Skip to content

Commit

Permalink
Added: dirty bot implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
capcom6 committed Dec 27, 2023
1 parent 26ccc5f commit 59c921a
Show file tree
Hide file tree
Showing 17 changed files with 309 additions and 139 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ monitor:
CONFIG_PATH=./configs/monitor.yml go run ./cmd/monitor/main.go

bot:
go run ./cmd/bot/main.go
CONFIG_PATH=./configs/bot.yml go run ./cmd/bot/main.go

init-dev:
go mod download \
Expand Down
95 changes: 90 additions & 5 deletions internal/botx/app.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,103 @@
package botx

import (
"fmt"
"context"
"sync"

"github.com/capcom6/go-infra-fx/logger"
"github.com/capcom6/service-monitor-tgbot/internal/botx/config"
"github.com/capcom6/service-monitor-tgbot/internal/botx/telegram"
"github.com/capcom6/service-monitor-tgbot/pkg/eventbus"
"github.com/capcom6/service-monitor-tgbot/pkg/events"
"go.uber.org/fx"
"go.uber.org/fx/fxevent"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

func Run() {
module := fx.Module(
"bot",
fx.Invoke(func() {
fmt.Println("Bot")
}),
logger.Module,
config.Module,
telegram.Module,
eventbus.Module,
fx.Provide(NewMessages),
fx.Invoke(start),
)

fx.New(module).Run()
fx.New(
module,
fx.WithLogger(func(logger *zap.Logger) fxevent.Logger {
logOption := fxevent.ZapLogger{Logger: logger}
logOption.UseLogLevel(zapcore.DebugLevel)
return &logOption
}),
).Run()
}

func start(eventbus eventbus.EventBus, telegram *telegram.TelegramBot, messages *Messages, logger *zap.Logger, lc fx.Lifecycle) error {
ctx, cancel := context.WithCancel(context.Background())

wg := &sync.WaitGroup{}

lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
cancel()
wg.Wait()
return nil
},
})

ch, err := eventbus.Receive(ctx)
if err != nil {
return err
}

wg.Add(1)
go func() {
defer wg.Done()

logger.Debug("start receive events")
defer logger.Debug("stop receive events")

event := events.Event[events.ServiceStateChanged]{}
for payload := range ch {
if err := event.Decode(payload); err != nil {
continue
}
if event.Name != events.EventNameServiceStateChanged {
continue
}

v := event.Payload

msg := ""
if v.State == events.ServiceStateOffline {
context := OfflineContext{
OnlineContext: OnlineContext{
Name: telegram.EscapeText(v.Name),
},
Error: telegram.EscapeText(v.Error),
}
msg, err = messages.Render(TemplateOffline, context)
} else {
context := OnlineContext{
Name: telegram.EscapeText(v.Name),
}
msg, err = messages.Render(TemplateOnline, context)
}

if err != nil {
logger.Error("can't render template", zap.Error(err))
continue
}

if err := telegram.SendMessage(msg); err != nil {
logger.Error("can't send message", zap.Error(err))
}
}
}()

return nil
}
5 changes: 5 additions & 0 deletions internal/botx/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package config

type Config struct {
Telegram Telegram `yaml:"telegram"`
EventBus EventBus `yaml:"eventBus"`
}

