Skip to content

Commit a3431e5

Browse files
committed
feat: add dynamic module registration
1 parent c78013e commit a3431e5

File tree

7 files changed

+192
-105
lines changed

7 files changed

+192
-105
lines changed

cmd/config.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,24 @@ import (
66
"github.com/spf13/viper"
77
)
88

9-
func LoadConfig[V any](cmd *cobra.Command) (*V, error){
9+
func LoadConfig[V any](cmd *cobra.Command) (*V, error) {
10+
var cfg V
11+
if err := MapConfig(cmd, &cfg); err != nil {
12+
return nil, err
13+
}
14+
15+
return &cfg, nil
16+
}
17+
18+
func MapConfig(cmd *cobra.Command, cfg any) error {
1019
v := viper.New()
1120
if err := v.BindPFlags(cmd.Flags()); err != nil {
12-
return nil, fmt.Errorf("binding flags: %w", err)
21+
return fmt.Errorf("binding flags: %w", err)
1322
}
1423

15-
var cfg V
16-
if err := v.Unmarshal(&cfg); err != nil {
17-
return nil, fmt.Errorf("unmarshalling config: %w", err)
24+
if err := v.Unmarshal(cfg); err != nil {
25+
return fmt.Errorf("unmarshalling config: %w", err)
1826
}
1927

20-
return &cfg, nil
28+
return nil
2129
}

cmd/serve.go

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,6 @@ import (
3838
)
3939

4040
type ServeConfig struct {
41-
WorkerConfiguration `mapstructure:",squash"`
42-
4341
Bind string `mapstructure:"bind"`
4442
BallastSizeInBytes uint `mapstructure:"ballast-size"`
4543
NumscriptCacheMaxCount uint `mapstructure:"numscript-cache-max-count"`
@@ -125,15 +123,15 @@ func NewServeCommand() *cobra.Command {
125123
}),
126124
fx.Decorate(func(
127125
params struct {
128-
fx.In
126+
fx.In
129127

130-
Handler chi.Router
131-
HealthController *health.HealthController
132-
Logger logging.Logger
128+
Handler chi.Router
129+
HealthController *health.HealthController
130+
Logger logging.Logger
133131

134-
MeterProvider *metric.MeterProvider `optional:"true"`
135-
Exporter *otlpmetrics.InMemoryExporter `optional:"true"`
136-
},
132+
MeterProvider *metric.MeterProvider `optional:"true"`
133+
Exporter *otlpmetrics.InMemoryExporter `optional:"true"`
134+
},
137135
) chi.Router {
138136
return assembleFinalRouter(
139137
service.IsDebug(cmd),
@@ -150,10 +148,13 @@ func NewServeCommand() *cobra.Command {
150148
}
151149

152150
if cfg.WorkerEnabled {
153-
options = append(options, worker.NewFXModule(worker.ModuleConfig{
154-
Schedule: cfg.HashLogsBlockCRONSpec,
155-
MaxBlockSize: cfg.HashLogsBlockMaxSize,
156-
}))
151+
workerModule, err := worker.NewFXModule(func(v any) error {
152+
return MapConfig(cmd, v)
153+
})
154+
if err != nil {
155+
return fmt.Errorf("creating worker module: %w", err)
156+
}
157+
options = append(options, workerModule)
157158
}
158159

159160
return service.New(cmd.OutOrStdout(), options...).Run(cmd)

cmd/worker.go

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -11,21 +11,37 @@ import (
1111
"github.com/formancehq/ledger/internal/worker"
1212
"github.com/spf13/cobra"
1313
"go.uber.org/fx"
14+
"reflect"
15+
"strconv"
1416
)
1517

16-
const (
17-
WorkerAsyncBlockHasherMaxBlockSizeFlag = "worker-async-block-hasher-max-block-size"
18-
WorkerAsyncBlockHasherScheduleFlag = "worker-async-block-hasher-schedule"
19-
)
18+
func addWorkerFlags(cmd *cobra.Command) {
19+
for _, runnerFactory := range worker.AllRunnerFactories() {
20+
typeOfRunnerFactory := reflect.TypeOf(runnerFactory)
21+
method, _ := typeOfRunnerFactory.MethodByName("CreateRunner")
22+
configType := method.Type.In(1)
2023

21-
type WorkerConfiguration struct {
22-
HashLogsBlockMaxSize int `mapstructure:"worker-async-block-hasher-max-block-size"`
23-
HashLogsBlockCRONSpec string `mapstructure:"worker-async-block-hasher-schedule"`
24-
}
24+
for i := 0; i < configType.NumField(); i++ {
25+
field := configType.Field(i)
26+
fieldTag := field.Tag
27+
flag := field.Tag.Get("mapstructure")
28+
description := fieldTag.Get("description")
29+
defaultValue := fieldTag.Get("default")
2530

26-
func addWorkerFlags(cmd *cobra.Command) {
27-
cmd.Flags().Int(WorkerAsyncBlockHasherMaxBlockSizeFlag, 1000, "Max block size")
28-
cmd.Flags().String(WorkerAsyncBlockHasherScheduleFlag, "0 * * * * *", "Schedule")
31+
switch field.Type.Kind() {
32+
case reflect.Int, reflect.Int64, reflect.Int32, reflect.Int16, reflect.Int8:
33+
defaultValue, err := strconv.ParseInt(defaultValue, 10, 64)
34+
if err != nil {
35+
panic(err)
36+
}
37+
cmd.Flags().Int(flag, int(defaultValue), description)
38+
case reflect.String:
39+
cmd.Flags().String(flag, defaultValue, description)
40+
default:
41+
panic(fmt.Sprintf("cannot config flag %s as type %T is not handled", flag, field.Type))
42+
}
43+
}
44+
}
2945
}
3046

3147
func NewWorkerCommand() *cobra.Command {
@@ -38,9 +54,11 @@ func NewWorkerCommand() *cobra.Command {
3854
return err
3955
}
4056

41-
cfg, err := LoadConfig[WorkerConfiguration](cmd)
57+
workerModule, err := worker.NewFXModule(func(v any) error {
58+
return MapConfig(cmd, v)
59+
})
4260
if err != nil {
43-
return fmt.Errorf("loading config: %w", err)
61+
return fmt.Errorf("creating worker module: %w", err)
4462
}
4563

4664
return service.New(cmd.OutOrStdout(),
@@ -50,10 +68,7 @@ func NewWorkerCommand() *cobra.Command {
5068
otlpmetrics.FXModuleFromFlags(cmd),
5169
bunconnect.Module(*connectionOptions, service.IsDebug(cmd)),
5270
storage.NewFXModule(storage.ModuleConfig{}),
53-
worker.NewFXModule(worker.ModuleConfig{
54-
MaxBlockSize: cfg.HashLogsBlockMaxSize,
55-
Schedule: cfg.HashLogsBlockCRONSpec,
56-
}),
71+
workerModule,
5772
).Run(cmd)
5873
},
5974
}

internal/worker/async_block.go renamed to internal/storage/async_block.go

Lines changed: 54 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package worker
1+
package storage
22

33
import (
44
"context"
@@ -19,16 +19,17 @@ import (
1919
)
2020

2121
type AsyncBlockRunnerConfig struct {
22-
MaxBlockSize int
23-
Schedule cron.Schedule
22+
HashLogsBlockMaxSize int `mapstructure:"worker-async-block-hasher-max-block-size" description:"Max block size" default:"1000"`
23+
HashLogsBlockCRONSpec string `mapstructure:"worker-async-block-hasher-schedule" description:"Schedule" default:"0 * * * * *"`
2424
}
2525

2626
type AsyncBlockRunner struct {
27-
stopChannel chan chan struct{}
28-
logger logging.Logger
29-
db *bun.DB
30-
cfg AsyncBlockRunnerConfig
31-
tracer trace.Tracer
27+
stopChannel chan chan struct{}
28+
logger logging.Logger
29+
db *bun.DB
30+
tracer trace.Tracer
31+
maxBlockSize int
32+
schedule cron.Schedule
3233
}
3334

3435
func (r *AsyncBlockRunner) Name() string {
@@ -38,7 +39,7 @@ func (r *AsyncBlockRunner) Name() string {
3839
func (r *AsyncBlockRunner) Run(ctx context.Context) error {
3940

4041
now := time.Now()
41-
next := r.cfg.Schedule.Next(now).Sub(now)
42+
next := r.schedule.Next(now).Sub(now)
4243

4344
for {
4445
select {
@@ -48,7 +49,7 @@ func (r *AsyncBlockRunner) Run(ctx context.Context) error {
4849
}
4950

5051
now = time.Now()
51-
next = r.cfg.Schedule.Next(now).Sub(now)
52+
next = r.schedule.Next(now).Sub(now)
5253
case ch := <-r.stopChannel:
5354
close(ch)
5455
return nil
@@ -106,17 +107,24 @@ func (r *AsyncBlockRunner) processLedger(ctx context.Context, l ledger.Ledger) e
106107
var err error
107108
_, err = r.db.NewRaw(fmt.Sprintf(`
108109
call "%s".create_blocks(?, ?)
109-
`, l.Bucket), l.Name, r.cfg.MaxBlockSize).
110+
`, l.Bucket), l.Name, r.maxBlockSize).
110111
Exec(ctx)
111112
return err
112113
}
113114

114-
func NewAsyncBlockRunner(logger logging.Logger, db *bun.DB, cfg AsyncBlockRunnerConfig, opts ...Option) *AsyncBlockRunner {
115+
func NewAsyncBlockRunner(
116+
logger logging.Logger,
117+
db *bun.DB,
118+
schedule cron.Schedule,
119+
maxBlockSize int,
120+
opts ...Option,
121+
) *AsyncBlockRunner {
115122
ret := &AsyncBlockRunner{
116-
stopChannel: make(chan chan struct{}),
117-
logger: logger,
118-
db: db,
119-
cfg: cfg,
123+
stopChannel: make(chan chan struct{}),
124+
logger: logger,
125+
db: db,
126+
schedule: schedule,
127+
maxBlockSize: maxBlockSize,
120128
}
121129

122130
for _, opt := range append(defaultOptions, opts...) {
@@ -137,3 +145,33 @@ func WithTracer(tracer trace.Tracer) Option {
137145
var defaultOptions = []Option{
138146
WithTracer(noop.Tracer{}),
139147
}
148+
149+
type AsyncBlockRunnerFactory struct{}
150+
151+
func (f *AsyncBlockRunnerFactory) CreateRunner(config AsyncBlockRunnerConfig) (any, error) {
152+
return func(
153+
logger logging.Logger,
154+
db *bun.DB,
155+
traceProvider trace.TracerProvider,
156+
) (Runner, error) {
157+
parser := cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow)
158+
schedule, err := parser.Parse(config.HashLogsBlockCRONSpec)
159+
if err != nil {
160+
return nil, err
161+
}
162+
163+
return NewAsyncBlockRunner(
164+
logger,
165+
db,
166+
schedule,
167+
config.HashLogsBlockMaxSize,
168+
WithTracer(traceProvider.Tracer("AsyncBlockRunner")),
169+
), nil
170+
}, nil
171+
}
172+
173+
var _ RunnerFactory[AsyncBlockRunnerConfig] = (*AsyncBlockRunnerFactory)(nil)
174+
175+
func init() {
176+
RegisterRunnerFactory(&AsyncBlockRunnerFactory{})
177+
}

internal/worker/fx.go

Lines changed: 0 additions & 50 deletions
This file was deleted.

internal/worker/module.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package worker
2+
3+
import (
4+
"context"
5+
"go.uber.org/fx"
6+
"reflect"
7+
)
8+
9+
func NewFXModule(configLoader func(v any) error) (fx.Option, error) {
10+
11+
options := make([]fx.Option, 0)
12+
for _, factory := range runnerFactories {
13+
vFactory := reflect.TypeOf(factory)
14+
method, _ := vFactory.MethodByName("CreateRunner")
15+
configType := reflect.New(method.Type.In(1)).Interface()
16+
if err := configLoader(configType); err != nil {
17+
return nil, err
18+
}
19+
20+
ret := reflect.ValueOf(factory).
21+
MethodByName("CreateRunner").
22+
Call([]reflect.Value{
23+
reflect.ValueOf(configType).Elem(),
24+
})
25+
if ret[1].Interface() != nil {
26+
return nil, ret[1].Interface().(error)
27+
}
28+
29+
runnerConstructor := ret[0].Interface()
30+
31+
options = append(options, fx.Provide(
32+
fx.Annotate(runnerConstructor, fx.ResultTags(`group:"runners"`)),
33+
))
34+
}
35+
36+
options = append(options,
37+
fx.Invoke(fx.Annotate(func(runners []Runner, lc fx.Lifecycle) {
38+
for _, runner := range runners {
39+
lc.Append(fx.Hook{
40+
OnStart: func(ctx context.Context) error {
41+
go func() {
42+
if err := runner.Run(context.WithoutCancel(ctx)); err != nil {
43+
panic(err)
44+
}
45+
}()
46+
return nil
47+
},
48+
OnStop: runner.Stop,
49+
})
50+
}
51+
}, fx.ParamTags(`group:"runners"`))),
52+
)
53+
54+
return fx.Options(options...), nil
55+
}
56+
57+
type Runner interface {
58+
Run(ctx context.Context) error
59+
Stop(ctx context.Context) error
60+
}
61+
62+
type RunnerFactory[Config any] interface {
63+
// CreateRunner returns a constructor for the runner
64+
// It should be passable to fx
65+
CreateRunner(config Config) (any, error)
66+
}
67+
68+
var runnerFactories = make([]any, 0)
69+
70+
func AllRunnerFactories() []any {
71+
return runnerFactories
72+
}
73+
74+
func RegisterRunnerFactory[Config any](factory RunnerFactory[Config]) {
75+
runnerFactories = append(runnerFactories, factory)
76+
}

0 commit comments

Comments
 (0)