Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature Request: Binlog Timestamp Watermarking for VStream #16477

Closed
twthorn opened this issue Jul 25, 2024 · 11 comments · Fixed by #16593
Closed

Feature Request: Binlog Timestamp Watermarking for VStream #16477

twthorn opened this issue Jul 25, 2024 · 11 comments · Fixed by #16593

Comments

@twthorn
Copy link
Contributor

twthorn commented Jul 25, 2024

Feature Description

Watermarking

Provide an option for specifying a time interval that enables sending periodic per-shard binlog events that indicates that all binlogs for that shard up to timestamp t have been received by the VStream client. Send this event regularly based on the interval and regardless of lag or throttling.

Note

The ideal case is that these watermark events are generated by inserts into a table, the same as the other tables the client is tracking changes for. This provides a stronger guarantee of the correctness, since it using the same code path we are trying to verify has processed up to a timestamp t. It is acceptable if they are received simply as table change events for a specially named table (the client can handle these events in a custom way).

Use Case(s)

There are a two common parts of a CDC pipeline using VStreams:

  • VStream client - reads the stream writes to a per-table message queue
  • Consumer - reads from the queue, writes to a data store

For processing the write to the data store, typically there is an async system for merging together the change log records to create a consistent snapshot of the database. This can be done on a best effort basis (whatever data is available, merge it in). However, other use cases require that the data be complete for a time window, from t0 (lower bound) to t1 (upper bound) (e.g., one day). In this case, the downstream system creating this database snapshot for a complete time window needs a way to know that the upstream system has processed up to timestamp t1 (upper bound of the time window).

With this feature, the following would be done:

  • VStream client - publishes each watermark event periodically to a queue

The downstream system can then check the latest timestamp of each shard's watermark event in the queue. If all timestamps for all shards are >= t1, then we know that the VStream client has processed all binlogs up to timestamp t1, and the snapshot job for the time window can be started.

@twthorn twthorn added Needs Triage This issue needs to be correctly labelled and triaged Type: Feature labels Jul 25, 2024
@mattlord mattlord added Component: VReplication and removed Needs Triage This issue needs to be correctly labelled and triaged labels Jul 25, 2024
@rohit-nayak-ps
Copy link
Contributor

Let me know if my understanding is correct, based on this issue and the discussion in https://vitess.slack.com/archives/C0PQY0PTK/p1721154609745629:

  • The periodic heartbeats will need to be enabled using the vttablet flags:—heartbeat_enable and —heartbeat_interval.
  • We need an option to the VStream API to also send events from _vt.heartbeats.
  • The vstream would then forward each RowEvent from updates to the _vt.heartbeats table as per-shard RowEvents with the timestamp of the heartbeat. Since this event is part of the binlog stream you would be assured that you have all other updates as of the heartbeat’s timestamp.

Does this meet the requirements outlined in this issue or am I missing something?

@twthorn
Copy link
Contributor Author

twthorn commented Aug 1, 2024

@rohit-nayak-ps Yes, that approach will completely satisfy the requirements

@rohit-nayak-ps rohit-nayak-ps self-assigned this Aug 1, 2024
@rohit-nayak-ps
Copy link
Contributor

@twthorn, thanks, will update here once I have a working PR so that you test and validate the approach.

@twthorn
Copy link
Contributor Author

twthorn commented Aug 12, 2024

Hi @rohit-nayak-ps wanted to check in on this. Anything I can do to help? Please let me know. Thanks again for your work on this issue!

@rohit-nayak-ps
Copy link
Contributor

It is WIP: I will have a draft PR for it in a day or two. I would request you to test it locally then to make sure it matches the expectations before we get it reviewed.

@rohit-nayak-ps
Copy link
Contributor

@twthorn, can you please try out #16593?

@twthorn
Copy link
Contributor Author

twthorn commented Aug 15, 2024

I ran this locally with the following diff

diff --git a/examples/common/scripts/vttablet-up.sh b/examples/common/scripts/vttablet-up.sh
index daa40aee89..03c54038d0 100755
--- a/examples/common/scripts/vttablet-up.sh
+++ b/examples/common/scripts/vttablet-up.sh
@@ -51,9 +51,10 @@ vttablet \
  --restore_from_backup \
  --port $port \
  --grpc_port $grpc_port \
