Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@

# Dependency directories (remove the comment below to include it)
# vendor/
./config.yml
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ COPY --from=builder /etc/passwd /etc/passwd
COPY --from=builder /etc/group /etc/group

COPY --from=builder /app .
COPY /config.yml .

USER goof:goof

Expand Down
21 changes: 0 additions & 21 deletions cmd/wal-listener/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"os"

"github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"

Expand Down Expand Up @@ -45,25 +44,6 @@ func main() {

logger := initLogger(cfg.Logger, version)

initSentry(cfg.Monitoring.SentryDSN, logger)

go initMetrics(cfg.Monitoring.PromAddr, logger)

natsConn, err := nats.Connect(cfg.Nats.Address)
if err != nil {
return fmt.Errorf("nats connection: %w", err)
}
defer natsConn.Close()

js, err := natsConn.JetStream()
if err != nil {
return fmt.Errorf("jet stream: %w", err)
}

if err := createStream(logger, js, cfg.Nats.StreamName); err != nil {
return fmt.Errorf("create Nats stream: %w", err)
}

conn, rConn, err := initPgxConnections(cfg.Database)
if err != nil {
return fmt.Errorf("pgx connection: %w", err)
Expand All @@ -74,7 +54,6 @@ func main() {
logger,
listener.NewRepository(conn),
rConn,
listener.NewNatsPublisher(js),
listener.NewBinaryParser(binary.BigEndian),
)

Expand Down
Empty file added config.yml
Empty file.
29 changes: 0 additions & 29 deletions config_example.yml

This file was deleted.

849 changes: 849 additions & 0 deletions go.sum

Large diffs are not rendered by default.

11 changes: 4 additions & 7 deletions listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ type Listener struct {
log *logrus.Entry
mu sync.RWMutex
slotName string
publisher publisher
replicator replication
repository repository
parser parser
Expand Down Expand Up @@ -83,14 +82,12 @@ func NewWalListener(
log *logrus.Entry,
repo repository,
repl replication,
publ publisher,
parser parser,
) *Listener {
return &Listener{
log: log,
slotName: cfg.Listener.SlotName,
cfg: cfg,
publisher: publ,
repository: repo,
replicator: repl,
parser: parser,
Expand Down Expand Up @@ -250,10 +247,10 @@ func (l *Listener) Stream(ctx context.Context) {
for _, event := range natsEvents {
subjectName := event.SubjectName(l.cfg)

if err = l.publisher.Publish(subjectName, event); err != nil {
l.errChannel <- fmt.Errorf("publish message: %w", err)
continue
}
// if err = l.publisher.Publish(subjectName, event); err != nil {
// l.errChannel <- fmt.Errorf("publish message: %w", err)
// continue
// }

publishedEvents.With(prometheus.Labels{"subject": subjectName, "table": event.Table}).Inc()

Expand Down
18 changes: 9 additions & 9 deletions listener/nats_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"fmt"
"time"

"github.com/goccy/go-json"
// "github.com/goccy/go-json"
"github.com/google/uuid"
"github.com/nats-io/nats.go"

Expand All @@ -29,14 +29,14 @@ type Event struct {

// Publish serializes the event and publishes it on the bus.
func (n NatsPublisher) Publish(subject string, event Event) error {
msg, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("marshal err: %w", err)
}

if _, err := n.js.Publish(subject, msg); err != nil {
return fmt.Errorf("failed to publish: %w", err)
}
// msg, err := json.Marshal(event)
// if err != nil {
// return fmt.Errorf("marshal err: %w", err)
// }

// if _, err := n.js.Publish(subject, msg); err != nil {
// return fmt.Errorf("failed to publish: %w", err)
// }

return nil
}
Expand Down
10 changes: 6 additions & 4 deletions listener/wal_transaction.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package listener

import (
"strconv"
"strings"
"time"

"fmt"
"github.com/goccy/go-json"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"strconv"
"strings"
"time"
)

// ActionKind kind of action on WAL message.
Expand Down Expand Up @@ -208,6 +208,8 @@ func (w *WalTransaction) CreateEventsWithFilter(tableMap map[string][]string) []
}

filterSkippedEvents.With(prometheus.Labels{"table": item.Table}).Inc()
bs, _ := json.Marshal(event.Data)
fmt.Println(string(bs))

logrus.WithFields(
logrus.Fields{
Expand Down