From c3552071ddb6f3baa9930673a44f1558050816e2 Mon Sep 17 00:00:00 2001 From: Slava Markeyev Date: Fri, 26 Jul 2024 10:27:29 -0700 Subject: [PATCH] add time ms and txn to encoded json (#132) * add time ms and txn to encoded json * Fix jq query --- itests/common.bash | 4 ++-- marshaller/marshaller.go | 6 +++++- marshaller/marshaller_test.go | 10 +++++----- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/itests/common.bash b/itests/common.bash index ce4cc13..db848ba 100644 --- a/itests/common.bash +++ b/itests/common.bash @@ -198,9 +198,9 @@ _verify() { log "Checking ./tests/$BATS_TEST_DESCRIPTION/golden/$file against ./tests/$BATS_TEST_DESCRIPTION/output/$file" if [ "$SORT" == "false" ]; then - output=$(diff <(cat "./tests/$BATS_TEST_DESCRIPTION/golden/$file") <(cat "./tests/$BATS_TEST_DESCRIPTION/output/$file" | jq 'del(.lsn, .time)' -c -M) || true) + output=$(diff <(cat "./tests/$BATS_TEST_DESCRIPTION/golden/$file") <(cat "./tests/$BATS_TEST_DESCRIPTION/output/$file" | jq 'del(.lsn, .time, .time_ms, .txn)' -c -M) || true) else - output=$(diff <(cat "./tests/$BATS_TEST_DESCRIPTION/golden/$file") <(sort "./tests/$BATS_TEST_DESCRIPTION/output/$file" | jq 'del(.lsn, .time)' -c -M) || true) + output=$(diff <(cat "./tests/$BATS_TEST_DESCRIPTION/golden/$file") <(sort "./tests/$BATS_TEST_DESCRIPTION/output/$file" | jq 'del(.lsn, .time, .time_ms, .txn)' -c -M) || true) fi if ! $output; then diff --git a/marshaller/marshaller.go b/marshaller/marshaller.go index 6f2e97e..69d95a7 100644 --- a/marshaller/marshaller.go +++ b/marshaller/marshaller.go @@ -117,7 +117,9 @@ func New(shutdownHandler shutdown.ShutdownHandler, // jsonWalEntry is a helper struct which has json field tags type jsonWalEntry struct { Time string `json:"time"` - Lsn string `json:"lsn"` // Log Sequence Number that determines position in WAL + TimeMs int64 `json:"time_ms"` // epoch ms + Txn string `json:"txn"` // txn id + time key + Lsn string `json:"lsn"` // Log Sequence Number that determines position in WAL Table string `json:"table"` Operation string `json:"operation"` Columns map[string]map[string]map[string]string `json:"columns"` @@ -307,6 +309,8 @@ func marshalWalToJson(msg *replication.WalMessage, noMarshalOldValue bool) ([]by // Construct WalEntry reusedWalEntry.Time = t + reusedWalEntry.TimeMs = msg.ServerTime + reusedWalEntry.Txn = msg.TimeBasedKey reusedWalEntry.Lsn = *(*string)(unsafe.Pointer(&lsnBytes)) reusedWalEntry.Table = msg.Pr.Relation reusedWalEntry.Operation = msg.Pr.Operation diff --git a/marshaller/marshaller_test.go b/marshaller/marshaller_test.go index 91f43f3..edf4008 100644 --- a/marshaller/marshaller_test.go +++ b/marshaller/marshaller_test.go @@ -172,7 +172,7 @@ func TestBasicInsertMessage(t *testing.T) { assert.Equal(t, "test.users", marshalled.Table) assert.Equal(t, "0", marshalled.Transaction) - expectedJson := "{\"time\":\"1970-01-01T00:00:00Z\",\"lsn\":\"0/1\",\"table\":\"test.users\",\"operation\":\"INSERT\",\"columns\":{\"first_name\":{\"new\":{\"q\":\"true\",\"t\":\"string\",\"v\":\"Foo\"}}}}" + expectedJson := "{\"time\":\"1970-01-01T00:00:00Z\",\"time_ms\":0,\"txn\":\"0-0\",\"lsn\":\"0/1\",\"table\":\"test.users\",\"operation\":\"INSERT\",\"columns\":{\"first_name\":{\"new\":{\"q\":\"true\",\"t\":\"string\",\"v\":\"Foo\"}}}}" assert.Equal(t, expectedJson, string(marshalled.Json)) } @@ -208,7 +208,7 @@ func TestBasicUpdateMessage(t *testing.T) { assert.Equal(t, "test.users", marshalled.Table) assert.Equal(t, "0", marshalled.Transaction) - expectedJson := "{\"time\":\"1970-01-01T00:00:00Z\",\"lsn\":\"0/1\",\"table\":\"test.users\",\"operation\":\"INSERT\",\"columns\":{\"first_name\":{\"new\":{\"q\":\"true\",\"t\":\"string\",\"v\":\"Foo\"},\"old\":{\"q\":\"true\",\"t\":\"string\",\"v\":\"Bar\"}}}}" + expectedJson := "{\"time\":\"1970-01-01T00:00:00Z\",\"time_ms\":0,\"txn\":\"0-0\",\"lsn\":\"0/1\",\"table\":\"test.users\",\"operation\":\"INSERT\",\"columns\":{\"first_name\":{\"new\":{\"q\":\"true\",\"t\":\"string\",\"v\":\"Foo\"},\"old\":{\"q\":\"true\",\"t\":\"string\",\"v\":\"Bar\"}}}}" assert.Equal(t, expectedJson, string(marshalled.Json)) } @@ -244,7 +244,7 @@ func TestBasicUpdateNoOldMessage(t *testing.T) { assert.Equal(t, "test.users", marshalled.Table) assert.Equal(t, "0", marshalled.Transaction) - expectedJson := "{\"time\":\"1970-01-01T00:00:00Z\",\"lsn\":\"0/1\",\"table\":\"test.users\",\"operation\":\"INSERT\",\"columns\":{\"first_name\":{\"new\":{\"q\":\"true\",\"t\":\"string\",\"v\":\"Foo\"}}}}" + expectedJson := "{\"time\":\"1970-01-01T00:00:00Z\",\"time_ms\":0,\"txn\":\"0-0\",\"lsn\":\"0/1\",\"table\":\"test.users\",\"operation\":\"INSERT\",\"columns\":{\"first_name\":{\"new\":{\"q\":\"true\",\"t\":\"string\",\"v\":\"Foo\"}}}}" assert.Equal(t, expectedJson, string(marshalled.Json)) } @@ -289,7 +289,7 @@ func TestToastValue(t *testing.T) { assert.Equal(t, "test.users", marshalled.Table) assert.Equal(t, "0", marshalled.Transaction) - expectedJson := "{\"time\":\"1970-01-01T00:00:00Z\",\"lsn\":\"0/1\",\"table\":\"test.users\",\"operation\":\"INSERT\",\"columns\":{\"first_name\":{\"new\":{\"q\":\"true\",\"t\":\"string\",\"v\":\"Bar\"},\"old\":{\"q\":\"true\",\"t\":\"string\",\"v\":\"Bar\"}}}}" + expectedJson := "{\"time\":\"1970-01-01T00:00:00Z\",\"time_ms\":0,\"txn\":\"0-0\",\"lsn\":\"0/1\",\"table\":\"test.users\",\"operation\":\"INSERT\",\"columns\":{\"first_name\":{\"new\":{\"q\":\"true\",\"t\":\"string\",\"v\":\"Bar\"},\"old\":{\"q\":\"true\",\"t\":\"string\",\"v\":\"Bar\"}}}}" assert.Equal(t, expectedJson, string(marshalled.Json)) } @@ -334,7 +334,7 @@ func TestToastNoOldValue(t *testing.T) { assert.Equal(t, "test.users", marshalled.Table) assert.Equal(t, "0", marshalled.Transaction) - expectedJson := "{\"time\":\"1970-01-01T00:00:00Z\",\"lsn\":\"0/1\",\"table\":\"test.users\",\"operation\":\"INSERT\",\"columns\":{\"first_name\":{\"new\":{\"q\":\"true\",\"t\":\"string\",\"v\":\"Bar\"}}}}" + expectedJson := "{\"time\":\"1970-01-01T00:00:00Z\",\"time_ms\":0,\"txn\":\"0-0\",\"lsn\":\"0/1\",\"table\":\"test.users\",\"operation\":\"INSERT\",\"columns\":{\"first_name\":{\"new\":{\"q\":\"true\",\"t\":\"string\",\"v\":\"Bar\"}}}}" assert.Equal(t, expectedJson, string(marshalled.Json)) }