Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ erl_crash.dump
/bench/graphs/
coverage_output.log
/.claude/settings.local.json
/.serena/
139 changes: 139 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
# CLAUDE.md

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

## Project

ExFix is an Elixir implementation of the FIX (Financial Information eXchange) Session Protocol FIXT.1.1. It only supports session initiator mode (buy-side). Published on hex.pm (v0.2.9), Apache 2.0 license. Zero runtime dependencies — only Erlang/OTP primitives.

---

## How You Must Work: OpenSpec-Driven Development

This project uses **OpenSpec** for spec-driven development. All code changes — features, refactors, and non-trivial fixes — must flow through `openspec/`. Treat `openspec/` as the single source of truth for requirements, active changes, and implementation plans.

**You must never make code changes directly from chat.** If a user asks to "just edit the code" or "quickly patch this", refuse and guide them into the proper OpenSpec flow.

### Workflow

1. **Always read `openspec/config.yaml` before starting work** — it contains project context, conventions, and rules for proposals/specs/design/tasks.
2. **For any requested change:**
- If an appropriate change exists under `openspec/changes/`, work only through its `proposal.md` and `tasks.md`.
- If no change exists, create one using `/opsx:propose` before touching any code.
3. **The OpenSpec loop:**
- **Propose** → Capture *what* and *why* in `openspec/changes/<change-id>/proposal.md`
- **Tasks** → Break implementation into steps in `tasks.md`
- **Apply** → Implement tasks via `/opsx:apply`, marking complete as you go
- **Archive** → Once verified, archive via `/opsx:archive`

### Commands

- `/opsx:propose` — Create a new change with proposal and tasks
- `/opsx:apply` — Implement tasks from an existing change's `tasks.md`
- `/opsx:explore` — Think through ideas, investigate problems, clarify requirements
- `/opsx:archive` — Archive a completed change

### Rules (Mandatory)

- **No bypassing OpenSpec.** You cannot apply non-trivial code changes without a change folder under `openspec/changes/`.
- **Specs over chat.** If chat instructions contradict `openspec/specs/` or an active change, follow the written specs and highlight the discrepancy.
- **Prefer minimal change.** Change only what the tasks require; no opportunistic refactors unless explicitly included.
- **Backwards compatibility.** This is a library on hex.pm — consider impact on existing behaviours (`SessionHandler`, `Dictionary`, `SessionRegistry`).
- **Specs stay in sync.** Never silently diverge from specs. If you need to deviate, update the spec/proposal first.

### Schema: Lightweight

This project uses the `lightweight` schema for fixes, refactors, and minor improvements: `proposal.md` → `tasks.md` (no design document). Each task should be completable in < 2 hours, include test tasks, and specify affected files.

---

## Build & Test Commands

```bash
mix compile # Compile
mix test # Run all tests
mix test test/session_test.exs # Single test file
mix test test/session_test.exs:42 # Single test by line
mix test --only tag_name # Tests by tag
mix credo # Lint
mix dialyzer # Static type checking (slow first run)
mix docs # Generate ExDoc documentation
mix bench -d 2 # Run benchmarks
```

Requires Elixir ~> 1.18, Erlang/OTP 27.

---

## Architecture

### Supervision Tree

```
ExFix.Application (Supervisor, rest_for_one)
├── ExFix.DefaultSessionRegistry (GenServer, ETS-backed)
└── ExFix.SessionSup (DynamicSupervisor)
└── ExFix.SessionWorker (GenServer, one per session)
```

### Key Modules & Layers

**Public API** — `ExFix` (`lib/ex_fix.ex`): Three functions: `start_session_initiator/5`, `send_message!/2`, `stop_session/2`. Builds a `SessionConfig` and delegates to the registry.

**Session Protocol FSM** — `ExFix.Session` (`lib/ex_fix/session.ex`): Pure functional state machine. Takes a `%Session{}` struct + input, returns `{result_tag, messages_to_send, updated_session}` where `result_tag` is `:ok | :continue | :resend | :logout | :stop`. Handles all FIX session-level messages (Logon, Heartbeat, TestRequest, ResendRequest, Reject, SequenceReset, Logout). **No side effects** — the caller (`SessionWorker`) handles I/O.

**Session Worker** — `ExFix.SessionWorker` (`lib/ex_fix/session_worker.ex`): GenServer that owns the TCP/SSL socket. Receives network data, feeds it to `Session`, sends outgoing messages, manages heartbeat timers. Registered as `:"ex_fix_session_#{name}"`.

**Session Registry** — `ExFix.SessionRegistry` (behaviour) / `ExFix.DefaultSessionRegistry` (default impl): Tracks session lifecycle status via ETS. Monitors session worker processes for crash detection and reconnection. The registry module is injectable per-session.

