Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for new status and forceclose server commands #229

Merged
merged 32 commits into from
Mar 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
287bef4
add errormonitor also to subtask
jkrumbiegel Dec 13, 2024
5061f03
add Dates dep
jkrumbiegel Dec 13, 2024
e34fbbc
add run start and finish timestamps to `File`
jkrumbiegel Dec 13, 2024
689fa8a
add a "workers" message type that returns worker metadata
jkrumbiegel Dec 13, 2024
ff3bfd3
reformat docstring
jkrumbiegel Dec 16, 2024
b1a0b52
remove forced error again
jkrumbiegel Jan 6, 2025
9bdd7ff
add tests
jkrumbiegel Jan 6, 2025
fb568d3
format
jkrumbiegel Jan 6, 2025
7d017d7
Merge branch 'main' into jk/workers-status
jkrumbiegel Mar 7, 2025
7f54131
switch to string with full server status via "status" message
jkrumbiegel Mar 7, 2025
4ee3468
print server timeout and time left
jkrumbiegel Mar 7, 2025
61e29df
fix test
jkrumbiegel Mar 7, 2025
0825441
format
jkrumbiegel Mar 7, 2025
e4623df
add info about when server started
jkrumbiegel Mar 10, 2025
4668620
formatting
jkrumbiegel Mar 10, 2025
2eb69f6
add changelog
jkrumbiegel Mar 10, 2025
018719c
read QNR_VERSION at compile time
jkrumbiegel Mar 10, 2025
a3420cc
break after error to not continue with `nothing` as json
jkrumbiegel Mar 10, 2025
c1ba1b3
formatting
jkrumbiegel Mar 11, 2025
4cb83d2
throw errors when run/close are called on a notebook that's running
jkrumbiegel Mar 11, 2025
030d26c
add tests for notebook run collisions
jkrumbiegel Mar 11, 2025
bd09652
remove obsolete concurrency test
jkrumbiegel Mar 12, 2025
28e14c3
add env to printout
jkrumbiegel Mar 12, 2025
2c2536a
add changelog about failure behavior
jkrumbiegel Mar 12, 2025
3afcec2
add `forceclose` server command and tests
jkrumbiegel Mar 12, 2025
f7ae33d
format
jkrumbiegel Mar 12, 2025
6cb5fbb
`empty!(channel)` is not available on previous julia versions
jkrumbiegel Mar 12, 2025
2af2080
`Base.errormonitor` is also not available on earlier julias
jkrumbiegel Mar 12, 2025
0eb7bbe
fix issue when `evaluate!` throws
jkrumbiegel Mar 12, 2025
7a7e89f
fix message
jkrumbiegel Mar 12, 2025
3a38fa7
add forceclose mention to changelog
jkrumbiegel Mar 12, 2025
e1d047b
Merge branch 'main' into jk/workers-status
jkrumbiegel Mar 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

### Added

- New socket server command `status` that returns a string describing the current server status, including information for each active worker, as well as `forceclose` which is the forced version of `close` that can shut down a worker even if it's currently running [#229](https://github.com/PumasAI/QuartoNotebookRunner.jl/pull/229).

### Changed

- Socket server commands `run`, `close` and `stop` will error if a file lock is currently held (a file worker is running) instead of waiting potentially a long time for that lock to open. Those locks were always intended only as mutation protection and not a queueing mechanism, they only behaved that way by accident and in case some worker hangs, it's impractical that further `quarto render` commands just add on top of the pile without a message [#229](https://github.com/PumasAI/QuartoNotebookRunner.jl/pull/229).

## [v0.14.0] - 2025-02-26

### Added
Expand Down
4 changes: 4 additions & 0 deletions src/QuartoNotebookRunner.jl
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ export Server, render, run!, close!

# Includes.

const QNR_VERSION =
VersionNumber(TOML.parsefile(joinpath(@__DIR__, "..", "Project.toml"))["version"])
include_dependency(joinpath(@__DIR__, "..", "Project.toml"))

include("UserError.jl")
include("Malt.jl")
include("WorkerSetup.jl")
Expand Down
144 changes: 130 additions & 14 deletions src/server.jl
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
lock::ReentrantLock
timeout::Float64
timeout_timer::Union{Nothing,Timer}
run_started::Union{Nothing,Dates.DateTime}
run_finished::Union{Nothing,Dates.DateTime}
run_decision_channel::Channel{Symbol}

function File(path::String, options::Union{String,Dict{String,Any}})
if isfile(path)
Expand All @@ -24,6 +27,7 @@
exeflags, env = _exeflags_and_env(merged_options)
timeout = _extract_timeout(merged_options)


exe, _exeflags = _julia_exe(exeflags)
worker =
cd(() -> Malt.Worker(; exe, exeflags = _exeflags, env), dirname(path))
Expand All @@ -38,6 +42,9 @@
ReentrantLock(),
timeout,
nothing,
nothing,
nothing,
Channel{Symbol}(32), # we don't want an unbuffered channel because we might want to `put!` to it without blocking
)
init!(file, merged_options)
return file
Expand Down Expand Up @@ -1427,41 +1434,96 @@
options::Union{String,Dict{String,Any}} = Dict{String,Any}(),
chunk_callback = (i, n, c) -> nothing,
)
borrow_file!(server, path; optionally_create = true) do file
if file.timeout_timer !== nothing
close(file.timeout_timer)
file.timeout_timer = nothing
end
result = evaluate!(file, output; showprogress, options, markdown, chunk_callback)
if file.timeout > 0
file.timeout_timer = Timer(file.timeout) do _
try
borrow_file!(server, path; optionally_create = true) do file
if file.timeout_timer !== nothing
close(file.timeout_timer)
file.timeout_timer = nothing
end
file.run_started = Dates.now()
file.run_finished = nothing

