Skip to content

Commit 9027ade

Browse files
committed
Fix immediate tasks from content app getting stuck in waiting
1 parent 8804484 commit 9027ade

File tree

3 files changed

+11
-8
lines changed

3 files changed

+11
-8
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fixed immediate tasks from the content app getting stuck in waiting.

pulpcore/content/handler.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -903,7 +903,7 @@ async def _match_and_stream(self, path, request):
903903
ca = ra.content_artifact
904904
# Try to add content to repository if present & supported
905905
if repository and repository.PULL_THROUGH_SUPPORTED:
906-
await sync_to_async(repository.pull_through_add_content)(ca)
906+
await repository.async_pull_through_add_content(ca)
907907
# Try to stream the ContentArtifact if already created
908908
if ca.artifact:
909909
return await self._serve_content_artifact(ca, headers, request)
@@ -1347,7 +1347,7 @@ async def finalize():
13471347
)
13481348
# Try to add content to repository if present & supported
13491349
if repository and repository.PULL_THROUGH_SUPPORTED:
1350-
await sync_to_async(repository.pull_through_add_content)(ca)
1350+
await repository.async_pull_through_add_content(ca)
13511351
await response.write_eof()
13521352

13531353
if response.status == 404:

pulpcore/tasking/tasks.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -235,12 +235,13 @@ def dispatch(
235235

236236
execute_now = immediate and not called_from_content_app()
237237
assert deferred or immediate, "A task must be at least `deferred` or `immediate`."
238-
send_wakeup_signal = True if not immediate else False
238+
send_wakeup_signal = not execute_now
239239
function_name = get_function_name(func)
240240
versions = get_version(versions, function_name)
241241
colliding_resources, resources = get_resources(exclusive_resources, shared_resources, immediate)
242+
app_lock = None if not execute_now else AppStatus.objects.current() # Lazy evaluation...
242243
task_payload = get_task_payload(
243-
function_name, task_group, args, kwargs, resources, versions, immediate, deferred
244+
function_name, task_group, args, kwargs, resources, versions, immediate, deferred, app_lock
244245
)
245246
task = Task.objects.create(**task_payload)
246247
task.refresh_from_db() # The database will have assigned a timestamp for us.
@@ -278,9 +279,10 @@ async def adispatch(
278279
function_name = get_function_name(func)
279280
versions = get_version(versions, function_name)
280281
colliding_resources, resources = get_resources(exclusive_resources, shared_resources, immediate)
281-
send_wakeup_signal = False
282+
send_wakeup_signal = not execute_now
283+
app_lock = None if not execute_now else AppStatus.objects.current() # Lazy evaluation...
282284
task_payload = get_task_payload(
283-
function_name, task_group, args, kwargs, resources, versions, immediate, deferred
285+
function_name, task_group, args, kwargs, resources, versions, immediate, deferred, app_lock
284286
)
285287
task = await Task.objects.acreate(**task_payload)
286288
await task.arefresh_from_db() # The database will have assigned a timestamp for us.
@@ -302,7 +304,7 @@ async def adispatch(
302304

303305

304306
def get_task_payload(
305-
function_name, task_group, args, kwargs, resources, versions, immediate, deferred
307+
function_name, task_group, args, kwargs, resources, versions, immediate, deferred, app_lock
306308
):
307309
payload = {
308310
"state": TASK_STATES.WAITING,
@@ -317,7 +319,7 @@ def get_task_payload(
317319
"immediate": immediate,
318320
"deferred": deferred,
319321
"profile_options": x_task_diagnostics_var.get(None),
320-
"app_lock": None if not immediate else AppStatus.objects.current(), # Lazy evaluation...
322+
"app_lock": app_lock,
321323
}
322324
return payload
323325

0 commit comments

Comments
 (0)