**Parser** — `ExFix.Parser` (`lib/ex_fix/parser.ex`): Two-phase parsing. `parse1/4` extracts frame, validates checksum/length, identifies the "subject" field. `parse2/1` completes field parsing. This split lets the network process quickly route messages by subject to dedicated worker processes.

**Messages** — `InMessage` (received, fields as `[{tag_string, value_string}]`) and `OutMessage` (outgoing, built with `new/1` + `set_field/3`). Fields are FIX tag numbers as strings (e.g., `"35"` for MsgType, `"49"` for SenderCompID).

### Session Handler Behaviour

Users implement `ExFix.SessionHandler` with 4 callbacks: `on_logon/2`, `on_logout/2`, `on_app_message/4`, `on_session_message/4`. The `env` parameter is a user-provided map from config. `on_app_message` receives partially-parsed messages (`complete: false`) — call `Parser.parse2(msg)` to finish parsing.

### Session Status Types

Two distinct status domains:
- **Protocol status** (`Session.session_status`): `:offline | :connecting | :online | :disconnecting | :disconnected` — used within `Session` FSM
- **Registry status** (`Session.registry_status`): adds `:connected | :reconnecting` — used by `DefaultSessionRegistry` and `SessionWorker.terminate/2`

### Data Flow (Incoming)

```
Socket → SessionWorker.handle_info(:tcp/:ssl) → handle_data/2
→ Session.handle_incoming_data/2 → Parser.parse1/4
→ Session.process_valid_message/4 → Session.process_incoming_message/5
→ returns {result_tag, msgs_to_send, session}
→ SessionWorker sends msgs via transport, updates state
```

### App Config

Config keys in `config/config.exs` (compile-time via `Application.compile_env`):
- `warning_on_garbled_messages` (boolean)
- `session_registry` (module)
- `default_dictionary` (module)
- `logout_timeout` (ms, default 2000, set to 20 in test)
- `rx_heartbeat_tolerance` (float multiplier, default 1.2)

---

## Test Infrastructure

Tests use mock transport (`TestTransport`), test handler (`FixDummySessionHandler`), and test registry (`TestSessionRegistry`) defined in `test/test_helper.exs`. The mock transport simulates socket communication via process messages. Helper `msg/1` converts pipe-delimited strings to FIX binary (e.g., `msg("8=FIXT.1.1|9=5|...")`).

---

## Conventions

- FIX field tags are always strings, not integers (e.g., `"35"` not `35`)
- Fields stored as keyword-like tuples: `{tag_string, value_string}`
- Module attributes for FIX constants: `@msg_type_logon "A"`, `@field_text "58"`
- Performance-critical functions use `@compile {:inline, ...}`
- ETS used for out-queue (ordered_set, private per session) and registry (public, named)
- Extension via behaviours (`SessionHandler`, `Dictionary`, `SessionRegistry`), not protocols
- Pre-existing compiler warnings in `session.ex` (struct update typing) are not regressions
- Code and commit language: English
29 changes: 27 additions & 2 deletions lib/ex_fix/session.ex
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,19 @@ defmodule ExFix.Session do
end

def handle_timeout(%Session{config: config, out_lastseq: out_lastseq} = session, :rx) do
%SessionConfig{name: session_name, session_handler: session_handler, env: env} = config

if function_exported?(session_handler, :on_error, 4) do
try do
session_handler.on_error(session_name, :heartbeat_timeout, %{}, env)
rescue
e ->
Logger.error(fn ->
"[fix.error] [#{session_name}] on_error callback raised: #{inspect(e)}"
end)
end
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicated error notification pattern across session module

Low Severity

The function_exported? + try/rescue + Logger.error pattern for invoking on_error is copy-pasted twice inline in session.ex (for :heartbeat_timeout and :garbled_message), while session_worker.ex correctly extracts the identical logic into a reusable maybe_notify_error/5 helper. A similar private helper in session.ex would eliminate the duplication and keep the error-handling contract consistent across both modules.

Additional Locations (2)

Fix in Cursor Fix in Web

end

out_lastseq = out_lastseq + 1
text = "Data not received"
logout_msg = build_message(config, @msg_type_logout, out_lastseq, [{@field_text, text}])
Expand Down Expand Up @@ -604,15 +617,27 @@ defmodule ExFix.Session do
_expected_seqnum,
%InMessage{valid: false, error_reason: :garbled} = msg
) do
%Session{config: %SessionConfig{name: session_name, session_handler: session_handler, env: env}} =
session

if @warning_on_garbled_messages do
Logger.warning(fn ->
%Session{config: %SessionConfig{name: session_name}} = session

"[fix.warning] [#{session_name}] Garbled: " <>
:unicode.characters_to_binary(msg.original_fix_msg, :latin1, :utf8)
end)
end

