Skip to content

Conversation

taylordowns2000
Copy link
Member

@taylordowns2000 taylordowns2000 commented Sep 8, 2025

Does two things:

  1. Fixes Runs are marked as lost when they should be re-enqueued #3565
  2. Renames some variables to be less ambiguous - it was confusing for me not knowing if they were seconds or milliseconds

Question: Should we add some sort of state to the run_channel to make sure that it cannot be joined by more than one worker? (Imagine we roll back after 30 seconds, and right at that moment worker A joins the run channel. Worker B will then join also once it gets its own claim. It would be awesome to either invalidate the first key or block joining the run channel once we decide to roll back. Is that possible?)

Validation steps

  1. set RUN_CHANNEL_JOIN_TIMEOUT_SECONDS=15
  2. add process.sleep(11_000) as described in the issue (line 64/65 in worker_channel.ex in this PR)
  3. run a run and see what happens!
  4. it should be claimed, the worker should timeout, and then after 30 seconds it should be put back into available state.
  5. if you remove the timeout, everything should work as usual

AI Usage

Please disclose how you've used AI in this work (it's cool, we just want to know!):

  • Code generation (copilot but not intellisense)
  • Learning or fact checking
  • Strategy / design
  • Optimisation / refactoring
  • Translation / spellchecking / doc gen
  • Other
  • I have not used AI
  • Description of changes

AI review of changes

Add Claim Timeout Mechanism for Worker Channel

Summary

This PR implements a timeout mechanism for the WorkerChannel to prevent runs from being stuck in :claimed state when workers disconnect or fail to join run channels after claiming runs. The mechanism works across clustered Lightning nodes using Phoenix.PubSub for cross-node communication.

Problem

Previously, when a worker claimed runs and then disconnected before joining the corresponding run channels, those runs would remain in :claimed state indefinitely, preventing other workers from claiming them. This created a resource leak where runs became unavailable.

Solution

  • Timeout Mechanism: After a worker claims runs, a configurable timeout (default 30 seconds) is started
  • Automatic Rollback: If the client doesn't join run channels within the timeout period, claimed runs are automatically rolled back to :available state
  • Cross-Node Communication: Uses Phoenix.PubSub to work across clustered Lightning nodes
  • Optimization: Only starts timeout when there are actually runs to track (no timeout for 0 runs)

Key Changes

Core Implementation
  • lib/lightning_web/channels/worker_channel.ex: Added timeout logic, PubSub subscription, and rollback handling
  • lib/lightning_web/channels/run_channel.ex: Added notification broadcast when run channels are joined
  • lib/lightning/runs.ex: Added rollback_claimed_runs/1 function for database rollback
Configuration
  • lib/lightning/config/bootstrap.ex: Added RUN_CHANNEL_JOIN_TIMEOUT_SECONDS environment variable
  • lib/lightning/config.ex: Added configuration callback and renamed existing callbacks for consistency
Testing
  • test/lightning_web/channels/worker_channel_test.exs: Added comprehensive tests for timeout and cancellation scenarios

Areas for Reviewers to Focus On

🔍 Critical Review Areas
  1. Cross-Node Communication Logic (worker_channel.ex:39-44, 168-179)

    • Verify PubSub subscription/unsubscription is handled correctly
    • Check that worker_id filtering prevents message cross-contamination
    • Ensure proper cleanup in terminate/2
  2. Timeout and Rollback Logic (worker_channel.ex:83-102, 119-135)

    • Verify timeout only starts when original_runs count > 0
    • Check that rollback correctly resets run state to :available
    • Ensure timeout cancellation works when all runs are joined
  3. Database Rollback Function (runs.ex:350-374)

    • Verify the rollback_claimed_runs/1 function correctly updates run state
    • Check that it handles empty lists gracefully
    • Ensure proper logging of rollback operations
  4. Configuration Changes (config.ex:327-330, 453, 693-695)

    • Verify new configuration follows existing patterns
    • Check that callback renaming doesn't break existing functionality
    • Ensure proper default values
🧪 Test Coverage
  1. Timeout Test (worker_channel_test.exs:139-170)

    • Verify test correctly simulates timeout scenario
    • Check that manual timeout triggering works in test environment
    • Ensure rollback verification is comprehensive
  2. Cancellation Test (worker_channel_test.exs:177-210)

    • Verify PubSub message format matches implementation
    • Check that timeout cancellation prevents rollback
    • Ensure test cleanup removes configuration
⚠️ Potential Issues to Watch For
  1. Memory Leaks: Ensure PubSub subscriptions are properly cleaned up
  2. Race Conditions: Verify timeout cancellation is atomic
  3. Configuration: Check that environment variable naming is consistent
  4. Error Handling: Ensure graceful handling of edge cases (empty runs, nil worker_id)

Configuration

The timeout can be configured via the RUN_CHANNEL_JOIN_TIMEOUT_SECONDS environment variable (default: 30 seconds).

Testing

All existing tests pass, and new tests cover:

  • Timeout rollback when client doesn't join run channels
  • Timeout cancellation when client successfully joins run channels
  • Cross-node communication via PubSub

