diff --git a/frontend/package.json b/frontend/package.json index 791320f217..f06f60e730 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -65,6 +65,7 @@ "@rivet-gg/icons": "workspace:*", "@rivetkit/engine-api-full": "workspace:*", "@rivetkit/example-registry": "workspace:^", + "@rivetkit/traces": "workspace:*", "@sentry/react": "^8.55.0", "@sentry/vite-plugin": "^2.23.1", "@shikijs/langs": "^3.12.2", diff --git a/frontend/src/components/actors/actor-clear-events-log-button.tsx b/frontend/src/components/actors/actor-clear-events-log-button.tsx deleted file mode 100644 index b5b72f6ddc..0000000000 --- a/frontend/src/components/actors/actor-clear-events-log-button.tsx +++ /dev/null @@ -1,30 +0,0 @@ -import { faBroomWide, Icon } from "@rivet-gg/icons"; -import { useMutation } from "@tanstack/react-query"; -import { Button } from "../ui/button"; -import { WithTooltip } from "../ui/tooltip"; -import { useActorInspector } from "./actor-inspector-context"; -import type { ActorId } from "./queries"; - -export function ActorClearEventsLogButton({ actorId }: { actorId: ActorId }) { - const { mutate, isPending } = useMutation( - useActorInspector().actorClearEventsMutationOptions(actorId), - ); - - return ( - { - mutate(); - }} - > - - - } - /> - ); -} diff --git a/frontend/src/components/actors/actor-events-list.tsx b/frontend/src/components/actors/actor-events-list.tsx deleted file mode 100644 index 959c831d0f..0000000000 --- a/frontend/src/components/actors/actor-events-list.tsx +++ /dev/null @@ -1,245 +0,0 @@ -import { - faHammer, - faLink, - faMegaphone, - faTowerBroadcast, - Icon, -} from "@rivet-gg/icons"; -import { useQuery } from "@tanstack/react-query"; -import { format } from "date-fns"; -import { type PropsWithChildren, useEffect, useRef } from "react"; -import { match, P } from "ts-pattern"; -import { Badge } from "../ui/badge"; -import { - type TransformedInspectorEvent, - useActorInspector, -} from "./actor-inspector-context"; -import { ActorObjectInspector } from "./console/actor-inspector"; -import type { ActorId } from "./queries"; - -interface ActorEventsListProps { - actorId: ActorId; - search: string; - filter: string[]; -} - -export function ActorEventsList({ - actorId, - search, - filter, -}: ActorEventsListProps) { - const actorInspector = useActorInspector(); - const { data, isLoading, isError } = useQuery( - actorInspector.actorEventsQueryOptions(actorId), - ); - - if (isLoading) { - return Loading events...; - } - - if (isError) { - return ( - - Realtime Events Preview is currently unavailable. -
- See console/logs for more details. -
- ); - } - - const filteredEvents = data?.filter?.((event) => { - const constraints = []; - - if ("name" in event.body.val) { - constraints.push( - event.body.val.name - .toLowerCase() - .includes(search.toLowerCase()), - ); - } - if ("eventName" in event.body.val) { - constraints.push( - event.body.val.eventName - .toLowerCase() - .includes(search.toLowerCase()), - ); - } - if (filter.length > 0) { - const type = event.body.tag; - constraints.push(filter.includes(type)); - } - return constraints.every(Boolean); - }); - - if (filteredEvents?.length === 0) { - return No events found.; - } - - return filteredEvents?.map((event) => { - return ; - }); -} - -function Event(props: TransformedInspectorEvent) { - const ref = useRef(null); - - useEffect(() => { - if (ref.current && props.timestamp.getTime() > Date.now() - 1000) { - ref.current.animate( - [ - { backgroundColor: "transparent" }, - { backgroundColor: "hsl(var(--primary) / 15%)" }, - { backgroundColor: "transparent" }, - ], - { - duration: 1000, - fill: "forwards", - easing: "ease-in-out", - }, - ); - } - }, [props.timestamp]); - - return match(props.body) - .with({ tag: "ActionEvent" }, (body) => { - return ( - -
- {props.timestamp - ? format( - props.timestamp, - "LLL dd HH:mm:ss", - ).toUpperCase() - : null} -
-
- {body.val.connId.split("-")[0]} -
-
- - - Action - -
-
{body.val.name}
-
- -
-
- ); - }) - .with( - { tag: P.union("SubscribeEvent", "UnSubscribeEvent") }, - (body) => { - return ( - -
- {props.timestamp - ? format( - props.timestamp, - "LLL dd HH:mm:ss", - ).toUpperCase() - : null} -
-
- {body.val.connId.split("-")[0]} -
-
- - - {body.tag === "SubscribeEvent" - ? "Subscribe" - : "Unsubscribe"} - -
-
- {body.val.eventName} -
-
- - ); - }, - ) - .with({ tag: "BroadcastEvent" }, (body) => { - return ( - -
- {props.timestamp - ? format( - props.timestamp, - "LLL dd HH:mm:ss", - ).toUpperCase() - : null} -
-
-
- - - Broadcast - -
-
- {body.val.eventName} -
-
- -
- - ); - }) - .with({ tag: "FiredEvent" }, (body) => { - return ( - -
- {props.timestamp - ? format( - props.timestamp, - "LLL dd HH:mm:ss", - ).toUpperCase() - : null} -
-
- {body.val.connId.split("-")[0]} -
-
- - - Send - -
-
- {body.val.eventName} -
-
- -
-
- ); - }) - .exhaustive(); -} - -function EventContainer({ - ref, - children, -}: { - ref: React.RefObject; - children: React.ReactNode; -}) { - return ( -
- {children} -
- ); -} - -function Info({ children }: PropsWithChildren) { - return ( -
- {children} -
- ); -} diff --git a/frontend/src/components/actors/actor-events-tab.tsx b/frontend/src/components/actors/actor-events-tab.tsx deleted file mode 100644 index 6aa70d1400..0000000000 --- a/frontend/src/components/actors/actor-events-tab.tsx +++ /dev/null @@ -1,52 +0,0 @@ -import { faSpinnerThird, Icon } from "@rivet-gg/icons"; -import { useQuery } from "@tanstack/react-query"; -import { ActorEvents } from "./actor-events"; -import { useActorInspector } from "./actor-inspector-context"; -import { Info } from "./actor-state-tab"; -import { useDataProvider } from "./data-provider"; -import type { ActorId } from "./queries"; - -interface ActorEventsTabProps { - actorId: ActorId; -} - -export function ActorEventsTab({ actorId }: ActorEventsTabProps) { - const { data: destroyedAt } = useQuery( - useDataProvider().actorDestroyedAtQueryOptions(actorId), - ); - - const { isError, isLoading } = useQuery( - useActorInspector().actorEventsQueryOptions(actorId), - ); - - if (destroyedAt) { - return ( -
- State Preview is unavailable for inactive Actors. -
- ); - } - - if (isLoading) { - return ( - -
- - Loading Events... -
-
- ); - } - - if (isError) { - return ( - - Database Studio is currently unavailable. -
- See console/logs for more details. -
- ); - } - - return ; -} diff --git a/frontend/src/components/actors/actor-events.tsx b/frontend/src/components/actors/actor-events.tsx deleted file mode 100644 index 671641d5f8..0000000000 --- a/frontend/src/components/actors/actor-events.tsx +++ /dev/null @@ -1,213 +0,0 @@ -import { faPause, faPlay, Icon } from "@rivet-gg/icons"; -import { useQuery } from "@tanstack/react-query"; -import { - startTransition, - useCallback, - useEffect, - useRef, - useState, -} from "react"; -import { useResizeObserver } from "usehooks-ts"; -import { - Button, - LiveBadge, - LogsView, - PauseBadge, - ScrollArea, - ToggleGroup, - ToggleGroupItem, - WithTooltip, -} from "@/components"; -import { ActorClearEventsLogButton } from "./actor-clear-events-log-button"; -import { useActorDetailsSettings } from "./actor-details-settings"; -import { ActorDetailsSettingsButton } from "./actor-details-settings-button"; -import { ActorEventsList } from "./actor-events-list"; -import { - type TransformedInspectorEvent, - useActorInspector, -} from "./actor-inspector-context"; -import type { ActorId } from "./queries"; - -export type EventsTypeFilter = TransformedInspectorEvent["body"]["tag"]; - -interface ActorEventsProps { - actorId: ActorId; -} - -export function ActorEvents({ actorId }: ActorEventsProps) { - const [search, setSearch] = useState(""); - const [logsFilter, setLogsFilter] = useState([ - "ActionEvent", - "SubscribeEvent", - "BroadcastEvent", - "FiredEvent", - ]); - - const ref = useRef(null); - const [settings] = useActorDetailsSettings(); - - const actorQueries = useActorInspector(); - const { data } = useQuery(actorQueries.actorEventsQueryOptions(actorId)); - const { onScroll } = useScrollToBottom(ref, [data]); - - return ( -
-
-
-
- - startTransition(() => setSearch(e.target.value)) - } - /> -
- { - setLogsFilter(value as EventsTypeFilter[]); - }} - className="gap-0 text-xs p-2 border-r" - > - - Action - - - Subscription - - - Broadcast - - - Send - - -
- - - -
- -
-
-
-
-
- -
-
-
- Timestamp -
-
Connection
-
Event
-
Name
-
Data
-
- - -
-
-
-
- ); -} - -ActorEvents.Skeleton = () => { - return ( -
- -
- ); -}; - -function useScrollToBottom( - ref: React.RefObject, - deps: unknown[], -) { - const [settings] = useActorDetailsSettings(); - const [follow, setFollow] = useState(true); - const shouldFollow = () => settings.autoFollowLogs && follow; - const shouldScanForNew = useRef(false); - useResizeObserver({ - // @ts-expect-error -- TS2322 -- Type 'HTMLDivElement' is not assignable to type 'Element | null'. - ref, - onResize: () => { - if (shouldFollow()) { - // https://github.com/TanStack/virtual/issues/537 - requestAnimationFrame(() => { - ref.current?.scrollTo({ - top: ref.current.scrollHeight, - behavior: "instant", - }); - }); - } - }, - }); - - const onScroll = useCallback((e: React.UIEvent) => { - if (shouldScanForNew.current) { - return; - } - setFollow( - e.currentTarget.scrollHeight - e.currentTarget.scrollTop <= - e.currentTarget.clientHeight, - ); - }, []); - - useEffect( - () => { - if (!shouldFollow()) { - return () => {}; - } - shouldScanForNew.current = true; - // https://github.com/TanStack/virtual/issues/537 - const rafId = requestAnimationFrame(() => { - ref.current?.scrollTo({ - top: ref.current.scrollHeight, - behavior: "instant", - }); - shouldScanForNew.current = false; - }); - - return () => { - cancelAnimationFrame(rafId); - shouldScanForNew.current = false; - }; - }, - // biome-ignore lint/correctness/useExhaustiveDependencies: deps is passed from caller - deps, - ); - - return { onScroll }; -} diff --git a/frontend/src/components/actors/actor-inspector-context.tsx b/frontend/src/components/actors/actor-inspector-context.tsx index 5949a12f23..6a6e9e3363 100644 --- a/frontend/src/components/actors/actor-inspector-context.tsx +++ b/frontend/src/components/actors/actor-inspector-context.tsx @@ -10,11 +10,13 @@ import { createContext, useContext, useMemo, useRef } from "react"; import type ReconnectingWebSocket from "reconnectingwebsocket"; import { type Connection, - type Event, type ToServer, TO_CLIENT_VERSIONED as toClient, TO_SERVER_VERSIONED as toServer, + type QueueStatus, } from "rivetkit/inspector"; +import type { ReadRangeOptions, ReadRangeWire } from "@rivetkit/traces"; +import { decodeReadRangeWire } from "@rivetkit/traces/reader"; import { toast } from "sonner"; import { match } from "ts-pattern"; import z from "zod"; @@ -30,25 +32,140 @@ export const actorInspectorQueriesKeys = { ["actor", actorId, "connections"] as const, actorDatabase: (actorId: ActorId) => ["actor", actorId, "database"] as const, - actorEvents: (actorId: ActorId) => ["actor", actorId, "events"] as const, actorRpcs: (actorId: ActorId) => ["actor", actorId, "rpcs"] as const, - actorClearEvents: (actorId: ActorId) => - ["actor", actorId, "clear-events"] as const, + actorTraces: (actorId: ActorId) => ["actor", actorId, "traces"] as const, + actorQueueStatus: (actorId: ActorId, limit: number) => + ["actor", actorId, "queue", limit] as const, + actorQueueSize: (actorId: ActorId) => + ["actor", actorId, "queue", "size"] as const, actorWakeUp: (actorId: ActorId) => ["actor", actorId, "wake-up"] as const, }; +type QueueStatusSummary = { + size: number; + maxSize: number; + truncated: boolean; + messages: Array<{ + id: string; + name: string; + createdAtMs: number; + }>; +}; + interface ActorInspectorApi { ping: () => Promise; executeAction: (name: string, args: unknown[]) => Promise; patchState: (state: unknown) => Promise; getConnections: () => Promise; - getEvents: () => Promise; getState: () => Promise<{ isEnabled: boolean; state: unknown }>; getRpcs: () => Promise; - clearEvents: () => Promise; + getTraces: (options: ReadRangeOptions) => Promise; + getQueueStatus: (limit: number) => Promise; getMetadata: () => Promise<{ version: string }>; } +type FeatureSupport = { + supported: boolean; + minVersion: string; + currentVersion?: string; + message: string; +}; + +const MIN_RIVETKIT_VERSION_TRACES = "2.0.40"; +const MIN_RIVETKIT_VERSION_QUEUE = "2.0.40"; +const INSPECTOR_ERROR_EVENTS_DROPPED = "inspector.events_dropped"; + +function parseSemver(version?: string) { + if (!version) { + return null; + } + const match = version.match(/^(\d+)\.(\d+)\.(\d+)/); + if (!match) { + return null; + } + return { + major: Number(match[1]), + minor: Number(match[2]), + patch: Number(match[3]), + }; +} + +function compareSemver( + a: { major: number; minor: number; patch: number }, + b: { major: number; minor: number; patch: number }, +) { + if (a.major !== b.major) { + return a.major - b.major; + } + if (a.minor !== b.minor) { + return a.minor - b.minor; + } + return a.patch - b.patch; +} + +function isVersionAtLeast(version: string | undefined, minVersion: string) { + const parsed = parseSemver(version); + const minParsed = parseSemver(minVersion); + if (!parsed || !minParsed) { + return false; + } + return compareSemver(parsed, minParsed) >= 0; +} + +function buildFeatureSupport( + currentVersion: string | undefined, + minVersion: string, + label: string, +): FeatureSupport { + const supported = isVersionAtLeast(currentVersion, minVersion); + if (!currentVersion) { + return { + supported: false, + minVersion, + currentVersion, + message: `${label} requires RivetKit ${minVersion}+. Please upgrade.`, + }; + } + return { + supported, + minVersion, + currentVersion, + message: supported + ? "" + : `${label} requires RivetKit ${minVersion}+ (current ${currentVersion}). Please upgrade.`, + }; +} + +function getInspectorProtocolVersion(version: string | undefined) { + const parsed = parseSemver(version); + if (!parsed) { + return 2; + } + if (isVersionAtLeast(version, MIN_RIVETKIT_VERSION_QUEUE)) { + return 4; + } + if (isVersionAtLeast(version, MIN_RIVETKIT_VERSION_TRACES)) { + return 3; + } + if (parsed.major >= 2) { + return 2; + } + return 1; +} + +function normalizeQueueStatus(status: QueueStatus): QueueStatusSummary { + return { + size: Number(status.size), + maxSize: Number(status.maxSize), + truncated: status.truncated, + messages: status.messages.map((message) => ({ + id: message.id.toString(), + name: message.name, + createdAtMs: Number(message.createdAtMs), + })), + }; +} + export const createDefaultActorInspectorContext = ({ api, }: { @@ -139,34 +256,46 @@ export const createDefaultActorInspectorContext = ({ }); }, - actorEventsQueryOptions(actorId: ActorId) { + actorRpcsQueryOptions(actorId: ActorId) { return queryOptions({ staleTime: Infinity, - queryKey: actorInspectorQueriesKeys.actorEvents(actorId), + queryKey: actorInspectorQueriesKeys.actorRpcs(actorId), queryFn: () => { - return api.getEvents(); + return api.getRpcs(); }, }); }, - actorRpcsQueryOptions(actorId: ActorId) { + actorTracesQueryOptions(actorId: ActorId, options: ReadRangeOptions) { return queryOptions({ - staleTime: Infinity, - queryKey: actorInspectorQueriesKeys.actorRpcs(actorId), + staleTime: 0, + queryKey: [ + ...actorInspectorQueriesKeys.actorTraces(actorId), + options.startMs, + options.endMs, + options.limit, + ], queryFn: () => { - return api.getRpcs(); + return api.getTraces(options); }, }); }, - - actorClearEventsMutationOptions(actorId: ActorId) { - return mutationOptions({ - mutationKey: ["actor", actorId, "clear-events"], - mutationFn: async () => { - return api.clearEvents(); + actorQueueStatusQueryOptions(actorId: ActorId, limit: number) { + return queryOptions({ + staleTime: 0, + queryKey: actorInspectorQueriesKeys.actorQueueStatus(actorId, limit), + queryFn: () => { + return api.getQueueStatus(limit); }, }); }, + actorQueueSizeQueryOptions(actorId: ActorId) { + return queryOptions({ + staleTime: Infinity, + queryKey: actorInspectorQueriesKeys.actorQueueSize(actorId), + queryFn: () => 0, + }); + }, actorPingQueryOptions(actorId: ActorId) { return queryOptions({ @@ -298,7 +427,16 @@ export const actorMetadataQueryOptions = ({ export type ActorInspectorContext = ReturnType< typeof createDefaultActorInspectorContext -> & { connectionStatus: ConnectionStatus; isInspectorAvailable: boolean }; +> & { + connectionStatus: ConnectionStatus; + isInspectorAvailable: boolean; + rivetkitVersion?: string; + inspectorProtocolVersion: number; + features: { + traces: FeatureSupport; + queue: FeatureSupport; + }; +}; const ActorInspectorContext = createContext({} as ActorInspectorContext); @@ -332,13 +470,33 @@ export const ActorInspectorProvider = ({ const actionsManager = useRef(new ActionsManager()); - const { isSuccess: isActorMetadataSuccess } = useQuery({ + const { data: actorMetadata, isSuccess: isActorMetadataSuccess } = useQuery({ ...actorMetadataQueryOptions({ actorId, credentials }), }); const { isSuccess: isActorDataSuccess } = useActorInspectorData(actorId); const isInspectorAvailable = isActorMetadataSuccess && isActorDataSuccess; + const rivetkitVersion = actorMetadata?.version; + const inspectorProtocolVersion = useMemo( + () => getInspectorProtocolVersion(rivetkitVersion), + [rivetkitVersion], + ); + const features = useMemo( + () => ({ + traces: buildFeatureSupport( + rivetkitVersion, + MIN_RIVETKIT_VERSION_TRACES, + "Traces", + ), + queue: buildFeatureSupport( + rivetkitVersion, + MIN_RIVETKIT_VERSION_QUEUE, + "Queue", + ), + }), + [rivetkitVersion], + ); const onMessage = useMemo(() => { return createMessageHandler({ queryClient, actorId, actionsManager }); @@ -373,7 +531,7 @@ export const ActorInspectorProvider = ({ args: new Uint8Array(cbor.encode(args)).buffer, }, }, - }), + }, inspectorProtocolVersion), ); return promise; @@ -389,7 +547,7 @@ export const ActorInspectorProvider = ({ .buffer, }, }, - }), + }, inspectorProtocolVersion), ); }, @@ -403,72 +561,84 @@ export const ActorInspectorProvider = ({ tag: "ConnectionsRequest", val: { id: BigInt(id) }, }, - }), + }, inspectorProtocolVersion), ); return promise; }, - getEvents: async () => { - const { id, promise } = - actionsManager.current.createResolver< - TransformedInspectorEvent[] - >(); + getState: async () => { + const { id, promise } = actionsManager.current.createResolver<{ + isEnabled: boolean; + state: unknown; + }>(); sendMessage( serverMessage({ body: { - tag: "EventsRequest", + tag: "StateRequest", val: { id: BigInt(id) }, }, - }), + }, inspectorProtocolVersion), ); return promise; }, - getState: async () => { - const { id, promise } = actionsManager.current.createResolver<{ - isEnabled: boolean; - state: unknown; - }>(); + getRpcs() { + const { id, promise } = + actionsManager.current.createResolver(); sendMessage( serverMessage({ body: { - tag: "StateRequest", + tag: "RpcsListRequest", val: { id: BigInt(id) }, }, - }), + }, inspectorProtocolVersion), ); return promise; }, + getTraces: async ({ startMs, endMs, limit }) => { + const { id, promise } = + actionsManager.current.createResolver({ + timeoutMs: 10_000, + }); - clearEvents: async () => { - const { id, promise } = actionsManager.current.createResolver(); sendMessage( serverMessage({ body: { - tag: "ClearEventsRequest", - val: { id: BigInt(id) }, + tag: "TraceQueryRequest", + val: { + id: BigInt(id), + startMs: BigInt(Math.floor(startMs)), + endMs: BigInt(Math.floor(endMs)), + limit: BigInt(limit), + }, }, - }), + }, inspectorProtocolVersion), ); + return promise; }, - - getRpcs() { + getQueueStatus: async (limit) => { + const safeLimit = Math.max(0, Math.floor(limit)); const { id, promise } = - actionsManager.current.createResolver(); + actionsManager.current.createResolver({ + timeoutMs: 10_000, + }); sendMessage( serverMessage({ body: { - tag: "RpcsListRequest", - val: { id: BigInt(id) }, + tag: "QueueRequest", + val: { + id: BigInt(id), + limit: BigInt(safeLimit), + }, }, - }), + }, inspectorProtocolVersion), ); return promise; @@ -478,17 +648,27 @@ export const ActorInspectorProvider = ({ return getActorMetadataProxy.current(); }, } satisfies ActorInspectorApi; - }, [sendMessage, reconnect]); + }, [sendMessage, reconnect, inspectorProtocolVersion]); const value = useMemo(() => { return { connectionStatus: status, isInspectorAvailable, + rivetkitVersion, + inspectorProtocolVersion, + features, ...createDefaultActorInspectorContext({ api, }), }; - }, [api, status, isInspectorAvailable]); + }, [ + api, + status, + isInspectorAvailable, + rivetkitVersion, + inspectorProtocolVersion, + features, + ]); return ( @@ -508,9 +688,17 @@ const createMessageHandler = actionsManager: React.RefObject; }) => async (e: ReconnectingWebSocket.MessageEvent) => { - const message = toClient.deserializeWithEmbeddedVersion( - new Uint8Array(await e.data.arrayBuffer()), - ); + let message: ReturnType< + typeof toClient.deserializeWithEmbeddedVersion + >; + try { + message = toClient.deserializeWithEmbeddedVersion( + new Uint8Array(await e.data.arrayBuffer()), + ); + } catch (error) { + console.warn("Failed to decode inspector message", error); + return; + } match(message.body) .with({ tag: "Init" }, (body) => { @@ -529,11 +717,6 @@ const createMessageHandler = transformConnections(body.val.connections), ); - queryClient.setQueryData( - actorInspectorQueriesKeys.actorEvents(actorId), - transformEvents(body.val.events), - ); - queryClient.setQueryData( actorInspectorQueriesKeys.actorIsStateEnabled(actorId), body.val.isStateEnabled, @@ -546,13 +729,6 @@ const createMessageHandler = transformConnections(body.val.connections), ); }) - .with({ tag: "EventsResponse" }, (body) => { - const { rid } = body.val; - actionsManager.current.resolve( - Number(rid), - transformEvents(body.val.events), - ); - }) .with({ tag: "StateResponse" }, (body) => { const { rid } = body.val; actionsManager.current.resolve( @@ -586,80 +762,42 @@ const createMessageHandler = { isEnabled: true, state: transformState(body.val.state) }, ); }) - .with({ tag: "EventsUpdated" }, (body) => { - queryClient.setQueryData( - actorInspectorQueriesKeys.actorEvents(actorId), - transformEvents(body.val.events), - ); - }) .with({ tag: "RpcsListResponse" }, (body) => { const { rid } = body.val; actionsManager.current.resolve(Number(rid), body.val.rpcs); }) + .with({ tag: "TraceQueryResponse" }, (body) => { + const { rid } = body.val; + actionsManager.current.resolve( + Number(rid), + decodeReadRangeWire(new Uint8Array(body.val.payload)), + ); + }) + .with({ tag: "QueueResponse" }, (body) => { + const { rid, status } = body.val; + actionsManager.current.resolve( + Number(rid), + normalizeQueueStatus(status), + ); + }) + .with({ tag: "QueueUpdated" }, (body) => { + queryClient.setQueryData( + actorInspectorQueriesKeys.actorQueueSize(actorId), + Number(body.val.queueSize), + ); + queryClient.invalidateQueries({ + queryKey: ["actor", actorId, "queue"], + }); + }) .with({ tag: "Error" }, (body) => { + if (body.val.message === INSPECTOR_ERROR_EVENTS_DROPPED) { + return; + } toast.error(`Inspector error: ${body.val.message}`); }) .exhaustive(); }; -function transformEvents(events: readonly Event[]) { - return events.map((event) => { - const base = { - ...event, - timestamp: new Date(Number(event.timestamp)), - }; - - return match(event.body) - .with({ tag: "FiredEvent" }, (body) => ({ - ...base, - body: { - ...body, - val: { - ...body.val, - args: cbor.decode(new Uint8Array(body.val.args)), - }, - }, - })) - .with({ tag: "ActionEvent" }, (body) => ({ - ...base, - body: { - ...body, - val: { - ...body.val, - args: cbor.decode(new Uint8Array(body.val.args)), - }, - }, - })) - .with({ tag: "BroadcastEvent" }, (body) => ({ - ...base, - body: { - ...body, - val: { - ...body.val, - args: cbor.decode(new Uint8Array(body.val.args)), - }, - }, - })) - .with({ tag: "SubscribeEvent" }, (body) => ({ - ...base, - body: { - ...body, - }, - })) - .with({ tag: "UnSubscribeEvent" }, (body) => ({ - ...base, - body: { - ...body, - }, - })) - .exhaustive(); - }); -} - -export type TransformedInspectorEvent = ReturnType< - typeof transformEvents ->[number]; - function transformConnections(connections: readonly Connection[]) { return connections.map((connection) => ({ ...connection, @@ -671,8 +809,8 @@ function transformState(state: ArrayBuffer) { return cbor.decode(new Uint8Array(state)); } -function serverMessage(data: ToServer) { - return toServer.serializeWithEmbeddedVersion(data, 1); +function serverMessage(data: ToServer, version: number) { + return toServer.serializeWithEmbeddedVersion(data, version); } class ActionsManager { @@ -680,10 +818,13 @@ class ActionsManager { private nextId = 1; - createResolver(): { id: number; promise: Promise } { + createResolver(options?: { + timeoutMs?: number; + }): { id: number; promise: Promise } { const id = this.nextId++; const { promise, resolve, reject } = Promise.withResolvers(); this.suspensions.set(id, { promise, resolve, reject }); + const timeoutMs = options?.timeoutMs ?? 2_000; // set a timeout to reject the promise if not resolved in time setTimeout(() => { @@ -691,7 +832,7 @@ class ActionsManager { reject(new Error("Action timed out")); this.suspensions.delete(id); } - }, 2_000); + }, timeoutMs); return { id, promise }; } diff --git a/frontend/src/components/actors/actor-queue-tab.tsx b/frontend/src/components/actors/actor-queue-tab.tsx new file mode 100644 index 0000000000..fd70fc312e --- /dev/null +++ b/frontend/src/components/actors/actor-queue-tab.tsx @@ -0,0 +1,31 @@ +import { useQuery } from "@tanstack/react-query"; +import { Info } from "./actor-state-tab"; +import { useActorInspector } from "./actor-inspector-context"; +import { useDataProvider } from "./data-provider"; +import { ActorQueue } from "./actor-queue"; +import type { ActorId } from "./queries"; + +interface ActorQueueTabProps { + actorId: ActorId; +} + +export function ActorQueueTab({ actorId }: ActorQueueTabProps) { + const inspector = useActorInspector(); + const { data: destroyedAt } = useQuery( + useDataProvider().actorDestroyedAtQueryOptions(actorId), + ); + + if (destroyedAt) { + return ( + + Queue data is unavailable for inactive Actors. + + ); + } + + if (!inspector.features.queue.supported) { + return {inspector.features.queue.message}; + } + + return ; +} diff --git a/frontend/src/components/actors/actor-queue.tsx b/frontend/src/components/actors/actor-queue.tsx new file mode 100644 index 0000000000..6d8b87147b --- /dev/null +++ b/frontend/src/components/actors/actor-queue.tsx @@ -0,0 +1,88 @@ +import { faSpinnerThird, Icon } from "@rivet-gg/icons"; +import { useQuery } from "@tanstack/react-query"; +import { format } from "date-fns"; +import { LiveBadge, ScrollArea } from "@/components"; +import { useActorInspector } from "./actor-inspector-context"; +import type { ActorId } from "./queries"; + +const DEFAULT_QUEUE_LIMIT = 200; + +export function ActorQueue({ actorId }: { actorId: ActorId }) { + const inspector = useActorInspector(); + const queueStatusQuery = useQuery({ + ...inspector.actorQueueStatusQueryOptions(actorId, DEFAULT_QUEUE_LIMIT), + enabled: + inspector.isInspectorAvailable && + inspector.features.queue.supported, + refetchOnWindowFocus: false, + }); + const queueSizeQuery = useQuery( + inspector.actorQueueSizeQueryOptions(actorId), + ); + + if (queueStatusQuery.isLoading) { + return ( +
+ + Loading queue... +
+ ); + } + + if (queueStatusQuery.isError || !queueStatusQuery.data) { + return ( +
+ Queue data is currently unavailable. +
+ ); + } + + const status = queueStatusQuery.data; + const size = + Number.isFinite(status.size) ? status.size : queueSizeQuery.data ?? 0; + + return ( + +
+ +
+ Queue size {size} / {status.maxSize} +
+
+
+ {status.messages.length === 0 ? ( +
+ Queue is empty. +
+ ) : ( + status.messages.map((message) => ( +
+
+
+ {message.name} +
+
+ {format( + new Date(message.createdAtMs), + "p", + )} +
+
+
+ ID {message.id} +
+
+ )) + )} + {status.truncated ? ( +
+ Showing the first {DEFAULT_QUEUE_LIMIT} messages. +
+ ) : null} +
+
+ ); +} diff --git a/frontend/src/components/actors/actor-traces-tab.tsx b/frontend/src/components/actors/actor-traces-tab.tsx new file mode 100644 index 0000000000..6a63f21c9e --- /dev/null +++ b/frontend/src/components/actors/actor-traces-tab.tsx @@ -0,0 +1,31 @@ +import { useQuery } from "@tanstack/react-query"; +import { Info } from "./actor-state-tab"; +import { useDataProvider } from "./data-provider"; +import type { ActorId } from "./queries"; +import { ActorTraces } from "./actor-traces"; +import { useActorInspector } from "./actor-inspector-context"; + +interface ActorTracesTabProps { + actorId: ActorId; +} + +export function ActorTracesTab({ actorId }: ActorTracesTabProps) { + const inspector = useActorInspector(); + const { data: destroyedAt } = useQuery( + useDataProvider().actorDestroyedAtQueryOptions(actorId), + ); + + if (destroyedAt) { + return ( + + Traces are unavailable for inactive Actors. + + ); + } + + if (!inspector.features.traces.supported) { + return {inspector.features.traces.message}; + } + + return ; +} diff --git a/frontend/src/components/actors/actor-traces.tsx b/frontend/src/components/actors/actor-traces.tsx new file mode 100644 index 0000000000..4ad108b85e --- /dev/null +++ b/frontend/src/components/actors/actor-traces.tsx @@ -0,0 +1,609 @@ +import { faChevronDown, faSpinnerThird, Icon } from "@rivet-gg/icons"; +import { useQuery } from "@tanstack/react-query"; +import { format } from "date-fns"; +import { + useMemo, + useState, + type ReactElement, +} from "react"; +import type { DateRange } from "../datepicker"; +import { RangeDatePicker } from "../datepicker"; +import { Button } from "../ui/button"; +import { + Select, + SelectContent, + SelectItem, + SelectTrigger, + SelectValue, +} from "../ui/select"; +import { cn } from "../lib/utils"; +import { ActorObjectInspector } from "./console/actor-inspector"; +import { useActorInspector } from "./actor-inspector-context"; +import type { ActorId } from "./queries"; +import { readRangeWireToOtlp } from "@rivetkit/traces/reader"; +import type { + OtlpAnyValue, + OtlpExportTraceServiceRequestJson, + OtlpKeyValue, + OtlpSpan, + OtlpSpanEvent, +} from "@rivetkit/traces"; + +const PRESET_OPTIONS = [ + { label: "5 min", ms: 5 * 60 * 1000 }, + { label: "15 min", ms: 15 * 60 * 1000 }, + { label: "30 min", ms: 30 * 60 * 1000 }, + { label: "1 hour", ms: 60 * 60 * 1000 }, + { label: "3 hours", ms: 3 * 60 * 60 * 1000 }, + { label: "6 hours", ms: 6 * 60 * 60 * 1000 }, + { label: "12 hours", ms: 12 * 60 * 60 * 1000 }, + { label: "24 hours", ms: 24 * 60 * 60 * 1000 }, + { label: "2 days", ms: 2 * 24 * 60 * 60 * 1000 }, + { label: "7 days", ms: 7 * 24 * 60 * 60 * 1000 }, + { label: "14 days", ms: 14 * 24 * 60 * 60 * 1000 }, +]; + +const DEFAULT_PRESET_MS = 30 * 60 * 1000; +const GAP_THRESHOLD_MS = 500; +const DEFAULT_LIMIT = 1000; + +type SpanNode = { + span: OtlpSpan; + startNs: bigint; + endNs: bigint | null; + children: SpanNode[]; + events: OtlpSpanEvent[]; +}; + +type TraceItem = + | { type: "span"; node: SpanNode; timeNs: bigint } + | { type: "event"; event: OtlpSpanEvent; timeNs: bigint }; + +export function ActorTraces({ actorId }: { actorId: ActorId }) { + const inspector = useActorInspector(); + const [isLive, setIsLive] = useState(true); + const [presetMs, setPresetMs] = useState(DEFAULT_PRESET_MS); + const [customRange, setCustomRange] = useState(() => { + const now = Date.now(); + return { + from: new Date(now - DEFAULT_PRESET_MS), + to: new Date(now), + }; + }); + + const query = useQuery({ + queryKey: [ + "actor", + actorId, + "traces", + isLive, + presetMs, + customRange?.from?.getTime(), + customRange?.to?.getTime(), + DEFAULT_LIMIT, + ], + queryFn: async () => { + const now = Date.now(); + const rangeStartMs = isLive + ? now - presetMs + : customRange?.from?.getTime() ?? now - presetMs; + const rangeEndMs = isLive + ? now + : customRange?.to?.getTime() ?? now; + const startMs = Math.min(rangeStartMs, rangeEndMs); + const endMs = Math.max(rangeStartMs, rangeEndMs); + return inspector.api.getTraces({ + startMs, + endMs, + limit: DEFAULT_LIMIT, + }); + }, + enabled: inspector.isInspectorAvailable && inspector.features.traces.supported, + refetchInterval: isLive ? 1000 : false, + staleTime: 0, + }); + + const queryResult = useMemo(() => { + if (!query.data) { + return null; + } + return readRangeWireToOtlp(query.data); + }, [query.data]); + + const traceTree = useMemo(() => { + if (!queryResult) { + return []; + } + const spans = extractSpans(queryResult.otlp); + return buildSpanTree(spans); + }, [queryResult]); + + const nowMs = Date.now(); + const nowNs = BigInt(nowMs) * 1_000_000n; + const liveRange = { + from: new Date(nowMs - presetMs), + to: new Date(nowMs), + }; + const displayRange = isLive ? liveRange : customRange; + + const onPresetChange = (value: string) => { + const ms = Number(value); + if (Number.isNaN(ms)) { + return; + } + setPresetMs(ms); + if (!isLive) { + const now = Date.now(); + setCustomRange({ + from: new Date(now - ms), + to: new Date(now), + }); + } + }; + + if (query.isLoading) { + return ( +
+ + Loading traces... +
+ ); + } + + if (query.isError) { + return ( +
+ Traces are currently unavailable. +
+ ); + } + + return ( +
+
+ + +
+ { + setCustomRange(range); + setIsLive(false); + }} + /> +
+
+ {displayRange?.from && displayRange?.to + ? `${format(displayRange.from, "PPpp")} → ${format( + displayRange.to, + "PPpp", + )}` + : "Select a time range"} +
+
+ +
+ {traceTree.length === 0 ? ( +
+ No traces found for this time range. +
+ ) : ( + renderItemsWithGaps( + traceTree.map((node) => ({ + type: "span" as const, + node, + timeNs: node.startNs, + })), + 0, + nowNs, + ) + )} + {queryResult?.clamped ? ( +
+ Results truncated at {DEFAULT_LIMIT} spans. +
+ ) : null} +
+
+ ); +} + +function renderItemsWithGaps( + items: TraceItem[], + depth: number, + nowNs: bigint, +): ReactElement[] { + const sorted = [...items].sort((a, b) => + a.timeNs < b.timeNs ? -1 : a.timeNs > b.timeNs ? 1 : 0, + ); + const nodes: ReactElement[] = []; + for (let i = 0; i < sorted.length; i++) { + const item = sorted[i]; + if (i > 0) { + const prev = sorted[i - 1]; + const gapMs = nsToMs(item.timeNs - prev.timeNs); + if (gapMs > GAP_THRESHOLD_MS) { + nodes.push( + , + ); + } + } + if (item.type === "span") { + nodes.push( + , + ); + } else { + nodes.push( + , + ); + } + } + return nodes; +} + +function TraceSpanItem({ + node, + depth, + nowNs, +}: { + node: SpanNode; + depth: number; + nowNs: bigint; +}) { + const [isOpen, setIsOpen] = useState(false); + const startMs = nsToMs(node.startNs); + const endNs = node.endNs ?? nowNs; + const durationMs = Math.max(0, nsToMs(endNs - node.startNs)); + const subeventCount = node.children.length + node.events.length; + const subeventLabel = subeventCount === 1 ? "subevent" : "subevents"; + const isActive = node.endNs == null; + const items = useMemo(() => { + const result: TraceItem[] = []; + for (const child of node.children) { + result.push({ type: "span", node: child, timeNs: child.startNs }); + } + for (const event of node.events) { + result.push({ + type: "event", + event, + timeNs: BigInt(event.timeUnixNano), + }); + } + return result; + }, [node]); + + const details = buildSpanDetails(node.span); + + return ( +
0 && "ml-4", + )} + > + + {isOpen ? ( +
+ {details ? ( +
+
+ Span details +
+ +
+ ) : null} + {items.length === 0 ? ( +
+ No subevents. +
+ ) : ( +
+ {renderItemsWithGaps(items, depth + 1, nowNs)} +
+ )} +
+ ) : null} +
+ ); +} + +function TraceEventRow({ + event, + depth, +}: { + event: OtlpSpanEvent; + depth: number; +}) { + const eventMs = nsToMs(BigInt(event.timeUnixNano)); + const attributes = otlpAttributesToObject(event.attributes); + return ( +
0 && "ml-4")}> +
+
+ {event.name || "Event"} +
+
+ {format(new Date(eventMs), "p")} +
+
+ {attributes ? ( +
+ +
+ ) : null} +
+ ); +} + +function GapMarker({ ms, depth }: { ms: number; depth: number }) { + return ( +
0 && "ml-4", + )} + > +
+ {formatGap(ms)} +
+
+ ); +} + +function extractSpans( + otlp: OtlpExportTraceServiceRequestJson, +): OtlpSpan[] { + const spans: OtlpSpan[] = []; + for (const resource of otlp.resourceSpans ?? []) { + for (const scope of resource.scopeSpans ?? []) { + spans.push(...(scope.spans ?? [])); + } + } + return spans; +} + +function buildSpanTree(spans: OtlpSpan[]): SpanNode[] { + const byId = new Map(); + for (const span of spans) { + byId.set(span.spanId, { + span, + startNs: BigInt(span.startTimeUnixNano), + endNs: span.endTimeUnixNano ? BigInt(span.endTimeUnixNano) : null, + children: [], + events: span.events ?? [], + }); + } + + const roots: SpanNode[] = []; + for (const node of byId.values()) { + const parentId = node.span.parentSpanId; + if (parentId && byId.has(parentId)) { + byId.get(parentId)?.children.push(node); + } else { + roots.push(node); + } + } + + for (const node of byId.values()) { + node.children.sort((a, b) => + a.startNs < b.startNs ? -1 : a.startNs > b.startNs ? 1 : 0, + ); + node.events.sort((a, b) => + BigInt(a.timeUnixNano) < BigInt(b.timeUnixNano) + ? -1 + : BigInt(a.timeUnixNano) > BigInt(b.timeUnixNano) + ? 1 + : 0, + ); + } + + roots.sort((a, b) => (a.startNs < b.startNs ? -1 : 1)); + return roots; +} + +function buildSpanDetails(span: OtlpSpan): Record | null { + const attributes = otlpAttributesToObject(span.attributes); + const links = span.links?.map((link) => ({ + traceId: link.traceId, + spanId: link.spanId, + traceState: link.traceState, + attributes: otlpAttributesToObject(link.attributes), + droppedAttributesCount: link.droppedAttributesCount, + })); + const details: Record = {}; + if (attributes && Object.keys(attributes).length > 0) { + details.attributes = attributes; + } + if (span.status) { + details.status = span.status; + } + if (links && links.length > 0) { + details.links = links; + } + if (span.traceState) { + details.traceState = span.traceState; + } + if (span.flags !== undefined) { + details.flags = span.flags; + } + return Object.keys(details).length > 0 ? details : null; +} + +function otlpAttributesToObject( + attributes?: OtlpKeyValue[], +): Record | null { + if (!attributes || attributes.length === 0) { + return null; + } + const out: Record = {}; + for (const entry of attributes) { + if (!entry.key) { + continue; + } + out[entry.key] = otlpAnyValueToJs(entry.value); + } + return out; +} + +function otlpAnyValueToJs(value?: OtlpAnyValue): unknown { + if (!value) { + return null; + } + if (value.stringValue !== undefined) { + return value.stringValue; + } + if (value.boolValue !== undefined) { + return value.boolValue; + } + if (value.intValue !== undefined) { + return value.intValue; + } + if (value.doubleValue !== undefined) { + return value.doubleValue; + } + if (value.bytesValue !== undefined) { + return value.bytesValue; + } + if (value.arrayValue?.values) { + return value.arrayValue.values.map((item) => + otlpAnyValueToJs(item), + ); + } + if (value.kvlistValue?.values) { + const obj: Record = {}; + for (const entry of value.kvlistValue.values) { + obj[entry.key] = otlpAnyValueToJs(entry.value); + } + return obj; + } + return null; +} + +function nsToMs(ns: bigint): number { + return Number(ns / 1_000_000n); +} + +function formatDuration(ms: number): string { + if (ms < 1000) { + return `${Math.round(ms)}ms`; + } + const seconds = ms / 1000; + if (seconds < 60) { + return `${seconds < 10 ? seconds.toFixed(1) : Math.round(seconds)}s`; + } + const minutes = Math.floor(seconds / 60); + const remSeconds = Math.round(seconds % 60); + if (minutes < 60) { + return `${minutes}m ${remSeconds}s`; + } + const hours = Math.floor(minutes / 60); + const remMinutes = minutes % 60; + if (hours < 24) { + return `${hours}h ${remMinutes}m`; + } + const days = Math.floor(hours / 24); + const remHours = hours % 24; + return `${days}d ${remHours}h`; +} + +function formatGap(ms: number): string { + const seconds = ms / 1000; + if (seconds < 60) { + const value = + seconds < 10 ? seconds.toFixed(1) : Math.round(seconds).toString(); + return `${value} ${Number(value) === 1 ? "second" : "seconds"}`; + } + const minutes = Math.floor(seconds / 60); + if (minutes < 60) { + return `${minutes} ${minutes === 1 ? "minute" : "minutes"}`; + } + const hours = Math.floor(minutes / 60); + if (hours < 24) { + return `${hours} ${hours === 1 ? "hour" : "hours"}`; + } + const days = Math.floor(hours / 24); + return `${days} ${days === 1 ? "day" : "days"}`; +} diff --git a/frontend/src/components/actors/actors-actor-details.tsx b/frontend/src/components/actors/actors-actor-details.tsx index 3e447dc3e6..13eb7ad3ea 100644 --- a/frontend/src/components/actors/actors-actor-details.tsx +++ b/frontend/src/components/actors/actors-actor-details.tsx @@ -13,11 +13,12 @@ import { ActorConfigTab } from "./actor-config-tab"; import { ActorConnectionsTab } from "./actor-connections-tab"; import { ActorDatabaseTab } from "./actor-db-tab"; import { ActorDetailsSettingsProvider } from "./actor-details-settings"; -import { ActorEventsTab } from "./actor-events-tab"; import { ActorLogsTab } from "./actor-logs-tab"; +import { ActorQueueTab } from "./actor-queue-tab"; import { ActorStateTab } from "./actor-state-tab"; import { QueriedActorStatus } from "./actor-status"; import { ActorStopButton } from "./actor-stop-button"; +import { ActorTracesTab } from "./actor-traces-tab"; import { useActorsView } from "./actors-view-context-provider"; import { ActorConsole } from "./console/actor-console"; import { @@ -100,7 +101,8 @@ export function ActorTabs({ className?: string; children?: ReactNode; }) { - const value = disabled ? undefined : tab || "state"; + const normalizedTab = tab === "events" ? "traces" : tab; + const value = disabled ? undefined : normalizedTab || "state"; const guardContent = useInspectorGuard(); @@ -132,10 +134,18 @@ export function ActorTabs({ - Events + Queue + + + + Traces {/* - - {guardContent || } + + {guardContent || } + + + + {guardContent || } + state: optional + isStateEnabled: bool + rpcs: list + isDatabaseEnabled: bool + queueSize: uint +} + +type ConnectionsResponse struct { + rid: uint + connections: list +} + +type StateResponse struct { + rid: uint + state: optional + isStateEnabled: bool +} + +type ActionResponse struct { + rid: uint + output: data +} + +type TraceQueryResponse struct { + rid: uint + payload: data +} + +type QueueMessageSummary struct { + id: uint + name: str + createdAtMs: uint +} + +type QueueStatus struct { + size: uint + maxSize: uint + messages: list + truncated: bool +} + +type QueueResponse struct { + rid: uint + status: QueueStatus +} + +type StateUpdated struct { + state: State +} + +type QueueUpdated struct { + queueSize: uint +} + +type RpcsListResponse struct { + rid: uint + rpcs: list +} + +type ConnectionsUpdated struct { + connections: list +} + +type Error struct { + message: str +} + +type ToClientBody union { + StateResponse | + ConnectionsResponse | + ActionResponse | + ConnectionsUpdated | + QueueUpdated | + StateUpdated | + RpcsListResponse | + TraceQueryResponse | + QueueResponse | + Error | + Init +} + +type ToClient struct { + body: ToClientBody +} diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts b/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts index b04526f138..79b04e52f7 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts @@ -918,7 +918,7 @@ export class ActorInstance { #emitLogEvent(level: string, args: unknown[]) { const span = this.#traces.getCurrentSpan(); - if (!span) { + if (!span || !span.isActive()) { return; } diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/instance/queue-manager.ts b/rivetkit-typescript/packages/rivetkit/src/actor/instance/queue-manager.ts index 8ca8517bcb..f82da25570 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/instance/queue-manager.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/instance/queue-manager.ts @@ -94,6 +94,7 @@ export class QueueManager { }); await this.#rebuildMetadata(); } + this.#actor.inspector.updateQueueSize(this.#metadata.size); } /** Adds a message to the queue with the given name and body. */ diff --git a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/mod.ts b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/mod.ts index bee76d9168..214d6423e2 100644 --- a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/mod.ts @@ -10,7 +10,6 @@ import { RegistryConfig, RegistryConfigSchema, } from "@/registry/config"; -import { getPort } from "@/test/mod"; import { logger } from "./log"; import { runActionFeaturesTests } from "./tests/action-features"; import { runActorConnTests } from "./tests/actor-conn"; @@ -240,17 +239,24 @@ export async function createTestRuntime( // TODO: I think this whole function is fucked, we should probably switch to calling registry.serve() directly // Start server - const port = await getPort(); const server = honoServe({ fetch: router.fetch, hostname: "127.0.0.1", - port, + port: 0, }); + if (!server.listening) { + await new Promise((resolve) => { + server.once("listening", () => resolve()); + }); + } invariant( nodeWebSocket.injectWebSocket !== undefined, "should have injectWebSocket", ); nodeWebSocket.injectWebSocket(server); + const address = server.address(); + invariant(address && typeof address !== "string", "missing server address"); + const port = address.port; const serverEndpoint = `http://127.0.0.1:${port}`; logger().info({ msg: "test serer listening", port }); diff --git a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/raw-websocket.ts b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/raw-websocket.ts index ad0354a9e4..c18d1d37ba 100644 --- a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/raw-websocket.ts +++ b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/raw-websocket.ts @@ -478,7 +478,7 @@ export function runRawWebSocketTests(driverTestConfig: DriverTestConfig) { // Test WebSocket with ONLY query parameters on the base path // This tests the case where path is "/websocket?foo=bar" without trailing slash - const ws = await actor.websocket("?token=secret&session=123"); + const ws = await actor.webSocket("?token=secret&session=123"); await new Promise((resolve, reject) => { ws.addEventListener("open", () => resolve(), { once: true }); diff --git a/rivetkit-typescript/packages/rivetkit/src/inspector/actor-inspector.ts b/rivetkit-typescript/packages/rivetkit/src/inspector/actor-inspector.ts index 6969306e0a..d2d542846a 100644 --- a/rivetkit-typescript/packages/rivetkit/src/inspector/actor-inspector.ts +++ b/rivetkit-typescript/packages/rivetkit/src/inspector/actor-inspector.ts @@ -38,6 +38,26 @@ export class ActorInspector { return this.#lastQueueSize; } + async getQueueStatus(limit: number): Promise { + const maxSize = this.actor.config.options.maxQueueSize; + const safeLimit = Math.max(0, Math.floor(limit)); + const messages = await this.actor.queueManager.getMessages(); + const sorted = messages.sort( + (a, b) => a.createdAt - b.createdAt, + ); + const limited = safeLimit > 0 ? sorted.slice(0, safeLimit) : []; + return { + size: BigInt(this.#lastQueueSize), + maxSize: BigInt(maxSize), + truncated: sorted.length > limited.length, + messages: limited.map((message) => ({ + id: message.id, + name: message.name, + createdAtMs: BigInt(message.createdAt), + })), + }; + } + updateQueueSize(size: number) { if (this.#lastQueueSize === size) { return; diff --git a/rivetkit-typescript/packages/rivetkit/src/inspector/handler.ts b/rivetkit-typescript/packages/rivetkit/src/inspector/handler.ts index d3b2ddcd20..932007195f 100644 --- a/rivetkit-typescript/packages/rivetkit/src/inspector/handler.ts +++ b/rivetkit-typescript/packages/rivetkit/src/inspector/handler.ts @@ -18,6 +18,7 @@ export async function handleWebSocketInspectorConnect({ actor: AnyActorInstance; }): Promise { const inspector = actor.inspector; + const maxQueueStatusLimit = 200; const listeners: Unsubscribe[] = []; return { @@ -139,6 +140,20 @@ export async function handleWebSocketInspectorConnect({ }, }, }); + } else if (message.body.tag === "QueueRequest") { + const { id, limit } = message.body.val; + const status = await inspector.getQueueStatus( + Math.min(Number(limit), maxQueueStatusLimit), + ); + sendMessage(ws, { + body: { + tag: "QueueResponse", + val: { + rid: id, + status, + }, + }, + }); } else { assertUnreachable(message.body); } diff --git a/rivetkit-typescript/packages/rivetkit/src/registry/index.ts b/rivetkit-typescript/packages/rivetkit/src/registry/index.ts index 23707179ff..a020a0c058 100644 --- a/rivetkit-typescript/packages/rivetkit/src/registry/index.ts +++ b/rivetkit-typescript/packages/rivetkit/src/registry/index.ts @@ -1,6 +1,6 @@ import type { Client } from "@/client/client"; import { createClient } from "@/client/mod"; -import { isDev } from "@/utils/env-vars"; +import { getNodeEnv, isDev } from "@/utils/env-vars"; import { type RegistryActors, type RegistryConfig, @@ -49,10 +49,12 @@ export class Registry { this.#config = config; // Auto-prepare on next tick (gives time for sync config modification) - setTimeout(() => { - // biome-ignore lint/nursery/noFloatingPromises: fire-and-forget auto-prepare - this.#ensureRuntime(); - }, 0); + if (getNodeEnv() !== "test") { + setTimeout(() => { + // biome-ignore lint/nursery/noFloatingPromises: fire-and-forget auto-prepare + this.#ensureRuntime(); + }, 0); + } } /** Creates runtime if not already created. Idempotent. */ diff --git a/rivetkit-typescript/packages/rivetkit/src/schemas/actor-inspector/mod.ts b/rivetkit-typescript/packages/rivetkit/src/schemas/actor-inspector/mod.ts index 8341c0c4a1..6797a7306b 100644 --- a/rivetkit-typescript/packages/rivetkit/src/schemas/actor-inspector/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/schemas/actor-inspector/mod.ts @@ -1 +1 @@ -export * from "../../../dist/schemas/actor-inspector/v3"; +export * from "../../../dist/schemas/actor-inspector/v4"; diff --git a/rivetkit-typescript/packages/rivetkit/src/schemas/actor-inspector/versioned.ts b/rivetkit-typescript/packages/rivetkit/src/schemas/actor-inspector/versioned.ts index 8609d00737..873074b074 100644 --- a/rivetkit-typescript/packages/rivetkit/src/schemas/actor-inspector/versioned.ts +++ b/rivetkit-typescript/packages/rivetkit/src/schemas/actor-inspector/versioned.ts @@ -3,8 +3,11 @@ import { createVersionedDataHandler } from "vbare"; import * as v1 from "../../../dist/schemas/actor-inspector/v1"; import * as v2 from "../../../dist/schemas/actor-inspector/v2"; import * as v3 from "../../../dist/schemas/actor-inspector/v3"; +import * as v4 from "../../../dist/schemas/actor-inspector/v4"; -export const CURRENT_VERSION = 3; +export const CURRENT_VERSION = 4; + +const EVENTS_DROPPED_ERROR = "inspector.events_dropped"; // Converter from v1 to v2: Add queueSize field to Init message const v1ToClientToV2 = (v1Data: v1.ToClient): v2.ToClient => { @@ -58,7 +61,14 @@ const v2ToClientToV3 = (v2Data: v2.ToClient): v3.ToClient => { v2Data.body.tag === "EventsUpdated" || v2Data.body.tag === "EventsResponse" ) { - throw new Error("Cannot convert events responses to v3"); + return { + body: { + tag: "Error", + val: { + message: EVENTS_DROPPED_ERROR, + }, + }, + }; } return v2Data as unknown as v3.ToClient; }; @@ -83,6 +93,19 @@ const v3ToClientToV2 = (v3Data: v3.ToClient): v2.ToClient => { return v3Data as unknown as v2.ToClient; }; +// Converter from v3 to v4: No changes to client structure +const v3ToClientToV4 = (v3Data: v3.ToClient): v4.ToClient => { + return v3Data as unknown as v4.ToClient; +}; + +// Converter from v4 to v3: Drop queue responses +const v4ToClientToV3 = (v4Data: v4.ToClient): v3.ToClient => { + if (v4Data.body.tag === "QueueResponse") { + throw new Error("Cannot convert QueueResponse to v3"); + } + return v4Data as unknown as v3.ToClient; +}; + // ToServer is identical between v1 and v2 const v1ToServerToV2 = (v1Data: v1.ToServer): v2.ToServer => { return v1Data as unknown as v2.ToServer; @@ -111,7 +134,20 @@ const v3ToServerToV2 = (v3Data: v3.ToServer): v2.ToServer => { return v3Data as unknown as v2.ToServer; }; -export const TO_SERVER_VERSIONED = createVersionedDataHandler({ +// Converter from v3 to v4: No changes to server structure +const v3ToServerToV4 = (v3Data: v3.ToServer): v4.ToServer => { + return v3Data as unknown as v4.ToServer; +}; + +// Converter from v4 to v3: Drop queue request +const v4ToServerToV3 = (v4Data: v4.ToServer): v3.ToServer => { + if (v4Data.body.tag === "QueueRequest") { + throw new Error("Cannot convert QueueRequest to v3"); + } + return v4Data as unknown as v3.ToServer; +}; + +export const TO_SERVER_VERSIONED = createVersionedDataHandler({ serializeVersion: (data, version) => { switch (version) { case 1: @@ -120,6 +156,8 @@ export const TO_SERVER_VERSIONED = createVersionedDataHandler({ return v2.encodeToServer(data as v2.ToServer); case 3: return v3.encodeToServer(data); + case 4: + return v4.encodeToServer(data); default: throw new Error(`Unknown version ${version}`); } @@ -132,15 +170,17 @@ export const TO_SERVER_VERSIONED = createVersionedDataHandler({ return v2.decodeToServer(bytes); case 3: return v3.decodeToServer(bytes); + case 4: + return v4.decodeToServer(bytes); default: throw new Error(`Unknown version ${version}`); } }, - deserializeConverters: () => [v1ToServerToV2, v2ToServerToV3], - serializeConverters: () => [v3ToServerToV2, v2ToServerToV1], + deserializeConverters: () => [v1ToServerToV2, v2ToServerToV3, v3ToServerToV4], + serializeConverters: () => [v4ToServerToV3, v3ToServerToV2, v2ToServerToV1], }); -export const TO_CLIENT_VERSIONED = createVersionedDataHandler({ +export const TO_CLIENT_VERSIONED = createVersionedDataHandler({ serializeVersion: (data, version) => { switch (version) { case 1: @@ -149,6 +189,8 @@ export const TO_CLIENT_VERSIONED = createVersionedDataHandler({ return v2.encodeToClient(data as v2.ToClient); case 3: return v3.encodeToClient(data); + case 4: + return v4.encodeToClient(data); default: throw new Error(`Unknown version ${version}`); } @@ -161,10 +203,12 @@ export const TO_CLIENT_VERSIONED = createVersionedDataHandler({ return v2.decodeToClient(bytes); case 3: return v3.decodeToClient(bytes); + case 4: + return v4.decodeToClient(bytes); default: throw new Error(`Unknown version ${version}`); } }, - deserializeConverters: () => [v1ToClientToV2, v2ToClientToV3], - serializeConverters: () => [v3ToClientToV2, v2ToClientToV1], + deserializeConverters: () => [v1ToClientToV2, v2ToClientToV3, v3ToClientToV4], + serializeConverters: () => [v4ToClientToV3, v3ToClientToV2, v2ToClientToV1], }); diff --git a/rivetkit-typescript/packages/rivetkit/src/test/mod.ts b/rivetkit-typescript/packages/rivetkit/src/test/mod.ts index fc8451a87c..636d1fec82 100644 --- a/rivetkit-typescript/packages/rivetkit/src/test/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/test/mod.ts @@ -1,4 +1,3 @@ -import { createServer } from "node:net"; import { serve as honoServe } from "@hono/node-server"; import { createNodeWebSocket } from "@hono/node-ws"; import invariant from "invariant"; @@ -58,17 +57,24 @@ export async function setupTest>( // TODO: I think this whole function is fucked, we should probably switch to calling registry.serve() directly // Start server - const port = await getPort(); const server = honoServe({ fetch: router.fetch, hostname: "127.0.0.1", - port, + port: 0, }); + if (!server.listening) { + await new Promise((resolve) => { + server.once("listening", () => resolve()); + }); + } invariant( nodeWebSocket.injectWebSocket !== undefined, "should have injectWebSocket", ); nodeWebSocket.injectWebSocket(server); + const address = server.address(); + invariant(address && typeof address !== "string", "missing server address"); + const port = address.port; const endpoint = `http://127.0.0.1:${port}`; logger().info({ msg: "test server listening", port }); @@ -88,53 +94,3 @@ export async function setupTest>( return { client }; } - -export async function getPort(): Promise { - // Pick random port between 10000 and 65535 (avoiding well-known and registered ports) - const MIN_PORT = 10000; - const MAX_PORT = 65535; - const getRandomPort = () => - Math.floor(Math.random() * (MAX_PORT - MIN_PORT + 1)) + MIN_PORT; - - let port = getRandomPort(); - let maxAttempts = 10; - - while (maxAttempts > 0) { - try { - // Try to create a server on the port to check if it's available - const server = await new Promise((resolve, reject) => { - const server = createServer(); - - server.once("error", (err: Error & { code?: string }) => { - if (err.code === "EADDRINUSE") { - reject(new Error(`Port ${port} is in use`)); - } else { - reject(err); - } - }); - - server.once("listening", () => { - resolve(server); - }); - - server.listen(port); - }); - - // Close the server since we're just checking availability - await new Promise((resolve) => { - server.close(() => resolve()); - }); - - return port; - } catch (err) { - // If port is in use, try a different one - maxAttempts--; - if (maxAttempts <= 0) { - break; - } - port = getRandomPort(); - } - } - - throw new Error("Could not find an available port after multiple attempts"); -} diff --git a/rivetkit-typescript/packages/traces/package.json b/rivetkit-typescript/packages/traces/package.json index 153eb21ba5..08eab034e0 100644 --- a/rivetkit-typescript/packages/traces/package.json +++ b/rivetkit-typescript/packages/traces/package.json @@ -20,13 +20,23 @@ "types": "./dist/tsup/index.d.cts", "default": "./dist/tsup/index.cjs" } + }, + "./reader": { + "import": { + "types": "./dist/tsup/reader.d.ts", + "default": "./dist/tsup/reader.js" + }, + "require": { + "types": "./dist/tsup/reader.d.cts", + "default": "./dist/tsup/reader.cjs" + } } }, "engines": { "node": ">=18.0.0" }, "scripts": { - "build": "pnpm run compile:bare && tsup src/index.ts", + "build": "pnpm run compile:bare && tsup src/index.ts src/reader.ts", "compile:bare": "tsx scripts/compile-bare.ts compile schemas/v1.bare -o dist/schemas/v1.ts", "check-types": "pnpm run compile:bare && tsc --noEmit", "test": "pnpm run compile:bare && vitest run" diff --git a/rivetkit-typescript/packages/traces/src/index.ts b/rivetkit-typescript/packages/traces/src/index.ts index 61caafb352..9a28d7b501 100644 --- a/rivetkit-typescript/packages/traces/src/index.ts +++ b/rivetkit-typescript/packages/traces/src/index.ts @@ -1,9 +1,11 @@ export { createTraces, +} from "./traces.js"; +export { decodeReadRangeWire, encodeReadRangeWire, readRangeWireToOtlp, -} from "./traces.js"; +} from "./read-range.js"; export type { EndSpanOptions, EventOptions, diff --git a/rivetkit-typescript/packages/traces/src/otlp.ts b/rivetkit-typescript/packages/traces/src/otlp.ts index c1f65ae1ed..0d01f7822f 100644 --- a/rivetkit-typescript/packages/traces/src/otlp.ts +++ b/rivetkit-typescript/packages/traces/src/otlp.ts @@ -1,4 +1,3 @@ -import { Buffer } from "node:buffer"; import { decode as decodeCbor } from "cbor-x"; export interface OtlpAnyValue { @@ -92,7 +91,19 @@ export function hexFromBytes(bytes: Uint8Array): string { } export function base64FromBytes(bytes: Uint8Array): string { - return Buffer.from(bytes).toString("base64"); + const bufferCtor = (globalThis as { Buffer?: { from: (data: Uint8Array) => { toString: (encoding: string) => string } } }).Buffer; + if (bufferCtor) { + return bufferCtor.from(bytes).toString("base64"); + } + let binary = ""; + for (let i = 0; i < bytes.length; i++) { + binary += String.fromCharCode(bytes[i]); + } + const btoaFn = (globalThis as { btoa?: (data: string) => string }).btoa; + if (!btoaFn) { + throw new Error("No base64 encoder available"); + } + return btoaFn(binary); } export function anyValueFromCborBytes(bytes: Uint8Array): OtlpAnyValue { diff --git a/rivetkit-typescript/packages/traces/src/read-range.ts b/rivetkit-typescript/packages/traces/src/read-range.ts new file mode 100644 index 0000000000..43b25c2d8e --- /dev/null +++ b/rivetkit-typescript/packages/traces/src/read-range.ts @@ -0,0 +1,515 @@ +import { decode as decodeCbor } from "cbor-x"; +import { + CURRENT_VERSION, + READ_RANGE_VERSIONED, + type Attributes, + type Chunk, + type ReadRangeWire, + type Record as TraceRecord, + type RecordBody, + type SpanId, + type SpanLink, + type SpanSnapshot, + type SpanStart, + type SpanStatus, + SpanStatusCode, + type TraceId, +} from "../schemas/versioned.js"; +import { + anyValueFromJs, + hexFromBytes, + type OtlpExportTraceServiceRequestJson, + type OtlpKeyValue, + type OtlpResource, + type OtlpSpan, + type OtlpSpanEvent, + type OtlpSpanLink, + type OtlpSpanStatus, +} from "./otlp.js"; + +type AttributeMap = Map; + +type LinkState = { + traceId: TraceId; + spanId: SpanId; + traceState: string | null; + attributes: AttributeMap; + droppedAttributesCount: number; +}; + +type SpanEventState = { + name: string; + timeUnixNs: bigint; + attributes: AttributeMap; + droppedAttributesCount: number; +}; + +type SpanBuilder = { + traceId: TraceId; + spanId: SpanId; + parentSpanId: SpanId | null; + name: string; + kind: number; + traceState: string | null; + flags: number; + attributes: AttributeMap; + droppedAttributesCount: number; + links: LinkState[]; + droppedLinksCount: number; + status: SpanStatus | null; + startTimeUnixNs: bigint; + endTimeUnixNs: bigint | null; + events: SpanEventState[]; +}; + +type RecordEntry = { + record: TraceRecord; + strings: readonly string[]; + absNs: bigint; + sequence: number; +}; + +type BaseRecordEntry = { + record: TraceRecord; + strings: readonly string[]; + absNs: bigint; +}; + +function toUint8Array(buffer: ArrayBuffer): Uint8Array { + return new Uint8Array(buffer); +} + +function normalizeBytes(input: Uint8Array | ArrayBuffer): Uint8Array { + return input instanceof Uint8Array ? input : new Uint8Array(input); +} + +function spanKey(spanId: Uint8Array | SpanId): string { + return hexFromBytes(normalizeBytes(spanId)); +} + +export function encodeReadRangeWire(wire: ReadRangeWire): Uint8Array { + return READ_RANGE_VERSIONED.serializeWithEmbeddedVersion( + wire, + CURRENT_VERSION, + ); +} + +export function decodeReadRangeWire(bytes: Uint8Array): ReadRangeWire { + return READ_RANGE_VERSIONED.deserializeWithEmbeddedVersion(bytes); +} + +export function readRangeWireToOtlp( + wire: ReadRangeWire, + resource?: OtlpResource, +): { otlp: OtlpExportTraceServiceRequestJson; clamped: boolean } { + const startMs = + typeof wire.startTimeMs === "bigint" + ? wire.startTimeMs + : BigInt(Math.floor(wire.startTimeMs)); + const endMs = + typeof wire.endTimeMs === "bigint" + ? wire.endTimeMs + : BigInt(Math.floor(wire.endTimeMs)); + const limit = + typeof wire.limit === "bigint" ? Number(wire.limit) : wire.limit; + + if (limit <= 0 || endMs <= startMs) { + return { otlp: emptyExport(resource), clamped: wire.clamped }; + } + + const startNs = startMs * 1_000_000n; + const endNs = endMs * 1_000_000n; + const baseRecords = buildBaseRecordMap(wire.baseChunks); + const sequenceRef = { value: 0 }; + const records: RecordEntry[] = []; + for (const chunk of wire.chunks) { + collectRecordEntries(records, chunk, startNs, endNs, sequenceRef); + } + + records.sort((a, b) => { + if (a.absNs < b.absNs) return -1; + if (a.absNs > b.absNs) return 1; + return a.sequence - b.sequence; + }); + + const { spans, reachedSpanLimit } = buildSpansFromRecords( + records, + baseRecords, + limit, + ); + const exported = spans.map(toOtlpSpan); + return { + otlp: buildExport(exported, resource), + clamped: wire.clamped || reachedSpanLimit, + }; +} + +function collectRecordEntries( + collector: RecordEntry[], + chunk: Chunk, + startNs: bigint, + endNs: bigint, + sequenceRef: { value: number }, +): void { + for (const record of chunk.records) { + const absNs = chunk.baseUnixNs + record.timeOffsetNs; + if (absNs < startNs || absNs >= endNs) { + continue; + } + collector.push({ + record, + strings: chunk.strings, + absNs, + sequence: sequenceRef.value++, + }); + } +} + +function buildBaseRecordMap( + chunks: readonly Chunk[], +): Map { + const map = new Map(); + for (const chunk of chunks) { + for (const record of chunk.records) { + const absNs = chunk.baseUnixNs + record.timeOffsetNs; + map.set(spanKey(recordSpanId(record.body)), { + record, + strings: chunk.strings, + absNs, + }); + } + } + return map; +} + +function buildSpansFromRecords( + records: RecordEntry[], + baseRecords: Map, + limit: number, +): { spans: SpanBuilder[]; reachedSpanLimit: boolean } { + const spans = new Map(); + let reachedSpanLimit = false; + + for (const entry of records) { + const body = entry.record.body; + const id = recordSpanId(body); + const key = spanKey(id); + let span = spans.get(key); + if (!span) { + if (spans.size >= limit) { + reachedSpanLimit = true; + continue; + } + const baseRecord = baseRecords.get(key); + if (baseRecord) { + span = initSpanFromBaseRecord( + baseRecord.record.body, + baseRecord.strings, + baseRecord.absNs, + ); + } + if (!span) { + span = initSpanFromRecord( + body, + entry.absNs, + entry.strings, + ); + } + if (!span) { + continue; + } + spans.set(key, span); + } + applyRecord(span, body, entry.absNs, entry.strings); + } + + return { spans: Array.from(spans.values()), reachedSpanLimit }; +} + +function recordSpanId(body: RecordBody): SpanId { + switch (body.tag) { + case "SpanStart": + return body.val.spanId; + case "SpanEvent": + return body.val.spanId; + case "SpanUpdate": + return body.val.spanId; + case "SpanEnd": + return body.val.spanId; + case "SpanSnapshot": + return body.val.spanId; + } +} + +function initSpanFromBaseRecord( + base: RecordBody, + strings: readonly string[], + absNs: bigint, +): SpanBuilder | undefined { + switch (base.tag) { + case "SpanStart": + return initSpanFromStart(base.val, absNs, strings); + case "SpanSnapshot": + return initSpanFromSnapshot(base.val, strings); + default: + return undefined; + } +} + +function initSpanFromRecord( + body: RecordBody, + absNs: bigint, + strings: readonly string[], +): SpanBuilder | undefined { + switch (body.tag) { + case "SpanStart": + return initSpanFromStart(body.val, absNs, strings); + case "SpanSnapshot": + return initSpanFromSnapshot(body.val, strings); + default: + return undefined; + } +} + +function initSpanFromStart( + start: SpanStart, + absNs: bigint | null, + strings: readonly string[], +): SpanBuilder { + return { + traceId: start.traceId, + spanId: start.spanId, + parentSpanId: start.parentSpanId, + name: strings[start.name] ?? "", + kind: start.kind, + traceState: start.traceState, + flags: start.flags, + attributes: decodeAttributeList(start.attributes, strings), + droppedAttributesCount: start.droppedAttributesCount, + links: decodeLinks(start.links, strings), + droppedLinksCount: start.droppedLinksCount, + status: null, + startTimeUnixNs: absNs ?? 0n, + endTimeUnixNs: null, + events: [], + }; +} + +function initSpanFromSnapshot( + snapshot: SpanSnapshot, + strings: readonly string[], +): SpanBuilder { + return { + traceId: snapshot.traceId, + spanId: snapshot.spanId, + parentSpanId: snapshot.parentSpanId, + name: strings[snapshot.name] ?? "", + kind: snapshot.kind, + traceState: snapshot.traceState, + flags: snapshot.flags, + attributes: decodeAttributeList(snapshot.attributes, strings), + droppedAttributesCount: snapshot.droppedAttributesCount, + links: decodeLinks(snapshot.links, strings), + droppedLinksCount: snapshot.droppedLinksCount, + status: snapshot.status, + startTimeUnixNs: snapshot.startTimeUnixNs, + endTimeUnixNs: null, + events: [], + }; +} + +function applyRecord( + span: SpanBuilder, + body: RecordBody, + absNs: bigint, + strings: readonly string[], +): void { + switch (body.tag) { + case "SpanStart": + if (span.startTimeUnixNs === 0n) { + span.startTimeUnixNs = absNs; + } + return; + case "SpanSnapshot": + span.traceId = body.val.traceId; + span.parentSpanId = body.val.parentSpanId; + span.name = strings[body.val.name] ?? ""; + span.kind = body.val.kind; + span.traceState = body.val.traceState; + span.flags = body.val.flags; + span.attributes = decodeAttributeList( + body.val.attributes, + strings, + ); + span.droppedAttributesCount = body.val.droppedAttributesCount; + span.links = decodeLinks(body.val.links, strings); + span.droppedLinksCount = body.val.droppedLinksCount; + span.status = body.val.status; + span.startTimeUnixNs = body.val.startTimeUnixNs; + return; + case "SpanUpdate": + applyAttributes(span.attributes, body.val.attributes, strings); + span.droppedAttributesCount += body.val.droppedAttributesCount; + if (body.val.status) { + span.status = body.val.status; + } + return; + case "SpanEvent": + span.events.push({ + name: strings[body.val.name] ?? "", + timeUnixNs: absNs, + attributes: decodeAttributeList( + body.val.attributes, + strings, + ), + droppedAttributesCount: body.val.droppedAttributesCount, + }); + return; + case "SpanEnd": + span.endTimeUnixNs = absNs; + if (body.val.status) { + span.status = body.val.status; + } + return; + } +} + +function decodeAttributeList( + attributes: Attributes, + strings: readonly string[], +): AttributeMap { + const map = new Map(); + for (const kv of attributes) { + const key = strings[kv.key] ?? ""; + try { + map.set(key, decodeCbor(toUint8Array(kv.value)) as unknown); + } catch { + continue; + } + } + return map; +} + +function applyAttributes( + map: AttributeMap, + attributes: Attributes, + strings: readonly string[], +): void { + for (const kv of attributes) { + const key = strings[kv.key] ?? ""; + try { + map.set(key, decodeCbor(toUint8Array(kv.value)) as unknown); + } catch { + continue; + } + } +} + +function decodeLinks( + links: readonly SpanLink[], + strings: readonly string[], +): LinkState[] { + return links.map((link) => ({ + traceId: link.traceId, + spanId: link.spanId, + traceState: link.traceState, + attributes: decodeAttributeList(link.attributes, strings), + droppedAttributesCount: link.droppedAttributesCount, + })); +} + +function toOtlpSpan(span: SpanBuilder): OtlpSpan { + const attributes = mapToOtlpAttributes(span.attributes); + const events = span.events.map((event) => toOtlpEvent(event)); + const links = span.links.map((link) => toOtlpLink(link)); + const status = span.status ? toOtlpStatus(span.status) : undefined; + return { + traceId: hexFromBytes(normalizeBytes(span.traceId)), + spanId: hexFromBytes(normalizeBytes(span.spanId)), + parentSpanId: span.parentSpanId + ? hexFromBytes(normalizeBytes(span.parentSpanId)) + : undefined, + name: span.name, + kind: span.kind, + traceState: span.traceState ?? undefined, + flags: span.flags || undefined, + startTimeUnixNano: span.startTimeUnixNs.toString(), + endTimeUnixNano: span.endTimeUnixNs + ? span.endTimeUnixNs.toString() + : undefined, + attributes: attributes.length > 0 ? attributes : undefined, + droppedAttributesCount: span.droppedAttributesCount || undefined, + events: events.length > 0 ? events : undefined, + links: links.length > 0 ? links : undefined, + droppedLinksCount: span.droppedLinksCount || undefined, + status, + }; +} + +function toOtlpEvent(event: SpanEventState): OtlpSpanEvent { + const attributes = mapToOtlpAttributes(event.attributes); + return { + timeUnixNano: event.timeUnixNs.toString(), + name: event.name, + attributes: attributes.length > 0 ? attributes : undefined, + droppedAttributesCount: event.droppedAttributesCount || undefined, + }; +} + +function toOtlpLink(link: LinkState): OtlpSpanLink { + const attributes = mapToOtlpAttributes(link.attributes); + return { + traceId: hexFromBytes(normalizeBytes(link.traceId)), + spanId: hexFromBytes(normalizeBytes(link.spanId)), + traceState: link.traceState ?? undefined, + attributes: attributes.length > 0 ? attributes : undefined, + droppedAttributesCount: link.droppedAttributesCount || undefined, + }; +} + +function toOtlpStatus(status: SpanStatus): OtlpSpanStatus { + const code = + status.code === SpanStatusCode.OK + ? 1 + : status.code === SpanStatusCode.ERROR + ? 2 + : 0; + return { + code, + message: status.message ?? undefined, + }; +} + +function mapToOtlpAttributes(map: AttributeMap): OtlpKeyValue[] { + const list: OtlpKeyValue[] = []; + for (const [key, value] of map.entries()) { + if (value === undefined || typeof value === "function") { + continue; + } + if (typeof value === "symbol") { + continue; + } + list.push({ key, value: anyValueFromJs(value) }); + } + return list; +} + +function emptyExport( + resourceValue?: OtlpResource, +): OtlpExportTraceServiceRequestJson { + return buildExport([], resourceValue); +} + +function buildExport( + spans: OtlpSpan[], + resourceValue?: OtlpResource, +): OtlpExportTraceServiceRequestJson { + return { + resourceSpans: [ + { + resource: resourceValue, + scopeSpans: [{ spans }], + }, + ], + }; +} diff --git a/rivetkit-typescript/packages/traces/src/reader.ts b/rivetkit-typescript/packages/traces/src/reader.ts new file mode 100644 index 0000000000..d7748bff91 --- /dev/null +++ b/rivetkit-typescript/packages/traces/src/reader.ts @@ -0,0 +1,5 @@ +export { + decodeReadRangeWire, + encodeReadRangeWire, + readRangeWireToOtlp, +} from "./read-range.js"; diff --git a/rivetkit-typescript/packages/traces/src/traces.ts b/rivetkit-typescript/packages/traces/src/traces.ts index 795d041787..36c4e80c30 100644 --- a/rivetkit-typescript/packages/traces/src/traces.ts +++ b/rivetkit-typescript/packages/traces/src/traces.ts @@ -7,13 +7,11 @@ import { pack, unpack } from "fdb-tuple"; import { CHUNK_VERSIONED, CURRENT_VERSION, - READ_RANGE_VERSIONED, encodeRecord, type ActiveSpanRef, type Attributes, type Chunk, type KeyValue, - type ReadRangeWire, type Record as TraceRecord, type RecordBody, type SpanEnd, @@ -30,21 +28,17 @@ import { type TraceId, } from "../schemas/versioned.js"; import { - anyValueFromJs, hexFromBytes, type OtlpExportTraceServiceRequestJson, - type OtlpKeyValue, type OtlpResource, - type OtlpSpan, - type OtlpSpanEvent, - type OtlpSpanLink, - type OtlpSpanStatus, } from "./otlp.js"; +import { readRangeWireToOtlp } from "./read-range.js"; import type { EndSpanOptions, EventOptions, ReadRangeOptions, ReadRangeResult, + ReadRangeWire, SpanHandle, SpanStatusInput, StartSpanOptions, @@ -105,31 +99,6 @@ type LinkState = { droppedAttributesCount: number; }; -type SpanBuilder = { - traceId: TraceId; - spanId: SpanId; - parentSpanId: SpanId | null; - name: string; - kind: number; - traceState: string | null; - flags: number; - attributes: AttributeMap; - droppedAttributesCount: number; - links: LinkState[]; - droppedLinksCount: number; - status: SpanStatus | null; - startTimeUnixNs: bigint; - endTimeUnixNs: bigint | null; - events: SpanEventState[]; -}; - -type SpanEventState = { - name: string; - timeUnixNs: bigint; - attributes: AttributeMap; - droppedAttributesCount: number; -}; - type ChunkState = { bucketStartSec: number; chunkId: number; @@ -141,19 +110,6 @@ type ChunkState = { createdAtMonoMs: number; }; -type RecordEntry = { - record: TraceRecord; - strings: readonly string[]; - absNs: bigint; - sequence: number; -}; - -type BaseRecordEntry = { - record: TraceRecord; - strings: readonly string[]; - absNs: bigint; -}; - type PendingChunk = { key: Uint8Array; bucketStartSec: number; @@ -460,6 +416,35 @@ export function createTraces( return map; } + function decodeAttributeList( + attributes: Attributes, + strings: readonly string[], + ): AttributeMap { + const map = new Map(); + for (const kv of attributes) { + const key = strings[kv.key] ?? ""; + try { + map.set(key, decodeCbor(toUint8Array(kv.value)) as unknown); + } catch { + continue; + } + } + return map; + } + + function decodeLinks( + links: readonly SpanLink[], + strings: readonly string[], + ): LinkState[] { + return links.map((link) => ({ + traceId: link.traceId, + spanId: link.spanId, + traceState: link.traceState, + attributes: decodeAttributeList(link.attributes, strings), + droppedAttributesCount: link.droppedAttributesCount, + })); + } + function encodeLinkState( links: LinkState[], ): { links: SpanLink[]; dropped: number } { @@ -788,7 +773,11 @@ export function createTraces( } function getCurrentSpan(): SpanHandle | null { - return spanContext.getStore() ?? null; + const handle = spanContext.getStore() ?? null; + if (!handle) { + return null; + } + return isActive(handle) ? handle : null; } async function readRangeWire( @@ -870,27 +859,6 @@ export function createTraces( return readRangeWireToOtlp(wire, resource); } - function collectRecords( - collector: RecordEntry[], - chunk: Chunk, - startNs: bigint, - endNs: bigint, - sequenceRef: { value: number }, - ): void { - for (const record of chunk.records) { - const absNs = chunk.baseUnixNs + record.timeOffsetNs; - if (absNs < startNs || absNs >= endNs) { - continue; - } - collector.push({ - record, - strings: chunk.strings, - absNs, - sequence: sequenceRef.value++, - }); - } - } - function filterChunkRecords( chunk: Chunk, startNs: bigint, @@ -935,6 +903,21 @@ export function createTraces( return false; } + function recordSpanId(body: RecordBody): SpanId { + switch (body.tag) { + case "SpanStart": + return body.val.spanId; + case "SpanEvent": + return body.val.spanId; + case "SpanUpdate": + return body.val.spanId; + case "SpanEnd": + return body.val.spanId; + case "SpanSnapshot": + return body.val.spanId; + } + } + function currentChunkAsChunk(): Chunk { return { baseUnixNs: currentChunk.baseUnixNs, @@ -1204,430 +1187,3 @@ export function createTraces( readRangeWire, }; } - -export function encodeReadRangeWire(wire: ReadRangeWire): Uint8Array { - return READ_RANGE_VERSIONED.serializeWithEmbeddedVersion( - wire, - CURRENT_VERSION, - ); -} - -export function decodeReadRangeWire(bytes: Uint8Array): ReadRangeWire { - return READ_RANGE_VERSIONED.deserializeWithEmbeddedVersion(bytes); -} - -export function readRangeWireToOtlp( - wire: ReadRangeWire, - resource?: OtlpResource, -): ReadRangeResult { - const startMs = - typeof wire.startTimeMs === "bigint" - ? wire.startTimeMs - : BigInt(Math.floor(wire.startTimeMs)); - const endMs = - typeof wire.endTimeMs === "bigint" - ? wire.endTimeMs - : BigInt(Math.floor(wire.endTimeMs)); - const limit = - typeof wire.limit === "bigint" ? Number(wire.limit) : wire.limit; - - if (limit <= 0 || endMs <= startMs) { - return { otlp: emptyExport(resource), clamped: wire.clamped }; - } - - const startNs = startMs * 1_000_000n; - const endNs = endMs * 1_000_000n; - const baseRecords = buildBaseRecordMap(wire.baseChunks); - const sequenceRef = { value: 0 }; - const records: RecordEntry[] = []; - for (const chunk of wire.chunks) { - collectRecordEntries(records, chunk, startNs, endNs, sequenceRef); - } - - records.sort((a, b) => { - if (a.absNs < b.absNs) return -1; - if (a.absNs > b.absNs) return 1; - return a.sequence - b.sequence; - }); - - const { spans, reachedSpanLimit } = buildSpansFromRecords( - records, - baseRecords, - limit, - ); - const exported = spans.map(toOtlpSpan); - return { - otlp: buildExport(exported, resource), - clamped: wire.clamped || reachedSpanLimit, - }; -} - -function collectRecordEntries( - collector: RecordEntry[], - chunk: Chunk, - startNs: bigint, - endNs: bigint, - sequenceRef: { value: number }, -): void { - for (const record of chunk.records) { - const absNs = chunk.baseUnixNs + record.timeOffsetNs; - if (absNs < startNs || absNs >= endNs) { - continue; - } - collector.push({ - record, - strings: chunk.strings, - absNs, - sequence: sequenceRef.value++, - }); - } -} - -function buildBaseRecordMap( - chunks: readonly Chunk[], -): Map { - const map = new Map(); - for (const chunk of chunks) { - for (const record of chunk.records) { - const absNs = chunk.baseUnixNs + record.timeOffsetNs; - map.set(spanKey(recordSpanId(record.body)), { - record, - strings: chunk.strings, - absNs, - }); - } - } - return map; -} - -function buildSpansFromRecords( - records: RecordEntry[], - baseRecords: Map, - limit: number, -): { spans: SpanBuilder[]; reachedSpanLimit: boolean } { - const spans = new Map(); - let reachedSpanLimit = false; - - for (const entry of records) { - const body = entry.record.body; - const id = recordSpanId(body); - const key = spanKey(id); - let span = spans.get(key); - if (!span) { - if (spans.size >= limit) { - reachedSpanLimit = true; - continue; - } - const baseRecord = baseRecords.get(key); - if (baseRecord) { - span = initSpanFromBaseRecord( - baseRecord.record.body, - baseRecord.strings, - baseRecord.absNs, - ); - } - if (!span) { - span = initSpanFromRecord( - body, - entry.absNs, - entry.strings, - ); - } - if (!span) { - continue; - } - spans.set(key, span); - } - applyRecord(span, body, entry.absNs, entry.strings); - } - - return { spans: Array.from(spans.values()), reachedSpanLimit }; -} - -function recordSpanId(body: RecordBody): SpanId { - switch (body.tag) { - case "SpanStart": - return body.val.spanId; - case "SpanEvent": - return body.val.spanId; - case "SpanUpdate": - return body.val.spanId; - case "SpanEnd": - return body.val.spanId; - case "SpanSnapshot": - return body.val.spanId; - } -} - -function initSpanFromBaseRecord( - base: RecordBody, - strings: readonly string[], - absNs: bigint, -): SpanBuilder | undefined { - switch (base.tag) { - case "SpanStart": - return initSpanFromStart(base.val, absNs, strings); - case "SpanSnapshot": - return initSpanFromSnapshot(base.val, strings); - default: - return undefined; - } -} - -function initSpanFromRecord( - body: RecordBody, - absNs: bigint, - strings: readonly string[], -): SpanBuilder | undefined { - switch (body.tag) { - case "SpanStart": - return initSpanFromStart(body.val, absNs, strings); - case "SpanSnapshot": - return initSpanFromSnapshot(body.val, strings); - default: - return undefined; - } -} - -function initSpanFromStart( - start: SpanStart, - absNs: bigint | null, - strings: readonly string[], -): SpanBuilder { - return { - traceId: start.traceId, - spanId: start.spanId, - parentSpanId: start.parentSpanId, - name: strings[start.name] ?? "", - kind: start.kind, - traceState: start.traceState, - flags: start.flags, - attributes: decodeAttributeList(start.attributes, strings), - droppedAttributesCount: start.droppedAttributesCount, - links: decodeLinks(start.links, strings), - droppedLinksCount: start.droppedLinksCount, - status: null, - startTimeUnixNs: absNs ?? 0n, - endTimeUnixNs: null, - events: [], - }; -} - -function initSpanFromSnapshot( - snapshot: SpanSnapshot, - strings: readonly string[], -): SpanBuilder { - return { - traceId: snapshot.traceId, - spanId: snapshot.spanId, - parentSpanId: snapshot.parentSpanId, - name: strings[snapshot.name] ?? "", - kind: snapshot.kind, - traceState: snapshot.traceState, - flags: snapshot.flags, - attributes: decodeAttributeList(snapshot.attributes, strings), - droppedAttributesCount: snapshot.droppedAttributesCount, - links: decodeLinks(snapshot.links, strings), - droppedLinksCount: snapshot.droppedLinksCount, - status: snapshot.status, - startTimeUnixNs: snapshot.startTimeUnixNs, - endTimeUnixNs: null, - events: [], - }; -} - -function applyRecord( - span: SpanBuilder, - body: RecordBody, - absNs: bigint, - strings: readonly string[], -): void { - switch (body.tag) { - case "SpanStart": - if (span.startTimeUnixNs === 0n) { - span.startTimeUnixNs = absNs; - } - return; - case "SpanSnapshot": - span.traceId = body.val.traceId; - span.parentSpanId = body.val.parentSpanId; - span.name = strings[body.val.name] ?? ""; - span.kind = body.val.kind; - span.traceState = body.val.traceState; - span.flags = body.val.flags; - span.attributes = decodeAttributeList( - body.val.attributes, - strings, - ); - span.droppedAttributesCount = body.val.droppedAttributesCount; - span.links = decodeLinks(body.val.links, strings); - span.droppedLinksCount = body.val.droppedLinksCount; - span.status = body.val.status; - span.startTimeUnixNs = body.val.startTimeUnixNs; - return; - case "SpanUpdate": - applyAttributes(span.attributes, body.val.attributes, strings); - span.droppedAttributesCount += body.val.droppedAttributesCount; - if (body.val.status) { - span.status = body.val.status; - } - return; - case "SpanEvent": - span.events.push({ - name: strings[body.val.name] ?? "", - timeUnixNs: absNs, - attributes: decodeAttributeList( - body.val.attributes, - strings, - ), - droppedAttributesCount: body.val.droppedAttributesCount, - }); - return; - case "SpanEnd": - span.endTimeUnixNs = absNs; - if (body.val.status) { - span.status = body.val.status; - } - return; - } -} - -function decodeAttributeList( - attributes: Attributes, - strings: readonly string[], -): AttributeMap { - const map = new Map(); - for (const kv of attributes) { - const key = strings[kv.key] ?? ""; - try { - map.set(key, decodeCbor(toUint8Array(kv.value)) as unknown); - } catch { - continue; - } - } - return map; -} - -function applyAttributes( - map: AttributeMap, - attributes: Attributes, - strings: readonly string[], -): void { - for (const kv of attributes) { - const key = strings[kv.key] ?? ""; - try { - map.set(key, decodeCbor(toUint8Array(kv.value)) as unknown); - } catch { - continue; - } - } -} - -function decodeLinks( - links: readonly SpanLink[], - strings: readonly string[], -): LinkState[] { - return links.map((link) => ({ - traceId: link.traceId, - spanId: link.spanId, - traceState: link.traceState, - attributes: decodeAttributeList(link.attributes, strings), - droppedAttributesCount: link.droppedAttributesCount, - })); -} - -function toOtlpSpan(span: SpanBuilder): OtlpSpan { - const attributes = mapToOtlpAttributes(span.attributes); - const events = span.events.map((event) => toOtlpEvent(event)); - const links = span.links.map((link) => toOtlpLink(link)); - const status = span.status ? toOtlpStatus(span.status) : undefined; - return { - traceId: hexFromBytes(normalizeBytes(span.traceId)), - spanId: hexFromBytes(normalizeBytes(span.spanId)), - parentSpanId: span.parentSpanId - ? hexFromBytes(normalizeBytes(span.parentSpanId)) - : undefined, - name: span.name, - kind: span.kind, - traceState: span.traceState ?? undefined, - flags: span.flags || undefined, - startTimeUnixNano: span.startTimeUnixNs.toString(), - endTimeUnixNano: span.endTimeUnixNs - ? span.endTimeUnixNs.toString() - : undefined, - attributes: attributes.length > 0 ? attributes : undefined, - droppedAttributesCount: span.droppedAttributesCount || undefined, - events: events.length > 0 ? events : undefined, - links: links.length > 0 ? links : undefined, - droppedLinksCount: span.droppedLinksCount || undefined, - status, - }; -} - -function toOtlpEvent(event: SpanEventState): OtlpSpanEvent { - const attributes = mapToOtlpAttributes(event.attributes); - return { - timeUnixNano: event.timeUnixNs.toString(), - name: event.name, - attributes: attributes.length > 0 ? attributes : undefined, - droppedAttributesCount: event.droppedAttributesCount || undefined, - }; -} - -function toOtlpLink(link: LinkState): OtlpSpanLink { - const attributes = mapToOtlpAttributes(link.attributes); - return { - traceId: hexFromBytes(normalizeBytes(link.traceId)), - spanId: hexFromBytes(normalizeBytes(link.spanId)), - traceState: link.traceState ?? undefined, - attributes: attributes.length > 0 ? attributes : undefined, - droppedAttributesCount: link.droppedAttributesCount || undefined, - }; -} - -function toOtlpStatus(status: SpanStatus): OtlpSpanStatus { - const code = - status.code === SpanStatusCode.OK - ? 1 - : status.code === SpanStatusCode.ERROR - ? 2 - : 0; - return { - code, - message: status.message ?? undefined, - }; -} - -function mapToOtlpAttributes(map: AttributeMap): OtlpKeyValue[] { - const list: OtlpKeyValue[] = []; - for (const [key, value] of map.entries()) { - if (value === undefined || typeof value === "function") { - continue; - } - if (typeof value === "symbol") { - continue; - } - list.push({ key, value: anyValueFromJs(value) }); - } - return list; -} - -function emptyExport( - resourceValue?: OtlpResource, -): OtlpExportTraceServiceRequestJson { - return buildExport([], resourceValue); -} - -function buildExport( - spans: OtlpSpan[], - resourceValue?: OtlpResource, -): OtlpExportTraceServiceRequestJson { - return { - resourceSpans: [ - { - resource: resourceValue, - scopeSpans: [{ spans }], - }, - ], - }; -} diff --git a/rivetkit-typescript/packages/workflow-engine/TODO.md b/rivetkit-typescript/packages/workflow-engine/TODO.md index 05e9abd49e..ae1425893b 100644 --- a/rivetkit-typescript/packages/workflow-engine/TODO.md +++ b/rivetkit-typescript/packages/workflow-engine/TODO.md @@ -10,6 +10,10 @@ - rename to the queue-based handlers (next) +## keep syncing c.state flushing with workflow flushing + +- if we modify c.state in the workflow, it should roll back + ## review workflow state vs actors tate ## review variables available in workflow context that should not be