diff --git a/cmd/portal/main.go b/cmd/portal/main.go index e3900381..1080eda2 100644 --- a/cmd/portal/main.go +++ b/cmd/portal/main.go @@ -4,6 +4,8 @@ import ( "fmt" "strings" + "github.com/hibiken/asynq" + "github.com/vultisig/verifier/config" "github.com/vultisig/verifier/internal/portal" "github.com/vultisig/verifier/internal/storage" @@ -34,7 +36,28 @@ func main() { panic(err) } - server := portal.NewServer(*cfg, pool, db, assetStorage) + var queueClient *asynq.Client + redisCfg := cfg.Redis + if redisCfg.URI != "" { + redisConnOpt, parseErr := asynq.ParseRedisURI(redisCfg.URI) + if parseErr != nil { + panic(parseErr) + } + queueClient = asynq.NewClient(redisConnOpt) + } else if redisCfg.Host != "" { + redisConnOpt := asynq.RedisClientOpt{ + Addr: redisCfg.Host + ":" + redisCfg.Port, + Username: redisCfg.User, + Password: redisCfg.Password, + DB: redisCfg.DB, + } + queueClient = asynq.NewClient(redisConnOpt) + } + if queueClient != nil { + defer queueClient.Close() + } + + server := portal.NewServer(*cfg, pool, db, assetStorage, queueClient) if err := server.Start(); err != nil { panic(err) } diff --git a/cmd/worker/main.go b/cmd/worker/main.go index 4c15ef9a..58b5cc8c 100644 --- a/cmd/worker/main.go +++ b/cmd/worker/main.go @@ -10,6 +10,7 @@ import ( "github.com/hibiken/asynq" "github.com/vultisig/verifier/config" + "github.com/vultisig/verifier/internal/email" "github.com/vultisig/verifier/internal/fee_manager" "github.com/vultisig/verifier/internal/health" "github.com/vultisig/verifier/internal/logging" @@ -92,6 +93,7 @@ func main() { Queues: map[string]int{ tasks.QUEUE_NAME: 10, vault.EmailQueueName: 100, + email.QueueName: 100, "scheduled_plugin_queue": 10, }, }, @@ -151,7 +153,6 @@ func main() { mux := asynq.NewServeMux() - // Wrap handlers with metrics collection mux.HandleFunc(tasks.TypeKeyGenerationDKLS, workerMetrics.Handler("keygen", vaultMgmService.HandleKeyGenerationDKLS)) mux.HandleFunc(tasks.TypeKeySignDKLS, @@ -163,6 +164,24 @@ func main() { mux.HandleFunc(tasks.TypePolicyDeactivate, workerMetrics.Handler("policy_deactivate", policyService.HandlePolicyDeactivate)) + mandrillClient := email.NewMandrillClient( + cfg.Mandrill.APIKey, + cfg.Mandrill.SendingDomain, + logger, + ) + if mandrillClient.IsConfigured() { + emailHandler := email.NewHandler(mandrillClient, logger) + mux.HandleFunc(email.TypePortalProposal, + workerMetrics.Handler("email_proposal", emailHandler.HandleProposal)) + mux.HandleFunc(email.TypePortalApproval, + workerMetrics.Handler("email_approval", emailHandler.HandleApproval)) + mux.HandleFunc(email.TypePortalPublish, + workerMetrics.Handler("email_publish", emailHandler.HandlePublish)) + logger.Info("email handlers registered") + } else { + logger.Warn("mandrill not configured: portal email handlers disabled") + } + if err := srv.Run(mux); err != nil { panic(fmt.Errorf("could not run server: %w", err)) } diff --git a/config/config.go b/config/config.go index 6d63c679..db33051f 100644 --- a/config/config.go +++ b/config/config.go @@ -24,6 +24,7 @@ type WorkerConfig struct { Fees FeesConfig `mapstructure:"fees" json:"fees"` Metrics MetricsConfig `mapstructure:"metrics" json:"metrics,omitempty"` HealthPort int `mapstructure:"health_port" json:"health_port,omitempty"` + Mandrill MandrillConfig `mapstructure:"mandrill" json:"mandrill,omitempty"` } type VerifierConfig struct { @@ -58,6 +59,11 @@ type MetricsConfig struct { Port int `mapstructure:"port" json:"port,omitempty"` } +type MandrillConfig struct { + APIKey string `mapstructure:"api_key" json:"api_key,omitempty"` + SendingDomain string `mapstructure:"sending_domain" json:"sending_domain,omitempty"` +} + type PluginAssetsConfig struct { Host string `mapstructure:"host" json:"host"` Region string `mapstructure:"region" json:"region"` @@ -109,6 +115,7 @@ type PortalConfig struct { BaseURL string `mapstructure:"base_url" json:"base_url,omitempty"` // Base URL for magic links } `mapstructure:"server" json:"server"` Database config.Database `mapstructure:"database" json:"database,omitempty"` + Redis config.Redis `mapstructure:"redis" json:"redis,omitempty"` MaxApiKeysPerPlugin int `mapstructure:"max_api_keys_per_plugin" json:"max_api_keys_per_plugin,omitempty"` PluginAssets PluginAssetsConfig `mapstructure:"plugin_assets" json:"plugin_assets,omitempty"` MaxMediaImagesPerPlugin int `mapstructure:"max_media_images_per_plugin" json:"max_media_images_per_plugin,omitempty"` @@ -119,14 +126,11 @@ type PortalConfig struct { } type PortalEmailConfig struct { - MandrillAPIKey string `mapstructure:"mandrill_api_key" json:"mandrill_api_key,omitempty"` - FromEmail string `mapstructure:"from_email" json:"from_email,omitempty"` - FromName string `mapstructure:"from_name" json:"from_name,omitempty"` NotificationEmails []string `mapstructure:"notification_emails" json:"notification_emails,omitempty"` } func (c PortalEmailConfig) IsConfigured() bool { - return c.MandrillAPIKey != "" && c.FromEmail != "" && len(c.NotificationEmails) > 0 + return len(c.NotificationEmails) > 0 } func GetConfigure() (*WorkerConfig, error) { diff --git a/internal/email/handler.go b/internal/email/handler.go new file mode 100644 index 00000000..f36dfd08 --- /dev/null +++ b/internal/email/handler.go @@ -0,0 +1,115 @@ +package email + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/hibiken/asynq" + "github.com/sirupsen/logrus" +) + +type Handler struct { + client *MandrillClient + logger *logrus.Logger +} + +func NewHandler(client *MandrillClient, logger *logrus.Logger) *Handler { + return &Handler{ + client: client, + logger: logger, + } +} + +func (h *Handler) HandleProposal(ctx context.Context, t *asynq.Task) error { + var task PortalProposalTask + err := json.Unmarshal(t.Payload(), &task) + if err != nil { + return fmt.Errorf("unmarshal task: %w: %w", err, asynq.SkipRetry) + } + + if len(task.NotificationEmails) == 0 { + h.logger.WithField("plugin_id", task.PluginID).Warn("no notification emails configured") + return nil + } + + recipients := make([]MandrillTo, len(task.NotificationEmails)) + for i, addr := range task.NotificationEmails { + recipients[i] = MandrillTo{ + Email: addr, + Type: "to", + } + } + + mergeVars := make([]MandrillVar, len(task.NotificationEmails)) + for i, addr := range task.NotificationEmails { + mergeVars[i] = MandrillVar{ + Rcpt: addr, + Vars: []MandrillMergeVarContent{ + {Name: MergeVarPluginID, Content: task.PluginID}, + {Name: MergeVarPluginTitle, Content: task.Title}, + {Name: MergeVarContactEmail, Content: task.ContactEmail}, + {Name: MergeVarProposalURL, Content: task.ProposalURL}, + }, + } + } + + err = h.client.SendTemplate(ctx, TemplatePortalProposal, recipients, mergeVars) + if err != nil { + h.logger.WithError(err).WithField("plugin_id", task.PluginID).Error("failed to send proposal email") + return fmt.Errorf("send template: %w", err) + } + + h.logger.WithField("plugin_id", task.PluginID).Info("proposal notification sent") + return nil +} + +func (h *Handler) HandleApproval(ctx context.Context, t *asynq.Task) error { + var task PortalApprovalTask + err := json.Unmarshal(t.Payload(), &task) + if err != nil { + return fmt.Errorf("unmarshal task: %w: %w", err, asynq.SkipRetry) + } + + return h.sendContactEmail(ctx, TemplatePortalApproval, task.ContactEmail, task.PluginID, []MandrillMergeVarContent{ + {Name: MergeVarPluginID, Content: task.PluginID}, + {Name: MergeVarPluginTitle, Content: task.Title}, + }) +} + +func (h *Handler) HandlePublish(ctx context.Context, t *asynq.Task) error { + var task PortalPublishTask + err := json.Unmarshal(t.Payload(), &task) + if err != nil { + return fmt.Errorf("unmarshal task: %w: %w", err, asynq.SkipRetry) + } + + return h.sendContactEmail(ctx, TemplatePortalPublish, task.ContactEmail, task.PluginID, []MandrillMergeVarContent{ + {Name: MergeVarPluginID, Content: task.PluginID}, + {Name: MergeVarPluginTitle, Content: task.Title}, + {Name: MergeVarPluginURL, Content: task.PluginURL}, + }) +} + +func (h *Handler) sendContactEmail(ctx context.Context, template string, contactEmail string, pluginID string, vars []MandrillMergeVarContent) error { + if contactEmail == "" { + return nil + } + + recipients := []MandrillTo{ + {Email: contactEmail, Type: "to"}, + } + + mergeVars := []MandrillVar{ + {Rcpt: contactEmail, Vars: vars}, + } + + err := h.client.SendTemplate(ctx, template, recipients, mergeVars) + if err != nil { + h.logger.WithError(err).WithField("plugin_id", pluginID).Errorf("failed to send %s email", template) + return fmt.Errorf("send template: %w", err) + } + + h.logger.WithField("plugin_id", pluginID).Infof("%s notification sent", template) + return nil +} diff --git a/internal/email/handler_test.go b/internal/email/handler_test.go new file mode 100644 index 00000000..5a98abbe --- /dev/null +++ b/internal/email/handler_test.go @@ -0,0 +1,252 @@ +package email + +import ( + "context" + "encoding/json" + "errors" + "net/http" + "net/http/httptest" + "testing" + + "github.com/hibiken/asynq" + "github.com/sirupsen/logrus" +) + +func TestHandler_HandleProposal_Success(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`[{"email":"admin@test.com","status":"sent"}]`)) + })) + defer server.Close() + + logger := logrus.New() + client := NewMandrillClientWithURL("test-api-key", "test.com", server.URL, logger) + handler := NewHandler(client, logger) + + task := PortalProposalTask{ + PluginID: "plugin-123", + Title: "Test Plugin", + ContactEmail: "dev@example.com", + ProposalURL: "https://portal.test.com/admin/plugin-proposals/plugin-123", + NotificationEmails: []string{"admin@test.com"}, + } + payload, _ := json.Marshal(task) + asynqTask := asynq.NewTask(TypePortalProposal, payload) + + err := handler.HandleProposal(context.Background(), asynqTask) + if err != nil { + t.Errorf("expected no error, got %v", err) + } +} + +func TestHandler_HandleProposal_UnmarshalError(t *testing.T) { + logger := logrus.New() + client := NewMandrillClient("test-api-key", "test.com", logger) + handler := NewHandler(client, logger) + + asynqTask := asynq.NewTask(TypePortalProposal, []byte("invalid json")) + + err := handler.HandleProposal(context.Background(), asynqTask) + if err == nil { + t.Fatal("expected error, got nil") + } + + if !errors.Is(err, asynq.SkipRetry) { + t.Errorf("expected SkipRetry error, got %v", err) + } +} + +func TestHandler_HandleProposal_MandrillError(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + _, _ = w.Write([]byte(`{"status":"error","message":"Internal error"}`)) + })) + defer server.Close() + + logger := logrus.New() + client := NewMandrillClientWithURL("test-api-key", "test.com", server.URL, logger) + handler := NewHandler(client, logger) + + task := PortalProposalTask{ + PluginID: "plugin-123", + Title: "Test Plugin", + ContactEmail: "dev@example.com", + ProposalURL: "https://portal.test.com/admin/plugin-proposals/plugin-123", + NotificationEmails: []string{"admin@test.com"}, + } + payload, _ := json.Marshal(task) + asynqTask := asynq.NewTask(TypePortalProposal, payload) + + err := handler.HandleProposal(context.Background(), asynqTask) + if err == nil { + t.Fatal("expected error, got nil") + } +} + +func TestHandler_HandleApproval_Success(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`[{"email":"dev@example.com","status":"sent"}]`)) + })) + defer server.Close() + + logger := logrus.New() + client := NewMandrillClientWithURL("test-api-key", "test.com", server.URL, logger) + handler := NewHandler(client, logger) + + task := PortalApprovalTask{ + PluginID: "plugin-123", + Title: "Approved Plugin", + ContactEmail: "dev@example.com", + } + payload, _ := json.Marshal(task) + asynqTask := asynq.NewTask(TypePortalApproval, payload) + + err := handler.HandleApproval(context.Background(), asynqTask) + if err != nil { + t.Errorf("expected no error, got %v", err) + } +} + +func TestHandler_HandleApproval_EmptyContactEmail(t *testing.T) { + logger := logrus.New() + client := NewMandrillClient("test-api-key", "test.com", logger) + handler := NewHandler(client, logger) + + task := PortalApprovalTask{ + PluginID: "plugin-123", + Title: "Test Plugin", + ContactEmail: "", + } + payload, _ := json.Marshal(task) + asynqTask := asynq.NewTask(TypePortalApproval, payload) + + err := handler.HandleApproval(context.Background(), asynqTask) + if err != nil { + t.Errorf("expected no error for empty contact email, got %v", err) + } +} + +func TestHandler_HandleApproval_UnmarshalError(t *testing.T) { + logger := logrus.New() + client := NewMandrillClient("test-api-key", "test.com", logger) + handler := NewHandler(client, logger) + + asynqTask := asynq.NewTask(TypePortalApproval, []byte("invalid json")) + + err := handler.HandleApproval(context.Background(), asynqTask) + if err == nil { + t.Fatal("expected error, got nil") + } + + if !errors.Is(err, asynq.SkipRetry) { + t.Errorf("expected SkipRetry error, got %v", err) + } +} + +func TestHandler_HandlePublish_Success(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`[{"email":"dev@example.com","status":"queued"}]`)) + })) + defer server.Close() + + logger := logrus.New() + client := NewMandrillClientWithURL("test-api-key", "test.com", server.URL, logger) + handler := NewHandler(client, logger) + + task := PortalPublishTask{ + PluginID: "plugin-123", + Title: "Published Plugin", + ContactEmail: "dev@example.com", + PluginURL: "https://portal.test.com/plugins/plugin-123", + } + payload, _ := json.Marshal(task) + asynqTask := asynq.NewTask(TypePortalPublish, payload) + + err := handler.HandlePublish(context.Background(), asynqTask) + if err != nil { + t.Errorf("expected no error, got %v", err) + } +} + +func TestHandler_HandlePublish_EmptyContactEmail(t *testing.T) { + logger := logrus.New() + client := NewMandrillClient("test-api-key", "test.com", logger) + handler := NewHandler(client, logger) + + task := PortalPublishTask{ + PluginID: "plugin-123", + Title: "Test Plugin", + ContactEmail: "", + PluginURL: "https://portal.test.com/plugins/plugin-123", + } + payload, _ := json.Marshal(task) + asynqTask := asynq.NewTask(TypePortalPublish, payload) + + err := handler.HandlePublish(context.Background(), asynqTask) + if err != nil { + t.Errorf("expected no error for empty contact email, got %v", err) + } +} + +func TestHandler_HandlePublish_UnmarshalError(t *testing.T) { + logger := logrus.New() + client := NewMandrillClient("test-api-key", "test.com", logger) + handler := NewHandler(client, logger) + + asynqTask := asynq.NewTask(TypePortalPublish, []byte("invalid json")) + + err := handler.HandlePublish(context.Background(), asynqTask) + if err == nil { + t.Fatal("expected error, got nil") + } + + if !errors.Is(err, asynq.SkipRetry) { + t.Errorf("expected SkipRetry error, got %v", err) + } +} + +func TestHandler_HandlePublish_MandrillReject(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`[{"email":"dev@example.com","status":"rejected","reject_reason":"hard-bounce"}]`)) + })) + defer server.Close() + + logger := logrus.New() + client := NewMandrillClientWithURL("test-api-key", "test.com", server.URL, logger) + handler := NewHandler(client, logger) + + task := PortalPublishTask{ + PluginID: "plugin-123", + Title: "Test Plugin", + ContactEmail: "dev@example.com", + PluginURL: "https://portal.test.com/plugins/plugin-123", + } + payload, _ := json.Marshal(task) + asynqTask := asynq.NewTask(TypePortalPublish, payload) + + err := handler.HandlePublish(context.Background(), asynqTask) + if err == nil { + t.Fatal("expected error for rejected email, got nil") + } +} + +func TestMandrillClient_IsConfigured(t *testing.T) { + logger := logrus.New() + + t.Run("configured", func(t *testing.T) { + client := NewMandrillClient("test-api-key", "test.com", logger) + if !client.IsConfigured() { + t.Error("expected client to be configured") + } + }) + + t.Run("not configured - empty api key", func(t *testing.T) { + client := NewMandrillClient("", "test.com", logger) + if client.IsConfigured() { + t.Error("expected client to not be configured") + } + }) +} diff --git a/internal/email/mandrill.go b/internal/email/mandrill.go new file mode 100644 index 00000000..13825344 --- /dev/null +++ b/internal/email/mandrill.go @@ -0,0 +1,140 @@ +package email + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "time" + + "github.com/sirupsen/logrus" +) + +const defaultMandrillURL = "https://mandrillapp.com/api/1.0/messages/send-template" + +type MandrillTo struct { + Email string `json:"email"` + Name string `json:"name,omitempty"` + Type string `json:"type"` +} + +type MandrillMergeVarContent struct { + Name string `json:"name"` + Content string `json:"content"` +} + +type MandrillVar struct { + Rcpt string `json:"rcpt"` + Vars []MandrillMergeVarContent `json:"vars"` +} + +type MandrillMessage struct { + To []MandrillTo `json:"to"` + SendingDomain string `json:"sending_domain,omitempty"` + MergeVars []MandrillVar `json:"merge_vars,omitempty"` +} + +type MandrillPayload struct { + Key string `json:"key"` + TemplateName string `json:"template_name"` + TemplateContent []MandrillMergeVarContent `json:"template_content"` + Message MandrillMessage `json:"message"` +} + +type MandrillSendResult struct { + Email string `json:"email"` + Status string `json:"status"` + RejectReason string `json:"reject_reason,omitempty"` + ID string `json:"_id,omitempty"` +} + +type MandrillClient struct { + apiKey string + sendingDomain string + baseURL string + httpClient *http.Client + logger *logrus.Logger +} + +func NewMandrillClient(apiKey, sendingDomain string, logger *logrus.Logger) *MandrillClient { + return &MandrillClient{ + apiKey: apiKey, + sendingDomain: sendingDomain, + baseURL: defaultMandrillURL, + httpClient: &http.Client{ + Timeout: 30 * time.Second, + }, + logger: logger, + } +} + +func NewMandrillClientWithURL(apiKey, sendingDomain, baseURL string, logger *logrus.Logger) *MandrillClient { + return &MandrillClient{ + apiKey: apiKey, + sendingDomain: sendingDomain, + baseURL: baseURL, + httpClient: &http.Client{ + Timeout: 30 * time.Second, + }, + logger: logger, + } +} + +func (c *MandrillClient) IsConfigured() bool { + return c.apiKey != "" +} + +func (c *MandrillClient) SendTemplate(ctx context.Context, templateName string, recipients []MandrillTo, mergeVars []MandrillVar) error { + payload := MandrillPayload{ + Key: c.apiKey, + TemplateName: templateName, + TemplateContent: []MandrillMergeVarContent{}, + Message: MandrillMessage{ + To: recipients, + SendingDomain: c.sendingDomain, + MergeVars: mergeVars, + }, + } + + body, err := json.Marshal(payload) + if err != nil { + return fmt.Errorf("marshal payload: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL, bytes.NewReader(body)) + if err != nil { + return fmt.Errorf("create request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + + resp, err := c.httpClient.Do(req) + if err != nil { + return fmt.Errorf("send request: %w", err) + } + defer resp.Body.Close() + + respBody, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("read response: %w", err) + } + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("mandrill returned status %d: %s", resp.StatusCode, string(respBody)) + } + + var results []MandrillSendResult + err = json.Unmarshal(respBody, &results) + if err != nil { + return fmt.Errorf("unmarshal response: %w", err) + } + + for _, r := range results { + if r.Status != "sent" && r.Status != "queued" { + return fmt.Errorf("email to %s failed: %s (%s)", r.Email, r.Status, r.RejectReason) + } + } + + return nil +} diff --git a/internal/email/tasks.go b/internal/email/tasks.go new file mode 100644 index 00000000..0ceaad51 --- /dev/null +++ b/internal/email/tasks.go @@ -0,0 +1,40 @@ +package email + +const ( + QueueName = "portal:email_queue" + + TypePortalProposal = "email:portal:proposal" + TypePortalApproval = "email:portal:approval" + TypePortalPublish = "email:portal:publish" + + TemplatePortalProposal = "vultisig-portal-proposal" + TemplatePortalApproval = "vultisig-portal-approval" + TemplatePortalPublish = "vultisig-portal-publish" + + MergeVarPluginID = "PLUGIN_ID" + MergeVarPluginTitle = "PLUGIN_TITLE" + MergeVarContactEmail = "CONTACT_EMAIL" + MergeVarProposalURL = "PROPOSAL_URL" + MergeVarPluginURL = "PLUGIN_URL" +) + +type PortalProposalTask struct { + PluginID string `json:"plugin_id"` + Title string `json:"title"` + ContactEmail string `json:"contact_email"` + ProposalURL string `json:"proposal_url"` + NotificationEmails []string `json:"notification_emails"` +} + +type PortalApprovalTask struct { + PluginID string `json:"plugin_id"` + Title string `json:"title"` + ContactEmail string `json:"contact_email"` +} + +type PortalPublishTask struct { + PluginID string `json:"plugin_id"` + Title string `json:"title"` + ContactEmail string `json:"contact_email"` + PluginURL string `json:"plugin_url"` +} diff --git a/internal/portal/email.go b/internal/portal/email.go index d5978422..52ad7759 100644 --- a/internal/portal/email.go +++ b/internal/portal/email.go @@ -1,32 +1,19 @@ package portal import ( - "bytes" - "context" "encoding/json" "fmt" - "html" - "io" - "net/http" "net/url" "strings" "time" + "github.com/hibiken/asynq" "github.com/sirupsen/logrus" "github.com/vultisig/verifier/config" + "github.com/vultisig/verifier/internal/email" ) -const mandrillSendURL = "https://mandrillapp.com/api/1.0/messages/send.json" - -func maskEmail(email string) string { - at := strings.Index(email, "@") - if at <= 1 { - return "***" - } - return email[:1] + strings.Repeat("*", at-1) + email[at:] -} - type EmailSender interface { IsConfigured() bool SendProposalNotificationAsync(pluginID, title, contactEmail string) @@ -37,50 +24,21 @@ type EmailSender interface { type EmailService struct { cfg config.PortalEmailConfig portalURL string - mandrillURL string - client *http.Client + queueClient *asynq.Client logger *logrus.Logger } -func NewEmailService(cfg config.PortalEmailConfig, portalURL string, logger *logrus.Logger) *EmailService { +func NewEmailService(cfg config.PortalEmailConfig, portalURL string, queueClient *asynq.Client, logger *logrus.Logger) *EmailService { return &EmailService{ cfg: cfg, portalURL: strings.TrimRight(portalURL, "/"), - mandrillURL: mandrillSendURL, + queueClient: queueClient, logger: logger, - client: &http.Client{ - Timeout: 30 * time.Second, - }, } } func (s *EmailService) IsConfigured() bool { - return s.cfg.IsConfigured() -} - -type mandrillMessage struct { - Key string `json:"key"` - Message mandrillMessageBody `json:"message"` -} - -type mandrillMessageBody struct { - FromEmail string `json:"from_email"` - FromName string `json:"from_name"` - To []mandrillRecipient `json:"to"` - Subject string `json:"subject"` - HTML string `json:"html"` - Text string `json:"text"` -} - -type mandrillRecipient struct { - Email string `json:"email"` - Type string `json:"type"` -} - -type mandrillSendResult struct { - Email string `json:"email"` - Status string `json:"status"` - RejectReason string `json:"reject_reason,omitempty"` + return s.queueClient != nil && len(s.cfg.NotificationEmails) > 0 } func (s *EmailService) SendProposalNotificationAsync(pluginID, title, contactEmail string) { @@ -88,106 +46,37 @@ func (s *EmailService) SendProposalNotificationAsync(pluginID, title, contactEma return } - go func() { - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - err := s.sendProposalNotification(ctx, pluginID, title, contactEmail) - if err != nil { - s.logger.WithError(err).WithFields(logrus.Fields{ - "plugin_id": pluginID, - }).Error("failed to send proposal notification email") - } - }() -} - -func (s *EmailService) sendProposalNotification(ctx context.Context, pluginID, title, contactEmail string) error { - pid := html.EscapeString(pluginID) - t := html.EscapeString(title) - ce := html.EscapeString(contactEmail) - proposalURL := fmt.Sprintf("%s/admin/plugin-proposals/%s", s.portalURL, url.PathEscape(pluginID)) - subject := fmt.Sprintf("New Plugin Proposal: %s", t) - htmlBody := fmt.Sprintf(` -
A new plugin proposal has been submitted for review.
-| Plugin ID: | -%s | -
| Title: | -%s | -
| Contact Email: | -%s | -
Your plugin proposal %s has been approved.
-To complete the listing process:
-Thank you for contributing to Vultisig!
-`, t) - - text := fmt.Sprintf(`Plugin Proposal Approved - -Your plugin proposal "%s" has been approved. - -To complete the listing process: -1. Pay the listing fee through the developer portal -2. Once payment is confirmed, your plugin will be published automatically - -Thank you for contributing to Vultisig! -`, title) + task := email.PortalApprovalTask{ + PluginID: pluginID, + Title: title, + ContactEmail: contactEmail, + } - return s.sendToRecipient(ctx, contactEmail, subject, htmlBody, text) + err := s.enqueue(email.TypePortalApproval, task) + if err != nil { + s.logger.WithError(err).WithField("plugin_id", pluginID).Error("failed to enqueue approval email") + } } func (s *EmailService) SendPublishNotificationAsync(pluginID, title, contactEmail string) { @@ -195,110 +84,35 @@ func (s *EmailService) SendPublishNotificationAsync(pluginID, title, contactEmai return } - go func() { - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - err := s.sendPublishNotification(ctx, pluginID, title, contactEmail) - if err != nil { - s.logger.WithError(err).WithFields(logrus.Fields{ - "plugin_id": pluginID, - "email": maskEmail(contactEmail), - }).Error("failed to send publish notification email") - } - }() -} - -func (s *EmailService) sendPublishNotification(ctx context.Context, pluginID, title, contactEmail string) error { - t := html.EscapeString(title) - pid := html.EscapeString(pluginID) pluginURL := fmt.Sprintf("%s/plugins/%s", s.portalURL, url.PathEscape(pluginID)) - subject := fmt.Sprintf("Your Plugin Is Now Live: %s", t) - htmlBody := fmt.Sprintf(` -Your plugin %s is now live on the Vultisig marketplace.
- -Plugin ID: %s
-Thank you for contributing to Vultisig!
-`, t, html.EscapeString(pluginURL), pid) - - text := fmt.Sprintf(`Plugin Published! - -Your plugin "%s" is now live on the Vultisig marketplace. - -View your plugin: %s - -Plugin ID: %s - -Thank you for contributing to Vultisig! -`, title, pluginURL, pluginID) - - return s.sendToRecipient(ctx, contactEmail, subject, htmlBody, text) -} - -func (s *EmailService) sendToAdmins(ctx context.Context, subject, htmlBody, text string) error { - recipients := make([]mandrillRecipient, len(s.cfg.NotificationEmails)) - for i, email := range s.cfg.NotificationEmails { - recipients[i] = mandrillRecipient{Email: email, Type: "to"} - } - return s.sendEmail(ctx, recipients, subject, htmlBody, text) -} - -func (s *EmailService) sendToRecipient(ctx context.Context, email, subject, htmlBody, text string) error { - recipients := []mandrillRecipient{{Email: email, Type: "to"}} - return s.sendEmail(ctx, recipients, subject, htmlBody, text) -} - -func (s *EmailService) sendEmail(ctx context.Context, recipients []mandrillRecipient, subject, htmlBody, text string) error { - msg := mandrillMessage{ - Key: s.cfg.MandrillAPIKey, - Message: mandrillMessageBody{ - FromEmail: s.cfg.FromEmail, - FromName: s.cfg.FromName, - To: recipients, - Subject: subject, - HTML: htmlBody, - Text: text, - }, + task := email.PortalPublishTask{ + PluginID: pluginID, + Title: title, + ContactEmail: contactEmail, + PluginURL: pluginURL, } - body, err := json.Marshal(msg) + err := s.enqueue(email.TypePortalPublish, task) if err != nil { - return fmt.Errorf("marshal email request: %w", err) + s.logger.WithError(err).WithField("plugin_id", pluginID).Error("failed to enqueue publish email") } +} - req, err := http.NewRequestWithContext(ctx, http.MethodPost, s.mandrillURL, bytes.NewReader(body)) - if err != nil { - return fmt.Errorf("create email request: %w", err) - } - req.Header.Set("Content-Type", "application/json") - - resp, err := s.client.Do(req) - if err != nil { - return fmt.Errorf("send email request: %w", err) - } - defer resp.Body.Close() - - respBody, err := io.ReadAll(resp.Body) +func (s *EmailService) enqueue(taskType string, payload interface{}) error { + buf, err := json.Marshal(payload) if err != nil { - return fmt.Errorf("read response body: %w", err) + return fmt.Errorf("marshal task: %w", err) } - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("mandrill returned status %d: %s", resp.StatusCode, string(respBody)) - } - - var results []mandrillSendResult - err = json.Unmarshal(respBody, &results) + _, err = s.queueClient.Enqueue( + asynq.NewTask(taskType, buf), + asynq.Queue(email.QueueName), + asynq.Retention(24*time.Hour), + asynq.MaxRetry(3), + ) if err != nil { - return fmt.Errorf("unmarshal response: %w", err) - } - - for _, r := range results { - if r.Status != "sent" && r.Status != "queued" { - return fmt.Errorf("email to %s failed: %s (%s)", maskEmail(r.Email), r.Status, r.RejectReason) - } + return fmt.Errorf("enqueue task: %w", err) } return nil diff --git a/internal/portal/email_test.go b/internal/portal/email_test.go index 02db6373..ce03833e 100644 --- a/internal/portal/email_test.go +++ b/internal/portal/email_test.go @@ -1,13 +1,8 @@ package portal import ( - "context" - "encoding/json" - "net/http" - "net/http/httptest" "sync" "testing" - "time" "github.com/sirupsen/logrus" "github.com/vultisig/verifier/config" @@ -172,244 +167,52 @@ func TestMockEmailSender_SendPublishNotification(t *testing.T) { } func TestEmailService_IsConfigured(t *testing.T) { - tests := []struct { - name string - cfg config.PortalEmailConfig - expected bool - }{ - { - name: "not configured - empty config", - cfg: config.PortalEmailConfig{}, - expected: false, - }, - { - name: "not configured - missing api key", - cfg: config.PortalEmailConfig{ - FromEmail: "noreply@vultisig.com", - NotificationEmails: []string{"admin@vultisig.com"}, - }, - expected: false, - }, - { - name: "not configured - missing from email", - cfg: config.PortalEmailConfig{ - MandrillAPIKey: "test-api-key", - NotificationEmails: []string{"admin@vultisig.com"}, - }, - expected: false, - }, - { - name: "not configured - missing notification emails", - cfg: config.PortalEmailConfig{ - MandrillAPIKey: "test-api-key", - FromEmail: "noreply@vultisig.com", - }, - expected: false, - }, - { - name: "configured", - cfg: config.PortalEmailConfig{ - MandrillAPIKey: "test-api-key", - FromEmail: "noreply@vultisig.com", - FromName: "Vultisig", - NotificationEmails: []string{"admin@vultisig.com"}, - }, - expected: true, - }, - } + logger := logrus.New() - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - svc := NewEmailService(tt.cfg, "https://portal.vultisig.com", logrus.New()) - if svc.IsConfigured() != tt.expected { - t.Errorf("IsConfigured() = %v, expected %v", svc.IsConfigured(), tt.expected) - } - }) - } + t.Run("not configured - nil queue client", func(t *testing.T) { + cfg := config.PortalEmailConfig{ + NotificationEmails: []string{"admin@vultisig.com"}, + } + svc := NewEmailService(cfg, "https://portal.vultisig.com", nil, logger) + if svc.IsConfigured() { + t.Error("expected not configured with nil queue client") + } + }) + + t.Run("not configured - missing notification emails", func(t *testing.T) { + cfg := config.PortalEmailConfig{} + svc := NewEmailService(cfg, "https://portal.vultisig.com", nil, logger) + if svc.IsConfigured() { + t.Error("expected not configured without notification emails") + } + }) } -func TestEmailService_SendProposalNotification_NotConfigured(t *testing.T) { - svc := NewEmailService(config.PortalEmailConfig{}, "https://portal.vultisig.com", logrus.New()) +func TestEmailService_SendNotification_NotConfigured(t *testing.T) { + logger := logrus.New() + svc := NewEmailService(config.PortalEmailConfig{}, "https://portal.vultisig.com", nil, logger) svc.SendProposalNotificationAsync("test-plugin", "Test", "test@example.com") + svc.SendApprovalNotificationAsync("test-plugin", "Test", "test@example.com") + svc.SendPublishNotificationAsync("test-plugin", "Test", "test@example.com") } func TestEmailService_SendApprovalNotification_EmptyEmail(t *testing.T) { + logger := logrus.New() cfg := config.PortalEmailConfig{ - MandrillAPIKey: "test-api-key", - FromEmail: "noreply@vultisig.com", NotificationEmails: []string{"admin@vultisig.com"}, } - svc := NewEmailService(cfg, "https://portal.vultisig.com", logrus.New()) + svc := NewEmailService(cfg, "https://portal.vultisig.com", nil, logger) svc.SendApprovalNotificationAsync("test-plugin", "Test", "") } func TestEmailService_SendPublishNotification_EmptyEmail(t *testing.T) { + logger := logrus.New() cfg := config.PortalEmailConfig{ - MandrillAPIKey: "test-api-key", - FromEmail: "noreply@vultisig.com", NotificationEmails: []string{"admin@vultisig.com"}, } - svc := NewEmailService(cfg, "https://portal.vultisig.com", logrus.New()) + svc := NewEmailService(cfg, "https://portal.vultisig.com", nil, logger) svc.SendPublishNotificationAsync("test-plugin", "Test", "") } - -func TestEmailService_SendEmail_Success(t *testing.T) { - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodPost { - t.Errorf("expected POST, got %s", r.Method) - } - if r.Header.Get("Content-Type") != "application/json" { - t.Errorf("expected Content-Type application/json, got %s", r.Header.Get("Content-Type")) - } - - var msg mandrillMessage - err := json.NewDecoder(r.Body).Decode(&msg) - if err != nil { - t.Errorf("failed to decode request: %v", err) - } - - if msg.Key != "test-api-key" { - t.Errorf("expected API key 'test-api-key', got '%s'", msg.Key) - } - if msg.Message.FromEmail != "noreply@vultisig.com" { - t.Errorf("expected from email 'noreply@vultisig.com', got '%s'", msg.Message.FromEmail) - } - if len(msg.Message.To) != 1 { - t.Errorf("expected 1 recipient, got %d", len(msg.Message.To)) - } - - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode([]mandrillSendResult{ - {Email: msg.Message.To[0].Email, Status: "sent"}, - }) - })) - defer server.Close() - - cfg := config.PortalEmailConfig{ - MandrillAPIKey: "test-api-key", - FromEmail: "noreply@vultisig.com", - FromName: "Vultisig", - NotificationEmails: []string{"admin@vultisig.com"}, - } - svc := NewEmailService(cfg, "https://portal.vultisig.com", logrus.New()) - svc.client = server.Client() - - originalURL := mandrillSendURL - defer func() { - if mandrillSendURL != originalURL { - t.Error("mandrillSendURL should not be changed") - } - }() - - ctx := context.Background() - recipients := []mandrillRecipient{{Email: "test@example.com", Type: "to"}} - - err := svc.sendEmailTo(ctx, server.URL, recipients, "Test Subject", "HTML
", "Text") - if err != nil { - t.Errorf("sendEmailTo failed: %v", err) - } -} - -func TestEmailService_SendEmail_APIError(t *testing.T) { - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusUnauthorized) - w.Write([]byte(`{"status":"error","message":"Invalid API key"}`)) - })) - defer server.Close() - - cfg := config.PortalEmailConfig{ - MandrillAPIKey: "invalid-key", - FromEmail: "noreply@vultisig.com", - NotificationEmails: []string{"admin@vultisig.com"}, - } - svc := NewEmailService(cfg, "https://portal.vultisig.com", logrus.New()) - svc.client = server.Client() - - ctx := context.Background() - recipients := []mandrillRecipient{{Email: "test@example.com", Type: "to"}} - - err := svc.sendEmailTo(ctx, server.URL, recipients, "Test", "HTML
", "Text") - if err == nil { - t.Error("expected error for invalid API key") - } -} - -func TestEmailService_SendEmail_RejectedEmail(t *testing.T) { - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode([]mandrillSendResult{ - {Email: "invalid@example.com", Status: "rejected", RejectReason: "invalid-sender"}, - }) - })) - defer server.Close() - - cfg := config.PortalEmailConfig{ - MandrillAPIKey: "test-api-key", - FromEmail: "noreply@vultisig.com", - NotificationEmails: []string{"admin@vultisig.com"}, - } - svc := NewEmailService(cfg, "https://portal.vultisig.com", logrus.New()) - svc.client = server.Client() - - ctx := context.Background() - recipients := []mandrillRecipient{{Email: "invalid@example.com", Type: "to"}} - - err := svc.sendEmailTo(ctx, server.URL, recipients, "Test", "HTML
", "Text") - if err == nil { - t.Error("expected error for rejected email") - } -} - -func TestEmailService_SendProposalNotification_Async(t *testing.T) { - var wg sync.WaitGroup - wg.Add(1) - - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - defer wg.Done() - - var msg mandrillMessage - json.NewDecoder(r.Body).Decode(&msg) - - if msg.Message.Subject == "" { - t.Error("expected non-empty subject") - } - - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode([]mandrillSendResult{ - {Email: msg.Message.To[0].Email, Status: "queued"}, - }) - })) - defer server.Close() - - cfg := config.PortalEmailConfig{ - MandrillAPIKey: "test-api-key", - FromEmail: "noreply@vultisig.com", - FromName: "Vultisig", - NotificationEmails: []string{"admin@vultisig.com"}, - } - svc := NewEmailService(cfg, "https://portal.vultisig.com", logrus.New()) - svc.client = server.Client() - svc.mandrillURL = server.URL - - svc.SendProposalNotificationAsync("test-plugin", "Test Plugin", "dev@example.com") - - done := make(chan struct{}) - go func() { - wg.Wait() - close(done) - }() - - select { - case <-done: - case <-time.After(5 * time.Second): - t.Error("timeout waiting for async email send") - } -} - -func (s *EmailService) sendEmailTo(ctx context.Context, url string, recipients []mandrillRecipient, subject, htmlBody, text string) error { - s.mandrillURL = url - return s.sendEmail(ctx, recipients, subject, htmlBody, text) -} diff --git a/internal/portal/server.go b/internal/portal/server.go index 6a25083f..6cb79ba7 100644 --- a/internal/portal/server.go +++ b/internal/portal/server.go @@ -14,6 +14,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/google/uuid" + "github.com/hibiken/asynq" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" @@ -44,7 +45,7 @@ type Server struct { listingFeeClient *ListingFeeClient } -func NewServer(cfg config.PortalConfig, pool *pgxpool.Pool, db *postgres.PostgresBackend, assetStorage storage.PluginAssetStorage) *Server { +func NewServer(cfg config.PortalConfig, pool *pgxpool.Pool, db *postgres.PostgresBackend, assetStorage storage.PluginAssetStorage, queueClient *asynq.Client) *Server { logger := logrus.WithField("service", "portal").Logger var listingFeeClient *ListingFeeClient if cfg.DeveloperServiceURL != "" { @@ -59,7 +60,7 @@ func NewServer(cfg config.PortalConfig, pool *pgxpool.Pool, db *postgres.Postgre inviteService: NewInviteService(cfg.Server.HMACSecret, cfg.Server.BaseURL), db: db, assetStorage: assetStorage, - emailService: NewEmailService(cfg.Email, cfg.Server.BaseURL, logger), + emailService: NewEmailService(cfg.Email, cfg.Server.BaseURL, queueClient, logger), listingFeeClient: listingFeeClient, } }