diff --git a/.gitignore b/.gitignore index bdcfa88..ce25f3a 100644 --- a/.gitignore +++ b/.gitignore @@ -23,3 +23,4 @@ erl_crash.dump /bench/graphs/ coverage_output.log /.claude/settings.local.json +/.serena/ diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..d0e7e09 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,139 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Project + +ExFix is an Elixir implementation of the FIX (Financial Information eXchange) Session Protocol FIXT.1.1. It only supports session initiator mode (buy-side). Published on hex.pm (v0.2.9), Apache 2.0 license. Zero runtime dependencies — only Erlang/OTP primitives. + +--- + +## How You Must Work: OpenSpec-Driven Development + +This project uses **OpenSpec** for spec-driven development. All code changes — features, refactors, and non-trivial fixes — must flow through `openspec/`. Treat `openspec/` as the single source of truth for requirements, active changes, and implementation plans. + +**You must never make code changes directly from chat.** If a user asks to "just edit the code" or "quickly patch this", refuse and guide them into the proper OpenSpec flow. + +### Workflow + +1. **Always read `openspec/config.yaml` before starting work** — it contains project context, conventions, and rules for proposals/specs/design/tasks. +2. **For any requested change:** + - If an appropriate change exists under `openspec/changes/`, work only through its `proposal.md` and `tasks.md`. + - If no change exists, create one using `/opsx:propose` before touching any code. +3. **The OpenSpec loop:** + - **Propose** → Capture *what* and *why* in `openspec/changes//proposal.md` + - **Tasks** → Break implementation into steps in `tasks.md` + - **Apply** → Implement tasks via `/opsx:apply`, marking complete as you go + - **Archive** → Once verified, archive via `/opsx:archive` + +### Commands + +- `/opsx:propose` — Create a new change with proposal and tasks +- `/opsx:apply` — Implement tasks from an existing change's `tasks.md` +- `/opsx:explore` — Think through ideas, investigate problems, clarify requirements +- `/opsx:archive` — Archive a completed change + +### Rules (Mandatory) + +- **No bypassing OpenSpec.** You cannot apply non-trivial code changes without a change folder under `openspec/changes/`. +- **Specs over chat.** If chat instructions contradict `openspec/specs/` or an active change, follow the written specs and highlight the discrepancy. +- **Prefer minimal change.** Change only what the tasks require; no opportunistic refactors unless explicitly included. +- **Backwards compatibility.** This is a library on hex.pm — consider impact on existing behaviours (`SessionHandler`, `Dictionary`, `SessionRegistry`). +- **Specs stay in sync.** Never silently diverge from specs. If you need to deviate, update the spec/proposal first. + +### Schema: Lightweight + +This project uses the `lightweight` schema for fixes, refactors, and minor improvements: `proposal.md` → `tasks.md` (no design document). Each task should be completable in < 2 hours, include test tasks, and specify affected files. + +--- + +## Build & Test Commands + +```bash +mix compile # Compile +mix test # Run all tests +mix test test/session_test.exs # Single test file +mix test test/session_test.exs:42 # Single test by line +mix test --only tag_name # Tests by tag +mix credo # Lint +mix dialyzer # Static type checking (slow first run) +mix docs # Generate ExDoc documentation +mix bench -d 2 # Run benchmarks +``` + +Requires Elixir ~> 1.18, Erlang/OTP 27. + +--- + +## Architecture + +### Supervision Tree + +``` +ExFix.Application (Supervisor, rest_for_one) +├── ExFix.DefaultSessionRegistry (GenServer, ETS-backed) +└── ExFix.SessionSup (DynamicSupervisor) + └── ExFix.SessionWorker (GenServer, one per session) +``` + +### Key Modules & Layers + +**Public API** — `ExFix` (`lib/ex_fix.ex`): Three functions: `start_session_initiator/5`, `send_message!/2`, `stop_session/2`. Builds a `SessionConfig` and delegates to the registry. + +**Session Protocol FSM** — `ExFix.Session` (`lib/ex_fix/session.ex`): Pure functional state machine. Takes a `%Session{}` struct + input, returns `{result_tag, messages_to_send, updated_session}` where `result_tag` is `:ok | :continue | :resend | :logout | :stop`. Handles all FIX session-level messages (Logon, Heartbeat, TestRequest, ResendRequest, Reject, SequenceReset, Logout). **No side effects** — the caller (`SessionWorker`) handles I/O. + +**Session Worker** — `ExFix.SessionWorker` (`lib/ex_fix/session_worker.ex`): GenServer that owns the TCP/SSL socket. Receives network data, feeds it to `Session`, sends outgoing messages, manages heartbeat timers. Registered as `:"ex_fix_session_#{name}"`. + +**Session Registry** — `ExFix.SessionRegistry` (behaviour) / `ExFix.DefaultSessionRegistry` (default impl): Tracks session lifecycle status via ETS. Monitors session worker processes for crash detection and reconnection. The registry module is injectable per-session. + +**Parser** — `ExFix.Parser` (`lib/ex_fix/parser.ex`): Two-phase parsing. `parse1/4` extracts frame, validates checksum/length, identifies the "subject" field. `parse2/1` completes field parsing. This split lets the network process quickly route messages by subject to dedicated worker processes. + +**Messages** — `InMessage` (received, fields as `[{tag_string, value_string}]`) and `OutMessage` (outgoing, built with `new/1` + `set_field/3`). Fields are FIX tag numbers as strings (e.g., `"35"` for MsgType, `"49"` for SenderCompID). + +### Session Handler Behaviour + +Users implement `ExFix.SessionHandler` with 4 callbacks: `on_logon/2`, `on_logout/2`, `on_app_message/4`, `on_session_message/4`. The `env` parameter is a user-provided map from config. `on_app_message` receives partially-parsed messages (`complete: false`) — call `Parser.parse2(msg)` to finish parsing. + +### Session Status Types + +Two distinct status domains: +- **Protocol status** (`Session.session_status`): `:offline | :connecting | :online | :disconnecting | :disconnected` — used within `Session` FSM +- **Registry status** (`Session.registry_status`): adds `:connected | :reconnecting` — used by `DefaultSessionRegistry` and `SessionWorker.terminate/2` + +### Data Flow (Incoming) + +``` +Socket → SessionWorker.handle_info(:tcp/:ssl) → handle_data/2 + → Session.handle_incoming_data/2 → Parser.parse1/4 + → Session.process_valid_message/4 → Session.process_incoming_message/5 + → returns {result_tag, msgs_to_send, session} + → SessionWorker sends msgs via transport, updates state +``` + +### App Config + +Config keys in `config/config.exs` (compile-time via `Application.compile_env`): +- `warning_on_garbled_messages` (boolean) +- `session_registry` (module) +- `default_dictionary` (module) +- `logout_timeout` (ms, default 2000, set to 20 in test) +- `rx_heartbeat_tolerance` (float multiplier, default 1.2) + +--- + +## Test Infrastructure + +Tests use mock transport (`TestTransport`), test handler (`FixDummySessionHandler`), and test registry (`TestSessionRegistry`) defined in `test/test_helper.exs`. The mock transport simulates socket communication via process messages. Helper `msg/1` converts pipe-delimited strings to FIX binary (e.g., `msg("8=FIXT.1.1|9=5|...")`). + +--- + +## Conventions + +- FIX field tags are always strings, not integers (e.g., `"35"` not `35`) +- Fields stored as keyword-like tuples: `{tag_string, value_string}` +- Module attributes for FIX constants: `@msg_type_logon "A"`, `@field_text "58"` +- Performance-critical functions use `@compile {:inline, ...}` +- ETS used for out-queue (ordered_set, private per session) and registry (public, named) +- Extension via behaviours (`SessionHandler`, `Dictionary`, `SessionRegistry`), not protocols +- Pre-existing compiler warnings in `session.ex` (struct update typing) are not regressions +- Code and commit language: English diff --git a/lib/ex_fix/session.ex b/lib/ex_fix/session.ex index 69a82aa..b6200ed 100644 --- a/lib/ex_fix/session.ex +++ b/lib/ex_fix/session.ex @@ -530,6 +530,19 @@ defmodule ExFix.Session do end def handle_timeout(%Session{config: config, out_lastseq: out_lastseq} = session, :rx) do + %SessionConfig{name: session_name, session_handler: session_handler, env: env} = config + + if function_exported?(session_handler, :on_error, 4) do + try do + session_handler.on_error(session_name, :heartbeat_timeout, %{}, env) + rescue + e -> + Logger.error(fn -> + "[fix.error] [#{session_name}] on_error callback raised: #{inspect(e)}" + end) + end + end + out_lastseq = out_lastseq + 1 text = "Data not received" logout_msg = build_message(config, @msg_type_logout, out_lastseq, [{@field_text, text}]) @@ -604,15 +617,27 @@ defmodule ExFix.Session do _expected_seqnum, %InMessage{valid: false, error_reason: :garbled} = msg ) do + %Session{config: %SessionConfig{name: session_name, session_handler: session_handler, env: env}} = + session + if @warning_on_garbled_messages do Logger.warning(fn -> - %Session{config: %SessionConfig{name: session_name}} = session - "[fix.warning] [#{session_name}] Garbled: " <> :unicode.characters_to_binary(msg.original_fix_msg, :latin1, :utf8) end) end + if function_exported?(session_handler, :on_error, 4) do + try do + session_handler.on_error(session_name, :garbled_message, %{raw_message: msg.original_fix_msg}, env) + rescue + e -> + Logger.error(fn -> + "[fix.error] [#{session_name}] on_error callback raised: #{inspect(e)}" + end) + end + end + {:ok, [], %Session{session | extra_bytes: msg.other_msgs}} end diff --git a/lib/ex_fix/session_handler.ex b/lib/ex_fix/session_handler.ex index 5431c32..bf8322d 100644 --- a/lib/ex_fix/session_handler.ex +++ b/lib/ex_fix/session_handler.ex @@ -48,4 +48,25 @@ defmodule ExFix.SessionHandler do disconnection, which occurs first. """ @callback on_logout(session_name :: Session.session_name(), env :: map()) :: any() + + @doc """ + Called when a session-level error occurs. + + Error types: + - `:connect_error` — TCP/SSL connection failure. Details: `%{reason: term()}` + - `:transport_error` — TCP/SSL send failure. Details: `%{reason: term()}` + - `:garbled_message` — message failed checksum or body-length validation. Details: `%{raw_message: binary()}` + - `:heartbeat_timeout` — counterparty heartbeat not received within tolerance. Details: `%{}` + + This callback is optional. Handlers that do not implement it will not receive + error notifications. + """ + @callback on_error( + session_name :: Session.session_name(), + error_type :: atom(), + details :: map(), + env :: map() + ) :: any() + + @optional_callbacks [on_error: 4] end diff --git a/lib/ex_fix/session_worker.ex b/lib/ex_fix/session_worker.ex index 281e018..d2a52b8 100644 --- a/lib/ex_fix/session_worker.ex +++ b/lib/ex_fix/session_worker.ex @@ -231,11 +231,14 @@ defmodule ExFix.SessionWorker do name: fix_session_name, transport: transport, client: client, + session: session, log_outgoing_msg: log_outgoing_msg, tx_timer: tx_timer }, resend \\ false ) do + %Session{config: %SessionConfig{session_handler: session_handler, env: env}} = session + for msg <- msgs_to_send do data = Serializer.serialize(msg, DateTime.utc_now(), resend) @@ -246,7 +249,18 @@ defmodule ExFix.SessionWorker do end) end - transport.send(client, data) + case transport.send(client, data) do + {:error, reason} -> + Logger.error(fn -> + "[fix.error] [#{fix_session_name}] Transport send error: #{inspect(reason)}" + end) + + maybe_notify_error(session_handler, fix_session_name, :transport_error, %{reason: reason}, env) + + _ -> + :ok + end + send(tx_timer, :msg) end @@ -281,6 +295,8 @@ defmodule ExFix.SessionWorker do {:error, reason} -> Logger.error("Cannot open socket: #{inspect(reason)}") + %SessionConfig{session_handler: session_handler, env: env} = config + maybe_notify_error(session_handler, fix_session_name, :connect_error, %{reason: reason}, env) {:stop, reason, state} end end @@ -292,4 +308,17 @@ defmodule ExFix.SessionWorker do session_registry.session_update_status(name, :connected) rx_timer end + + defp maybe_notify_error(session_handler, session_name, error_type, details, env) do + if function_exported?(session_handler, :on_error, 4) do + try do + session_handler.on_error(session_name, error_type, details, env) + rescue + e -> + Logger.error(fn -> + "[fix.error] [#{session_name}] on_error callback raised: #{inspect(e)}" + end) + end + end + end end diff --git a/openspec/changes/archive/2026-03-05-callback-on-error/.openspec.yaml b/openspec/changes/archive/2026-03-05-callback-on-error/.openspec.yaml new file mode 100644 index 0000000..5aae5cf --- /dev/null +++ b/openspec/changes/archive/2026-03-05-callback-on-error/.openspec.yaml @@ -0,0 +1,2 @@ +schema: spec-driven +created: 2026-03-04 diff --git a/openspec/changes/archive/2026-03-05-callback-on-error/design.md b/openspec/changes/archive/2026-03-05-callback-on-error/design.md new file mode 100644 index 0000000..046fde0 --- /dev/null +++ b/openspec/changes/archive/2026-03-05-callback-on-error/design.md @@ -0,0 +1,104 @@ +## Context + +ExFix surfaces session errors only via `Logger`. Applications have no programmatic way to react to transport failures, garbled messages, or heartbeat timeouts. The `SessionHandler` behaviour has 4 callbacks (`on_logon`, `on_logout`, `on_app_message`, `on_session_message`) — none for errors. + +Current error sites: +- `SessionWorker.connect_and_send_logon/2` — `Logger.error` on socket connect failure +- `SessionWorker.do_send_messages/3` — `transport.send` result ignored +- `Session.process_invalid_message/3` — garbled messages logged (if config enabled) +- `Session.handle_timeout/2` — heartbeat timeout triggers logout but no handler notification + +## Goals / Non-Goals + +**Goals:** +- Add an optional `on_error/4` callback to `SessionHandler` +- Invoke it at all error sites — from `Session` for protocol-level errors, from `SessionWorker` for transport-level errors +- Maintain full backwards compatibility — existing handlers compile and work without changes + +**Non-Goals:** +- Error recovery actions (e.g., auto-reconnect based on callback return) — future work +- Replacing `Logger` calls — `on_error` complements logging, doesn't replace it +- Surfacing application-level message errors (business logic) — only session/transport errors + +## Decisions + +### 1. Optional callback via `@optional_callbacks` + +**Choice**: Use `@optional_callbacks [on_error: 4]` on the `SessionHandler` behaviour. + +**Alternatives considered**: +- Separate `ErrorHandler` behaviour — adds configuration complexity and a new concept for users. The error is tightly coupled to the session, so extending the existing behaviour is simpler. +- Default implementation via `__using__` macro — ExFix doesn't use `__using__` anywhere, introducing it for one callback would be inconsistent. + +**Rationale**: `@optional_callbacks` is idiomatic Elixir, zero breaking changes, and the caller checks with `function_exported?/3`. + +### 2. Call site placement: Session for protocol errors, SessionWorker for transport errors + +**Choice**: `on_error/4` is called from the same layer where the error originates: +- **`Session`** calls `on_error` for: `:garbled_message`, `:heartbeat_timeout` +- **`SessionWorker`** calls `on_error` for: `:connect_error`, `:transport_error` + +``` +┌──────────────────┐ ┌────────-─────────┐ ┌──────────────────┐ +│ Session │ │ SessionWorker │ │ SessionHandler │ +│ (state machine) │ │ (GenServer) │ │ (user callback) │ +│ │─calls──▶│ │─calls─▶│ on_error/4 │ +│ on_error for: │ │ on_error for: │ │ │ +│ :garbled_message │ │ :connect_error │ └──────────────────┘ +│ :heartbeat_timeout │ :transport_error │ +└──────────────────┘ └─────────────-────┘ +``` + +**Rationale**: `Session` already calls handler callbacks directly (`on_session_message` in 8 places, `on_app_message` in 2 places, `on_logon` once, `on_logout` 3 times). It is not a pure FSM — it performs side effects via handler callbacks as part of its message processing. Having `Session` call `on_error` for protocol-level errors follows the established pattern exactly. + +Transport errors (`connect_error`, `transport_error`) genuinely originate in `SessionWorker` (socket operations), so they are notified from there. + +### 3. Error type atoms + details map + +**Choice**: `on_error(session_name, error_type, details, env)` where: +- `error_type` — atom: `:connect_error | :transport_error | :garbled_message | :heartbeat_timeout` +- `details` — map with error-specific context: + +| Error Type | Details Map | +|---|---| +| `:connect_error` | `%{reason: term()}` | +| `:transport_error` | `%{reason: term()}` | +| `:garbled_message` | `%{raw_message: binary()}` | +| `:heartbeat_timeout` | `%{}` | + +**Alternatives considered**: +- Struct per error type — over-engineering for 4 error types, harder to extend +- Tuple `{error_type, reason}` — less extensible than a map for future fields + +### 4. Garbled message notification — direct call from Session + +`Session.process_invalid_message/3` handles garbled messages (`:garbled` error reason). It will call `on_error` directly, guarded by `function_exported?/3`, same as how it calls `on_session_message` elsewhere. The `SessionConfig` already contains `session_handler` and `env`, so no new plumbing is needed. + +### 5. Heartbeat timeout notification — direct call from Session + +`Session.handle_timeout/2` already distinguishes first vs. second RX timeout via `last_test_req_id_sent`: +- First timeout (`last_test_req_id_sent: nil`) → sends TestRequest, returns `{:ok, ...}` +- Second timeout (`last_test_req_id_sent` set) → sends Logout, returns `{:logout, ...}` + +The second clause will call `on_error` with `:heartbeat_timeout` before returning `{:logout, ...}`. + +### 6. `function_exported?/3` — call each time, no caching + +**Choice**: Call `function_exported?(session_handler, :on_error, 4)` at each error site without caching. + +**Rationale**: `function_exported?/3` is a BIF that performs a fast module table lookup. Error paths are infrequent (not the hot path), so the overhead is negligible. Caching in `SessionConfig` would add a struct field and constructor change for zero measurable benefit. Both `Session` and `SessionWorker` have access to the handler module via `SessionConfig`. + +### 7. `:invalid_message` excluded + +The proposal originally listed `:invalid_message` for validation failures (CompID mismatch, SendingTime accuracy, BeginString error). These are excluded because they are already surfaced to the handler via existing callbacks: +- CompID mismatch → `on_logout` + Reject message visible in `on_session_message` +- SendingTime accuracy → Reject message visible in `on_session_message` +- BeginString error → Logout message sent (handler notified via normal flow) + +Adding a redundant `:invalid_message` notification would create confusing double-notification. If needed in the future, it can be added as a separate change. + +## Risks / Trade-offs + +- **[Performance]** `function_exported?/3` check on every error → Mitigation: errors are infrequent (not hot path), and the BIF is fast. +- **[Handler exceptions]** User's `on_error/4` could raise → Mitigation: wrap calls in `try/rescue` with Logger fallback. Same risk exists for other callbacks but hasn't been an issue. +- **[Backwards compat]** New optional callback is non-breaking. Existing code continues to work unchanged. diff --git a/openspec/changes/archive/2026-03-05-callback-on-error/proposal.md b/openspec/changes/archive/2026-03-05-callback-on-error/proposal.md new file mode 100644 index 0000000..d13670a --- /dev/null +++ b/openspec/changes/archive/2026-03-05-callback-on-error/proposal.md @@ -0,0 +1,33 @@ +## Why + +Applications using ExFix have no programmatic way to react to session-level errors. Connection failures, garbled messages, transport send errors, and heartbeat timeouts are logged via `Logger` but never surfaced to the `SessionHandler` implementation. In financial trading systems, the ability to react to errors (e.g., switch venue, alert operations, adjust order routing) is critical. + +## What Changes + +- Add an `on_error/4` optional callback to the `ExFix.SessionHandler` behaviour. +- Invoke the callback when session-level errors occur — from `Session` for protocol errors, from `SessionWorker` for transport errors. +- The callback is optional via `@optional_callbacks` — existing handler implementations continue to work without changes. + +Error types to surface: +- `:connect_error` — TCP/SSL connection failures +- `:transport_error` — TCP/SSL send failures +- `:garbled_message` — messages that fail checksum or body-length validation +- `:heartbeat_timeout` — counterparty heartbeat not received within tolerance + +Note: `:invalid_message` (CompID mismatch, SendingTime accuracy, BeginString error) was considered but excluded. These validation failures are already surfaced to the handler via existing callbacks (`on_session_message` receives the Reject messages, `on_logout` is called for CompID/BeginString errors). Adding a separate `:invalid_message` notification would create redundant double-notification. + +## Capabilities + +### New Capabilities +- `error-callback`: Optional `on_error/4` callback on `SessionHandler` behaviour for programmatic error notification + +### Modified Capabilities +- `transport`: Add `on_error/4` invocation for `:connect_error` and `:transport_error` at transport error sites in `SessionWorker` + +## Impact + +- **`ExFix.SessionHandler`** — new optional callback added to the behaviour. Non-breaking: existing implementations compile without changes. +- **`ExFix.Session`** — calls `on_error/4` for `:garbled_message` and `:heartbeat_timeout` (follows existing pattern of calling handler callbacks directly). +- **`ExFix.SessionWorker`** — calls `on_error/4` for `:connect_error` and `:transport_error` at transport error sites. Must check if the handler implements the callback before invoking. +- **FIX protocol conformance** — no impact. This is an observability addition, not a protocol change. +- **No new dependencies** — uses only existing OTP primitives. diff --git a/openspec/changes/archive/2026-03-05-callback-on-error/specs/error-callback/spec.md b/openspec/changes/archive/2026-03-05-callback-on-error/specs/error-callback/spec.md new file mode 100644 index 0000000..d3e9518 --- /dev/null +++ b/openspec/changes/archive/2026-03-05-callback-on-error/specs/error-callback/spec.md @@ -0,0 +1,91 @@ +## ADDED Requirements + +### Requirement: Optional on_error callback in SessionHandler + +The `ExFix.SessionHandler` behaviour SHALL define an optional callback `on_error/4` with the signature: + +```elixir +@callback on_error( + session_name :: Session.session_name(), + error_type :: atom(), + details :: map(), + env :: map() +) :: any() +``` + +The callback SHALL be declared with `@optional_callbacks [on_error: 4]`. Handlers that do not implement it SHALL continue to compile and function without changes. + +#### Scenario: Handler implements on_error +- **GIVEN** a `SessionHandler` module that implements `on_error/4` +- **WHEN** a session-level error occurs +- **THEN** the system SHALL invoke `on_error/4` with the session name, error type atom, details map, and env + +#### Scenario: Handler does not implement on_error +- **GIVEN** a `SessionHandler` module that does not implement `on_error/4` +- **WHEN** a session-level error occurs +- **THEN** the system SHALL NOT raise an error and SHALL continue normal operation + +#### Scenario: on_error raises an exception +- **GIVEN** a `SessionHandler` whose `on_error/4` raises an exception +- **WHEN** the callback is invoked +- **THEN** the system SHALL rescue the exception, log it via `Logger.error`, and continue normal session operation + +### Requirement: Error type and details contract + +Each error notification SHALL include an `error_type` atom and a `details` map. The guaranteed details keys per error type are: + +| Error Type | Details Map | Source | +|---|---|---| +| `:connect_error` | `%{reason: term()}` | `SessionWorker` | +| `:transport_error` | `%{reason: term()}` | `SessionWorker` | +| `:garbled_message` | `%{raw_message: binary()}` | `Session` | +| `:heartbeat_timeout` | `%{}` | `Session` | + +### Requirement: Transport connect error notification + +The system SHALL invoke `on_error/4` with error type `:connect_error` when a TCP/SSL connection attempt fails. + +#### Scenario: TCP connection refused +- **GIVEN** a session configured to connect to a host +- **WHEN** `transport_mod.connect/3` returns `{:error, reason}` +- **THEN** `SessionWorker` SHALL invoke `on_error(session_name, :connect_error, %{reason: reason}, env)` +- **AND** the existing `Logger.error` call SHALL remain + +### Requirement: Transport send error notification + +The system SHALL invoke `on_error/4` with error type `:transport_error` when sending data over the socket fails. + +#### Scenario: Send fails on closed socket +- **GIVEN** an active session with an established connection +- **WHEN** `transport.send(client, data)` returns `{:error, reason}` +- **THEN** `SessionWorker` SHALL invoke `on_error(session_name, :transport_error, %{reason: reason}, env)` +- **AND** the system SHALL log the error via `Logger.error` + +### Requirement: Garbled message notification + +The system SHALL invoke `on_error/4` with error type `:garbled_message` when a garbled FIX message is received. + +#### Scenario: Message fails checksum validation +- **GIVEN** an active session receiving data +- **WHEN** the parser returns an `InMessage` with `error_reason: :garbled` +- **THEN** `Session.process_invalid_message/3` SHALL invoke `on_error(session_name, :garbled_message, %{raw_message: binary}, env)` directly (same pattern as existing `on_session_message` calls) +- **AND** the existing garbled-message warning behavior (controlled by `warning_on_garbled_messages` config) SHALL remain unchanged + +#### Scenario: Message fails body length validation +- **GIVEN** an active session receiving data +- **WHEN** the parser returns an `InMessage` with `error_reason: :garbled` due to body length mismatch +- **THEN** `Session.process_invalid_message/3` SHALL invoke `on_error(session_name, :garbled_message, %{raw_message: binary}, env)` + +### Requirement: Heartbeat timeout notification + +The system SHALL invoke `on_error/4` with error type `:heartbeat_timeout` when the counterparty fails to respond within the heartbeat tolerance and a logout is initiated. + +#### Scenario: No data received after test request +- **GIVEN** an active session where `last_test_req_id_sent` is not nil (a TestRequest was already sent on the first timeout) +- **WHEN** the RX timer fires again (second consecutive timeout), triggering `Session.handle_timeout/2` to return `{:logout, ...}` +- **THEN** `Session.handle_timeout/2` SHALL invoke `on_error(session_name, :heartbeat_timeout, %{}, env)` before returning the logout result +- **AND** the system SHALL proceed with its existing logout behavior + +### Requirement: Callback guard via function_exported? + +Before invoking `on_error/4`, the system SHALL check `function_exported?(session_handler, :on_error, 4)`. This check SHALL be performed at each call site without caching, as error paths are infrequent. diff --git a/openspec/changes/archive/2026-03-05-callback-on-error/tasks.md b/openspec/changes/archive/2026-03-05-callback-on-error/tasks.md new file mode 100644 index 0000000..81b82d9 --- /dev/null +++ b/openspec/changes/archive/2026-03-05-callback-on-error/tasks.md @@ -0,0 +1,33 @@ +## 1. SessionHandler behaviour + +- [x] 1.1 Add `on_error/4` callback and `@optional_callbacks` declaration to `ExFix.SessionHandler` (`lib/ex_fix/session_handler.ex`). Include `@doc` and `@callback` spec. + +## 2. Session — garbled message notification + +- [x] 2.1 In `Session.process_invalid_message/3` (garbled clause, `lib/ex_fix/session.ex`), add a guarded call to `session_handler.on_error(session_name, :garbled_message, %{raw_message: msg.original_fix_msg}, env)`. Guard with `function_exported?(session_handler, :on_error, 4)`. Wrap in `try/rescue` with `Logger.error` fallback. + +## 3. Session — heartbeat timeout notification + +- [x] 3.1 In `Session.handle_timeout/2` second RX clause (`last_test_req_id_sent` is set, `lib/ex_fix/session.ex`), add a guarded call to `session_handler.on_error(session_name, :heartbeat_timeout, %{}, env)` before returning `{:logout, ...}`. Guard and rescue same as 2.1. + +## 4. SessionWorker — transport error notifications + +- [x] 4.1 Add private `maybe_notify_error/4` helper to `SessionWorker` (`lib/ex_fix/session_worker.ex`) that checks `function_exported?(session_handler, :on_error, 4)` and calls handler's `on_error/4` wrapped in `try/rescue`. +- [x] 4.2 Call `maybe_notify_error` with `:connect_error` in `connect_and_send_logon/2` on `{:error, reason}` (`lib/ex_fix/session_worker.ex`). +- [x] 4.3 Check `transport.send` return value in `do_send_messages/3` and call `maybe_notify_error` with `:transport_error` on `{:error, reason}` (`lib/ex_fix/session_worker.ex`). + +## 5. Test helpers + +- [x] 5.1 Add `on_error/4` implementation to `FixDummySessionHandler` in `test/test_helper.exs` that sends `{:on_error, error_type, details}` to the test process (via `env.test_pid` or similar). +- [x] 5.2 Create a second test handler `FixDummySessionHandlerNoError` without `on_error/4` to test the optional callback path (`test/test_helper.exs`). +- [x] 5.3 Extend `TestTransport` with a configurable error mode (implemented as separate FailConnectTransport/FailSendTransport modules in session_worker_test.exs instead, as process dictionary doesn't work cross-process) (e.g., `send/2` returns `{:error, reason}` when a flag is set via process dictionary or agent) to enable testing `:transport_error` dispatch. + +## 6. Tests + +- [x] 6.1 Test that a handler implementing `on_error/4` receives `:garbled_message` when a garbled FIX message is processed (`test/session_test.exs`). +- [x] 6.2 Test that a handler NOT implementing `on_error/4` does not crash on garbled messages (`test/session_test.exs`). +- [x] 6.3 Test that `:connect_error` is dispatched when `transport.connect` fails (`test/session_worker_test.exs`). +- [x] 6.4 Test that `:transport_error` is dispatched when `transport.send` returns an error (`test/session_worker_test.exs`). +- [x] 6.5 Test that `:heartbeat_timeout` is dispatched on second consecutive RX timeout (`test/session_test.exs`). +- [x] 6.6 Test that `on_error/4` exceptions are rescued and logged without crashing the session (`test/session_test.exs` or `test/session_worker_test.exs`). +- [x] 6.7 Run `mix credo` and `mix dialyzer` to verify no new warnings. diff --git a/openspec/specs/error-callback/spec.md b/openspec/specs/error-callback/spec.md new file mode 100644 index 0000000..d3e9518 --- /dev/null +++ b/openspec/specs/error-callback/spec.md @@ -0,0 +1,91 @@ +## ADDED Requirements + +### Requirement: Optional on_error callback in SessionHandler + +The `ExFix.SessionHandler` behaviour SHALL define an optional callback `on_error/4` with the signature: + +```elixir +@callback on_error( + session_name :: Session.session_name(), + error_type :: atom(), + details :: map(), + env :: map() +) :: any() +``` + +The callback SHALL be declared with `@optional_callbacks [on_error: 4]`. Handlers that do not implement it SHALL continue to compile and function without changes. + +#### Scenario: Handler implements on_error +- **GIVEN** a `SessionHandler` module that implements `on_error/4` +- **WHEN** a session-level error occurs +- **THEN** the system SHALL invoke `on_error/4` with the session name, error type atom, details map, and env + +#### Scenario: Handler does not implement on_error +- **GIVEN** a `SessionHandler` module that does not implement `on_error/4` +- **WHEN** a session-level error occurs +- **THEN** the system SHALL NOT raise an error and SHALL continue normal operation + +#### Scenario: on_error raises an exception +- **GIVEN** a `SessionHandler` whose `on_error/4` raises an exception +- **WHEN** the callback is invoked +- **THEN** the system SHALL rescue the exception, log it via `Logger.error`, and continue normal session operation + +### Requirement: Error type and details contract + +Each error notification SHALL include an `error_type` atom and a `details` map. The guaranteed details keys per error type are: + +| Error Type | Details Map | Source | +|---|---|---| +| `:connect_error` | `%{reason: term()}` | `SessionWorker` | +| `:transport_error` | `%{reason: term()}` | `SessionWorker` | +| `:garbled_message` | `%{raw_message: binary()}` | `Session` | +| `:heartbeat_timeout` | `%{}` | `Session` | + +### Requirement: Transport connect error notification + +The system SHALL invoke `on_error/4` with error type `:connect_error` when a TCP/SSL connection attempt fails. + +#### Scenario: TCP connection refused +- **GIVEN** a session configured to connect to a host +- **WHEN** `transport_mod.connect/3` returns `{:error, reason}` +- **THEN** `SessionWorker` SHALL invoke `on_error(session_name, :connect_error, %{reason: reason}, env)` +- **AND** the existing `Logger.error` call SHALL remain + +### Requirement: Transport send error notification + +The system SHALL invoke `on_error/4` with error type `:transport_error` when sending data over the socket fails. + +#### Scenario: Send fails on closed socket +- **GIVEN** an active session with an established connection +- **WHEN** `transport.send(client, data)` returns `{:error, reason}` +- **THEN** `SessionWorker` SHALL invoke `on_error(session_name, :transport_error, %{reason: reason}, env)` +- **AND** the system SHALL log the error via `Logger.error` + +### Requirement: Garbled message notification + +The system SHALL invoke `on_error/4` with error type `:garbled_message` when a garbled FIX message is received. + +#### Scenario: Message fails checksum validation +- **GIVEN** an active session receiving data +- **WHEN** the parser returns an `InMessage` with `error_reason: :garbled` +- **THEN** `Session.process_invalid_message/3` SHALL invoke `on_error(session_name, :garbled_message, %{raw_message: binary}, env)` directly (same pattern as existing `on_session_message` calls) +- **AND** the existing garbled-message warning behavior (controlled by `warning_on_garbled_messages` config) SHALL remain unchanged + +#### Scenario: Message fails body length validation +- **GIVEN** an active session receiving data +- **WHEN** the parser returns an `InMessage` with `error_reason: :garbled` due to body length mismatch +- **THEN** `Session.process_invalid_message/3` SHALL invoke `on_error(session_name, :garbled_message, %{raw_message: binary}, env)` + +### Requirement: Heartbeat timeout notification + +The system SHALL invoke `on_error/4` with error type `:heartbeat_timeout` when the counterparty fails to respond within the heartbeat tolerance and a logout is initiated. + +#### Scenario: No data received after test request +- **GIVEN** an active session where `last_test_req_id_sent` is not nil (a TestRequest was already sent on the first timeout) +- **WHEN** the RX timer fires again (second consecutive timeout), triggering `Session.handle_timeout/2` to return `{:logout, ...}` +- **THEN** `Session.handle_timeout/2` SHALL invoke `on_error(session_name, :heartbeat_timeout, %{}, env)` before returning the logout result +- **AND** the system SHALL proceed with its existing logout behavior + +### Requirement: Callback guard via function_exported? + +Before invoking `on_error/4`, the system SHALL check `function_exported?(session_handler, :on_error, 4)`. This check SHALL be performed at each call site without caching, as error paths are infrequent. diff --git a/test/session_test.exs b/test/session_test.exs index 50c2ac7..4fa5a95 100644 --- a/test/session_test.exs +++ b/test/session_test.exs @@ -1319,4 +1319,76 @@ defmodule ExFix.SessionTest do assert_receive {:session_msg, "test", @msg_type_reject, %InMessage{seqnum: 99}, _env} end + + ## + ## on_error callback tests + ## + + test "on_error receives :garbled_message for garbled FIX message", %{config: cfg} do + cfg = %SessionConfig{cfg | env: %{test_pid: self()}} + {:ok, session} = Session.init(cfg) + session = %Session{session | status: :online, in_lastseq: 10, out_lastseq: 5} + + incoming_data = msg("8=FIXT.1.1|9=$$$|garbled|10=$$$|") + {:ok, _msgs_to_send, _session} = Session.handle_incoming_data(session, incoming_data) + + assert_receive {:on_error, "test", :garbled_message, %{raw_message: _raw}} + end + + test "handler without on_error/4 does not crash on garbled messages", %{config: cfg} do + cfg = %SessionConfig{ + cfg + | session_handler: ExFix.TestHelper.FixDummySessionHandlerNoError + } + + {:ok, session} = Session.init(cfg) + session = %Session{session | status: :online, in_lastseq: 10, out_lastseq: 5} + + incoming_data = msg("8=FIXT.1.1|9=$$$|garbled|10=$$$|") + {:ok, msgs_to_send, session} = Session.handle_incoming_data(session, incoming_data) + + assert Session.get_status(session) == :online + assert msgs_to_send == [] + end + + test "on_error receives :heartbeat_timeout on second consecutive RX timeout", %{config: cfg} do + cfg = %SessionConfig{cfg | env: %{test_pid: self()}} + {:ok, session} = Session.init(cfg) + session = %Session{session | status: :online, in_lastseq: 10, out_lastseq: 5} + + # First timeout: sends TestRequest, no on_error + {:ok, _msgs, session} = Session.handle_timeout(session, :rx) + refute_receive {:on_error, _, :heartbeat_timeout, _} + + # Second timeout: triggers logout and on_error + {:logout, _msgs, _session} = Session.handle_timeout(session, :rx) + assert_receive {:on_error, "test", :heartbeat_timeout, %{}} + end + + test "on_error/4 exceptions are rescued without crashing the session", %{config: cfg} do + defmodule RaisingHandler do + @behaviour ExFix.SessionHandler + + def on_logon(_session_name, _env), do: :ok + def on_app_message(_session_name, _msg_type, _msg, _env), do: :ok + def on_session_message(_session_name, _msg_type, _msg, _env), do: :ok + def on_logout(_session_name, _env), do: :ok + + def on_error(_session_name, _error_type, _details, _env) do + raise "intentional test error" + end + end + + cfg = %SessionConfig{cfg | session_handler: RaisingHandler} + {:ok, session} = Session.init(cfg) + session = %Session{session | status: :online, in_lastseq: 10, out_lastseq: 5} + + incoming_data = msg("8=FIXT.1.1|9=$$$|garbled|10=$$$|") + + # Should not raise despite handler raising + {:ok, msgs_to_send, session} = Session.handle_incoming_data(session, incoming_data) + + assert Session.get_status(session) == :online + assert msgs_to_send == [] + end end diff --git a/test/session_worker_test.exs b/test/session_worker_test.exs index d15ccd2..7a627b4 100644 --- a/test/session_worker_test.exs +++ b/test/session_worker_test.exs @@ -206,4 +206,91 @@ defmodule ExFix.SessionWorkerTest do assert Parser.parse(logout, DefaultDictionary, 1).msg_type == "5" assert_receive {:DOWN, ^ref, :process, ^pid, :normal}, 1000 end + + ## + ## on_error callback tests + ## + + defmodule FailConnectTransport do + def connect(_host, _port, _options) do + {:error, :econnrefused} + end + + def send(_conn, _data), do: :ok + def close(_conn), do: :ok + end + + defmodule FailSendTransport do + def connect(_host, _port, options) do + {:ok, options[:test_pid]} + end + + def send(_conn, _data) do + {:error, :closed} + end + + def close(_conn), do: :ok + + def receive_data(session_name, data, socket_protocol \\ :tcp) do + Process.send(:"ex_fix_session_#{session_name}", {socket_protocol, self(), data}, []) + end + end + + defmodule ErrorNotifyHandler do + @behaviour ExFix.SessionHandler + + def on_logon(_session_name, _env), do: :ok + def on_app_message(_session_name, _msg_type, _msg, _env), do: :ok + def on_session_message(_session_name, _msg_type, _msg, _env), do: :ok + def on_logout(_session_name, _env), do: :ok + + def on_error(_session_name, error_type, details, env) do + send(env.test_pid, {:on_error, error_type, details}) + end + end + + test "on_error receives :connect_error when transport.connect fails" do + Process.flag(:trap_exit, true) + + cfg = %SessionConfig{ + name: "connect_err", + mode: :initiator, + sender_comp_id: "SENDER", + target_comp_id: "TARGET", + session_handler: ErrorNotifyHandler, + dictionary: DefaultDictionary, + transport_mod: FailConnectTransport, + transport_options: [test_pid: self()], + env: %{test_pid: self()} + } + + {:ok, pid} = SessionWorker.start_link(cfg, TestSessionRegistry) + + assert_receive {:on_error, :connect_error, %{reason: :econnrefused}}, 1000 + assert_receive {:EXIT, ^pid, :econnrefused}, 1000 + end + + test "on_error receives :transport_error when transport.send fails" do + Process.flag(:trap_exit, true) + + cfg = %SessionConfig{ + name: "send_err", + mode: :initiator, + sender_comp_id: "SENDER", + target_comp_id: "TARGET", + session_handler: ErrorNotifyHandler, + dictionary: DefaultDictionary, + transport_mod: FailSendTransport, + transport_options: [test_pid: self()], + env: %{test_pid: self()} + } + + {:ok, pid} = SessionWorker.start_link(cfg, TestSessionRegistry) + + # The logon message send will fail, triggering :transport_error + assert_receive {:on_error, :transport_error, %{reason: :closed}}, 1000 + # Clean up + Process.exit(pid, :kill) + assert_receive {:EXIT, ^pid, :killed}, 1000 + end end diff --git a/test/test_helper.exs b/test/test_helper.exs index d8b56ae..e0a599f 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -20,6 +20,29 @@ defmodule ExFix.TestHelper do end def on_logout(_session_id, _env), do: :ok + + def on_error(session_name, error_type, details, env) do + test_pid = Map.get(env, :test_pid, self()) + send(test_pid, {:on_error, session_name, error_type, details}) + end + end + + defmodule FixDummySessionHandlerNoError do + @behaviour ExFix.SessionHandler + + def on_logon(session_name, env) do + send(self(), {:logon, session_name, env}) + end + + def on_app_message(session_name, msg_type, msg, env) do + send(self(), {:msg, session_name, msg_type, msg, env}) + end + + def on_session_message(session_name, msg_type, msg, env) do + send(self(), {:session_msg, session_name, msg_type, msg, env}) + end + + def on_logout(_session_id, _env), do: :ok end defmodule FixEmptySessionHandler do