TL;DR: The Apache Beam Go SDK lacked a native Spanner Change Streams connector, forcing Go pipelines to depend on the Java implementation via cross-language transforms. I've implemented
ReadChangeStreamas a Splittable DoFn (SDF) that requires no external metadata table, achieves per-partition parallelism through aggressive splitting, and relies solely on the Beam runner's own checkpoint storage for durability. The PR is live at apache/beam#37991.
Why this needed to exist
Google Cloud Spanner's change streams let you subscribe to a real-time feed of mutations — inserts, updates, and deletes — across your database. They're the backbone of CDC (change data capture) pipelines: syncing to a data warehouse, propagating events, or building audit logs.
Apache Beam has had a Java implementation of this for some time. The Java SDK's SpannerIO.readChangeStream works well, but it carries a significant operational dependency: a metadata Spanner database that it uses to coordinate partition state across workers. This is fine for Java-centric teams, but for teams running Go pipelines — or anyone wanting to avoid tying their pipeline to the JVM — it's an obstacle.
We had a clear desire to not be tied to the Java implementation. So I sat down and wrote one from scratch.
What are Spanner Change Streams?
Before getting into the implementation, a quick primer if you haven't used them before.
You create a change stream in Spanner with a DDL statement:
CREATE CHANGE STREAM MyStream
FOR ALL
OPTIONS (retention_period = '7d');Once created, Spanner lets you query it via a special SQL function:
SELECT * FROM READ_MyStream(
start_timestamp => @start,
end_timestamp => @end,
partition_token => @token,
heartbeat_milliseconds => 10000
);Each query response is a stream of rows, each of which is one of three record types:
DataChangeRecord— the thing you actually care about. Contains the table name, the modified rows, old/new values, commit timestamp, and transaction metadata.HeartbeatRecord— a periodic pulse emitted when there's no activity. Used to advance the watermark.ChildPartitionsRecord— tells you about new partition tokens you need to start querying. Spanner reshards change stream partitions dynamically, so these show up as the database grows.
The partition tree starts from a root query with an empty token. That root query returns the initial set of leaf partitions. Each leaf may eventually return more ChildPartitionsRecord rows as Spanner reshards internally.
Root query (token="")
└── ChildPartitionsRecord → [token-A, token-B, ...]
├── token-A → DataChangeRecords ... → PartitionEndRecord
└── token-B → DataChangeRecords ... → ChildPartitionsRecord → [token-C]
└── token-C → ...
The challenge for a Beam connector is that you don't know the partition tree upfront — you discover it dynamically as you read. This rules out the usual static-input patterns in Beam. Enter Splittable DoFns.
Splittable DoFns: the right tool for the job
A Splittable DoFn (SDF) is Beam's mechanism for sources that are too complex for a simple Read transform — sources that are unbounded, have dynamic fan-out, or need fine-grained checkpointing. The Beam runner controls the SDF lifecycle:
- It calls
ProcessElementon a worker. - It can call
TrySplitto split an in-progress element into a primary (keep processing) and residual (hand to another worker). - It serialises the restriction (the SDF's state) to durable storage on checkpoint.
For change streams, the restriction encodes the partition work queue. The entire connector state lives here — no external coordination required.
beam.Impulse ──► readChangeStreamFn (SDF)
│
│ Restriction = PartitionQueueRestriction
│ {Pending: []PartitionWork, Bounded: bool}
│
▼
PCollection<DataChangeRecord>
The partition queue restriction
PartitionQueueRestriction is the heart of the design. It's a simple struct:
type PartitionQueueRestriction struct {
Pending []PartitionWork `json:"pending"`
Bounded bool `json:"bounded"`
}
type PartitionWork struct {
Token string `json:"token"`
StartTimestamp time.Time `json:"start"`
EndTimestamp time.Time `json:"end"` // zero = unbounded
}Pending[0] is the active partition being read. When a ChildPartitionsRecord arrives, the child tokens are appended to Pending via TryClaim. When a partition ends, Pending[0] is dequeued. The Beam runner serialises this entire struct to durable storage (GCS for Dataflow) on every checkpoint — that's your durability, with no Spanner metadata table involved.
The custom coder is registered at init time so the runner knows how to serialise and deserialise it:
coder.RegisterCoder(
reflect.TypeOf((*PartitionQueueRestriction)(nil)).Elem(),
encodePartitionQueueRestriction,
decodePartitionQueueRestriction,
)Achieving parallelism: aggressive TrySplit
A naive queue-based design would process all partitions sequentially on a single worker. That's clearly not acceptable for a large Spanner database that might have hundreds of partitions.
The solution is aggressive TrySplit. When the runner asks to split at any fraction greater than zero:
- Primary keeps only
Pending[0]— the currently active partition. - Residual gets
Pending[1:]— everything else.
The runner then recursively splits the residual, eventually producing one restriction per partition. Each restriction is scheduled on a separate worker.
Initial: [root, A, B, C, D]
Split 1: Primary=[root] Residual=[A, B, C, D]
Split 2: Primary=[A] Residual=[B, C, D]
Split 3: Primary=[B] Residual=[C, D]
...
This gives per-partition parallelism with no coordination overhead. The workers don't need to talk to each other — the runner handles it all through the SDF split protocol.
There's one caveat: the initial partition set isn't known until the root query runs and returns its first ChildPartitionsRecord. Until that point, there's a single restriction on a single worker. In practice this window is short — typically one defaultCheckpointInterval (10 seconds) — before splitting begins and parallelism kicks in.
The TrySplit(fraction == 0) case (self-checkpoint) works differently: the primary becomes empty (signalling done for this ProcessElement invocation) and the entire restriction becomes the residual. This is how the SDF periodically yields to allow the runner to serialise the checkpoint.
Watermark correctness
Getting the watermark right is one of the trickier parts of any streaming source. An incorrect watermark can cause records to be silently dropped as "late data" by downstream windowing operations.
The changeStreamWatermarkEstimator tracks two values:
| Field | Meaning | Initial value |
|---|---|---|
maxObserved | Highest commit or heartbeat timestamp seen so far | math.MinInt64 |
minPending | Minimum StartTimestamp across all queued partitions | math.MaxInt64 |
CurrentWatermark() returns min(maxObserved, minPending).
The minPending component is critical. Without it, consider this scenario: partition A has advanced the watermark to T1, but partition B is queued with data starting at T0 < T1. When B's data arrives, it's already behind the watermark and would be treated as late. By holding the watermark back to min(maxObserved, minPending), we ensure the watermark never advances past data that hasn't been emitted yet.
After aggressive splitting, each restriction holds exactly one partition, so in steady state minPending == maxObserved == the partition's current position. The dual-state design is only load-bearing during the brief window before splitting occurs.
Transient error resilience
Spanner streaming reads surface transient gRPC errors during backend maintenance (UNAVAILABLE) or leadership changes (ABORTED). Rather than failing the entire bundle on these, the implementation triggers a checkpoint-and-retry:
UNAVAILABLE / ABORTED → ResumeProcessingIn(1 second)
Because the restriction records the last committed timestamp, the retry resumes from exactly where reading left off. Non-retryable errors fail the bundle normally.
The public API
Using it from a Go pipeline is straightforward:
import "github.com/apache/beam/sdks/v2/go/pkg/beam/io/spannerio"
records := spannerio.ReadChangeStream(
s,
"projects/my-project/instances/my-instance/databases/my-db",
"MyStream", // change stream name
startTime, // inclusive start timestamp
time.Time{}, // zero value = unbounded (runs indefinitely)
10_000, // heartbeat interval in milliseconds
)
// records is a beam.PCollection of spannerio.DataChangeRecordEach element in the output PCollection is a DataChangeRecord:
type DataChangeRecord struct {
PartitionToken string
CommitTimestamp time.Time
RecordSequence string
ServerTransactionID string
IsLastRecordInTransactionInPartition bool
Table string
ColumnMetadata []*ColumnMetadata
Mods []*Mod
ModType ModType // Insert, Update, or Delete
ValueCaptureType ValueCaptureType
NumberOfRecordsInTransaction int32
NumberOfPartitionsInTransaction int32
TransactionTag string
IsSystemTransaction bool
}Mod.Keys, Mod.OldValues, and Mod.NewValues hold the actual row data as []*ModValue, where each ModValue contains a column name and its value JSON-encoded in the Spanner format.
How it differs from the Java implementation
The design deliberately diverges from the Java SDK's approach in one key area:
| Concern | Java | Go |
|---|---|---|
| Partition coordination | Spanner metadata table | SDF restriction (runner state) |
| External dependencies | Requires a metadata Spanner DB | None beyond the source database |
| Durability | Metadata table survives runner restarts | Runner checkpoint storage (GCS on Dataflow) |
| Partition deduplication | Metadata table tracks seen tokens | Not needed — newer API guarantees each token appears in exactly one parent's ChildPartitionsRecord |
The trade-off is that the Go implementation relies on the runner's checkpoint storage rather than a persistent external store. For Dataflow, this is backed by GCS and is just as durable. The benefit is that you don't need to provision or manage a separate metadata database — a meaningful operational simplification.
Observability
The connector emits three Beam metrics under the spannerio.changestream namespace:
| Metric | Description |
|---|---|
records_emitted | Total DataChangeRecords emitted |
partitions_completed | Partitions that reached a natural PartitionEndRecord |
errors_transient | Transient errors that triggered a checkpoint-and-retry |
These are accessible via the Beam metrics API or your runner's monitoring UI (Dataflow Monitoring for GCP users).
Known limitations
A few things worth knowing before reaching for this in production:
-
No initial parallelism. All partitions are discovered dynamically. Expect roughly 10 seconds of single-worker operation at pipeline start while the root query completes.
-
At-least-once delivery. Records can be re-emitted after a worker failure. If you need exactly-once semantics, deduplicate downstream — a Spanner UPSERT keyed on
(PartitionToken, CommitTimestamp, RecordSequence, ServerTransactionID)is the natural choice. -
Heartbeat records are not emitted. They advance the watermark internally but don't appear in the output
PCollection. If you need explicit heartbeat visibility, add a side output in a downstream DoFn. -
SQL injection guard is name-pattern only. The change stream name is interpolated directly into SQL (Spanner doesn't support parameterised function names), so the connector panics at startup if the name doesn't match
[A-Za-z_][A-Za-z0-9_]*. This is the complete safety measure — keep that in mind if you're building tooling that constructs stream names dynamically.
