Skip to content

Commit

Permalink
Some fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
ihippik committed Sep 20, 2024
1 parent bd1d1d0 commit f00d5a6
Show file tree
Hide file tree
Showing 7 changed files with 20 additions and 16 deletions.
2 changes: 1 addition & 1 deletion cmd/wal-listener/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package main
import (
"context"
"fmt"
"log/slog"

"github.com/jackc/pgx"
"github.com/nats-io/nats.go"
"log/slog"

"github.com/ihippik/wal-listener/v2/internal/config"
"github.com/ihippik/wal-listener/v2/internal/publisher"
Expand Down
21 changes: 10 additions & 11 deletions internal/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"golang.org/x/sync/errgroup"

"github.com/ihippik/wal-listener/v2/internal/config"
"github.com/ihippik/wal-listener/v2/internal/listener/transaction"
tx "github.com/ihippik/wal-listener/v2/internal/listener/transaction"
"github.com/ihippik/wal-listener/v2/internal/publisher"
)

Expand All @@ -29,7 +29,7 @@ type eventPublisher interface {
}

type parser interface {
ParseWalMessage([]byte, *transaction.WAL) error
ParseWalMessage([]byte, *tx.WAL) error
}

type replication interface {
Expand Down Expand Up @@ -71,7 +71,6 @@ type Listener struct {
isAlive atomic.Bool
}

// Variable with connection errors.
var (
errReplConnectionIsLost = errors.New("replication connection to postgres is lost")
errConnectionIsLost = errors.New("db connection to postgres is lost")
Expand Down Expand Up @@ -319,7 +318,7 @@ func (l *Listener) Stream(ctx context.Context) error {
},
}

tx := transaction.NewWAL(l.log, pool, l.monitor)
txWAL := tx.NewWAL(l.log, pool, l.monitor)

for {
if err := ctx.Err(); err != nil {
Expand All @@ -337,29 +336,29 @@ func (l *Listener) Stream(ctx context.Context) error {
continue
}

if err = l.processMessage(ctx, msg, tx); err != nil {
if err = l.processMessage(ctx, msg, txWAL); err != nil {
return fmt.Errorf("process message: %w", err)
}

l.processHeartBeat(msg)
}
}

func (l *Listener) processMessage(ctx context.Context, msg *pgx.ReplicationMessage, tx *transaction.WAL) error {
func (l *Listener) processMessage(ctx context.Context, msg *pgx.ReplicationMessage, txWAL *tx.WAL) error {
if msg.WalMessage == nil {
l.log.Debug("empty wal-message")
return nil
}

l.log.Debug("WAL message has been received", slog.Uint64("wal", msg.WalMessage.WalStart))

if err := l.parser.ParseWalMessage(msg.WalMessage.WalData, tx); err != nil {
if err := l.parser.ParseWalMessage(msg.WalMessage.WalData, txWAL); err != nil {
l.monitor.IncProblematicEvents(problemKindParse)
return fmt.Errorf("parse: %w", err)
}

if tx.CommitTime != nil {
for event := range tx.CreateEventsWithFilter(ctx, l.cfg.Listener.Filter.Tables) {
if txWAL.CommitTime != nil {
for event := range txWAL.CreateEventsWithFilter(ctx, l.cfg.Listener.Filter.Tables) {
subjectName := event.SubjectName(l.cfg)

if err := l.publisher.Publish(ctx, subjectName, event); err != nil {
Expand All @@ -377,10 +376,10 @@ func (l *Listener) processMessage(ctx context.Context, msg *pgx.ReplicationMessa
slog.Uint64("lsn", l.readLSN()),
)

tx.RetrieveEvent(event)
txWAL.RetrieveEvent(event)
}

tx.Clear()
txWAL.Clear()
}

if msg.WalMessage.WalStart > l.readLSN() {
Expand Down
3 changes: 2 additions & 1 deletion internal/listener/publisher_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package listener

import (
"context"
"github.com/ihippik/wal-listener/v2/internal/publisher"

"github.com/stretchr/testify/mock"

"github.com/ihippik/wal-listener/v2/internal/publisher"
)

type publisherMock struct {
Expand Down
1 change: 1 addition & 0 deletions internal/listener/transaction/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/google/uuid"

"github.com/ihippik/wal-listener/v2/internal/publisher"
)

Expand Down
3 changes: 2 additions & 1 deletion internal/publisher/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package publisher

import (
"fmt"
"github.com/ihippik/wal-listener/v2/internal/config"
"time"

"github.com/google/uuid"

"github.com/ihippik/wal-listener/v2/internal/config"
)

// Event structure for publishing to the NATS server.
Expand Down
3 changes: 2 additions & 1 deletion internal/publisher/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ import (
"crypto/tls"
"crypto/x509"
"fmt"
"github.com/ihippik/wal-listener/v2/internal/config"
"os"

"github.com/IBM/sarama"
"github.com/goccy/go-json"

"github.com/ihippik/wal-listener/v2/internal/config"
)

// KafkaPublisher represent event publisher with Kafka broker.
Expand Down
3 changes: 2 additions & 1 deletion internal/publisher/nats_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package publisher

import (
"github.com/ihippik/wal-listener/v2/internal/config"
"testing"

"github.com/ihippik/wal-listener/v2/internal/config"
)

func TestEvent_GetSubjectName(t *testing.T) {
Expand Down

0 comments on commit f00d5a6

Please sign in to comment.