diff --git a/cmd/wal-listener/init.go b/cmd/wal-listener/init.go index c7aed6db..2ed96f02 100644 --- a/cmd/wal-listener/init.go +++ b/cmd/wal-listener/init.go @@ -8,8 +8,8 @@ import ( "github.com/jackc/pgx" "github.com/nats-io/nats.go" - "github.com/ihippik/wal-listener/v2/config" - "github.com/ihippik/wal-listener/v2/publisher" + "github.com/ihippik/wal-listener/v2/internal/config" + "github.com/ihippik/wal-listener/v2/internal/publisher" ) // initPgxConnections initialise db and replication connections. diff --git a/cmd/wal-listener/main.go b/cmd/wal-listener/main.go index 6010dda7..fb054b8e 100644 --- a/cmd/wal-listener/main.go +++ b/cmd/wal-listener/main.go @@ -11,8 +11,9 @@ import ( scfg "github.com/ihippik/config" "github.com/urfave/cli/v2" - "github.com/ihippik/wal-listener/v2/config" - "github.com/ihippik/wal-listener/v2/listener" + "github.com/ihippik/wal-listener/v2/internal/config" + "github.com/ihippik/wal-listener/v2/internal/listener" + "github.com/ihippik/wal-listener/v2/internal/listener/transaction" ) func main() { @@ -73,19 +74,19 @@ func main() { } }() - service := listener.NewWalListener( + svc := listener.NewWalListener( cfg, logger, listener.NewRepository(conn), rConn, pub, - listener.NewBinaryParser(logger, binary.BigEndian), + transaction.NewBinaryParser(logger, binary.BigEndian), config.NewMetrics(), ) - go service.InitHandlers(ctx) + go svc.InitHandlers(ctx) - if err := service.Process(ctx); err != nil { + if err = svc.Process(ctx); err != nil { slog.Error("service process failed", "err", err.Error()) } diff --git a/config/config.go b/internal/config/config.go similarity index 100% rename from config/config.go rename to internal/config/config.go diff --git a/config/config_test.go b/internal/config/config_test.go similarity index 100% rename from config/config_test.go rename to internal/config/config_test.go diff --git a/config/metrics.go b/internal/config/metrics.go similarity index 100% rename from config/metrics.go rename to internal/config/metrics.go diff --git a/listener/listener.go b/internal/listener/listener.go similarity index 93% rename from listener/listener.go rename to internal/listener/listener.go index 82f202ee..dda7654b 100644 --- a/listener/listener.go +++ b/internal/listener/listener.go @@ -2,6 +2,7 @@ package listener import ( "context" + "errors" "fmt" "log/slog" "net/http" @@ -15,8 +16,9 @@ import ( "github.com/jackc/pgx" "golang.org/x/sync/errgroup" - "github.com/ihippik/wal-listener/v2/config" - "github.com/ihippik/wal-listener/v2/publisher" + "github.com/ihippik/wal-listener/v2/internal/config" + tx "github.com/ihippik/wal-listener/v2/internal/listener/transaction" + "github.com/ihippik/wal-listener/v2/internal/publisher" ) // Logical decoding plugin. @@ -27,7 +29,7 @@ type eventPublisher interface { } type parser interface { - ParseWalMessage([]byte, *WalTransaction) error + ParseWalMessage([]byte, *tx.WAL) error } type replication interface { @@ -69,6 +71,12 @@ type Listener struct { isAlive atomic.Bool } +var ( + errReplConnectionIsLost = errors.New("replication connection to postgres is lost") + errConnectionIsLost = errors.New("db connection to postgres is lost") + errReplDidNotStart = errors.New("replication did not start") +) + // NewWalListener create and initialize new service instance. func NewWalListener( cfg *config.Config, @@ -310,7 +318,7 @@ func (l *Listener) Stream(ctx context.Context) error { }, } - tx := NewWalTransaction(l.log, pool, l.monitor) + txWAL := tx.NewWAL(l.log, pool, l.monitor) for { if err := ctx.Err(); err != nil { @@ -328,7 +336,7 @@ func (l *Listener) Stream(ctx context.Context) error { continue } - if err = l.processMessage(ctx, msg, tx); err != nil { + if err = l.processMessage(ctx, msg, txWAL); err != nil { return fmt.Errorf("process message: %w", err) } @@ -336,7 +344,7 @@ func (l *Listener) Stream(ctx context.Context) error { } } -func (l *Listener) processMessage(ctx context.Context, msg *pgx.ReplicationMessage, tx *WalTransaction) error { +func (l *Listener) processMessage(ctx context.Context, msg *pgx.ReplicationMessage, txWAL *tx.WAL) error { if msg.WalMessage == nil { l.log.Debug("empty wal-message") return nil @@ -344,13 +352,13 @@ func (l *Listener) processMessage(ctx context.Context, msg *pgx.ReplicationMessa l.log.Debug("WAL message has been received", slog.Uint64("wal", msg.WalMessage.WalStart)) - if err := l.parser.ParseWalMessage(msg.WalMessage.WalData, tx); err != nil { + if err := l.parser.ParseWalMessage(msg.WalMessage.WalData, txWAL); err != nil { l.monitor.IncProblematicEvents(problemKindParse) return fmt.Errorf("parse: %w", err) } - if tx.CommitTime != nil { - for event := range tx.CreateEventsWithFilter(ctx, l.cfg.Listener.Filter.Tables) { + if txWAL.CommitTime != nil { + for event := range txWAL.CreateEventsWithFilter(ctx, l.cfg.Listener.Filter.Tables) { subjectName := event.SubjectName(l.cfg) if err := l.publisher.Publish(ctx, subjectName, event); err != nil { @@ -368,10 +376,10 @@ func (l *Listener) processMessage(ctx context.Context, msg *pgx.ReplicationMessa slog.Uint64("lsn", l.readLSN()), ) - tx.pool.Put(event) + txWAL.RetrieveEvent(event) } - tx.Clear() + txWAL.Clear() } if msg.WalMessage.WalStart > l.readLSN() { diff --git a/listener/listener_test.go b/internal/listener/listener_test.go similarity index 98% rename from listener/listener_test.go rename to internal/listener/listener_test.go index f5111863..b8e3aa47 100644 --- a/listener/listener_test.go +++ b/internal/listener/listener_test.go @@ -15,8 +15,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" - "github.com/ihippik/wal-listener/v2/config" - "github.com/ihippik/wal-listener/v2/publisher" + "github.com/ihippik/wal-listener/v2/internal/config" + tx "github.com/ihippik/wal-listener/v2/internal/listener/transaction" + "github.com/ihippik/wal-listener/v2/internal/publisher" ) var ( @@ -467,7 +468,7 @@ func TestListener_Stream(t *testing.T) { repo.On("NewStandbyStatus", walPositions).Return(status, err).After(10 * time.Millisecond) } - setParseWalMessageOnce := func(msg []byte, tx *WalTransaction, err error) { + setParseWalMessageOnce := func(msg []byte, tx *tx.WAL, err error) { prs.On("ParseWalMessage", msg, tx).Return(err) } @@ -560,15 +561,7 @@ func TestListener_Stream(t *testing.T) { setParseWalMessageOnce( []byte(`some bytes`), - &WalTransaction{ - monitor: metrics, - log: logger, - LSN: 0, - BeginTime: nil, - CommitTime: nil, - RelationStore: make(map[int32]RelationData), - Actions: nil, - }, + tx.NewWAL(logger, nil, metrics), nil, ) diff --git a/listener/monitor_test.go b/internal/listener/monitor_mock_test.go similarity index 100% rename from listener/monitor_test.go rename to internal/listener/monitor_mock_test.go diff --git a/listener/parser_mock_test.go b/internal/listener/parser_mock_test.go similarity index 55% rename from listener/parser_mock_test.go rename to internal/listener/parser_mock_test.go index 3fb864a3..fc37df5b 100644 --- a/listener/parser_mock_test.go +++ b/internal/listener/parser_mock_test.go @@ -4,30 +4,27 @@ import ( "time" "github.com/stretchr/testify/mock" + + trx "github.com/ihippik/wal-listener/v2/internal/listener/transaction" ) type parserMock struct { mock.Mock } -func (p *parserMock) ParseWalMessage(msg []byte, tx *WalTransaction) error { +func (p *parserMock) ParseWalMessage(msg []byte, tx *trx.WAL) error { args := p.Called(msg, tx) now := time.Now() tx.BeginTime = &now tx.CommitTime = &now - tx.Actions = []ActionData{ + tx.Actions = []trx.ActionData{ { Schema: "public", Table: "users", Kind: "INSERT", - NewColumns: []Column{ - { - name: "id", - value: 1, - valueType: 23, - isKey: true, - }, + NewColumns: []trx.Column{ + trx.InitColumn(nil, "id", 1, 23, true), }, }, } diff --git a/listener/publisher_mock_test.go b/internal/listener/publisher_mock_test.go similarity index 83% rename from listener/publisher_mock_test.go rename to internal/listener/publisher_mock_test.go index 9ae567d6..bf917559 100644 --- a/listener/publisher_mock_test.go +++ b/internal/listener/publisher_mock_test.go @@ -5,7 +5,7 @@ import ( "github.com/stretchr/testify/mock" - "github.com/ihippik/wal-listener/v2/publisher" + "github.com/ihippik/wal-listener/v2/internal/publisher" ) type publisherMock struct { diff --git a/listener/replicator_mock_test.go b/internal/listener/replicator_mock_test.go similarity index 100% rename from listener/replicator_mock_test.go rename to internal/listener/replicator_mock_test.go diff --git a/listener/repository.go b/internal/listener/repository.go similarity index 100% rename from listener/repository.go rename to internal/listener/repository.go diff --git a/listener/repository_mock_test.go b/internal/listener/repository_mock_test.go similarity index 100% rename from listener/repository_mock_test.go rename to internal/listener/repository_mock_test.go diff --git a/internal/listener/transaction/data.go b/internal/listener/transaction/data.go new file mode 100644 index 00000000..b7f47a5a --- /dev/null +++ b/internal/listener/transaction/data.go @@ -0,0 +1,124 @@ +package transaction + +import ( + "log/slog" + "strconv" + "time" + + "github.com/goccy/go-json" + "github.com/google/uuid" +) + +// ActionKind kind of action on WAL message. +type ActionKind string + +// kind of WAL message. +const ( + ActionKindInsert ActionKind = "INSERT" + ActionKindUpdate ActionKind = "UPDATE" + ActionKindDelete ActionKind = "DELETE" +) + +func (k ActionKind) string() string { + return string(k) +} + +// RelationData kind of WAL message data. +type RelationData struct { + Schema string + Table string + Columns []Column +} + +// ActionData kind of WAL message data. +type ActionData struct { + Schema string + Table string + Kind ActionKind + OldColumns []Column + NewColumns []Column +} + +// Column of the table with which changes occur. +type Column struct { + log *slog.Logger + name string + value any + valueType int + isKey bool +} + +// InitColumn create new Column instance with data.s +func InitColumn(log *slog.Logger, name string, value any, valueType int, isKey bool) Column { + return Column{log: log, name: name, value: value, valueType: valueType, isKey: isKey} +} + +// AssertValue converts bytes to a specific type depending +// on the type of this data in the database table. +func (c *Column) AssertValue(src []byte) { + var ( + val any + err error + ) + + if src == nil { + c.value = nil + return + } + + strSrc := string(src) + + const ( + timestampLayout = "2006-01-02 15:04:05" + timestampWithTZLayout = "2006-01-02 15:04:05.999999999-07" + ) + + switch c.valueType { + case BoolOID: + val, err = strconv.ParseBool(strSrc) + case Int2OID, Int4OID: + val, err = strconv.Atoi(strSrc) + case Int8OID: + val, err = strconv.ParseInt(strSrc, 10, 64) + case TextOID, VarcharOID: + val = strSrc + case TimestampOID: + val, err = time.Parse(timestampLayout, strSrc) + case TimestamptzOID: + val, err = time.ParseInLocation(timestampWithTZLayout, strSrc, time.UTC) + case DateOID, TimeOID: + val = strSrc + case UUIDOID: + val, err = uuid.Parse(strSrc) + case JSONBOID: + var m any + + if src[0] == '[' { + m = make([]any, 0) + } else { + m = make(map[string]any) + } + + err = json.Unmarshal(src, &m) + val = m + default: + c.log.Debug( + "unknown oid type", + slog.Int("pg_type", c.valueType), + slog.String("column_name", c.name), + ) + + val = strSrc + } + + if err != nil { + c.log.Error( + "column data parse error", + slog.String("err", err.Error()), + slog.Int("pg_type", c.valueType), + slog.String("column_name", c.name), + ) + } + + c.value = val +} diff --git a/internal/listener/transaction/monitor_mock_test.go b/internal/listener/transaction/monitor_mock_test.go new file mode 100644 index 00000000..d43bba8e --- /dev/null +++ b/internal/listener/transaction/monitor_mock_test.go @@ -0,0 +1,9 @@ +package transaction + +type monitorMock struct{} + +func (m *monitorMock) IncPublishedEvents(subject, table string) {} + +func (m *monitorMock) IncFilterSkippedEvents(table string) {} + +func (m *monitorMock) IncProblematicEvents(kind string) {} diff --git a/listener/parser.go b/internal/listener/transaction/parser.go similarity index 91% rename from listener/parser.go rename to internal/listener/transaction/parser.go index c79d1067..01f7e5b2 100644 --- a/listener/parser.go +++ b/internal/listener/transaction/parser.go @@ -1,8 +1,9 @@ -package listener +package transaction import ( "bytes" "encoding/binary" + "errors" "fmt" "log/slog" "time" @@ -16,6 +17,12 @@ type BinaryParser struct { buffer *bytes.Buffer } +var ( + ErrEmptyWALMessage = errors.New("empty WAL message") + ErrMessageLost = errors.New("messages are lost") + ErrUnknownMessageType = errors.New("unknown message type") +) + // NewBinaryParser create instance of binary parser. func NewBinaryParser(logger *slog.Logger, byteOrder binary.ByteOrder) *BinaryParser { return &BinaryParser{ @@ -25,9 +32,9 @@ func NewBinaryParser(logger *slog.Logger, byteOrder binary.ByteOrder) *BinaryPar } // ParseWalMessage parse postgres WAL message. -func (p *BinaryParser) ParseWalMessage(msg []byte, tx *WalTransaction) error { +func (p *BinaryParser) ParseWalMessage(msg []byte, tx *WAL) error { if len(msg) == 0 { - return errEmptyWALMessage + return ErrEmptyWALMessage } p.msgType = msg[0] @@ -55,7 +62,7 @@ func (p *BinaryParser) ParseWalMessage(msg []byte, tx *WalTransaction) error { ) if tx.LSN > 0 && tx.LSN != commit.LSN { - return fmt.Errorf("commit: %w", errMessageLost) + return fmt.Errorf("commit: %w", ErrMessageLost) } tx.CommitTime = &commit.Timestamp @@ -71,7 +78,7 @@ func (p *BinaryParser) ParseWalMessage(msg []byte, tx *WalTransaction) error { ) if tx.LSN == 0 { - return fmt.Errorf("commit: %w", errMessageLost) + return fmt.Errorf("commit: %w", ErrMessageLost) } rd := RelationData{ @@ -80,12 +87,7 @@ func (p *BinaryParser) ParseWalMessage(msg []byte, tx *WalTransaction) error { } for _, rf := range relation.Columns { - c := Column{ - log: p.log, - name: rf.Name, - valueType: int(rf.TypeID), - isKey: rf.Key, - } + c := InitColumn(p.log, rf.Name, nil, int(rf.TypeID), rf.Key) rd.Columns = append(rd.Columns, c) } @@ -147,7 +149,7 @@ func (p *BinaryParser) ParseWalMessage(msg []byte, tx *WalTransaction) error { tx.Actions = append(tx.Actions, action) default: - return fmt.Errorf("%w : %s", errUnknownMessageType, []byte{p.msgType}) + return fmt.Errorf("%w : %s", ErrUnknownMessageType, []byte{p.msgType}) } return nil diff --git a/listener/parser_test.go b/internal/listener/transaction/parser_test.go similarity index 97% rename from listener/parser_test.go rename to internal/listener/transaction/parser_test.go index b0dd330a..93baf7d7 100644 --- a/listener/parser_test.go +++ b/internal/listener/transaction/parser_test.go @@ -1,4 +1,4 @@ -package listener +package transaction import ( "bytes" @@ -461,7 +461,7 @@ func TestBinaryParser_getBeginMsg(t *testing.T) { func TestBinaryParser_ParseWalMessage(t *testing.T) { type args struct { msg []byte - tx *WalTransaction + tx *WAL } logger := slog.New(slog.NewJSONHandler(io.Discard, nil)) @@ -471,7 +471,7 @@ func TestBinaryParser_ParseWalMessage(t *testing.T) { name string args args wantErr bool - want *WalTransaction + want *WAL }{ { name: "empty data", @@ -487,9 +487,9 @@ func TestBinaryParser_ParseWalMessage(t *testing.T) { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 5, }, - tx: NewWalTransaction(logger, nil, metrics), + tx: NewWAL(logger, nil, metrics), }, - want: &WalTransaction{ + want: &WAL{ pool: nil, log: logger, LSN: 7, @@ -510,7 +510,7 @@ func TestBinaryParser_ParseWalMessage(t *testing.T) { 0, 0, 0, 0, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0, 0, }, - tx: &WalTransaction{ + tx: &WAL{ log: logger, LSN: 7, monitor: metrics, @@ -518,7 +518,7 @@ func TestBinaryParser_ParseWalMessage(t *testing.T) { RelationStore: make(map[int32]RelationData), }, }, - want: &WalTransaction{ + want: &WAL{ log: logger, LSN: 7, monitor: metrics, @@ -553,7 +553,7 @@ func TestBinaryParser_ParseWalMessage(t *testing.T) { 0, 0, 0, 23, 0, 0, 0, 1, }, - tx: &WalTransaction{ + tx: &WAL{ log: logger, LSN: 3, monitor: metrics, @@ -562,7 +562,7 @@ func TestBinaryParser_ParseWalMessage(t *testing.T) { RelationStore: make(map[int32]RelationData), }, }, - want: &WalTransaction{ + want: &WAL{ log: logger, monitor: metrics, LSN: 3, @@ -608,7 +608,7 @@ func TestBinaryParser_ParseWalMessage(t *testing.T) { 0, 0, 0, 6, 49, 48, }, - tx: &WalTransaction{ + tx: &WAL{ monitor: metrics, log: logger, LSN: 4, @@ -631,7 +631,7 @@ func TestBinaryParser_ParseWalMessage(t *testing.T) { }, }, }, - want: &WalTransaction{ + want: &WAL{ monitor: metrics, log: logger, LSN: 4, @@ -701,7 +701,7 @@ func TestBinaryParser_ParseWalMessage(t *testing.T) { 0, 0, 0, 2, 56, 48, }, - tx: &WalTransaction{ + tx: &WAL{ log: logger, monitor: metrics, LSN: 4, @@ -724,7 +724,7 @@ func TestBinaryParser_ParseWalMessage(t *testing.T) { }, }, }, - want: &WalTransaction{ + want: &WAL{ monitor: metrics, log: logger, LSN: 4, @@ -792,7 +792,7 @@ func TestBinaryParser_ParseWalMessage(t *testing.T) { 0, 0, 0, 2, 55, 55, }, - tx: &WalTransaction{ + tx: &WAL{ monitor: metrics, log: logger, LSN: 4, @@ -815,7 +815,7 @@ func TestBinaryParser_ParseWalMessage(t *testing.T) { }, }, }, - want: &WalTransaction{ + want: &WAL{ monitor: metrics, log: logger, LSN: 4, @@ -868,7 +868,7 @@ func TestBinaryParser_ParseWalMessage(t *testing.T) { 0, 0, 0, 2, 55, 55, }, - tx: &WalTransaction{ + tx: &WAL{ monitor: metrics, log: logger, LSN: 4, @@ -891,7 +891,7 @@ func TestBinaryParser_ParseWalMessage(t *testing.T) { }, }, }, - want: &WalTransaction{ + want: &WAL{ monitor: metrics, log: logger, LSN: 4, diff --git a/listener/pg_type.go b/internal/listener/transaction/pg_type.go similarity index 94% rename from listener/pg_type.go rename to internal/listener/transaction/pg_type.go index 7387c6bd..a9fcab2c 100644 --- a/listener/pg_type.go +++ b/internal/listener/transaction/pg_type.go @@ -1,4 +1,4 @@ -package listener +package transaction // PostgreSQL OIDs // https://github.com/postgres/postgres/blob/master/src/include/catalog/pg_type.dat diff --git a/listener/protocol.go b/internal/listener/transaction/protocol.go similarity index 99% rename from listener/protocol.go rename to internal/listener/transaction/protocol.go index 5bd08406..545dd264 100644 --- a/listener/protocol.go +++ b/internal/listener/transaction/protocol.go @@ -1,4 +1,4 @@ -package listener +package transaction import ( "time" diff --git a/internal/listener/transaction/wal.go b/internal/listener/transaction/wal.go new file mode 100644 index 00000000..675395ae --- /dev/null +++ b/internal/listener/transaction/wal.go @@ -0,0 +1,183 @@ +package transaction + +import ( + "context" + "errors" + "log/slog" + "strings" + "sync" + "time" + + "github.com/google/uuid" + + "github.com/ihippik/wal-listener/v2/internal/publisher" +) + +type monitor interface { + IncFilterSkippedEvents(table string) +} + +// WAL transaction specified WAL message. +type WAL struct { + log *slog.Logger + monitor monitor + LSN int64 + BeginTime *time.Time + CommitTime *time.Time + RelationStore map[int32]RelationData + Actions []ActionData + pool *sync.Pool +} + +var errRelationNotFound = errors.New("relation not found") + +// NewWAL create and initialize new WAL transaction. +func NewWAL(log *slog.Logger, pool *sync.Pool, monitor monitor) *WAL { + const aproxData = 300 + + return &WAL{ + pool: pool, + log: log, + monitor: monitor, + RelationStore: make(map[int32]RelationData), + Actions: make([]ActionData, 0, aproxData), + } +} + +// Clear transaction data. +func (w *WAL) Clear() { + w.CommitTime = nil + w.BeginTime = nil + w.Actions = nil +} + +func (w *WAL) RetrieveEvent(event *publisher.Event) { + w.pool.Put(event) +} + +func (w *WAL) getPoolEvent() *publisher.Event { + return w.pool.Get().(*publisher.Event) +} + +// CreateActionData create action from WAL message data. +func (w *WAL) CreateActionData( + relationID int32, + oldRows []TupleData, + newRows []TupleData, + kind ActionKind, +) (a ActionData, err error) { + rel, ok := w.RelationStore[relationID] + if !ok { + return a, errRelationNotFound + } + + a = ActionData{ + Schema: rel.Schema, + Table: rel.Table, + Kind: kind, + } + + oldColumns := make([]Column, 0, len(oldRows)) + + for num, row := range oldRows { + column := InitColumn( + w.log, + rel.Columns[num].name, + nil, + rel.Columns[num].valueType, + rel.Columns[num].isKey, + ) + + column.AssertValue(row.Value) + oldColumns = append(oldColumns, column) + } + + a.OldColumns = oldColumns + + newColumns := make([]Column, 0, len(newRows)) + + for num, row := range newRows { + column := InitColumn( + w.log, + rel.Columns[num].name, + nil, + rel.Columns[num].valueType, + rel.Columns[num].isKey, + ) + column.AssertValue(row.Value) + newColumns = append(newColumns, column) + } + + a.NewColumns = newColumns + + return a, nil +} + +// CreateEventsWithFilter filter WAL message by table, +// action and create events for each value. +func (w *WAL) CreateEventsWithFilter(ctx context.Context, tableMap map[string][]string) <-chan *publisher.Event { + output := make(chan *publisher.Event) + + go func(ctx context.Context) { + for _, item := range w.Actions { + if err := ctx.Err(); err != nil { + w.log.Debug("create events with filter: context canceled") + break + } + + dataOld := make(map[string]any, len(item.OldColumns)) + + for _, val := range item.OldColumns { + dataOld[val.name] = val.value + } + + data := make(map[string]any, len(item.NewColumns)) + + for _, val := range item.NewColumns { + data[val.name] = val.value + } + + event := w.getPoolEvent() + + event.ID = uuid.New() + event.Schema = item.Schema + event.Table = item.Table + event.Action = item.Kind.string() + event.Data = data + event.DataOld = dataOld + event.EventTime = *w.CommitTime + + actions, validTable := tableMap[item.Table] + + validAction := inArray(actions, item.Kind.string()) + if validTable && validAction { + output <- event + continue + } + + w.monitor.IncFilterSkippedEvents(item.Table) + + w.log.Debug( + "wal-message was skipped by filter", + slog.String("schema", item.Schema), + slog.String("table", item.Table), + slog.String("action", string(item.Kind)), + ) + } + + close(output) + }(ctx) + + return output +} + +// inArray checks whether the value is in an array. +func inArray(arr []string, value string) bool { + for _, v := range arr { + if strings.EqualFold(v, value) { + return true + } + } + + return false +} diff --git a/listener/wal_transaction_test.go b/internal/listener/transaction/wal_test.go similarity index 99% rename from listener/wal_transaction_test.go rename to internal/listener/transaction/wal_test.go index 353a4a17..710d45b7 100644 --- a/listener/wal_transaction_test.go +++ b/internal/listener/transaction/wal_test.go @@ -1,4 +1,4 @@ -package listener +package transaction import ( "io" @@ -135,7 +135,7 @@ func TestWalTransaction_CreateActionData(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - w := WalTransaction{ + w := WAL{ log: logger, LSN: tt.fields.LSN, BeginTime: tt.fields.BeginTime, diff --git a/publisher/event.go b/internal/publisher/event.go similarity index 93% rename from publisher/event.go rename to internal/publisher/event.go index 6af60aec..2ad90769 100644 --- a/publisher/event.go +++ b/internal/publisher/event.go @@ -5,7 +5,8 @@ import ( "time" "github.com/google/uuid" - "github.com/ihippik/wal-listener/v2/config" + + "github.com/ihippik/wal-listener/v2/internal/config" ) // Event structure for publishing to the NATS server. diff --git a/publisher/kafka.go b/internal/publisher/kafka.go similarity index 97% rename from publisher/kafka.go rename to internal/publisher/kafka.go index 1245c990..7285d259 100644 --- a/publisher/kafka.go +++ b/internal/publisher/kafka.go @@ -10,7 +10,7 @@ import ( "github.com/IBM/sarama" "github.com/goccy/go-json" - "github.com/ihippik/wal-listener/v2/config" + "github.com/ihippik/wal-listener/v2/internal/config" ) // KafkaPublisher represent event publisher with Kafka broker. diff --git a/publisher/nats.go b/internal/publisher/nats.go similarity index 100% rename from publisher/nats.go rename to internal/publisher/nats.go diff --git a/publisher/nats_test.go b/internal/publisher/nats_test.go similarity index 95% rename from publisher/nats_test.go rename to internal/publisher/nats_test.go index c2ba798a..eaeb40c5 100644 --- a/publisher/nats_test.go +++ b/internal/publisher/nats_test.go @@ -3,7 +3,7 @@ package publisher import ( "testing" - "github.com/ihippik/wal-listener/v2/config" + "github.com/ihippik/wal-listener/v2/internal/config" ) func TestEvent_GetSubjectName(t *testing.T) { diff --git a/publisher/pubsub.go b/internal/publisher/pubsub.go similarity index 100% rename from publisher/pubsub.go rename to internal/publisher/pubsub.go diff --git a/publisher/pubsub_connection.go b/internal/publisher/pubsub_connection.go similarity index 100% rename from publisher/pubsub_connection.go rename to internal/publisher/pubsub_connection.go diff --git a/publisher/rabbit.go b/internal/publisher/rabbit.go similarity index 97% rename from publisher/rabbit.go rename to internal/publisher/rabbit.go index c59aaa78..fa682330 100644 --- a/publisher/rabbit.go +++ b/internal/publisher/rabbit.go @@ -3,9 +3,9 @@ package publisher import ( "context" "fmt" + "github.com/ihippik/wal-listener/v2/internal/config" "github.com/goccy/go-json" - "github.com/ihippik/wal-listener/v2/config" "github.com/wagslane/go-rabbitmq" ) diff --git a/listener/errors.go b/listener/errors.go deleted file mode 100644 index d2a996e9..00000000 --- a/listener/errors.go +++ /dev/null @@ -1,14 +0,0 @@ -package listener - -import "errors" - -// Variable with connection errors. -var ( - errReplConnectionIsLost = errors.New("replication connection to postgres is lost") - errConnectionIsLost = errors.New("db connection to postgres is lost") - errMessageLost = errors.New("messages are lost") - errEmptyWALMessage = errors.New("empty WAL message") - errUnknownMessageType = errors.New("unknown message type") - errRelationNotFound = errors.New("relation not found") - errReplDidNotStart = errors.New("replication did not start") -) diff --git a/listener/wal_transaction.go b/listener/wal_transaction.go deleted file mode 100644 index 2aac107c..00000000 --- a/listener/wal_transaction.go +++ /dev/null @@ -1,280 +0,0 @@ -package listener - -import ( - "context" - "log/slog" - "strconv" - "strings" - "sync" - "time" - - "github.com/goccy/go-json" - "github.com/google/uuid" - - "github.com/ihippik/wal-listener/v2/publisher" -) - -// ActionKind kind of action on WAL message. -type ActionKind string - -// kind of WAL message. -const ( - ActionKindInsert ActionKind = "INSERT" - ActionKindUpdate ActionKind = "UPDATE" - ActionKindDelete ActionKind = "DELETE" -) - -type transactionMonitor interface { - IncFilterSkippedEvents(table string) -} - -// WalTransaction transaction specified WAL message. -type WalTransaction struct { - log *slog.Logger - monitor transactionMonitor - LSN int64 - BeginTime *time.Time - CommitTime *time.Time - RelationStore map[int32]RelationData - Actions []ActionData - pool *sync.Pool -} - -// NewWalTransaction create and initialize new WAL transaction. -func NewWalTransaction(log *slog.Logger, pool *sync.Pool, monitor transactionMonitor) *WalTransaction { - const aproxData = 300 - - return &WalTransaction{ - pool: pool, - log: log, - monitor: monitor, - RelationStore: make(map[int32]RelationData), - Actions: make([]ActionData, 0, aproxData), - } -} - -func (k ActionKind) string() string { - return string(k) -} - -// RelationData kind of WAL message data. -type RelationData struct { - Schema string - Table string - Columns []Column -} - -// ActionData kind of WAL message data. -type ActionData struct { - Schema string - Table string - Kind ActionKind - OldColumns []Column - NewColumns []Column -} - -// Column of the table with which changes occur. -type Column struct { - log *slog.Logger - name string - value any - valueType int - isKey bool -} - -// AssertValue converts bytes to a specific type depending -// on the type of this data in the database table. -func (c *Column) AssertValue(src []byte) { - var ( - val any - err error - ) - - if src == nil { - c.value = nil - return - } - - strSrc := string(src) - - const ( - timestampLayout = "2006-01-02 15:04:05" - timestampWithTZLayout = "2006-01-02 15:04:05.999999999-07" - ) - - switch c.valueType { - case BoolOID: - val, err = strconv.ParseBool(strSrc) - case Int2OID, Int4OID: - val, err = strconv.Atoi(strSrc) - case Int8OID: - val, err = strconv.ParseInt(strSrc, 10, 64) - case TextOID, VarcharOID: - val = strSrc - case TimestampOID: - val, err = time.Parse(timestampLayout, strSrc) - case TimestamptzOID: - val, err = time.ParseInLocation(timestampWithTZLayout, strSrc, time.UTC) - case DateOID, TimeOID: - val = strSrc - case UUIDOID: - val, err = uuid.Parse(strSrc) - case JSONBOID: - var m any - - if src[0] == '[' { - m = make([]any, 0) - } else { - m = make(map[string]any) - } - - err = json.Unmarshal(src, &m) - val = m - default: - c.log.Debug( - "unknown oid type", - slog.Int("pg_type", c.valueType), - slog.String("column_name", c.name), - ) - - val = strSrc - } - - if err != nil { - c.log.Error( - "column data parse error", - slog.String("err", err.Error()), - slog.Int("pg_type", c.valueType), - slog.String("column_name", c.name), - ) - } - - c.value = val -} - -// Clear transaction data. -func (w *WalTransaction) Clear() { - w.CommitTime = nil - w.BeginTime = nil - w.Actions = nil -} - -// CreateActionData create action from WAL message data. -func (w *WalTransaction) CreateActionData( - relationID int32, - oldRows []TupleData, - newRows []TupleData, - kind ActionKind, -) (a ActionData, err error) { - rel, ok := w.RelationStore[relationID] - if !ok { - return a, errRelationNotFound - } - - a = ActionData{ - Schema: rel.Schema, - Table: rel.Table, - Kind: kind, - } - - oldColumns := make([]Column, 0, len(oldRows)) - - for num, row := range oldRows { - column := Column{ - log: w.log, - name: rel.Columns[num].name, - valueType: rel.Columns[num].valueType, - isKey: rel.Columns[num].isKey, - } - - column.AssertValue(row.Value) - oldColumns = append(oldColumns, column) - } - - a.OldColumns = oldColumns - - newColumns := make([]Column, 0, len(newRows)) - - for num, row := range newRows { - column := Column{ - log: w.log, - name: rel.Columns[num].name, - valueType: rel.Columns[num].valueType, - isKey: rel.Columns[num].isKey, - } - column.AssertValue(row.Value) - newColumns = append(newColumns, column) - } - - a.NewColumns = newColumns - - return a, nil -} - -// CreateEventsWithFilter filter WAL message by table, -// action and create events for each value. -func (w *WalTransaction) CreateEventsWithFilter(ctx context.Context, tableMap map[string][]string) <-chan *publisher.Event { - output := make(chan *publisher.Event) - - go func(ctx context.Context) { - for _, item := range w.Actions { - if err := ctx.Err(); err != nil { - w.log.Debug("create events with filter: context canceled") - break - } - - dataOld := make(map[string]any, len(item.OldColumns)) - - for _, val := range item.OldColumns { - dataOld[val.name] = val.value - } - - data := make(map[string]any, len(item.NewColumns)) - - for _, val := range item.NewColumns { - data[val.name] = val.value - } - - event := w.pool.Get().(*publisher.Event) - event.ID = uuid.New() - event.Schema = item.Schema - event.Table = item.Table - event.Action = item.Kind.string() - event.Data = data - event.DataOld = dataOld - event.EventTime = *w.CommitTime - - actions, validTable := tableMap[item.Table] - - validAction := inArray(actions, item.Kind.string()) - if validTable && validAction { - output <- event - continue - } - - w.monitor.IncFilterSkippedEvents(item.Table) - - w.log.Debug( - "wal-message was skipped by filter", - slog.String("schema", item.Schema), - slog.String("table", item.Table), - slog.String("action", string(item.Kind)), - ) - } - - close(output) - }(ctx) - - return output -} - -// inArray checks whether the value is in an array. -func inArray(arr []string, value string) bool { - for _, v := range arr { - if strings.EqualFold(v, value) { - return true - } - } - - return false -}