Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion cmd/portal/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"fmt"
"strings"

"github.com/hibiken/asynq"

"github.com/vultisig/verifier/config"
"github.com/vultisig/verifier/internal/portal"
"github.com/vultisig/verifier/internal/storage"
Expand Down Expand Up @@ -34,7 +36,28 @@ func main() {
panic(err)
}

server := portal.NewServer(*cfg, pool, db, assetStorage)
var queueClient *asynq.Client
redisCfg := cfg.Redis
if redisCfg.URI != "" {
redisConnOpt, parseErr := asynq.ParseRedisURI(redisCfg.URI)
if parseErr != nil {
panic(parseErr)
}
queueClient = asynq.NewClient(redisConnOpt)
} else if redisCfg.Host != "" {
redisConnOpt := asynq.RedisClientOpt{
Addr: redisCfg.Host + ":" + redisCfg.Port,
Username: redisCfg.User,
Password: redisCfg.Password,
DB: redisCfg.DB,
}
queueClient = asynq.NewClient(redisConnOpt)
}
if queueClient != nil {
defer queueClient.Close()
}

server := portal.NewServer(*cfg, pool, db, assetStorage, queueClient)
if err := server.Start(); err != nil {
panic(err)
}
Expand Down
21 changes: 20 additions & 1 deletion cmd/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/hibiken/asynq"