+ --heartbeat_enable \
+ --heartbeat_interval 1s \
  --service_map 'grpc-queryservice,grpc-tabletmanager,grpc-updatestream' \
  --pid_file $VTDATAROOT/$tablet_dir/vttablet.pid \
- --heartbeat_on_demand_duration=5s \
  > $VTDATAROOT/$tablet_dir/vttablet.out 2>&1 &

 # Block waiting for the tablet to be listening
diff --git a/examples/local/vstream_client.go b/examples/local/vstream_client.go
index 98d2129f89..7dd7005807 100644
--- a/examples/local/vstream_client.go
+++ b/examples/local/vstream_client.go
@@ -75,6 +75,7 @@ func main() {
        flags := &vtgatepb.VStreamFlags{
                //MinimizeSkew:      false,
                //HeartbeatInterval: 60, //seconds
+               StreamKeyspaceHeartbeats: true,
        }
        reader, err := conn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
        for {

And taking the following steps

git checkout rohit/vstream-vt-heartbeat && make build

pushd examples/local

./101_initial_cluster.sh; mysql < ../common/insert_commerce_data.sql; ./201_customer_tablets.sh ; ./202_move_tables.sh; ./203_switch_reads.sh; ./204_switch_writes.sh; ./205_clean_commerce.sh; ./301_customer_sharded.sh; ./302_new_shards.sh; ./303_reshard.sh; ./304_switch_reads.sh; ./305_switch_writes.sh; ./306_down_shard_0.sh; ./307_delete_shard_0.sh

go run vstream_client.go

Got the following sample of heartbeat events

[type:BEGIN timestamp:1723751665 current_time:1723751665443063000 keyspace:"customer" shard:"-80" type:FIELD field_event:{table_name:"customer.heartbeat" fields:{name:"keyspaceShard" type:VARBINARY table:"heartbeat" org_table:"heartbeat" database:"_vt" org_name:"keyspaceShard" column_length:256 charset:63 flags:20611} fields:{name:"tabletUid" type:UINT32 table:"heartbeat" org_table:"heartbeat" database:"_vt" org_name:"tabletUid" column_length:10 charset:63 flags:36897} fields:{name:"ts" type:UINT64 table:"heartbeat" org_table:"heartbeat" database:"_vt" org_name:"ts" column_length:20 charset:63 flags:36897} keyspace:"customer" shard:"-80" is_internal:true} keyspace:"customer" shard:"-80" type:ROW timestamp:1723751665 row_event:{table_name:"customer.heartbeat" row_changes:{before:{lengths:12 lengths:3 lengths:19 values:"customer:-803011723751664440851000"} after:{lengths:12 lengths:3 lengths:19 values:"customer:-803011723751665442620000"}} keyspace:"customer" shard:"-80" flags:1 is_internal:true} current_time:1723751665443420000 keyspace:"customer" shard:"-80" type:VGTID vgtid:{shard_gtids:{keyspace:"customer" shard:"-80" gtid:"MySQL56/09bdd6a4-5b3f-11ef-aa3c-67bc771f38ba:1-534"} shard_gtids:{keyspace:"customer" shard:"80-" gtid:"MySQL56/0e11ada2-5b3f-11ef-b9a0-b1e36139fb41:1-519"}} keyspace:"customer" shard:"-80" type:COMMIT timestamp:1723751665 current_time:1723751665443426000 keyspace:"customer" shard:"-80"]
[type:BEGIN timestamp:1723751665 current_time:1723751665452348000 keyspace:"customer" shard:"80-" type:FIELD field_event:{table_name:"customer.heartbeat" fields:{name:"keyspaceShard" type:VARBINARY table:"heartbeat" org_table:"heartbeat" database:"_vt" org_name:"keyspaceShard" column_length:256 charset:63 flags:20611} fields:{name:"tabletUid" type:UINT32 table:"heartbeat" org_table:"heartbeat" database:"_vt" org_name:"tabletUid" column_length:10 charset:63 flags:36897} fields:{name:"ts" type:UINT64 table:"heartbeat" org_table:"heartbeat" database:"_vt" org_name:"ts" column_length:20 charset:63 flags:36897} keyspace:"customer" shard:"80-" is_internal:true} keyspace:"customer" shard:"80-" type:ROW timestamp:1723751665 row_event:{table_name:"customer.heartbeat" row_changes:{before:{lengths:12 lengths:3 lengths:19 values:"customer:80-4001723751664450280000"} after:{lengths:12 lengths:3 lengths:19 values:"customer:80-4001723751665451941000"}} keyspace:"customer" shard:"80-" flags:1 is_internal:true} current_time:1723751665452793000 keyspace:"customer" shard:"80-" type:VGTID vgtid:{shard_gtids:{keyspace:"customer" shard:"-80" gtid:"MySQL56/09bdd6a4-5b3f-11ef-aa3c-67bc771f38ba:1-534"} shard_gtids:{keyspace:"customer" shard:"80-" gtid:"MySQL56/0e11ada2-5b3f-11ef-b9a0-b1e36139fb41:1-520"}} keyspace:"customer" shard:"80-" type:COMMIT timestamp:1723751665 current_time:1723751665452800000 keyspace:"customer" shard:"80-"]
[type:BEGIN timestamp:1723751666 current_time:1723751666447638000 keyspace:"customer" shard:"-80" type:ROW timestamp:1723751666 row_event:{table_name:"customer.heartbeat" row_changes:{before:{lengths:12 lengths:3 lengths:19 values:"customer:-803011723751665442620000"} after:{lengths:12 lengths:3 lengths:19 values:"customer:-803011723751666444732000"}} keyspace:"customer" shard:"-80" flags:1 is_internal:true} current_time:1723751666447696000 keyspace:"customer" shard:"-80" type:VGTID vgtid:{shard_gtids:{keyspace:"customer" shard:"-80" gtid:"MySQL56/09bdd6a4-5b3f-11ef-aa3c-67bc771f38ba:1-535"} shard_gtids:{keyspace:"customer" shard:"80-" gtid:"MySQL56/0e11ada2-5b3f-11ef-b9a0-b1e36139fb41:1-520"}} keyspace:"customer" shard:"-80" type:COMMIT timestamp:1723751666 current_time:1723751666447723000 keyspace:"customer" shard:"-80"]
[type:BEGIN timestamp:1723751666 current_time:1723751666453692000 keyspace:"customer" shard:"80-" type:ROW timestamp:1723751666 row_event:{table_name:"customer.heartbeat" row_changes:{before:{lengths:12 lengths:3 lengths:19 values:"customer:80-4001723751665451941000"} after:{lengths:12 lengths:3 lengths:19 values:"customer:80-4001723751666452930000"}} keyspace:"customer" shard:"80-" flags:1 is_internal:true} current_time:1723751666453723000 keyspace:"customer" shard:"80-" type:VGTID vgtid:{shard_gtids:{keyspace:"customer" shard:"-80" gtid:"MySQL56/09bdd6a4-5b3f-11ef-aa3c-67bc771f38ba:1-535"} shard_gtids:{keyspace:"customer" shard:"80-" gtid:"MySQL56/0e11ada2-5b3f-11ef-b9a0-b1e36139fb41:1-521"}} keyspace:"customer" shard:"80-" type:COMMIT timestamp:1723751666 current_time:1723751666453737000 keyspace:"customer" shard:"80-"]
[type:BEGIN timestamp:1723751667 current_time:1723751667451644000 keyspace:"customer" shard:"-80" type:ROW timestamp:1723751667 row_event:{table_name:"customer.heartbeat" row_changes:{before:{lengths:12 lengths:3 lengths:19 values:"customer:-803011723751666444732000"} after:{lengths:12 lengths:3 lengths:19 values:"customer:-803011723751667448773000"}} keyspace:"customer" shard:"-80" flags:1 is_internal:true} current_time:1723751667451722000 keyspace:"customer" shard:"-80" type:VGTID vgtid:{shard_gtids:{keyspace:"customer" shard:"-80" gtid:"MySQL56/09bdd6a4-5b3f-11ef-aa3c-67bc771f38ba:1-536"} shard_gtids:{keyspace:"customer" shard:"80-" gtid:"MySQL56/0e11ada2-5b3f-11ef-b9a0-b1e36139fb41:1-521"}} keyspace:"customer" shard:"-80" type:COMMIT timestamp:1723751667 current_time:1723751667451747000 keyspace:"customer" shard:"-80"]
[type:BEGIN timestamp:1723751667 current_time:1723751667455731000 keyspace:"customer" shard:"80-" type:ROW timestamp:1723751667 row_event:{table_name:"customer.heartbeat" row_changes:{before:{lengths:12 lengths:3 lengths:19 values:"customer:80-4001723751666452930000"} after:{lengths:12 lengths:3 lengths:19 values:"customer:80-4001723751667454664000"}} keyspace:"customer" shard:"80-" flags:1 is_internal:true} current_time:1723751667455774000 keyspace:"customer" shard:"80-" type:VGTID vgtid:{shard_gtids:{keyspace:"customer" shard:"-80" gtid:"MySQL56/09bdd6a4-5b3f-11ef-aa3c-67bc771f38ba:1-536"} shard_gtids:{keyspace:"customer" shard:"80-" gtid:"MySQL56/0e11ada2-5b3f-11ef-b9a0-b1e36139fb41:1-522"}} keyspace:"customer" shard:"80-" type:COMMIT timestamp:1723751667 current_time:1723751667455794000 keyspace:"customer" shard:"80-"]
[type:BEGIN timestamp:1723751668 current_time:1723751668457068000 keyspace:"customer" shard:"-80" type:ROW timestamp:1723751668 row_event:{table_name:"customer.heartbeat" row_changes:{before:{lengths:12 lengths:3 lengths:19 values:"customer:-803011723751667448773000"} after:{lengths:12 lengths:3 lengths:19 values:"customer:-803011723751668454792000"}} keyspace:"customer" shard:"-80" flags:1 is_internal:true} current_time:1723751668457105000 keyspace:"customer" shard:"-80" type:VGTID vgtid:{shard_gtids:{keyspace:"customer" shard:"-80" gtid:"MySQL56/09bdd6a4-5b3f-11ef-aa3c-67bc771f38ba:1-537"} shard_gtids:{keyspace:"customer" shard:"80-" gtid:"MySQL56/0e11ada2-5b3f-11ef-b9a0-b1e36139fb41:1-522"}} keyspace:"customer" shard:"-80" type:COMMIT timestamp:1723751668 current_time:1723751668457117000 keyspace:"customer" shard:"-80"]
[type:BEGIN timestamp:1723751668 current_time:1723751668459451000 keyspace:"customer" shard:"80-" type:ROW timestamp:1723751668 row_event:{table_name:"customer.heartbeat" row_changes:{before:{lengths:12 lengths:3 lengths:19 values:"customer:80-4001723751667454664000"} after:{lengths:12 lengths:3 lengths:19 values:"customer:80-4001723751668457295000"}} keyspace:"customer" shard:"80-" flags:1 is_internal:true} current_time:1723751668459501000 keyspace:"customer" shard:"80-" type:VGTID vgtid:{shard_gtids:{keyspace:"customer" shard:"-80" gtid:"MySQL56/09bdd6a4-5b3f-11ef-aa3c-67bc771f38ba:1-537"} shard_gtids:{keyspace:"customer" shard:"80-" gtid:"MySQL56/0e11ada2-5b3f-11ef-b9a0-b1e36139fb41:1-523"}} keyspace:"customer" shard:"80-" type:COMMIT timestamp:1723751668 current_time:1723751668459523000 keyspace:"customer" shard:"80-"]

The heartbeat events for _vt.heartbeat are received periodically for the table. They have the expected BEGIN/FIELD/ROW/COMMIT format. The ROW events contain both the timestamp of the row and the time that the change was applied in the database. Events are received for all shards of the keyspace.

With this, any VStream client can establish a generic binlog watermarking strategy for all shards of a keyspace. @rohit-nayak-ps confirming this meets our requirements. Thank you again for the help on this

@rohit-nayak-ps
Copy link
Contributor

OK, great, thanks for testing this out. I will work further on the PR tomorrow: need to add more tests probably and clean it up a bit before marking it ready for review.

@deepthi
Copy link
Member

deepthi commented Aug 16, 2024

Enabling heartbeats on vttablets will mean that a binlog entry is generated for each interval. The interval defaults to 1s, and that is also what the proposed PR was tested with.

  • @twthorn what is the desired interval at which you would want to generate these?
  • @rohit-nayak-ps based on that we should come up with a recommended value for the flag.

@twthorn
Copy link
Contributor Author

twthorn commented Aug 20, 2024

@deepthi If we had the option for vstream clients, the desired interval we would set is 1 minute. But 1 second is also acceptable (it would just mean more data than is necessary for accurate watermarking).

@twthorn
Copy link
Contributor Author

twthorn commented Aug 27, 2024

Thank you all for the help on this, really appreciate it!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: Done
Development

Successfully merging a pull request may close this issue.

4 participants