Backward Compatibility

This change is fully backward compatible. Existing functionality remains unchanged, and the new mechanism only activates when runs are claimed.

You can read more details in our Responsible AI Policy

Pre-submission checklist

  • I have performed a self-review of my code.
  • I have implemented and tested all related authorization policies. (e.g., :owner, :admin, :editor, :viewer)
  • I have updated the changelog.
  • I have ticked a box in "AI usage" in this PR

Copy link

codecov bot commented Sep 9, 2025

Codecov Report

❌ Patch coverage is 71.18644% with 17 lines in your changes missing coverage. Please review.
✅ Project coverage is 90.21%. Comparing base (a859751) to head (9249a70).
⚠️ Report is 2 commits behind head on main.

Files with missing lines Patch % Lines
lib/lightning_web/channels/worker_channel.ex 66.66% 14 Missing ⚠️
lib/lightning_web/channels/run_channel.ex 50.00% 2 Missing ⚠️
lib/lightning/runs.ex 75.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3566      +/-   ##
==========================================
- Coverage   90.27%   90.21%   -0.07%     
==========================================
  Files         386      386              
  Lines       15869    15921      +52     
==========================================
+ Hits        14326    14363      +37     
- Misses       1543     1558      +15     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@taylordowns2000 taylordowns2000 changed the title Only reply if the socket is still alive Rollback "claimed" runs if worker doesn't join run channel Sep 10, 2025
@taylordowns2000 taylordowns2000 changed the title Rollback "claimed" runs if worker doesn't join run channel Rollback "claimed" runs to "available" if worker doesn't join run channel Sep 10, 2025
@taylordowns2000
Copy link
Member Author

@rorymckinley , @josephjclark , @stuartc , i feel pretty good about this with one straggling question... reposting it from the PR itself down here:

Question: Should we add some sort of state to the run_channel to make sure that it cannot be joined by more than one worker? (Imagine we roll back after 30 seconds, and right at that moment worker A joins the run channel. Worker B will then join also once it gets its own claim. It would be awesome to either invalidate the first key or block joining the run channel once we decide to roll back. Is that possible?)

Copy link
Collaborator

@rorymckinley rorymckinley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@taylordowns2000 I have some silly questions about the implementation and also I think there are places where we could screw down the tests a bit more.


# Notify the worker channel that this run channel has been joined
# Use PubSub for cross-node communication in clustered environments
case socket.assigns[:worker_id] do
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@taylordowns2000 run_channel_test.exs does not seem to have been updated to cover this additional functionality?

"Worker socket disconnected before claim reply, rolling back transaction for runs: #{Enum.map_join(original_runs, ", ", & &1.id)}"
)

Runs.rollback_claimed_runs(original_runs)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@taylordowns2000 There does not appear to be test coverage here - is there a cheap way to test that the Process.alive? check failed?


# Store the timeout reference and original runs in socket assigns
assign(socket,
claim_timeout_ref: timeout_ref,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@taylordowns2000 If I am reading this correctly, the reference to the timeout is set for the socket - does that make us vulnerable to the following:

Worker 1 sends a claim request for 1 run (R1)
Lightning sends back run details and sets timeout T1
Before Worker 1 joins the run channel and within the join timeout, Worker 1 sends through a claim for another run.
Lightning sends back run details (R2) and sets timeout T2.
Now, T1 has been replaced with T2, i.e. have we lost timeout coverage of R1?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great catch. investigating now

Copy link
Member Author

@taylordowns2000 taylordowns2000 Sep 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rorymckinley , i think you were right... does this do the trick? 0a874d9

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@taylordowns2000 It looks like it - I have a question re: this comment.

Is that functionality still coming?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we shouldn't roll them back. We should let the Janitor have its way with them if the worker channel closes

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed the comment. Thanks for catching it!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @taylordowns2000 - will you be adding tests later?

Copy link
Member Author

@taylordowns2000 taylordowns2000 Sep 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only if Joe/stu say yes, they want this approach in the app.

Logger.info("Successfully rolled back #{count} runs")

# Clear the timeout reference and pending runs from socket assigns
socket = assign(socket, claim_timeout_ref: nil, pending_runs: nil)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@taylordowns2000 This line does not appear to have test coverage?

{:noreply, socket}
end
else
# Still have pending runs, update the list
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@taylordowns2000 There does not appear to be test coverage for the case where there are runs remaining (and the timeout does not get cancelled)?

handle_info({:run_channel_joined, run_id}, socket)

_ ->
# Message not for this worker, ignore
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@taylordowns2000 Do we have a test to cover the case where the message is not for this worker?

"Worker channel terminating with pending runs, rolling back: #{Enum.map_join(pending_runs, ", ", & &1.id)}"
)

Runs.rollback_claimed_runs(pending_runs)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@taylordowns2000 This line does not appear to have test coverage?

# No worker ID available, continue normally
:ok

