From d30bdee6377b8aa65bf2027dd1f11ba25f0e51d1 Mon Sep 17 00:00:00 2001 From: Maxwell Brown Date: Tue, 23 Dec 2025 15:36:34 -0500 Subject: [PATCH 1/4] add stream method to arrow flight service --- packages/amp/src/ArrowFlight.ts | 262 +++++++++++++++++++++----------- packages/amp/src/Models.ts | 108 +++++++++++++ 2 files changed, 285 insertions(+), 85 deletions(-) diff --git a/packages/amp/src/ArrowFlight.ts b/packages/amp/src/ArrowFlight.ts index 18b5847..06f182f 100644 --- a/packages/amp/src/ArrowFlight.ts +++ b/packages/amp/src/ArrowFlight.ts @@ -9,21 +9,23 @@ import { type Transport as ConnectTransport } from "@connectrpc/connect" import * as Arr from "effect/Array" -import * as Console from "effect/Console" +import * as Cause from "effect/Cause" import * as Context from "effect/Context" import * as Effect from "effect/Effect" +import { identity } from "effect/Function" import * as Layer from "effect/Layer" import * as Option from "effect/Option" +import * as Predicate from "effect/Predicate" import * as Redacted from "effect/Redacted" import * as Schema from "effect/Schema" import * as Stream from "effect/Stream" import { Auth } from "./Auth.ts" -import { decodeDictionaryBatch, decodeRecordBatch, DictionaryRegistry } from "./internal/arrow-flight-ipc/Decoder.ts" +import { decodeRecordBatch, DictionaryRegistry } from "./internal/arrow-flight-ipc/Decoder.ts" import { recordBatchToJson } from "./internal/arrow-flight-ipc/Json.ts" -import { readColumnValues } from "./internal/arrow-flight-ipc/Readers.ts" -import { parseDictionaryBatch, parseRecordBatch } from "./internal/arrow-flight-ipc/RecordBatch.ts" +import { parseRecordBatch } from "./internal/arrow-flight-ipc/RecordBatch.ts" import { type ArrowSchema, getMessageType, MessageHeaderType, parseSchema } from "./internal/arrow-flight-ipc/Schema.ts" -import type { AuthInfo } from "./Models.ts" +import type { AuthInfo, BlockRange, RecordBatchMetadata } from "./Models.ts" +import { RecordBatchMetadataFromUint8Array } from "./Models.ts" import { FlightDescriptor_DescriptorType, FlightDescriptorSchema, FlightService } from "./Protobuf/Flight_pb.ts" import { CommandStatementQuerySchema } from "./Protobuf/FlightSql_pb.ts" @@ -201,6 +203,44 @@ export class ParseSchemaError extends Schema.TaggedError( cause: Schema.Defect }) {} +// ============================================================================= +// Types +// ============================================================================= + +/** + * Represents the result received from the `ArrowFlight` service when a query + * is successfully executed. + */ +export interface QueryResult { + readonly data: A + readonly metadata: RecordBatchMetadata +} + +/** + * Represents options that can be passed to `ArrowFlight.query` to control how + * the query is executed. + */ +export interface QueryOptions { + readonly schema?: Schema.Any | undefined + /** + * Sets the `stream` Amp query setting to `true`. + */ + readonly stream?: boolean | undefined + /** + * A set of block ranges which will be converted into a resume watermark + * header and sent with the query. This allows resumption of streaming queries. + */ + readonly resumeWatermark?: ReadonlyArray | undefined +} + +/** + * A utility type to extract the result type for a query. + */ +export type ExtractQueryResult = Options extends { + readonly schema: Schema.Schema +} ? QueryResult<_A> + : Record + // ============================================================================= // Arrow Flight Service // ============================================================================= @@ -209,16 +249,19 @@ export class ParseSchemaError extends Schema.TaggedError( /** * A service which can be used to execute queries against an Arrow Flight API. */ -export class ArrowFlight extends Context.Tag("@edgeandnode/amp/ArrowFlight") /** - * Executes an Arrow Flight SQL query and returns + * Executes an Arrow Flight SQL query and returns a stream of results. */ - readonly query: (sql: string) => Effect.Effect + readonly stream: ( + sql: string, + options?: Options + ) => Stream.Stream, ArrowFlightError> }>() {} const make = Effect.gen(function*() { @@ -226,95 +269,123 @@ const make = Effect.gen(function*() { const transport = yield* Transport const client = createClient(FlightService, transport) + const decodeRecordBatchMetadata = Schema.decode(RecordBatchMetadataFromUint8Array) + /** * Execute a SQL query and return a stream of rows. */ - const query = Effect.fn("ArrowFlight.request")(function*(query: string) { - // Setup the query context with authentication information, if available - const contextValues = createContextValues() - const authInfo = Option.isSome(auth) - ? yield* auth.value.getCachedAuthInfo - : Option.none() - if (Option.isSome(authInfo)) { - contextValues.set(AuthInfoContextKey, authInfo.value) - } + const stream = (query: string, options?: QueryOptions) => + Effect.gen(function*() { + const contextValues = createContextValues() + const authInfo = Option.isSome(auth) + ? yield* auth.value.getCachedAuthInfo + : Option.none() + + // Setup the query context with authentication information, if available + if (Option.isSome(authInfo)) { + contextValues.set(AuthInfoContextKey, authInfo.value) + } - const cmd = create(CommandStatementQuerySchema, { query }) - const any = anyPack(CommandStatementQuerySchema, cmd) - const desc = create(FlightDescriptorSchema, { - type: FlightDescriptor_DescriptorType.CMD, - cmd: toBinary(AnySchema, any) - }) - - const flightInfo = yield* Effect.tryPromise({ - try: (signal) => client.getFlightInfo(desc, { signal, contextValues }), - catch: (cause) => new RpcError({ cause, method: "getFlightInfo" }) - }) - - if (flightInfo.endpoint.length !== 1) { - return yield* flightInfo.endpoint.length <= 0 - ? new NoEndpointsError({ query }) - : new MultipleEndpointsError({ query }) - } + const cmd = create(CommandStatementQuerySchema, { query }) + const any = anyPack(CommandStatementQuerySchema, cmd) + const desc = create(FlightDescriptorSchema, { + type: FlightDescriptor_DescriptorType.CMD, + cmd: toBinary(AnySchema, any) + }) + + // Setup the query headers + const headers = new Headers() + if (Predicate.isNotUndefined(options?.stream)) { + headers.set("amp-stream", "true") + } + if (Predicate.isNotUndefined(options?.resumeWatermark)) { + headers.set("amp-resume", blockRangesToResumeWatermark(options.resumeWatermark)) + } - const { ticket } = flightInfo.endpoint[0]! + const flightInfo = yield* Effect.tryPromise({ + try: (signal) => client.getFlightInfo(desc, { contextValues, headers, signal }), + catch: (cause) => new RpcError({ cause, method: "getFlightInfo" }) + }) - if (ticket === undefined) { - return yield* new TicketNotFoundError({ query }) - } + if (flightInfo.endpoint.length !== 1) { + return yield* flightInfo.endpoint.length <= 0 + ? new NoEndpointsError({ query }) + : new MultipleEndpointsError({ query }) + } - const flightDataStream = Stream.unwrapScoped(Effect.gen(function*() { - const controller = yield* Effect.acquireRelease( - Effect.sync(() => new AbortController()), - (controller) => Effect.sync(() => controller.abort()) - ) - return Stream.fromAsyncIterable( - client.doGet(ticket, { signal: controller.signal, contextValues }), - (cause) => new RpcError({ cause, method: "doGet" }) - ) - })) - - let schema: ArrowSchema | undefined - const dictionaryRegistry = new DictionaryRegistry() - - // Convert FlightData stream to a stream of rows - return yield* flightDataStream.pipe( - Stream.runForEach(Effect.fnUntraced(function*(flightData) { - const messageType = yield* Effect.orDie(getMessageType(flightData)) - - switch (messageType) { - case MessageHeaderType.SCHEMA: { - schema = yield* parseSchema(flightData).pipe( - Effect.mapError((cause) => new ParseSchemaError({ cause })) - ) - break - } - case MessageHeaderType.DICTIONARY_BATCH: { - const dictionaryBatch = yield* parseDictionaryBatch(flightData).pipe( - Effect.mapError((cause) => new ParseDictionaryBatchError({ cause })) - ) - decodeDictionaryBatch(dictionaryBatch, flightData.dataBody, schema!, dictionaryRegistry, readColumnValues) - break - } - case MessageHeaderType.RECORD_BATCH: { - const recordBatch = yield* parseRecordBatch(flightData).pipe( - Effect.mapError((cause) => new ParseRecordBatchError({ cause })) - ) - const decodedRecordBatch = decodeRecordBatch(recordBatch, flightData.dataBody, schema!) - const json = recordBatchToJson(decodedRecordBatch, { dictionaryRegistry }) - yield* Console.dir(json, { depth: null, colors: true }) - break - } - } + const { ticket } = flightInfo.endpoint[0]! + + if (ticket === undefined) { + return yield* new TicketNotFoundError({ query }) + } - return yield* Effect.void + const flightDataStream = Stream.unwrapScoped(Effect.gen(function*() { + const controller = yield* Effect.acquireRelease( + Effect.sync(() => new AbortController()), + (controller) => Effect.sync(() => controller.abort()) + ) + return Stream.fromAsyncIterable( + client.doGet(ticket, { signal: controller.signal, contextValues }), + (cause) => new RpcError({ cause, method: "doGet" }) + ) })) - ) - }) + + let schema: ArrowSchema | undefined + const dictionaryRegistry = new DictionaryRegistry() + const dataSchema: Schema.Array$ = Schema.Array(options?.schema ?? Schema.Any as any) + const decodeRecordBatchData = Schema.decode(dataSchema) + + // Convert FlightData stream to a stream of rows + return flightDataStream.pipe( + Stream.mapEffect(Effect.fnUntraced(function*(flightData): Effect.fn.Return< + Option.Option>, + ArrowFlightError + > { + const messageType = yield* Effect.orDie(getMessageType(flightData)) + + switch (messageType) { + case MessageHeaderType.SCHEMA: { + schema = yield* parseSchema(flightData).pipe( + Effect.mapError((cause) => new ParseSchemaError({ cause })) + ) + return Option.none>() + } + case MessageHeaderType.DICTIONARY_BATCH: { + // TODO: figure out what to do (if anything) with dictionary batches + // const dictionaryBatch = yield* parseDictionaryBatch(flightData).pipe( + // Effect.mapError((cause) => new ParseDictionaryBatchError({ cause })) + // ) + // decodeDictionaryBatch(dictionaryBatch, flightData.dataBody, schema!, dictionaryRegistry, readColumnValues) + return Option.none>() + } + case MessageHeaderType.RECORD_BATCH: { + const metadata = yield* decodeRecordBatchMetadata(flightData.appMetadata).pipe( + Effect.mapError((cause) => new ParseRecordBatchError({ cause })) + ) + const recordBatch = yield* parseRecordBatch(flightData).pipe( + Effect.mapError((cause) => new ParseRecordBatchError({ cause })) + ) + const decodedRecordBatch = decodeRecordBatch(recordBatch, flightData.dataBody, schema!) + const json = recordBatchToJson(decodedRecordBatch, { dictionaryRegistry }) + const data = yield* decodeRecordBatchData(json).pipe( + Effect.mapError((cause) => new ParseRecordBatchError({ cause })) + ) + return Option.some({ data, metadata }) + } + } + + return yield* Effect.die(new Cause.RuntimeException(`Invalid message type received: ${messageType}`)) + })), + Stream.filterMap(identity) + ) + }).pipe( + Stream.unwrap, + Stream.withSpan("ArrowFlight.stream") + ) as any return { client, - query + stream } as const }) @@ -323,3 +394,24 @@ const make = Effect.gen(function*() { * service and depends upon some implementation of a `Transport`. */ export const layer: Layer.Layer = Layer.effect(ArrowFlight, make) + +// ============================================================================= +// Internal Utilities +// ============================================================================= + +/** + * Converts a list of block ranges into a resume watermark string. + * + * @param ranges - The block ranges to convert. + * @returns A resume watermark string. + */ +const blockRangesToResumeWatermark = (ranges: ReadonlyArray): string => { + const watermarks: Record = {} + for (const range of ranges) { + watermarks[range.network] = { + number: range.numbers.end, + hash: range.hash + } + } + return JSON.stringify(watermarks) +} diff --git a/packages/amp/src/Models.ts b/packages/amp/src/Models.ts index c69f1f1..e083b1c 100644 --- a/packages/amp/src/Models.ts +++ b/packages/amp/src/Models.ts @@ -1,3 +1,5 @@ +import * as Encoding from "effect/Encoding" +import * as ParseResult from "effect/ParseResult" import * as Schema from "effect/Schema" import { isAddress } from "viem" @@ -48,3 +50,109 @@ export const AuthInfo = Schema.Struct({ expiry: Schema.Int.pipe(Schema.positive(), Schema.optional) }).annotations({ identifier: "AuthInfo" }) export type AuthInfo = typeof AuthInfo.Type + +/** + * Represents a block number. + */ +export const BlockNumber = Schema.NonNegativeInt.pipe( + Schema.brand("Amp/Models/BlockNumber") +).annotations({ + identifier: "BlockNumber", + description: "A block number" +}) +export type BlockNumber = typeof BlockNumber.Type + +/** + * Represents a block hash. + */ +export const BlockHash = Schema.NonEmptyTrimmedString.pipe( + Schema.pattern(/^0x[a-z0-9]{64}/), + Schema.brand("Amp/Models/BlockHash") +).annotations({ identifier: "BlockHash" }) +export type BlockHash = typeof BlockHash.Type + +/** + * Represents a blockchain network. + */ +export const Network = Schema.Lowercase.pipe( + Schema.brand("Amp/Models/Network") +).annotations({ + title: "Network", + description: "a blockchain network", + examples: ["mainnet" as Network] +}) +export type Network = typeof Network.Type + +/** + * Represents a range of blocks from a given network. + */ +export const BlockRange = Schema.Struct({ + /** + * The name of the network from which the associated blocks were extracted. + */ + network: Network, + /** + * A start and end index representing the inclusive range of block numbers. + */ + numbers: Schema.Struct({ start: BlockNumber, end: BlockNumber }), + /** + * The hash associated with the end block. + */ + hash: BlockHash, + /** + * The hash associated with the parent of the start block, if present + */ + prevHash: Schema.optional(BlockHash).pipe( + Schema.fromKey("prev_hash") + ) +}).annotations({ + identifier: "BlockRange", + description: "A range of blocks on a given network" +}) +export type BlockRange = typeof BlockRange.Type + +/** + * Represents metadata carrying information about the block ranges covered by + * the associated Apache Arrow RecordBatch. + */ +export const RecordBatchMetadata = Schema.Struct({ + /** + * The block ranges included in the associated Apache Arrow RecordBatch. + */ + ranges: Schema.Array(BlockRange), + /** + * Indicates whether this is the final record batch associated to the ranges. + */ + rangesComplete: Schema.Boolean.pipe( + Schema.propertySignature, + Schema.fromKey("ranges_complete") + ) +}).annotations({ + identifier: "RecordBatchMetadata", + description: "Metadata carrying information about the block ranges covered by this record batch" +}) +export type RecordBatchMetadata = typeof RecordBatchMetadata.Type + +/** + * Represents the conversion of the binary `appMetadata` received from a + * `FlightData` response into metadata about the associated Arrow Flight + * RecordBatch. + */ +export const RecordBatchMetadataFromUint8Array = Schema.transformOrFail( + Schema.Uint8ArrayFromSelf, + Schema.parseJson(RecordBatchMetadata), + { + strict: true, + encode: (decoded, _, ast) => + ParseResult.try({ + try: () => new TextEncoder().encode(decoded), + catch: () => new ParseResult.Type(ast, decoded, "Failed to encode record batch metadata") + }), + decode: (encoded, _, ast) => + ParseResult.try({ + try: () => new TextDecoder().decode(encoded), + catch: () => new ParseResult.Type(ast, encoded, "Failed to encode record batch metadata") + }) + } +).pipe(Schema.asSchema) +export type RecordBatchMetadataFromUint8Array = typeof RecordBatchMetadataFromUint8Array.Type From 526d5a91c9aab9a6d6947e1a637a6a71190d598f Mon Sep 17 00:00:00 2001 From: Maxwell Brown Date: Tue, 23 Dec 2025 15:50:54 -0500 Subject: [PATCH 2/4] further restrict the schema shape if none provided --- packages/amp/src/ArrowFlight.ts | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/packages/amp/src/ArrowFlight.ts b/packages/amp/src/ArrowFlight.ts index 06f182f..c2495a1 100644 --- a/packages/amp/src/ArrowFlight.ts +++ b/packages/amp/src/ArrowFlight.ts @@ -332,7 +332,17 @@ const make = Effect.gen(function*() { let schema: ArrowSchema | undefined const dictionaryRegistry = new DictionaryRegistry() - const dataSchema: Schema.Array$ = Schema.Array(options?.schema ?? Schema.Any as any) + const dataSchema: Schema.Array$< + Schema.Record$< + typeof Schema.String, + typeof Schema.Unknown + > + > = Schema.Array( + options?.schema ?? Schema.Record({ + key: Schema.String, + value: Schema.Unknown + }) as any + ) const decodeRecordBatchData = Schema.decode(dataSchema) // Convert FlightData stream to a stream of rows From 7df5736abfea05ce0bc20eb1e30fe419e623703b Mon Sep 17 00:00:00 2001 From: Maxwell Brown Date: Tue, 23 Dec 2025 15:53:50 -0500 Subject: [PATCH 3/4] remove unused import --- packages/amp/src/Models.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/amp/src/Models.ts b/packages/amp/src/Models.ts index e083b1c..b81ce17 100644 --- a/packages/amp/src/Models.ts +++ b/packages/amp/src/Models.ts @@ -1,4 +1,3 @@ -import * as Encoding from "effect/Encoding" import * as ParseResult from "effect/ParseResult" import * as Schema from "effect/Schema" import { isAddress } from "viem" From 91ac01c1c072a32f7af16e57650fcd89650aef93 Mon Sep 17 00:00:00 2001 From: Maxwell Brown Date: Tue, 23 Dec 2025 16:11:43 -0500 Subject: [PATCH 4/4] rename to streamQuery --- packages/amp/src/ArrowFlight.ts | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/packages/amp/src/ArrowFlight.ts b/packages/amp/src/ArrowFlight.ts index c2495a1..cfb09ff 100644 --- a/packages/amp/src/ArrowFlight.ts +++ b/packages/amp/src/ArrowFlight.ts @@ -255,10 +255,18 @@ export class ArrowFlight extends Context.Tag("Amp/ArrowFlight") + /** + * Executes an Arrow Flight SQL query and returns a all results as an array. + */ + readonly query: ( + sql: string, + options?: Options + ) => Effect.Effect>, ArrowFlightError> + /** * Executes an Arrow Flight SQL query and returns a stream of results. */ - readonly stream: ( + readonly streamQuery: ( sql: string, options?: Options ) => Stream.Stream, ArrowFlightError> @@ -274,7 +282,7 @@ const make = Effect.gen(function*() { /** * Execute a SQL query and return a stream of rows. */ - const stream = (query: string, options?: QueryOptions) => + const streamQuery = (query: string, options?: QueryOptions) => Effect.gen(function*() { const contextValues = createContextValues() const authInfo = Option.isSome(auth) @@ -393,9 +401,17 @@ const make = Effect.gen(function*() { Stream.withSpan("ArrowFlight.stream") ) as any + const query = Effect.fn("ArrowFlight.query")( + function*(query: string, options?: QueryOptions) { + const chunk = yield* Stream.runCollect(streamQuery(query, options)) + return Array.from(chunk) + } + ) as any + return { client, - stream + query, + streamQuery } as const })