if function_exported?(session_handler, :on_error, 4) do
try do
session_handler.on_error(session_name, :garbled_message, %{raw_message: msg.original_fix_msg}, env)
rescue
e ->
Logger.error(fn ->
"[fix.error] [#{session_name}] on_error callback raised: #{inspect(e)}"
end)
end
end

{:ok, [], %Session{session | extra_bytes: msg.other_msgs}}
end

Expand Down
21 changes: 21 additions & 0 deletions lib/ex_fix/session_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,25 @@ defmodule ExFix.SessionHandler do
disconnection, which occurs first.
"""
@callback on_logout(session_name :: Session.session_name(), env :: map()) :: any()

@doc """
Called when a session-level error occurs.

Error types:
- `:connect_error` — TCP/SSL connection failure. Details: `%{reason: term()}`
- `:transport_error` — TCP/SSL send failure. Details: `%{reason: term()}`
- `:garbled_message` — message failed checksum or body-length validation. Details: `%{raw_message: binary()}`
- `:heartbeat_timeout` — counterparty heartbeat not received within tolerance. Details: `%{}`

This callback is optional. Handlers that do not implement it will not receive
error notifications.
"""
@callback on_error(
session_name :: Session.session_name(),
error_type :: atom(),
details :: map(),
env :: map()
) :: any()

@optional_callbacks [on_error: 4]
end
31 changes: 30 additions & 1 deletion lib/ex_fix/session_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -231,11 +231,14 @@ defmodule ExFix.SessionWorker do
name: fix_session_name,
transport: transport,
client: client,
session: session,
log_outgoing_msg: log_outgoing_msg,
tx_timer: tx_timer
},
resend \\ false
) do
%Session{config: %SessionConfig{session_handler: session_handler, env: env}} = session

for msg <- msgs_to_send do
data = Serializer.serialize(msg, DateTime.utc_now(), resend)

Expand All @@ -246,7 +249,18 @@ defmodule ExFix.SessionWorker do
end)
end

transport.send(client, data)
case transport.send(client, data) do
{:error, reason} ->
Logger.error(fn ->
"[fix.error] [#{fix_session_name}] Transport send error: #{inspect(reason)}"
end)

maybe_notify_error(session_handler, fix_session_name, :transport_error, %{reason: reason}, env)

_ ->
:ok
end

send(tx_timer, :msg)
end

Expand Down Expand Up @@ -281,6 +295,8 @@ defmodule ExFix.SessionWorker do

{:error, reason} ->
Logger.error("Cannot open socket: #{inspect(reason)}")
%SessionConfig{session_handler: session_handler, env: env} = config
maybe_notify_error(session_handler, fix_session_name, :connect_error, %{reason: reason}, env)
{:stop, reason, state}
end
end
Expand All @@ -292,4 +308,17 @@ defmodule ExFix.SessionWorker do
session_registry.session_update_status(name, :connected)
rx_timer
end

defp maybe_notify_error(session_handler, session_name, error_type, details, env) do
if function_exported?(session_handler, :on_error, 4) do
try do
session_handler.on_error(session_name, error_type, details, env)
rescue
e ->
Logger.error(fn ->
"[fix.error] [#{session_name}] on_error callback raised: #{inspect(e)}"
end)
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
schema: spec-driven
created: 2026-03-04
104 changes: 104 additions & 0 deletions openspec/changes/archive/2026-03-05-callback-on-error/design.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
## Context

ExFix surfaces session errors only via `Logger`. Applications have no programmatic way to react to transport failures, garbled messages, or heartbeat timeouts. The `SessionHandler` behaviour has 4 callbacks (`on_logon`, `on_logout`, `on_app_message`, `on_session_message`) — none for errors.

Current error sites:
- `SessionWorker.connect_and_send_logon/2` — `Logger.error` on socket connect failure
- `SessionWorker.do_send_messages/3` — `transport.send` result ignored
- `Session.process_invalid_message/3` — garbled messages logged (if config enabled)
- `Session.handle_timeout/2` — heartbeat timeout triggers logout but no handler notification

## Goals / Non-Goals

**Goals:**
- Add an optional `on_error/4` callback to `SessionHandler`
- Invoke it at all error sites — from `Session` for protocol-level errors, from `SessionWorker` for transport-level errors
- Maintain full backwards compatibility — existing handlers compile and work without changes

**Non-Goals:**
- Error recovery actions (e.g., auto-reconnect based on callback return) — future work
- Replacing `Logger` calls — `on_error` complements logging, doesn't replace it
- Surfacing application-level message errors (business logic) — only session/transport errors

## Decisions

### 1. Optional callback via `@optional_callbacks`

**Choice**: Use `@optional_callbacks [on_error: 4]` on the `SessionHandler` behaviour.

**Alternatives considered**:
- Separate `ErrorHandler` behaviour — adds configuration complexity and a new concept for users. The error is tightly coupled to the session, so extending the existing behaviour is simpler.
- Default implementation via `__using__` macro — ExFix doesn't use `__using__` anywhere, introducing it for one callback would be inconsistent.

**Rationale**: `@optional_callbacks` is idiomatic Elixir, zero breaking changes, and the caller checks with `function_exported?/3`.

### 2. Call site placement: Session for protocol errors, SessionWorker for transport errors

**Choice**: `on_error/4` is called from the same layer where the error originates:
- **`Session`** calls `on_error` for: `:garbled_message`, `:heartbeat_timeout`
- **`SessionWorker`** calls `on_error` for: `:connect_error`, `:transport_error`

```
┌──────────────────┐ ┌────────-─────────┐ ┌──────────────────┐
│ Session │ │ SessionWorker │ │ SessionHandler │
│ (state machine) │ │ (GenServer) │ │ (user callback) │
│ │─calls──▶│ │─calls─▶│ on_error/4 │
│ on_error for: │ │ on_error for: │ │ │
│ :garbled_message │ │ :connect_error │ └──────────────────┘
│ :heartbeat_timeout │ :transport_error │
└──────────────────┘ └─────────────-────┘
```

**Rationale**: `Session` already calls handler callbacks directly (`on_session_message` in 8 places, `on_app_message` in 2 places, `on_logon` once, `on_logout` 3 times). It is not a pure FSM — it performs side effects via handler callbacks as part of its message processing. Having `Session` call `on_error` for protocol-level errors follows the established pattern exactly.

Transport errors (`connect_error`, `transport_error`) genuinely originate in `SessionWorker` (socket operations), so they are notified from there.

### 3. Error type atoms + details map

**Choice**: `on_error(session_name, error_type, details, env)` where:
- `error_type` — atom: `:connect_error | :transport_error | :garbled_message | :heartbeat_timeout`
- `details` — map with error-specific context:

| Error Type | Details Map |
|---|---|
| `:connect_error` | `%{reason: term()}` |
| `:transport_error` | `%{reason: term()}` |
| `:garbled_message` | `%{raw_message: binary()}` |
| `:heartbeat_timeout` | `%{}` |

**Alternatives considered**:
- Struct per error type — over-engineering for 4 error types, harder to extend
- Tuple `{error_type, reason}` — less extensible than a map for future fields

### 4. Garbled message notification — direct call from Session

`Session.process_invalid_message/3` handles garbled messages (`:garbled` error reason). It will call `on_error` directly, guarded by `function_exported?/3`, same as how it calls `on_session_message` elsewhere. The `SessionConfig` already contains `session_handler` and `env`, so no new plumbing is needed.

### 5. Heartbeat timeout notification — direct call from Session

`Session.handle_timeout/2` already distinguishes first vs. second RX timeout via `last_test_req_id_sent`:
- First timeout (`last_test_req_id_sent: nil`) → sends TestRequest, returns `{:ok, ...}`
- Second timeout (`last_test_req_id_sent` set) → sends Logout, returns `{:logout, ...}`

The second clause will call `on_error` with `:heartbeat_timeout` before returning `{:logout, ...}`.

### 6. `function_exported?/3` — call each time, no caching

**Choice**: Call `function_exported?(session_handler, :on_error, 4)` at each error site without caching.

**Rationale**: `function_exported?/3` is a BIF that performs a fast module table lookup. Error paths are infrequent (not the hot path), so the overhead is negligible. Caching in `SessionConfig` would add a struct field and constructor change for zero measurable benefit. Both `Session` and `SessionWorker` have access to the handler module via `SessionConfig`.

### 7. `:invalid_message` excluded

The proposal originally listed `:invalid_message` for validation failures (CompID mismatch, SendingTime accuracy, BeginString error). These are excluded because they are already surfaced to the handler via existing callbacks:
- CompID mismatch → `on_logout` + Reject message visible in `on_session_message`
- SendingTime accuracy → Reject message visible in `on_session_message`
- BeginString error → Logout message sent (handler notified via normal flow)

Adding a redundant `:invalid_message` notification would create confusing double-notification. If needed in the future, it can be added as a separate change.

## Risks / Trade-offs

- **[Performance]** `function_exported?/3` check on every error → Mitigation: errors are infrequent (not hot path), and the BIF is fast.
- **[Handler exceptions]** User's `on_error/4` could raise → Mitigation: wrap calls in `try/rescue` with Logger fallback. Same risk exists for other callbacks but hasn't been an issue.
- **[Backwards compat]** New optional callback is non-breaking. Existing code continues to work unchanged.
Loading
Loading