worker_id ->
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@taylordowns2000 Are we vulnerable to a race condition here> E.g, we retrieve the run, broadcast a message that the timer can be reset and then send data back to the worker? In the gap between the broadcast and something acting on it, the timeout could kick in, unclaiming the run and potentially allowing situation where two workers are running it?

This feels like the 'ack-ack' scenario (although we can use the run channel response as the 'ack-ack' - and I think that addresses your question about the Worker A, Worker B scenario.

When a worker joins the run channel, I think that Lightning should confirm that the run is still claimed, lock the run and apply a change that makes rollback impossible (it feels like the responsible way to do this would be to introduce a new state), and then only send the :ok back to the worker.

If none of these conditions are true, the Worker gets an error.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rorymckinley , would this be handled better by ensuring that a run channel can only be joined once? or by a single worker? (this was a thought I had in the initial PR.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eish, or would that prevent workers from re-connecting to the run channel if they got temporarily disconnected?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@taylordowns2000 Even if a single-connect is an option I do not think it would solve the problem entirely and may introduce a new problem:

Showing my workings, apologies for the repetition of what I said before.

I can't see any obvious place where we are checking the state of a run before we send stuff back to the worker (probably never needed it before?) - so race 1 would be that the run has already been rolled back (essentially Worker A sans Worker B). A single connect does not seem to solve this.

Race 2 would be that the run has not been rolled back but in between the broadcast and the action the run gets rolled back. Using the Highlander approach you suggest would prevent another worker picking up the run once it has rolled back but......

  • The first worker would think that it is ok to work on this, but Lightning thinks it is available. What happens when the worker sends through 'started' - would Lightning need a moment to compose itself?
  • If we only allow one connection to the run, would that block legitimate cases where we do want another Worker to connect. So, let us say that Worker A does connect too late, the run has been rolled back but not claimed yet. In this case, we would tell Worker A to fly a kite and we would want Worker B to be able to connect, but the Highlander principle would block it?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And, oops: I forgot to add that I do not know enough about workers to know know if it will reconnect mid-run to the run channel - @josephjclark ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eish. Yeah, it does sound like adding another state (:locked?) might be useful. Let's see what @stuartc and @josephjclark think here before I invest more time in this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:confirmed ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the worker will just reconnect when the connection is lost. The socket should handle this. I'll test locally later and see if I can confirm.


{count, _} =
from(r in Run, where: r.id in ^run_ids)
|> Repo.update_all(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@taylordowns2000 To avoid race conditions, it may be worth locking each record, checking that the record should be rolled back and then rolling it back.

@github-project-automation github-project-automation bot moved this from New Issues to In review in v2 Sep 10, 2025
@josephjclark
Copy link
Collaborator

@taylordowns2000

Should we add some sort of state to the run_channel to make sure that it cannot be joined by more than one worker?

Not sure, I keep changing my mind.

It definitely feels like there are a few teeny tiny gaps where a claim might resolve just after the timeout, or two workers might join, or hhe rollback might occur just after a worker starts, or maybe the wrong worker joins. These are really tiny gaps but possible. They might just be impossible to close?

If a worker is disconnected mid-run, it should reconnect and continue sending events. In fact I've just tested this and it works great (although some events may time out if the connection is down for long enough)

So you could add validation that only one worker can be in the channel at the time, or after a worker joins, only another worker with the same id can rejoin. But none of those are ironclad.

I would leave it: this PR probably fixes 99% of the LostBeforeStart cases. Let's put that into the wild and come back and thing about the remaining 1% if it remains a problem.

@stuartc
Copy link
Member

stuartc commented Sep 11, 2025

One comment here, will digest the "ack" aspect tomorrow.

The idea of checking the transport pid is alive feels odd to me, the socket and channel process (afaik) are tied to the transport process so they shouldn't be running, ie that check will always be true. I haven't seen logs indicating a message failed because the transport was dead.

The original conversation was about a theory I had that transactions were continuing after socket had closed, since the claim handler is synchronous right now I find it hard to believe the client is still listening to replies after 60s. We observed multiple claim transactions on the db still going after 60s. So I'm interested in trying to replicate this. The client should have disconnected (and the channel process dies and takes the transaction with it) or the worker should still be waiting for a response. Really want to determine if the worker is still connected at least via a socket and isn't going to consume the response.

So the logic seems ok as a safe guard for a worst case scenario, since the reply isn't ack'd in our current design. But it shouldn't replace understanding what's happening (and I'm thinking it might get the way of debugging the scenario above).

<% else %>
<Common.combobox
items={assigns[:projects] || []}
items={(assigns[:projects] || []) |> Enum.sort_by(& &1.name)}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cheeky. I think Elias has fixed this on main now?


{:reply, {:ok, %{runs: runs}}, socket}
# Check if the socket is still alive
if Process.alive?(socket.transport_pid) do
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I'm with @stuartc here - checking the socket is alive isn't the right thing.

Is this a hangover from the original PR that needs removing? Or is there still some value?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: In review

Development

Successfully merging this pull request may close these issues.

Runs are marked as lost when they should be re-enqueued

4 participants