Apache Beam, Spanner Change Streams in Go

Apache Beam is a solid ETL framework however the Go SDK needs some work. I decided to implement Google Spanner ChangeStream in the Go SDK.

Blog Post Image

TL;DR: The Apache Beam Go SDK lacked a native Spanner Change Streams connector, forcing Go pipelines to depend on the Java implementation via cross-language transforms. I've implemented ReadChangeStream as a Splittable DoFn (SDF) that requires no external metadata table, achieves per-partition parallelism through aggressive splitting, and relies solely on the Beam runner's own checkpoint storage for durability. The PR is live at apache/beam#37991.


Why this needed to exist

Google Cloud Spanner's change streams let you subscribe to a real-time feed of mutations — inserts, updates, and deletes — across your database. They're the backbone of CDC (change data capture) pipelines: syncing to a data warehouse, propagating events, or building audit logs.

Apache Beam has had a Java implementation of this for some time. The Java SDK's SpannerIO.readChangeStream works well, but it carries a significant operational dependency: a metadata Spanner database that it uses to coordinate partition state across workers. This is fine for Java-centric teams, but for teams running Go pipelines — or anyone wanting to avoid tying their pipeline to the JVM — it's an obstacle.

We had a clear desire to not be tied to the Java implementation. So I sat down and wrote one from scratch.


What are Spanner Change Streams?

Before getting into the implementation, a quick primer if you haven't used them before.

You create a change stream in Spanner with a DDL statement:

CREATE CHANGE STREAM MyStream
FOR ALL
OPTIONS (retention_period = '7d');

Once created, Spanner lets you query it via a special SQL function:

SELECT * FROM READ_MyStream(
  start_timestamp => @start,
  end_timestamp   => @end,
  partition_token => @token,
  heartbeat_milliseconds => 10000
);

Each query response is a stream of rows, each of which is one of three record types:

  • DataChangeRecord — the thing you actually care about. Contains the table name, the modified rows, old/new values, commit timestamp, and transaction metadata.
  • HeartbeatRecord — a periodic pulse emitted when there's no activity. Used to advance the watermark.
  • ChildPartitionsRecord — tells you about new partition tokens you need to start querying. Spanner reshards change stream partitions dynamically, so these show up as the database grows.

The partition tree starts from a root query with an empty token. That root query returns the initial set of leaf partitions. Each leaf may eventually return more ChildPartitionsRecord rows as Spanner reshards internally.

Root query (token="")
    └── ChildPartitionsRecord → [token-A, token-B, ...]
            ├── token-A  →  DataChangeRecords ...  →  PartitionEndRecord
            └── token-B  →  DataChangeRecords ...  →  ChildPartitionsRecord → [token-C]
                                                            └── token-C  →  ...

The challenge for a Beam connector is that you don't know the partition tree upfront — you discover it dynamically as you read. This rules out the usual static-input patterns in Beam. Enter Splittable DoFns.


Splittable DoFns: the right tool for the job

