Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions processor/pipeline_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
obskit "github.com/rudderlabs/rudder-observability-kit/go/labels"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/processor/types"
"github.com/rudderlabs/rudder-server/rruntime"
"github.com/rudderlabs/rudder-server/utils/tracing"
Expand Down Expand Up @@ -210,7 +209,7 @@ func (w *pipelineWorker) start() {
ctx: subJob.ctx,
rsourcesStats: subJob.rsourcesStats,
dedupKeys: make(map[string]struct{}),
procErrorJobsByDestID: make(map[string][]*jobsdb.JobT),
procErrorJobsByDestID: make(map[string][]procErrorJob),
sourceDupStats: make(map[dupStatKey]int),
start: subJob.start,
}
Expand Down
17 changes: 12 additions & 5 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2519,7 +2519,7 @@ func (proc *Handle) destinationTransformStage(partition string, in *userTransfor
defer proc.stats.statDtransformStageCount(partition).Count(len(in.statusList))
}

procErrorJobsByDestID := make(map[string][]*jobsdb.JobT)
procErrorJobsByDestID := make(map[string][]procErrorJob)
var batchDestJobs []*jobsdb.JobT
var destJobs []*jobsdb.JobT
var droppedJobs []*jobsdb.JobT
Expand Down Expand Up @@ -2593,6 +2593,13 @@ func (proc *Handle) destinationTransformStage(partition string, in *userTransfor
}
}

// procErrorJob wraps a jobsdb.JobT with the original parsed events,
// avoiding an expensive marshal/unmarshal round-trip through EventPayload.
type procErrorJob struct {
*jobsdb.JobT
events []types.SingularEventT
}

type storeMessage struct {
ctx context.Context
trackedUsersReports []*trackedusers.UsersReport
Expand All @@ -2601,7 +2608,7 @@ type storeMessage struct {
batchDestJobs []*jobsdb.JobT
droppedJobs []*jobsdb.JobT

procErrorJobsByDestID map[string][]*jobsdb.JobT
procErrorJobsByDestID map[string][]procErrorJob
routerDestIDs []string

reportMetrics []*reportingtypes.PUReportedMetric
Expand Down Expand Up @@ -2943,7 +2950,7 @@ func (proc *Handle) userTransformAndFilter(ctx context.Context, partition, srcAn
commonMetaData := eventList[0].Metadata.CommonMetadata()

reportMetrics := make([]*reportingtypes.PUReportedMetric, 0)
procErrorJobsByDestID := make(map[string][]*jobsdb.JobT)
procErrorJobsByDestID := make(map[string][]procErrorJob)
droppedJobs := make([]*jobsdb.JobT, 0)

proc.config.configSubscriberLock.RLock()
Expand Down Expand Up @@ -3140,7 +3147,7 @@ func (proc *Handle) userTransformAndFilter(ctx context.Context, partition, srcAn
nonSuccessMetrics := proc.getNonSuccessfulMetrics(response, eventList, commonMetaData, eventsByMessageID, inPU, reportingtypes.USER_TRANSFORMER)
droppedJobs = append(droppedJobs, append(proc.getDroppedJobs(response, eventList), append(nonSuccessMetrics.failedJobs, nonSuccessMetrics.filteredJobs...)...)...)
if _, ok := procErrorJobsByDestID[destID]; !ok {
procErrorJobsByDestID[destID] = make([]*jobsdb.JobT, 0)
procErrorJobsByDestID[destID] = make([]procErrorJob, 0)
}
procErrorJobsByDestID[destID] = append(procErrorJobsByDestID[destID], nonSuccessMetrics.failedJobs...)
userTransformationStat.numOutputSuccessEvents.Count(len(eventsToTransform))
Expand Down Expand Up @@ -3231,7 +3238,7 @@ func (proc *Handle) userTransformAndFilter(ctx context.Context, partition, srcAn
nonSuccessMetrics := proc.getNonSuccessfulMetrics(response, eventList, commonMetaData, eventsByMessageID, inPU, reportingtypes.EVENT_FILTER)
droppedJobs = append(droppedJobs, append(proc.getDroppedJobs(response, eventsToTransform), append(nonSuccessMetrics.failedJobs, nonSuccessMetrics.filteredJobs...)...)...)
if _, ok := procErrorJobsByDestID[destID]; !ok {
procErrorJobsByDestID[destID] = make([]*jobsdb.JobT, 0)
procErrorJobsByDestID[destID] = make([]procErrorJob, 0)
}
procErrorJobsByDestID[destID] = append(procErrorJobsByDestID[destID], nonSuccessMetrics.failedJobs...)
eventsToTransform, successMetrics, successCountMap, successCountMetadataMap = proc.getTransformerEvents(response, commonMetaData, eventsByMessageID, destination, connection, inPU, reportingtypes.EVENT_FILTER)
Expand Down
12 changes: 8 additions & 4 deletions warehouse/router/identities.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ func (r *Router) hasLocalIdentityData(warehouse model.Warehouse) (exists bool) {
func (r *Router) hasWarehouseData(ctx context.Context, warehouse model.Warehouse) (bool, error) {
whManager, err := manager.New(r.destType, r.conf, r.logger, r.statsFactory)
if err != nil {
panic(err)
r.logger.Errorn("[WH]: Failed to create warehouse manager", obskit.Error(err))
return false, err
}

empty, err := whManager.IsEmpty(ctx, warehouse)
Expand Down Expand Up @@ -297,7 +298,8 @@ func (r *Router) initPrePopulateDestIdentitiesUpload(warehouse model.Warehouse)

marshalledSchema, err := jsonrs.Marshal(schema)
if err != nil {
panic(err)
r.logger.Errorn("[WH]: Failed to marshal schema for identity upload", obskit.Error(err))
return model.Upload{}
}

sqlStatement := fmt.Sprintf(`INSERT INTO %s (
Expand Down Expand Up @@ -334,7 +336,8 @@ func (r *Router) initPrePopulateDestIdentitiesUpload(warehouse model.Warehouse)
var uploadID int64
err = row.Scan(&uploadID)
if err != nil {
panic(err)
r.logger.Errorn("[WH]: Failed to scan upload ID", obskit.Error(err))
return model.Upload{}
}

upload := model.Upload{
Expand Down Expand Up @@ -420,7 +423,8 @@ func (r *Router) populateHistoricIdentities(ctx context.Context, warehouse model

whManager, err := manager.New(r.destType, r.conf, r.logger, r.statsFactory)
if err != nil {
panic(err)
r.logger.Errorn("[WH]: Failed to create warehouse manager for identity population", obskit.Error(err))
return
}

job := r.uploadJobFactory.NewUploadJob(ctx, &model.UploadJob{
Expand Down
2 changes: 1 addition & 1 deletion warehouse/router/state.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package router

import (
"fmt"

Check failure on line 4 in warehouse/router/state.go

View workflow job for this annotation

GitHub Actions / Integration (enterprise)

"fmt" imported and not used

Check failure on line 4 in warehouse/router/state.go

View workflow job for this annotation

GitHub Actions / lint

"fmt" imported and not used (typecheck)

Check failure on line 4 in warehouse/router/state.go

View workflow job for this annotation

GitHub Actions / Unit

"fmt" imported and not used

"github.com/rudderlabs/rudder-server/warehouse/internal/model"
)
Expand Down Expand Up @@ -84,7 +84,7 @@
func inProgressState(currentState string) string {
uploadState, ok := stateTransitions[currentState]
if !ok {
panic(fmt.Errorf("invalid state: %s", currentState))
return ""
}
return uploadState.inProgress
}
Expand Down
Loading