type Telegram struct {
Expand All @@ -13,3 +14,7 @@ type Telegram struct {
}

type TelegramMessages map[string]string

type EventBus struct {
DSN string `yaml:"dsn" envconfig:"EVENTBUS__DSN" validate:"required"`
}
3 changes: 3 additions & 0 deletions internal/botx/config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,8 @@ var (

defaultConfig Config = Config{
Telegram: Telegram{Token: "", ChatID: 0, WebhookURL: "", Debug: false, Messages: defaultTelegramMessages},
EventBus: EventBus{
DSN: "redis://localhost:6379/0?channel=events",
},
}
)
6 changes: 6 additions & 0 deletions internal/botx/config/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package config

import (
"github.com/capcom6/go-infra-fx/config"
"github.com/capcom6/service-monitor-tgbot/pkg/eventbus"
"go.uber.org/fx"
"go.uber.org/zap"
)
Expand All @@ -20,4 +21,9 @@ var Module = fx.Module(
fx.Provide(func(cfg Config) TelegramMessages {
return cfg.Telegram.Messages
}),
fx.Provide(func(cfg Config) eventbus.Config {
return eventbus.Config{
DSN: cfg.EventBus.DSN,
}
}),
)
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package infrastructure
package telegram

import (
"go.uber.org/fx"
"go.uber.org/zap"
)

var Module = fx.Module(
"infrastructure",
"telegram",
fx.Decorate(func(log *zap.Logger) *zap.Logger {
return log.Named("infrastructure")
return log.Named("telegram")
}),
fx.Provide(NewTelegramBot),
)
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package infrastructure
package telegram

import (
"github.com/capcom6/service-monitor-tgbot/internal/botx/config"
Expand Down Expand Up @@ -42,35 +42,3 @@ func (b *TelegramBot) SendMessage(text string) error {
func (b *TelegramBot) EscapeText(text string) string {
return tgbotapi.EscapeText(tgbotapi.ModeMarkdownV2, text)
}

// func (b *TelegramBot) Api() (*tgbotapi.BotAPI, error) {
// b.mux.Lock()
// defer b.mux.Unlock()

// if b.api != nil {
// return b.api, nil
// }

// api, err := tgbotapi.NewBotAPI(b.Config.Token)
// if err != nil {
// return nil, err
// }

// api.Debug = b.Config.Debug
// b.api = api

// return api, nil
// }

// func (b *TelegramBot) Listen(ctx context.Context) (tgbotapi.UpdatesChannel, error) {
// u := tgbotapi.NewUpdate(0)
// u.Timeout = 60
// u.AllowedUpdates = []string{"message", "callback_query"}

// go func() {
// <-ctx.Done()
// b.api.StopReceivingUpdates()
// }()

// return b.api.GetUpdatesChan(u), nil
// }
88 changes: 49 additions & 39 deletions internal/monitorx/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ import (

"github.com/capcom6/go-infra-fx/logger"
"github.com/capcom6/service-monitor-tgbot/internal/monitorx/config"
"github.com/capcom6/service-monitor-tgbot/internal/monitorx/eventbus"
"github.com/capcom6/service-monitor-tgbot/internal/monitorx/monitor"
"github.com/capcom6/service-monitor-tgbot/internal/monitorx/storage"
"github.com/capcom6/service-monitor-tgbot/pkg/eventbus"
"github.com/capcom6/service-monitor-tgbot/pkg/events"
"go.uber.org/fx"
"go.uber.org/fx/fxevent"
"go.uber.org/zap"
Expand All @@ -18,49 +19,13 @@ import (

func Run() {
module := fx.Module(
"bot",
"monitor",
logger.Module,
config.Module,
monitor.Module,
storage.Module,
eventbus.Module,
fx.Invoke(func(lc fx.Lifecycle, logger *zap.Logger, monitorMod *monitor.MonitorModule, eventbus eventbus.EventBus) error {
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
logger.Info("Started")
return nil
},
OnStop: func(ctx context.Context) error {
cancel()
wg.Wait()
logger.Info("Stopped")
return nil
},
})

ch, err := monitorMod.Monitor(ctx)
if err != nil {
return err
}

wg.Add(1)
go func() {
defer wg.Done()
for v := range ch {
logger.Debug("probe", zap.Any("state", v))

ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
if err := eventbus.Send(ctx, v); err != nil {
logger.Error("failed to send event", zap.Error(err))
}
cancel()
}
}()

return nil
}),
fx.Invoke(start),
)

fx.New(
Expand All @@ -72,3 +37,48 @@ func Run() {
}),
).Run()
}

func start(lc fx.Lifecycle, logger *zap.Logger, monitorMod *monitor.MonitorModule, eventbus eventbus.EventBus) error {
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
logger.Info("Started")
return nil
},
OnStop: func(ctx context.Context) error {
cancel()
wg.Wait()
logger.Info("Stopped")
return nil
},
})

ch, err := monitorMod.Monitor(ctx)
if err != nil {
return err
}

wg.Add(1)
go func() {
defer wg.Done()
for v := range ch {
logger.Debug("probe", zap.Any("state", v))

event := events.NewServiceStateChangedEvent(v.Id, v.Name, string(v.State), v.Error)
payload, err := event.Encode()
if err != nil {
logger.Error("failed to encode event", zap.Error(err))
continue
}

ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
if err := eventbus.Send(ctx, payload); err != nil {
logger.Error("failed to send event", zap.Error(err))
}
cancel()
}
}()

return nil
}
4 changes: 2 additions & 2 deletions internal/monitorx/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ type Config struct {
}

type Storage struct {
DSN string `yaml:"dsn"`
DSN string `yaml:"dsn" envconfig:"STORAGE__DSN" validate:"required"`
}

type EventBus struct {
DSN string `yaml:"dsn"`
DSN string `yaml:"dsn" envconfig:"EVENTBUS__DSN" validate:"required"`
}
5 changes: 3 additions & 2 deletions internal/monitorx/config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package config

var (
defaultConfig Config = Config{
Storage: Storage{
DSN: "file://./config.yaml",
Storage: Storage{DSN: "file://./config.yaml"},
EventBus: EventBus{
DSN: "redis://localhost:6379/0?channel=events",
},
}
)
2 changes: 1 addition & 1 deletion internal/monitorx/config/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package config

import (
"github.com/capcom6/go-infra-fx/config"
"github.com/capcom6/service-monitor-tgbot/internal/monitorx/eventbus"
"github.com/capcom6/service-monitor-tgbot/internal/monitorx/storage"
"github.com/capcom6/service-monitor-tgbot/pkg/eventbus"
"go.uber.org/fx"
"go.uber.org/zap"
)
Expand Down
Loading

0 comments on commit 59c921a

Please sign in to comment.