-
Notifications
You must be signed in to change notification settings - Fork 1.8k
feat(cdp): segment destination mapping #31336
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR Summary
Here's my review of the changes for the Segment destination mapping functionality:
Added comprehensive Amplitude destination integration with extensive test coverage and utility functions for PostHog's CDP.
Key changes:
- Implemented core Amplitude actions (logEvent, logPurchase, identifyUser, groupIdentifyUser) with proper type definitions and field mappings
- Added robust user property handling with support for UTM tracking, referrer data, and user agent parsing
- Created regional endpoint support for both North America and Europe with configurable batch/single event processing
- Built timestamp conversion utilities and session ID formatting for proper event timing
- Introduced comprehensive test coverage across all major components with edge case handling
Note: The template.ts file needs significant revision as it contains deprecated code, hardcoded values and inadequate error handling.
34 file(s) reviewed, 45 comment(s)
Edit PR Review Bot Settings | Greptile
...er/src/cdp/legacy-plugins/_segmentDestinations/amplitude/__tests__/convert-timestamp.test.ts
Outdated
Show resolved
Hide resolved
...rc/cdp/legacy-plugins/_segmentDestinations/amplitude/__tests__/merge-user-properties.test.ts
Outdated
Show resolved
Hide resolved
plugin-server/src/cdp/legacy-plugins/_segmentDestinations/amplitude/__tests__/amplitude.test.ts
Outdated
Show resolved
Hide resolved
plugin-server/src/cdp/legacy-plugins/_segmentDestinations/amplitude/__tests__/amplitude.test.ts
Outdated
Show resolved
Hide resolved
plugin-server/src/cdp/legacy-plugins/_segmentDestinations/amplitude/__tests__/amplitude.test.ts
Outdated
Show resolved
Hide resolved
plugin-server/src/cdp/legacy-plugins/_segmentDestinations/amplitude/referrer.ts
Outdated
Show resolved
Hide resolved
plugin-server/src/cdp/legacy-plugins/_segmentDestinations/amplitude/properties.ts
Outdated
Show resolved
Hide resolved
plugin-server/src/cdp/legacy-plugins/_segmentDestinations/amplitude/regional-endpoints.ts
Outdated
Show resolved
Hide resolved
plugin-server/src/cdp/legacy-plugins/_segmentDestinations/amplitude/regional-endpoints.ts
Outdated
Show resolved
Hide resolved
plugin-server/src/cdp/legacy-plugins/_segmentDestinations/amplitude/utm.ts
Outdated
Show resolved
Hide resolved
Size Change: +10 B (0%) Total Size: 3.71 MB ℹ️ View Unchanged
|
plugin-server/src/cdp/services/legacy-plugin-executor.service.ts
Outdated
Show resolved
Hide resolved
default: field.type !== 'object' || typeof field.default !== 'undefined' && ('@path' in field.default) ? translateInputs(field.default) : Object.fromEntries(Object.entries(field.properties ?? {}).map(([key, _]) => { | ||
const defaultVal = field.default as Record<string, object> ?? {} | ||
return [key, translateInputs(defaultVal[key])] | ||
})), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default value logic has two potential issues:
-
The condition
typeof field.default !== 'undefined' && ('@path' in field.default)
may cause runtime errors iffield.default
is undefined, as the second part would attempt to check a property on an undefined value. -
The type assertion
field.default as Record<string, object>
could be unsafe whenfield.default
is undefined.
Consider using optional chaining and nullish coalescing to handle these cases more safely:
default: field.type !== 'object' || (field.default && '@path' in field.default)
? translateInputs(field.default)
: Object.fromEntries(Object.entries(field.properties ?? {}).map(([key, _]) => {
const defaultVal = field.default ? (field.default as Record<string, object>) : {};
return [key, translateInputs(defaultVal[key])];
})),
default: field.type !== 'object' || typeof field.default !== 'undefined' && ('@path' in field.default) ? translateInputs(field.default) : Object.fromEntries(Object.entries(field.properties ?? {}).map(([key, _]) => { | |
const defaultVal = field.default as Record<string, object> ?? {} | |
return [key, translateInputs(defaultVal[key])] | |
})), | |
default: field.type !== 'object' || (field.default && '@path' in field.default) ? translateInputs(field.default) : Object.fromEntries(Object.entries(field.properties ?? {}).map(([key, _]) => { | |
const defaultVal = field.default ? (field.default as Record<string, object>) : {} | |
return [key, translateInputs(defaultVal[key])] | |
})), |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
📸 UI snapshots have been updated1 snapshot changes in total. 0 added, 1 modified, 0 deleted:
Triggered by this commit. |
📸 UI snapshots have been updated1 snapshot changes in total. 0 added, 1 modified, 0 deleted:
Triggered by this commit. |
plugin-server/src/cdp/services/segment-destination-executor.service.ts
Outdated
Show resolved
Hide resolved
plugin-server/src/cdp/services/segment-destination-executor.service.ts
Outdated
Show resolved
Hide resolved
plugin-server/src/cdp/services/segment-destination-executor.service.ts
Outdated
Show resolved
Hide resolved
plugin-server/src/cdp/services/segment-destination-executor.service.ts
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all of this code is untested. please add tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added tests
|
||
public async processInvocations(invocations: HogFunctionInvocation[]): Promise<HogFunctionInvocationResult[]> { | ||
// Segment plugins fire fetch requests and so need to be run in true parallel | ||
return await Promise.all( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fail-fast or fail-safe?
If one plugin throws an error, Promise.all will reject immediately. Ask yourself:
Do you want all or nothing (fail-fast)? → Promise.all is fine.
Do you want to collect all results/errors? → Use Promise.allSettled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@meikelmosby not fully sure on this one - technically, we want to capture all results & errors from all invocations, but wouldn't the same apply to the plugin worker?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
going to keep as is for now - https://posthog.slack.com/archives/C06GG249PR6/p1747918574348349
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Jumping in - the case here is basically that the executor code should only throw if there is a genuine programming error. If so we essentially have no choice but to crash out so there isn't much of a difference between all and allSettled, as we have no course of action to partially handle a batch.
Generally it would be good if we could partially handle a batch, but thats not how kafka works so this is an issue with out architecture moreso than this specific code path
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
adding some tests would be good here as well..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the tests for the executor cover this one as well
frontend/src/scenes/pipeline/hogfunctions/HogFunctionConfiguration.tsx
Outdated
Show resolved
Hide resolved
|
||
public async processInvocations(invocations: HogFunctionInvocation[]): Promise<HogFunctionInvocationResult[]> { | ||
// Segment plugins fire fetch requests and so need to be run in true parallel | ||
return await Promise.all( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Jumping in - the case here is basically that the executor code should only throw if there is a genuine programming error. If so we essentially have no choice but to crash out so there isn't much of a difference between all and allSettled, as we have no course of action to partially handle a batch.
Generally it would be good if we could partially handle a batch, but thats not how kafka works so this is an issue with out architecture moreso than this specific code path
Want to get this merged with all destinations hidden for now. https://github.com/PostHog/posthog/blob/external-destinations/plugin-server/src/cdp/segment/segment-templates.ts#L321-L331 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is good to go. Still some follow up needed for sure but lets iterate
} | ||
}) | ||
|
||
expect(invocationResults[0].logs).toMatchInlineSnapshot(` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When you have really big snapshots like this we should just use toMatchSnapshot
- inline ones are good when there is a limited number of options. Now the file is so huge its virtually unreadable.
pluginExecutionDuration.observe(performance.now() - start) | ||
} catch (e) { | ||
if (e instanceof RetryError) { | ||
// NOTE: Schedule as a retry to cyclotron? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This still needs to be implemented?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be left out for this PR but needs to be done as follow up asap
Important
👉 Stay up-to-date with PostHog coding conventions for a smoother review.
Changes
PostHog / Segment view
PostHog
Segment
Todos:
ability to select an entire object (we can technically do this using aconvert to string and use{event.properties}
string field){event.properties}
for nowTODOs:
cant use config cachingcreated a newsegment
queue with a consumer that has the caching logic removedtypescript: fix fetchResponse return typewill use @ts-ignore for nowRefer to this PR for the followup status #32651
Follow-up:
Does this work well for both Cloud and self-hosted?
How did you test this code?