Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/tidy-pans-fall.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@onflow/transport-http": patch
---

Gracefully handle empty heartbeat data when streaming events
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import {accountStatusesHandler} from "./account-statuses"

describe("accountStatusesHandler", () => {
it("should handle messages with account events", () => {
const onData = jest.fn()
const onError = jest.fn()

const subscriber = accountStatusesHandler.createSubscriber(
{accountAddresses: ["0x1234"]},
onData,
onError
)

subscriber.onData({
block_id: "block123",
height: "12345",
account_events: {
"0x1234": [
{
type: "flow.AccountKeyAdded",
transaction_id: "tx123",
transaction_index: "0",
event_index: "0",
payload: Buffer.from(JSON.stringify({value: "test"})).toString(
"base64"
),
},
],
},
message_index: "0",
})

expect(onData).toHaveBeenCalledTimes(1)
expect(onData).toHaveBeenCalledWith({
accountStatusEvent: {
accountAddress: "0x1234",
blockId: "block123",
blockHeight: 12345,
type: "flow.AccountKeyAdded",
transactionId: "tx123",
transactionIndex: 0,
eventIndex: 0,
payload: {value: "test"},
},
})
})

it("should handle heartbeat messages without account_events field", () => {
const onData = jest.fn()
const onError = jest.fn()

const subscriber = accountStatusesHandler.createSubscriber(
{accountAddresses: ["0x1234"], startBlockHeight: 100},
onData,
onError
)

const heartbeat: any = {
block_id: "block123",
height: "12345",
message_index: "0",
}

expect(() => subscriber.onData(heartbeat)).not.toThrow()
expect(onData).not.toHaveBeenCalled()
expect(subscriber.getConnectionArgs().start_block_height).toBe(12346)
})

it("should sort events by transaction and event index", () => {
const onData = jest.fn()

const subscriber = accountStatusesHandler.createSubscriber(
{accountAddresses: ["0x1234"]},
onData,
jest.fn()
)

subscriber.onData({
block_id: "block123",
height: "12345",
account_events: {
"0x1234": [
{
type: "flow.Event",
transaction_id: "tx1",
transaction_index: "1",
event_index: "0",
payload: Buffer.from(JSON.stringify({value: "second"})).toString(
"base64"
),
},
{
type: "flow.Event",
transaction_id: "tx1",
transaction_index: "0",
event_index: "0",
payload: Buffer.from(JSON.stringify({value: "first"})).toString(
"base64"
),
},
],
},
message_index: "0",
})

expect(onData).toHaveBeenCalledTimes(2)
expect(onData.mock.calls[0][0].accountStatusEvent.payload.value).toBe(
"first"
)
expect(onData.mock.calls[1][0].accountStatusEvent.payload.value).toBe(
"second"
)
})
})
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type AccountStatusesArgsDto = {
type AccountStatusesDataDto = {
block_id: string
height: string
account_events: {
account_events?: {
[address: string]: {
type: string
transaction_id: string
Expand All @@ -48,31 +48,37 @@ export const accountStatusesHandler = createSubscriptionHandler<{

return {
onData(rawData: AccountStatusesDataDto) {
const data: AccountStatusesData[] = []
for (const [address, events] of Object.entries(
rawData.account_events
)) {
for (const event of events) {
// Parse the raw data
const parsedData: AccountStatusesData = {
accountStatusEvent: {
accountAddress: address,
blockId: rawData.block_id,
blockHeight: Number(rawData.height),
type: event.type,
transactionId: event.transaction_id,
transactionIndex: Number(event.transaction_index),
eventIndex: Number(event.event_index),
payload: JSON.parse(
Buffer.from(event.payload, "base64").toString()
),
},
}
// The API may send messages without an account_events field when there are no events (heartbeat)
// Process events if they exist
if (rawData.account_events) {
const data: AccountStatusesData[] = []

// Collect all events from all accounts
for (const [address, events] of Object.entries(
rawData.account_events
)) {
for (const event of events) {
// Parse the raw data
const parsedData: AccountStatusesData = {
accountStatusEvent: {
accountAddress: address,
blockId: rawData.block_id,
blockHeight: Number(rawData.height),
type: event.type,
transactionId: event.transaction_id,
transactionIndex: Number(event.transaction_index),
eventIndex: Number(event.event_index),
payload: JSON.parse(
Buffer.from(event.payload, "base64").toString()
),
},
}

data.push(parsedData)
data.push(parsedData)
}
}

// Sort the messages by increasing message index
// Sort the messages by increasing transaction and event index
data.sort((a, b) => {
const txIndexDiff =
a.accountStatusEvent.transactionIndex -
Expand All @@ -84,17 +90,17 @@ export const accountStatusesHandler = createSubscriptionHandler<{
)
})

// Emit the messages
// Emit all the messages
for (const message of data) {
onData(message)
}
}

// Update the resume args
resumeArgs = {
...resumeArgs,
startBlockHeight: Number(BigInt(rawData.height) + BigInt(1)),
startBlockId: undefined,
}
// Always update resume args (for both heartbeat and event messages)
resumeArgs = {
...resumeArgs,
startBlockHeight: Number(BigInt(rawData.height) + BigInt(1)),
startBlockId: undefined,
}
},
onError(error: Error) {
Expand Down
66 changes: 66 additions & 0 deletions packages/transport-http/src/subscribe/handlers/events.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import {eventsHandler} from "./events"

describe("eventsHandler", () => {
it("should handle messages with events", () => {
const onData = jest.fn()
const onError = jest.fn()

const subscriber = eventsHandler.createSubscriber(
{eventTypes: ["A.Contract.Event"]},
onData,
onError
)

subscriber.onData({
block_id: "block123",
block_height: "12345",
block_timestamp: "2024-01-01T00:00:00Z",
events: [
{
type: "A.Contract.Event",
transaction_id: "tx123",
transaction_index: "0",
event_index: "0",
payload: Buffer.from(JSON.stringify({value: "test"})).toString(
"base64"
),
},
],
})

expect(onData).toHaveBeenCalledTimes(1)
expect(onData).toHaveBeenCalledWith({
event: {
blockId: "block123",
blockHeight: 12345,
blockTimestamp: "2024-01-01T00:00:00Z",
type: "A.Contract.Event",
transactionId: "tx123",
transactionIndex: 0,
eventIndex: 0,
payload: {value: "test"},
},
})
})

it("should handle heartbeat messages without events field", () => {
const onData = jest.fn()
const onError = jest.fn()

const subscriber = eventsHandler.createSubscriber(
{eventTypes: ["A.Contract.Event"], startBlockHeight: 100},
onData,
onError
)

const heartbeat: any = {
block_id: "block123",
block_height: "12345",
block_timestamp: "2024-01-01T00:00:00Z",
}

expect(() => subscriber.onData(heartbeat)).not.toThrow()
expect(onData).not.toHaveBeenCalled()
expect(subscriber.getConnectionArgs().start_block_height).toBe("12346")
})
})
42 changes: 23 additions & 19 deletions packages/transport-http/src/subscribe/handlers/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type EventsDataDto = {
block_id: string
block_height: string
block_timestamp: string
events: {
events?: {
type: string
transaction_id: string
transaction_index: string
Expand Down Expand Up @@ -59,24 +59,28 @@ export const eventsHandler = createSubscriptionHandler<{

return {
onData(rawData: EventsDataDto) {
for (const event of rawData.events) {
// Parse the raw data
const result: EventsData = {
event: {
blockId: rawData.block_id,
blockHeight: Number(rawData.block_height),
blockTimestamp: rawData.block_timestamp,
type: event.type,
transactionId: event.transaction_id,
transactionIndex: Number(event.transaction_index),
eventIndex: Number(event.event_index),
payload: JSON.parse(
Buffer.from(event.payload, "base64").toString()
),
},
}
// The API may send messages without an events field when there are no events in a block
// Handle this gracefully by checking if events exists and is an array
if (rawData.events && Array.isArray(rawData.events)) {
for (const event of rawData.events) {
// Parse the raw data
const result: EventsData = {
event: {
blockId: rawData.block_id,
blockHeight: Number(rawData.block_height),
blockTimestamp: rawData.block_timestamp,
type: event.type,
transactionId: event.transaction_id,
transactionIndex: Number(event.transaction_index),
eventIndex: Number(event.event_index),
payload: JSON.parse(
Buffer.from(event.payload, "base64").toString()
),
},
}

onData(result)
onData(result)
}
}

// Update the resume args
Expand All @@ -96,7 +100,7 @@ export const eventsHandler = createSubscriptionHandler<{
contracts: resumeArgs.contracts,
}

if ("startHeight" in resumeArgs && resumeArgs.startBlockHeight) {
if ("startBlockHeight" in resumeArgs && resumeArgs.startBlockHeight) {
return {
...encodedArgs,
start_block_height: String(resumeArgs.startBlockHeight),
Expand Down
Loading