"github.com/vultisig/verifier/config"
"github.com/vultisig/verifier/internal/email"
"github.com/vultisig/verifier/internal/fee_manager"
"github.com/vultisig/verifier/internal/health"
"github.com/vultisig/verifier/internal/logging"
Expand Down Expand Up @@ -92,6 +93,7 @@ func main() {
Queues: map[string]int{
tasks.QUEUE_NAME: 10,
vault.EmailQueueName: 100,
email.QueueName: 100,
"scheduled_plugin_queue": 10,
},
},
Expand Down Expand Up @@ -151,7 +153,6 @@ func main() {

mux := asynq.NewServeMux()

// Wrap handlers with metrics collection
mux.HandleFunc(tasks.TypeKeyGenerationDKLS,
workerMetrics.Handler("keygen", vaultMgmService.HandleKeyGenerationDKLS))
mux.HandleFunc(tasks.TypeKeySignDKLS,
Expand All @@ -163,6 +164,24 @@ func main() {
mux.HandleFunc(tasks.TypePolicyDeactivate,
workerMetrics.Handler("policy_deactivate", policyService.HandlePolicyDeactivate))

mandrillClient := email.NewMandrillClient(
cfg.Mandrill.APIKey,
cfg.Mandrill.SendingDomain,
logger,
)
if mandrillClient.IsConfigured() {
emailHandler := email.NewHandler(mandrillClient, logger)
mux.HandleFunc(email.TypePortalProposal,
workerMetrics.Handler("email_proposal", emailHandler.HandleProposal))
mux.HandleFunc(email.TypePortalApproval,
workerMetrics.Handler("email_approval", emailHandler.HandleApproval))
mux.HandleFunc(email.TypePortalPublish,
workerMetrics.Handler("email_publish", emailHandler.HandlePublish))
logger.Info("email handlers registered")
} else {
logger.Warn("mandrill not configured: portal email handlers disabled")
}

if err := srv.Run(mux); err != nil {
panic(fmt.Errorf("could not run server: %w", err))
}
Expand Down
12 changes: 8 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type WorkerConfig struct {
Fees FeesConfig `mapstructure:"fees" json:"fees"`
Metrics MetricsConfig `mapstructure:"metrics" json:"metrics,omitempty"`
HealthPort int `mapstructure:"health_port" json:"health_port,omitempty"`
Mandrill MandrillConfig `mapstructure:"mandrill" json:"mandrill,omitempty"`
}

type VerifierConfig struct {
Expand Down Expand Up @@ -58,6 +59,11 @@ type MetricsConfig struct {
Port int `mapstructure:"port" json:"port,omitempty"`
}

type MandrillConfig struct {
APIKey string `mapstructure:"api_key" json:"api_key,omitempty"`
SendingDomain string `mapstructure:"sending_domain" json:"sending_domain,omitempty"`
}

type PluginAssetsConfig struct {
Host string `mapstructure:"host" json:"host"`
Region string `mapstructure:"region" json:"region"`
Expand Down Expand Up @@ -109,6 +115,7 @@ type PortalConfig struct {
BaseURL string `mapstructure:"base_url" json:"base_url,omitempty"` // Base URL for magic links
} `mapstructure:"server" json:"server"`
Database config.Database `mapstructure:"database" json:"database,omitempty"`
Redis config.Redis `mapstructure:"redis" json:"redis,omitempty"`
MaxApiKeysPerPlugin int `mapstructure:"max_api_keys_per_plugin" json:"max_api_keys_per_plugin,omitempty"`
PluginAssets PluginAssetsConfig `mapstructure:"plugin_assets" json:"plugin_assets,omitempty"`
MaxMediaImagesPerPlugin int `mapstructure:"max_media_images_per_plugin" json:"max_media_images_per_plugin,omitempty"`
Expand All @@ -119,14 +126,11 @@ type PortalConfig struct {
}

type PortalEmailConfig struct {
MandrillAPIKey string `mapstructure:"mandrill_api_key" json:"mandrill_api_key,omitempty"`
FromEmail string `mapstructure:"from_email" json:"from_email,omitempty"`
FromName string `mapstructure:"from_name" json:"from_name,omitempty"`
NotificationEmails []string `mapstructure:"notification_emails" json:"notification_emails,omitempty"`
}

func (c PortalEmailConfig) IsConfigured() bool {
return c.MandrillAPIKey != "" && c.FromEmail != "" && len(c.NotificationEmails) > 0
return len(c.NotificationEmails) > 0
}

func GetConfigure() (*WorkerConfig, error) {
Expand Down
115 changes: 115 additions & 0 deletions internal/email/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package email

import (
"context"
"encoding/json"
"fmt"

"github.com/hibiken/asynq"
"github.com/sirupsen/logrus"
)

type Handler struct {
client *MandrillClient
logger *logrus.Logger
}

func NewHandler(client *MandrillClient, logger *logrus.Logger) *Handler {
return &Handler{
client: client,
logger: logger,
}
}

func (h *Handler) HandleProposal(ctx context.Context, t *asynq.Task) error {
var task PortalProposalTask
err := json.Unmarshal(t.Payload(), &task)
if err != nil {
return fmt.Errorf("unmarshal task: %w: %w", err, asynq.SkipRetry)
}

if len(task.NotificationEmails) == 0 {
h.logger.WithField("plugin_id", task.PluginID).Warn("no notification emails configured")
return nil
}

recipients := make([]MandrillTo, len(task.NotificationEmails))
for i, addr := range task.NotificationEmails {
recipients[i] = MandrillTo{
Email: addr,
Type: "to",
}
}

mergeVars := make([]MandrillVar, len(task.NotificationEmails))
for i, addr := range task.NotificationEmails {
mergeVars[i] = MandrillVar{
Rcpt: addr,
Vars: []MandrillMergeVarContent{
{Name: MergeVarPluginID, Content: task.PluginID},
{Name: MergeVarPluginTitle, Content: task.Title},
{Name: MergeVarContactEmail, Content: task.ContactEmail},
{Name: MergeVarProposalURL, Content: task.ProposalURL},
},
}
}

err = h.client.SendTemplate(ctx, TemplatePortalProposal, recipients, mergeVars)
if err != nil {
h.logger.WithError(err).WithField("plugin_id", task.PluginID).Error("failed to send proposal email")
return fmt.Errorf("send template: %w", err)
}

h.logger.WithField("plugin_id", task.PluginID).Info("proposal notification sent")
return nil
}

func (h *Handler) HandleApproval(ctx context.Context, t *asynq.Task) error {
var task PortalApprovalTask
err := json.Unmarshal(t.Payload(), &task)
if err != nil {
return fmt.Errorf("unmarshal task: %w: %w", err, asynq.SkipRetry)
}

return h.sendContactEmail(ctx, TemplatePortalApproval, task.ContactEmail, task.PluginID, []MandrillMergeVarContent{
{Name: MergeVarPluginID, Content: task.PluginID},
{Name: MergeVarPluginTitle, Content: task.Title},
})
}

func (h *Handler) HandlePublish(ctx context.Context, t *asynq.Task) error {
var task PortalPublishTask
err := json.Unmarshal(t.Payload(), &task)
if err != nil {
return fmt.Errorf("unmarshal task: %w: %w", err, asynq.SkipRetry)
}

return h.sendContactEmail(ctx, TemplatePortalPublish, task.ContactEmail, task.PluginID, []MandrillMergeVarContent{
{Name: MergeVarPluginID, Content: task.PluginID},
{Name: MergeVarPluginTitle, Content: task.Title},
{Name: MergeVarPluginURL, Content: task.PluginURL},
})
}

func (h *Handler) sendContactEmail(ctx context.Context, template string, contactEmail string, pluginID string, vars []MandrillMergeVarContent) error {
if contactEmail == "" {
return nil
}

recipients := []MandrillTo{
{Email: contactEmail, Type: "to"},
}

mergeVars := []MandrillVar{
{Rcpt: contactEmail, Vars: vars},
}

err := h.client.SendTemplate(ctx, template, recipients, mergeVars)
if err != nil {
h.logger.WithError(err).WithField("plugin_id", pluginID).Errorf("failed to send %s email", template)
return fmt.Errorf("send template: %w", err)
}

h.logger.WithField("plugin_id", pluginID).Infof("%s notification sent", template)
return nil
}
Loading