# we want to be able to force close the worker while `evaluate!` is running,
# so we run `evaluate!` in a task and wait for the `run_decision_channel`
# further down. Depending on the value fetched from that channel, we either
# know that evaluation has finished, or that force closing was requested.
while !isempty(file.run_decision_channel)
take!(file.run_decision_channel) # empty! not defined on channels in earlier julia versions
end

Check warning on line 1452 in src/server.jl

View check run for this annotation

Codecov / codecov/patch

src/server.jl#L1451-L1452

Added lines #L1451 - L1452 were not covered by tests

result_task = Threads.@spawn begin
try
evaluate!(file, output; showprogress, options, markdown, chunk_callback)
finally
put!(file.run_decision_channel, :evaluate_finished)
end
end

# block until a decision is reached
decision = take!(file.run_decision_channel)

# :forceclose might have been set from another task
if decision === :forceclose
close!(server, file.path) # this is in the same task, so reentrant lock allows access
error("File was force-closed during run")
elseif decision === :evaluate_finished
result = try
fetch(result_task)
catch err
# throw the original exception, not the wrapping TaskFailedException
rethrow(err.task.exception)
end
else
error("Invalid decision $decision")

Check warning on line 1477 in src/server.jl

View check run for this annotation

Codecov / codecov/patch

src/server.jl#L1477

Added line #L1477 was not covered by tests
end

file.run_finished = Dates.now()
if file.timeout > 0
file.timeout_timer = Timer(file.timeout) do _
close!(server, file.path)
@debug "File at $(file.path) timed out after $(file.timeout) seconds of inactivity."
end
else
close!(server, file.path)
@debug "File at $(file.path) timed out after $(file.timeout) seconds of inactivity."
end
return result
end
catch err
if err isa FileBusyError
throw(
UserError(
"Tried to run file \"$path\" but the corresponding worker is busy.",
),
)
else
close!(server, file.path)
rethrow(err)
end
return result
end
end

struct NoFileEntryError <: Exception
path::String
end

struct FileBusyError <: Exception
path::String
end

"""
borrow_file!(f, server, path; optionally_create = false, options = Dict{String,Any}())
borrow_file!(f, server, path; wait = false, optionally_create = false, options = Dict{String,Any}())

Executes `f(file)` while the `file`'s `ReentrantLock` is locked.
All actions on a `Server`'s `File` should be wrapped in this
so that no two tasks can mutate the `File` at the same time.
When `optionally_create` is `true`, the `File` will be created on the server
if it doesn't exist, in which case it is passed `options`.
If `wait = false`, `borrow_file!` will throw a `FileBusyError` if the lock cannot be attained immediately.
"""
function borrow_file!(
f,
server,
path;
wait = false,
optionally_create = false,
options = Dict{String,Any}(),
)
Expand Down Expand Up @@ -1497,7 +1559,18 @@
# no file exists or it doesn't match the one we have, we recurse into `borrow_file!`.
# This could in principle go on forever but is very unlikely to with a small number of
# concurrent users.
lock(file.lock) do

if wait
lock(file.lock)
lock_attained = true

Check warning on line 1565 in src/server.jl

View check run for this annotation

Codecov / codecov/patch

src/server.jl#L1564-L1565

Added lines #L1564 - L1565 were not covered by tests
else
lock_attained = trylock(file.lock)
end

try
if !lock_attained
throw(FileBusyError(apath))
end
current_file = lock(server.lock) do
get(server.workers, apath, nothing)
end
Expand All @@ -1506,6 +1579,8 @@
else
return f(file)
end
finally
lock_attained && unlock(file.lock)
end
end
end
Expand Down Expand Up @@ -1560,14 +1635,55 @@
end
return true
catch err
if !(err isa NoFileEntryError)
if err isa FileBusyError
throw(
UserError(
"Tried to close file \"$path\" but the corresponding worker is busy.",
),
)
elseif !(err isa NoFileEntryError)

Check warning on line 1644 in src/server.jl

View check run for this annotation

Codecov / codecov/patch

src/server.jl#L1644

Added line #L1644 was not covered by tests
rethrow(err)
else
false
end
end
end

function forceclose!(server::Server, path::String)
apath = abspath(path)
file = lock(server.lock) do
if haskey(server.workers, apath)
return server.workers[apath]
else
throw(NoFileEntryError(apath))

Check warning on line 1658 in src/server.jl

View check run for this annotation

Codecov / codecov/patch

src/server.jl#L1658

Added line #L1658 was not covered by tests
end
end
# if the worker is not actually running we need to fall back to normal closing,
# for that we try to get the file lock now
lock_attained = trylock(file.lock)
try
# if we've attained the lock, we can close normally
if lock_attained
close!(server, path)
# but if not, we request a forced close via the run decision channel that
# is being waited for in `run!` function
else
put!(file.run_decision_channel, :forceclose)
t = time()
while Malt.isrunning(file.worker)
timeout = 10
(time() - t) > timeout && error(
"Force close was requested but worker was still running after $timeout seconds.",
)
sleep(0.1)
end
end
finally
lock_attained && unlock(file.lock)
end
return
end

json_reader(str) = JSON3.read(str, Any)
yaml_reader(str) = YAML.load(str)

Expand Down
Loading
Loading