diff --git a/processor/pipeline_worker.go b/processor/pipeline_worker.go index 8f6e6cb072..1e324e1bf4 100644 --- a/processor/pipeline_worker.go +++ b/processor/pipeline_worker.go @@ -9,7 +9,6 @@ import ( "github.com/rudderlabs/rudder-go-kit/logger" "github.com/rudderlabs/rudder-go-kit/stats" obskit "github.com/rudderlabs/rudder-observability-kit/go/labels" - "github.com/rudderlabs/rudder-server/jobsdb" "github.com/rudderlabs/rudder-server/processor/types" "github.com/rudderlabs/rudder-server/rruntime" "github.com/rudderlabs/rudder-server/utils/tracing" @@ -210,7 +209,7 @@ func (w *pipelineWorker) start() { ctx: subJob.ctx, rsourcesStats: subJob.rsourcesStats, dedupKeys: make(map[string]struct{}), - procErrorJobsByDestID: make(map[string][]*jobsdb.JobT), + procErrorJobsByDestID: make(map[string][]procErrorJob), sourceDupStats: make(map[dupStatKey]int), start: subJob.start, } diff --git a/processor/processor.go b/processor/processor.go index abb56defe8..7f208a127c 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -2519,7 +2519,7 @@ func (proc *Handle) destinationTransformStage(partition string, in *userTransfor defer proc.stats.statDtransformStageCount(partition).Count(len(in.statusList)) } - procErrorJobsByDestID := make(map[string][]*jobsdb.JobT) + procErrorJobsByDestID := make(map[string][]procErrorJob) var batchDestJobs []*jobsdb.JobT var destJobs []*jobsdb.JobT var droppedJobs []*jobsdb.JobT @@ -2593,6 +2593,13 @@ func (proc *Handle) destinationTransformStage(partition string, in *userTransfor } } +// procErrorJob wraps a jobsdb.JobT with the original parsed events, +// avoiding an expensive marshal/unmarshal round-trip through EventPayload. +type procErrorJob struct { + *jobsdb.JobT + events []types.SingularEventT +} + type storeMessage struct { ctx context.Context trackedUsersReports []*trackedusers.UsersReport @@ -2601,7 +2608,7 @@ type storeMessage struct { batchDestJobs []*jobsdb.JobT droppedJobs []*jobsdb.JobT - procErrorJobsByDestID map[string][]*jobsdb.JobT + procErrorJobsByDestID map[string][]procErrorJob routerDestIDs []string reportMetrics []*reportingtypes.PUReportedMetric @@ -2943,7 +2950,7 @@ func (proc *Handle) userTransformAndFilter(ctx context.Context, partition, srcAn commonMetaData := eventList[0].Metadata.CommonMetadata() reportMetrics := make([]*reportingtypes.PUReportedMetric, 0) - procErrorJobsByDestID := make(map[string][]*jobsdb.JobT) + procErrorJobsByDestID := make(map[string][]procErrorJob) droppedJobs := make([]*jobsdb.JobT, 0) proc.config.configSubscriberLock.RLock() @@ -3140,7 +3147,7 @@ func (proc *Handle) userTransformAndFilter(ctx context.Context, partition, srcAn nonSuccessMetrics := proc.getNonSuccessfulMetrics(response, eventList, commonMetaData, eventsByMessageID, inPU, reportingtypes.USER_TRANSFORMER) droppedJobs = append(droppedJobs, append(proc.getDroppedJobs(response, eventList), append(nonSuccessMetrics.failedJobs, nonSuccessMetrics.filteredJobs...)...)...) if _, ok := procErrorJobsByDestID[destID]; !ok { - procErrorJobsByDestID[destID] = make([]*jobsdb.JobT, 0) + procErrorJobsByDestID[destID] = make([]procErrorJob, 0) } procErrorJobsByDestID[destID] = append(procErrorJobsByDestID[destID], nonSuccessMetrics.failedJobs...) userTransformationStat.numOutputSuccessEvents.Count(len(eventsToTransform)) @@ -3231,7 +3238,7 @@ func (proc *Handle) userTransformAndFilter(ctx context.Context, partition, srcAn nonSuccessMetrics := proc.getNonSuccessfulMetrics(response, eventList, commonMetaData, eventsByMessageID, inPU, reportingtypes.EVENT_FILTER) droppedJobs = append(droppedJobs, append(proc.getDroppedJobs(response, eventsToTransform), append(nonSuccessMetrics.failedJobs, nonSuccessMetrics.filteredJobs...)...)...) if _, ok := procErrorJobsByDestID[destID]; !ok { - procErrorJobsByDestID[destID] = make([]*jobsdb.JobT, 0) + procErrorJobsByDestID[destID] = make([]procErrorJob, 0) } procErrorJobsByDestID[destID] = append(procErrorJobsByDestID[destID], nonSuccessMetrics.failedJobs...) eventsToTransform, successMetrics, successCountMap, successCountMetadataMap = proc.getTransformerEvents(response, commonMetaData, eventsByMessageID, destination, connection, inPU, reportingtypes.EVENT_FILTER) diff --git a/warehouse/router/identities.go b/warehouse/router/identities.go index 9c2f5d3a1f..7e0e1e4d14 100644 --- a/warehouse/router/identities.go +++ b/warehouse/router/identities.go @@ -164,7 +164,8 @@ func (r *Router) hasLocalIdentityData(warehouse model.Warehouse) (exists bool) { func (r *Router) hasWarehouseData(ctx context.Context, warehouse model.Warehouse) (bool, error) { whManager, err := manager.New(r.destType, r.conf, r.logger, r.statsFactory) if err != nil { - panic(err) + r.logger.Errorn("[WH]: Failed to create warehouse manager", obskit.Error(err)) + return false, err } empty, err := whManager.IsEmpty(ctx, warehouse) @@ -297,7 +298,8 @@ func (r *Router) initPrePopulateDestIdentitiesUpload(warehouse model.Warehouse) marshalledSchema, err := jsonrs.Marshal(schema) if err != nil { - panic(err) + r.logger.Errorn("[WH]: Failed to marshal schema for identity upload", obskit.Error(err)) + return model.Upload{} } sqlStatement := fmt.Sprintf(`INSERT INTO %s ( @@ -334,7 +336,8 @@ func (r *Router) initPrePopulateDestIdentitiesUpload(warehouse model.Warehouse) var uploadID int64 err = row.Scan(&uploadID) if err != nil { - panic(err) + r.logger.Errorn("[WH]: Failed to scan upload ID", obskit.Error(err)) + return model.Upload{} } upload := model.Upload{ @@ -420,7 +423,8 @@ func (r *Router) populateHistoricIdentities(ctx context.Context, warehouse model whManager, err := manager.New(r.destType, r.conf, r.logger, r.statsFactory) if err != nil { - panic(err) + r.logger.Errorn("[WH]: Failed to create warehouse manager for identity population", obskit.Error(err)) + return } job := r.uploadJobFactory.NewUploadJob(ctx, &model.UploadJob{ diff --git a/warehouse/router/state.go b/warehouse/router/state.go index 22b8112625..5473fbb370 100644 --- a/warehouse/router/state.go +++ b/warehouse/router/state.go @@ -84,7 +84,7 @@ func init() { func inProgressState(currentState string) string { uploadState, ok := stateTransitions[currentState] if !ok { - panic(fmt.Errorf("invalid state: %s", currentState)) + return "" } return uploadState.inProgress }