https://prefect.io logo
#prefect-community
Title
# prefect-community
b

Brett Naul

03/02/2022, 1:53 PM
q about mapping and PAUSE signals: the mapping docs say
Even though the user didn't create them explicitly, the children tasks of a mapped task are first-class Prefect tasks. They can do anything a "normal" task can do, including succeed, fail, retry, pause, or skip.
this seems kinda true, but when you try to raise PAUSE inside a mapped task it seems to just loop infinitely, whereas for a normal task you see
<Task: pause> is currently Paused; enter 'y' to resume:
. anyone have a strong opinion on whether this is a bug or just not a supported usage?
Copy code
from prefect import Flow, task
from prefect.engine.signals import PAUSE

@task
def pause(i):
    import prefect
    if not prefect.context.get('resume'):
        raise PAUSE("pausing")
    return i

with Flow("f") as f:
   # pause.map(i=[1])  # infinite loop
   pause(i=1)          # works normally

f.run()
mapped version output
Copy code
[2022-03-02 08:53:41-0500] INFO - prefect.FlowRunner | Beginning Flow run for 'f'
INFO:prefect.FlowRunner:Beginning Flow run for 'f'
[2022-03-02 08:53:41-0500] INFO - prefect.TaskRunner | Task 'pause': Starting task run...
INFO:prefect.TaskRunner:Task 'pause': Starting task run...
[2022-03-02 08:53:41-0500] INFO - prefect.TaskRunner | Task 'pause': Finished task run for task with final state: 'Mapped'
INFO:prefect.TaskRunner:Task 'pause': Finished task run for task with final state: 'Mapped'
[2022-03-02 08:53:41-0500] INFO - prefect.TaskRunner | Task 'pause[0]': Starting task run...
INFO:prefect.TaskRunner:Task 'pause[0]': Starting task run...
[2022-03-02 08:53:41-0500] INFO - prefect.TaskRunner | PAUSE signal raised: PAUSE('pausing')
INFO:prefect.TaskRunner:PAUSE signal raised: PAUSE('pausing')
[2022-03-02 08:53:41-0500] INFO - prefect.TaskRunner | Task 'pause[0]': Finished task run for task with final state: 'Paused'
INFO:prefect.TaskRunner:Task 'pause[0]': Finished task run for task with final state: 'Paused'
[2022-03-02 08:53:41-0500] INFO - prefect.FlowRunner | Flow run RUNNING: terminal tasks are incomplete.
INFO:prefect.FlowRunner:Flow run RUNNING: terminal tasks are incomplete.
[2022-03-02 08:53:41-0500] INFO - prefect.FlowRunner | Beginning Flow run for 'f'
INFO:prefect.FlowRunner:Beginning Flow run for 'f'
[2022-03-02 08:53:41-0500] INFO - prefect.TaskRunner | Task 'pause': Starting task run...
INFO:prefect.TaskRunner:Task 'pause': Starting task run...
[2022-03-02 08:53:41-0500] INFO - prefect.TaskRunner | Task 'pause': Finished task run for task with final state: 'Mapped'
...etc etc
k

Kevin Kho

03/02/2022, 2:04 PM
I think there are a lot of things that behave different from
flow.run()
compared to agent deployed runs. For example, only one Flow can be fired at a time even if the schedule has two runs. For this one specifically, I would expect the Cloud runs to work even if
flow.run()
does not. But yeah, I can see how this can be painful because it means you can’t test locally whatsoever. I’d open an issue.
@Marvin open “Pause does not work with map for flow.run()”
a

Anna Geller

03/02/2022, 2:13 PM
I'd say this is similar to the flow-of-flows orchestrator pattern in that this is a feature that requires a backend flow run to work. To resume a paused run, you would need to do it through the UI or GraphQL API so I don't think we can support that just using
flow.run()
However, if a similar functionality would be introduced to Orion one day, this would be doable because in Orion every flow run, even the local ones, is recorded in the backend. So instead of opening an issue here, I would vote for adding a feature request for Orion so that this can be added one day there. @Brett Naul are you fine with that?
k

Kevin Kho

03/02/2022, 2:14 PM
Flow of flows can work though with flow.run if the other flows are registered so you can somewhat test it. This just hangs so it’s painful if you multiple maps that end in the pause I think
a

Anna Geller

03/02/2022, 2:20 PM
Correct, and I totally get the problem that it's a bit painful to test this locally so we can definitely keep the issue open, but I'm not sure if this is doable in a local flow run.
create_flow_run
task triggered in
flow.run()
still triggers a change in the backend (new child flow run is added). Similarly, raising a PAUSE state that can be resumed would also need to be recorded in a backend to work properly, similarly to the
create_flow_run
task (I think).
k

Kevin Kho

03/02/2022, 2:21 PM
Ahhh I see what you mean
👍 1
b

Brett Naul

03/02/2022, 2:25 PM
To resume a paused run, you would need to do it through the UI or GraphQL API so I don't think we can support that just using 
flow.run()
I don't quite follow this; why would this be true in the mapped context but not unmapped? in the unmapped case we get the simple input prompt
<Task: pause> is currently Paused; enter 'y' to resume:
, I can see why in say a DaskExecutor context this would not work but for the simple serial executor it seems like this same prompt would work just fine within a mapped task
a

Anna Geller

03/02/2022, 2:30 PM
Gotcha. I think the problem is that
flow.run(
) is a single process and to have the functionality you described,
flow.run()
would need to be able to work across multiple mapped processes/threads. How would this even work - spinning up a separate terminal window for each? 😄
b

Brett Naul

03/02/2022, 2:40 PM
still don't see a difference between say
Copy code
x = pause(1)
y = pause(2)
and
Copy code
xy = pause.map([1, 2])
for a serial executor that is running all in one the same thread+process, all the same considerations apply to both, but the former works and the latter doesn't. I think a good analogy is a
breakpoint()
which similarly requires user input. if you swap
raise PAUSE()
for
breakpoint()
then everything I'm saying still holds, and the above two blocks both work as expected (you see a prompt, you press
c
, the task completes). so there is something different about
PAUSE()
that is specific to its implementation, not an inherent limitation
z

Zanie

03/02/2022, 5:02 PM
This does look like a bug. It may or may not be easy to fix though.
🙏 1
b

Brett Naul

03/03/2022, 1:01 PM
thanks @Zanie, if you suspect it might be an easy fix I can look into it. probably not worth it if you think it goes pretty deep into the like core task runner logic
upvote 1
z

Zanie

03/03/2022, 2:50 PM
I’m not sure without looking myself 🙂 most of that predates me.
6 Views