Skip to content

Commit 33a6b54

Browse files
authored
fix(kafkajs): handle kafka tombstone messages (#3985)
1 parent f99adac commit 33a6b54

File tree

4 files changed

+59
-7
lines changed

4 files changed

+59
-7
lines changed

CHANGELOG.asciidoc

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,23 @@ Notes:
3333
3434
See the <<upgrade-to-v4>> guide.
3535
36+
==== Unreleased
37+
38+
[float]
39+
===== Breaking changes
40+
41+
[float]
42+
===== Features
43+
44+
[float]
45+
===== Bug fixes
46+
47+
* Fix message handling for tombstone messages in `kafkajs` instrumentation.
48+
({pull}3985[#3985])
49+
50+
[float]
51+
===== Chores
52+
3653
[[release-notes-4.5.2]]
3754
==== 4.5.2 - 2024/04/12
3855

lib/instrumentation/modules/kafkajs.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ module.exports = function (mod, agent, { version, enabled }) {
138138
config.captureBody === 'all' ||
139139
config.captureBody === 'transactions'
140140
) {
141-
messageCtx.body = message.value.toString();
141+
messageCtx.body = message.value?.toString();
142142
}
143143

144144
if (message.headers && config.captureHeaders) {

test/instrumentation/modules/kafkajs/fixtures/use-kafkajs-each-message.js

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ async function useKafkajsClient(kafkaClient, options) {
6464
let eachMessagesConsumed = 0;
6565
await consumer.run({
6666
eachMessage: async function ({ message }) {
67-
log.info(`message received: ${message.value.toString()}`);
67+
log.info(`message received: ${message.value?.toString()}`);
6868
eachMessagesConsumed++;
6969
},
7070
});
@@ -76,9 +76,14 @@ async function useKafkajsClient(kafkaClient, options) {
7676
data = await producer.send({
7777
topic,
7878
messages: [
79-
{ value: 'each message 1', headers: { foo: 'string' } },
80-
{ value: 'each message 2', headers: { foo: Buffer.from('buffer') } },
81-
{ value: 'each message 3', headers: { auth: 'this_is_a_secret' } },
79+
{ value: 'each message 1', headers: { foo: 'foo 1' } },
80+
{ value: 'each message 2', headers: { foo: Buffer.from('foo 2') } },
81+
{
82+
value: 'each message 3',
83+
headers: { foo: 'foo 3', auth: 'this_is_a_secret' },
84+
},
85+
// https://github.com/elastic/apm-agent-nodejs/issues/3980
86+
{ value: null, headers: { foo: 'foo 4' } },
8287
],
8388
});
8489
log.info({ data }, 'messages sent');

test/instrumentation/modules/kafkajs/kafkajs.test.js

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ const testFixtures = [
172172
message: {
173173
queue: { name: kafkaTopic },
174174
headers: {
175-
foo: 'buffer',
175+
foo: 'foo 1',
176176
traceparent: `00-${tx.trace_id}-${parentId}-01`,
177177
tracestate: 'es=s:1',
178178
},
@@ -189,7 +189,7 @@ const testFixtures = [
189189
message: {
190190
queue: { name: kafkaTopic },
191191
headers: {
192-
foo: 'string',
192+
foo: 'foo 2',
193193
traceparent: `00-${tx.trace_id}-${parentId}-01`,
194194
tracestate: 'es=s:1',
195195
},
@@ -206,6 +206,7 @@ const testFixtures = [
206206
message: {
207207
queue: { name: kafkaTopic },
208208
headers: {
209+
foo: 'foo 3',
209210
auth: '[REDACTED]',
210211
traceparent: `00-${tx.trace_id}-${parentId}-01`,
211212
tracestate: 'es=s:1',
@@ -214,6 +215,23 @@ const testFixtures = [
214215
},
215216
outcome: 'success',
216217
});
218+
219+
t.deepEqual(transactions.shift(), {
220+
name: `Kafka RECEIVE from ${kafkaTopic}`,
221+
type: 'messaging',
222+
context: {
223+
service: {},
224+
message: {
225+
queue: { name: kafkaTopic },
226+
headers: {
227+
foo: 'foo 4',
228+
traceparent: `00-${tx.trace_id}-${parentId}-01`,
229+
tracestate: 'es=s:1',
230+
},
231+
},
232+
},
233+
outcome: 'success',
234+
});
217235
t.equal(transactions.length, 0, 'all transactions accounted for');
218236
},
219237
},
@@ -665,6 +683,18 @@ const testFixtures = [
665683
},
666684
outcome: 'success',
667685
});
686+
687+
t.deepEqual(transactions.shift(), {
688+
name: `Kafka RECEIVE from ${kafkaTopic}`,
689+
type: 'messaging',
690+
context: {
691+
service: {},
692+
message: {
693+
queue: { name: kafkaTopic },
694+
},
695+
},
696+
outcome: 'success',
697+
});
668698
t.equal(transactions.length, 0, 'all transactions accounted for');
669699
},
670700
},

0 commit comments

Comments
 (0)