A Splittable DoFn (SDF) is Beam's mechanism for sources that are too complex for a simple Read transform — sources that are unbounded, have dynamic fan-out, or need fine-grained checkpointing. The Beam runner controls the SDF lifecycle:

  • It calls ProcessElement on a worker.
  • It can call TrySplit to split an in-progress element into a primary (keep processing) and residual (hand to another worker).
  • It serialises the restriction (the SDF's state) to durable storage on checkpoint.

For change streams, the restriction encodes the partition work queue. The entire connector state lives here — no external coordination required.

beam.Impulse ──► readChangeStreamFn (SDF)
                      │
                      │  Restriction = PartitionQueueRestriction
                      │  {Pending: []PartitionWork, Bounded: bool}
                      │
                      ▼
               PCollection<DataChangeRecord>

The partition queue restriction

PartitionQueueRestriction is the heart of the design. It's a simple struct:

type PartitionQueueRestriction struct {
    Pending []PartitionWork `json:"pending"`
    Bounded bool            `json:"bounded"`
}

type PartitionWork struct {
    Token          string    `json:"token"`
    StartTimestamp time.Time `json:"start"`
    EndTimestamp   time.Time `json:"end"` // zero = unbounded
}

Pending[0] is the active partition being read. When a ChildPartitionsRecord arrives, the child tokens are appended to Pending via TryClaim. When a partition ends, Pending[0] is dequeued. The Beam runner serialises this entire struct to durable storage (GCS for Dataflow) on every checkpoint — that's your durability, with no Spanner metadata table involved.

The custom coder is registered at init time so the runner knows how to serialise and deserialise it:

coder.RegisterCoder(
    reflect.TypeOf((*PartitionQueueRestriction)(nil)).Elem(),
    encodePartitionQueueRestriction,
    decodePartitionQueueRestriction,
)

Achieving parallelism: aggressive TrySplit

A naive queue-based design would process all partitions sequentially on a single worker. That's clearly not acceptable for a large Spanner database that might have hundreds of partitions.

The solution is aggressive TrySplit. When the runner asks to split at any fraction greater than zero:

  • Primary keeps only Pending[0] — the currently active partition.
  • Residual gets Pending[1:] — everything else.

The runner then recursively splits the residual, eventually producing one restriction per partition. Each restriction is scheduled on a separate worker.

Initial:   [root, A, B, C, D]
Split 1:   Primary=[root]   Residual=[A, B, C, D]
Split 2:   Primary=[A]      Residual=[B, C, D]
Split 3:   Primary=[B]      Residual=[C, D]
...

This gives per-partition parallelism with no coordination overhead. The workers don't need to talk to each other — the runner handles it all through the SDF split protocol.

There's one caveat: the initial partition set isn't known until the root query runs and returns its first ChildPartitionsRecord. Until that point, there's a single restriction on a single worker. In practice this window is short — typically one defaultCheckpointInterval (10 seconds) — before splitting begins and parallelism kicks in.

The TrySplit(fraction == 0) case (self-checkpoint) works differently: the primary becomes empty (signalling done for this ProcessElement invocation) and the entire restriction becomes the residual. This is how the SDF periodically yields to allow the runner to serialise the checkpoint.


Watermark correctness

Getting the watermark right is one of the trickier parts of any streaming source. An incorrect watermark can cause records to be silently dropped as "late data" by downstream windowing operations.

The changeStreamWatermarkEstimator tracks two values:

FieldMeaningInitial value
maxObservedHighest commit or heartbeat timestamp seen so farmath.MinInt64
minPendingMinimum StartTimestamp across all queued partitionsmath.MaxInt64

CurrentWatermark() returns min(maxObserved, minPending).

The minPending component is critical. Without it, consider this scenario: partition A has advanced the watermark to T1, but partition B is queued with data starting at T0 < T1. When B's data arrives, it's already behind the watermark and would be treated as late. By holding the watermark back to min(maxObserved, minPending), we ensure the watermark never advances past data that hasn't been emitted yet.

After aggressive splitting, each restriction holds exactly one partition, so in steady state minPending == maxObserved == the partition's current position. The dual-state design is only load-bearing during the brief window before splitting occurs.


Transient error resilience

Spanner streaming reads surface transient gRPC errors during backend maintenance (UNAVAILABLE) or leadership changes (ABORTED). Rather than failing the entire bundle on these, the implementation triggers a checkpoint-and-retry:

UNAVAILABLE / ABORTED  →  ResumeProcessingIn(1 second)

Because the restriction records the last committed timestamp, the retry resumes from exactly where reading left off. Non-retryable errors fail the bundle normally.


The public API

Using it from a Go pipeline is straightforward:

import "github.com/apache/beam/sdks/v2/go/pkg/beam/io/spannerio"

records := spannerio.ReadChangeStream(
    s,
    "projects/my-project/instances/my-instance/databases/my-db",
    "MyStream",       // change stream name
    startTime,        // inclusive start timestamp
    time.Time{},      // zero value = unbounded (runs indefinitely)
    10_000,           // heartbeat interval in milliseconds
)
// records is a beam.PCollection of spannerio.DataChangeRecord

Each element in the output PCollection is a DataChangeRecord:

type DataChangeRecord struct {
    PartitionToken                       string
    CommitTimestamp                      time.Time
    RecordSequence                       string
    ServerTransactionID                  string
    IsLastRecordInTransactionInPartition bool
    Table                                string
    ColumnMetadata                       []*ColumnMetadata
    Mods                                 []*Mod
    ModType                              ModType   // Insert, Update, or Delete
    ValueCaptureType                     ValueCaptureType
    NumberOfRecordsInTransaction         int32
    NumberOfPartitionsInTransaction      int32
    TransactionTag                       string
    IsSystemTransaction                  bool
}

Mod.Keys, Mod.OldValues, and Mod.NewValues hold the actual row data as []*ModValue, where each ModValue contains a column name and its value JSON-encoded in the Spanner format.


How it differs from the Java implementation

The design deliberately diverges from the Java SDK's approach in one key area:

ConcernJavaGo
Partition coordinationSpanner metadata tableSDF restriction (runner state)
External dependenciesRequires a metadata Spanner DBNone beyond the source database
DurabilityMetadata table survives runner restartsRunner checkpoint storage (GCS on Dataflow)
Partition deduplicationMetadata table tracks seen tokensNot needed — newer API guarantees each token appears in exactly one parent's ChildPartitionsRecord

The trade-off is that the Go implementation relies on the runner's checkpoint storage rather than a persistent external store. For Dataflow, this is backed by GCS and is just as durable. The benefit is that you don't need to provision or manage a separate metadata database — a meaningful operational simplification.


Observability

The connector emits three Beam metrics under the spannerio.changestream namespace:

MetricDescription
records_emittedTotal DataChangeRecords emitted
partitions_completedPartitions that reached a natural PartitionEndRecord
errors_transientTransient errors that triggered a checkpoint-and-retry

These are accessible via the Beam metrics API or your runner's monitoring UI (Dataflow Monitoring for GCP users).


Known limitations

A few things worth knowing before reaching for this in production:

  1. No initial parallelism. All partitions are discovered dynamically. Expect roughly 10 seconds of single-worker operation at pipeline start while the root query completes.

  2. At-least-once delivery. Records can be re-emitted after a worker failure. If you need exactly-once semantics, deduplicate downstream — a Spanner UPSERT keyed on (PartitionToken, CommitTimestamp, RecordSequence, ServerTransactionID) is the natural choice.

  3. Heartbeat records are not emitted. They advance the watermark internally but don't appear in the output PCollection. If you need explicit heartbeat visibility, add a side output in a downstream DoFn.

  4. SQL injection guard is name-pattern only. The change stream name is interpolated directly into SQL (Spanner doesn't support parameterised function names), so the connector panics at startup if the name doesn't match [A-Za-z_][A-Za-z0-9_]*. This is the complete safety measure — keep that in mind if you're building tooling that constructs stream names dynamically.