Brett Naul
03/02/2022, 1:53 PMEven though the user didn't create them explicitly, the children tasks of a mapped task are first-class Prefect tasks. They can do anything a "normal" task can do, including succeed, fail, retry, pause, or skip.this seems kinda true, but when you try to raise PAUSE inside a mapped task it seems to just loop infinitely, whereas for a normal task you see
<Task: pause> is currently Paused; enter 'y' to resume:
. anyone have a strong opinion on whether this is a bug or just not a supported usage?
from prefect import Flow, task
from prefect.engine.signals import PAUSE
@task
def pause(i):
import prefect
if not prefect.context.get('resume'):
raise PAUSE("pausing")
return i
with Flow("f") as f:
# pause.map(i=[1]) # infinite loop
pause(i=1) # works normally
f.run()
[2022-03-02 08:53:41-0500] INFO - prefect.FlowRunner | Beginning Flow run for 'f'
INFO:prefect.FlowRunner:Beginning Flow run for 'f'
[2022-03-02 08:53:41-0500] INFO - prefect.TaskRunner | Task 'pause': Starting task run...
INFO:prefect.TaskRunner:Task 'pause': Starting task run...
[2022-03-02 08:53:41-0500] INFO - prefect.TaskRunner | Task 'pause': Finished task run for task with final state: 'Mapped'
INFO:prefect.TaskRunner:Task 'pause': Finished task run for task with final state: 'Mapped'
[2022-03-02 08:53:41-0500] INFO - prefect.TaskRunner | Task 'pause[0]': Starting task run...
INFO:prefect.TaskRunner:Task 'pause[0]': Starting task run...
[2022-03-02 08:53:41-0500] INFO - prefect.TaskRunner | PAUSE signal raised: PAUSE('pausing')
INFO:prefect.TaskRunner:PAUSE signal raised: PAUSE('pausing')
[2022-03-02 08:53:41-0500] INFO - prefect.TaskRunner | Task 'pause[0]': Finished task run for task with final state: 'Paused'
INFO:prefect.TaskRunner:Task 'pause[0]': Finished task run for task with final state: 'Paused'
[2022-03-02 08:53:41-0500] INFO - prefect.FlowRunner | Flow run RUNNING: terminal tasks are incomplete.
INFO:prefect.FlowRunner:Flow run RUNNING: terminal tasks are incomplete.
[2022-03-02 08:53:41-0500] INFO - prefect.FlowRunner | Beginning Flow run for 'f'
INFO:prefect.FlowRunner:Beginning Flow run for 'f'
[2022-03-02 08:53:41-0500] INFO - prefect.TaskRunner | Task 'pause': Starting task run...
INFO:prefect.TaskRunner:Task 'pause': Starting task run...
[2022-03-02 08:53:41-0500] INFO - prefect.TaskRunner | Task 'pause': Finished task run for task with final state: 'Mapped'
...etc etcKevin Kho
flow.run()
compared to agent deployed runs. For example, only one Flow can be fired at a time even if the schedule has two runs.
For this one specifically, I would expect the Cloud runs to work even if flow.run()
does not. But yeah, I can see how this can be painful because it means you can’t test locally whatsoever. I’d open an issue.Marvin
03/02/2022, 2:05 PMAnna Geller
flow.run()
However, if a similar functionality would be introduced to Orion one day, this would be doable because in Orion every flow run, even the local ones, is recorded in the backend. So instead of opening an issue here, I would vote for adding a feature request for Orion so that this can be added one day there. @Brett Naul are you fine with that?Kevin Kho
Anna Geller
create_flow_run
task triggered in flow.run()
still triggers a change in the backend (new child flow run is added). Similarly, raising a PAUSE state that can be resumed would also need to be recorded in a backend to work properly, similarly to the create_flow_run
task (I think).Kevin Kho
Brett Naul
03/02/2022, 2:25 PMTo resume a paused run, you would need to do it through the UI or GraphQL API so I don't think we can support that just usingI don't quite follow this; why would this be true in the mapped context but not unmapped? in the unmapped case we get the simple input promptflow.run()
<Task: pause> is currently Paused; enter 'y' to resume:
, I can see why in say a DaskExecutor context this would not work but for the simple serial executor it seems like this same prompt would work just fine within a mapped taskAnna Geller
flow.run(
) is a single process and to have the functionality you described, flow.run()
would need to be able to work across multiple mapped processes/threads. How would this even work - spinning up a separate terminal window for each? 😄Brett Naul
03/02/2022, 2:40 PMx = pause(1)
y = pause(2)
and
xy = pause.map([1, 2])
for a serial executor that is running all in one the same thread+process, all the same considerations apply to both, but the former works and the latter doesn't.
I think a good analogy is a breakpoint()
which similarly requires user input. if you swap raise PAUSE()
for breakpoint()
then everything I'm saying still holds, and the above two blocks both work as expected (you see a prompt, you press c
, the task completes). so there is something different about PAUSE()
that is specific to its implementation, not an inherent limitationZanie
Brett Naul
03/03/2022, 1:01 PMZanie