Thread
#prefect-community
    b

    Brett Naul

    6 months ago
    q about mapping and PAUSE signals: the mapping docs say
    Even though the user didn't create them explicitly, the children tasks of a mapped task are first-class Prefect tasks. They can do anything a "normal" task can do, including succeed, fail, retry, pause, or skip.
    this seems kinda true, but when you try to raise PAUSE inside a mapped task it seems to just loop infinitely, whereas for a normal task you see
    <Task: pause> is currently Paused; enter 'y' to resume:
    . anyone have a strong opinion on whether this is a bug or just not a supported usage?
    from prefect import Flow, task
    from prefect.engine.signals import PAUSE
    
    @task
    def pause(i):
        import prefect
        if not prefect.context.get('resume'):
            raise PAUSE("pausing")
        return i
    
    with Flow("f") as f:
       # pause.map(i=[1])  # infinite loop
       pause(i=1)          # works normally
    
    f.run()
    mapped version output
    [2022-03-02 08:53:41-0500] INFO - prefect.FlowRunner | Beginning Flow run for 'f'
    INFO:prefect.FlowRunner:Beginning Flow run for 'f'
    [2022-03-02 08:53:41-0500] INFO - prefect.TaskRunner | Task 'pause': Starting task run...
    INFO:prefect.TaskRunner:Task 'pause': Starting task run...
    [2022-03-02 08:53:41-0500] INFO - prefect.TaskRunner | Task 'pause': Finished task run for task with final state: 'Mapped'
    INFO:prefect.TaskRunner:Task 'pause': Finished task run for task with final state: 'Mapped'
    [2022-03-02 08:53:41-0500] INFO - prefect.TaskRunner | Task 'pause[0]': Starting task run...
    INFO:prefect.TaskRunner:Task 'pause[0]': Starting task run...
    [2022-03-02 08:53:41-0500] INFO - prefect.TaskRunner | PAUSE signal raised: PAUSE('pausing')
    INFO:prefect.TaskRunner:PAUSE signal raised: PAUSE('pausing')
    [2022-03-02 08:53:41-0500] INFO - prefect.TaskRunner | Task 'pause[0]': Finished task run for task with final state: 'Paused'
    INFO:prefect.TaskRunner:Task 'pause[0]': Finished task run for task with final state: 'Paused'
    [2022-03-02 08:53:41-0500] INFO - prefect.FlowRunner | Flow run RUNNING: terminal tasks are incomplete.
    INFO:prefect.FlowRunner:Flow run RUNNING: terminal tasks are incomplete.
    [2022-03-02 08:53:41-0500] INFO - prefect.FlowRunner | Beginning Flow run for 'f'
    INFO:prefect.FlowRunner:Beginning Flow run for 'f'
    [2022-03-02 08:53:41-0500] INFO - prefect.TaskRunner | Task 'pause': Starting task run...
    INFO:prefect.TaskRunner:Task 'pause': Starting task run...
    [2022-03-02 08:53:41-0500] INFO - prefect.TaskRunner | Task 'pause': Finished task run for task with final state: 'Mapped'
    ...etc etc
    Kevin Kho

    Kevin Kho

    6 months ago
    I think there are a lot of things that behave different from
    flow.run()
    compared to agent deployed runs. For example, only one Flow can be fired at a time even if the schedule has two runs. For this one specifically, I would expect the Cloud runs to work even if
    flow.run()
    does not. But yeah, I can see how this can be painful because it means you can’t test locally whatsoever. I’d open an issue.
    @Marvin open “Pause does not work with map for flow.run()”
    Marvin

    Marvin

    6 months ago
    Anna Geller

    Anna Geller

    6 months ago
    I'd say this is similar to the flow-of-flows orchestrator pattern in that this is a feature that requires a backend flow run to work. To resume a paused run, you would need to do it through the UI or GraphQL API so I don't think we can support that just using
    flow.run()
    However, if a similar functionality would be introduced to Orion one day, this would be doable because in Orion every flow run, even the local ones, is recorded in the backend. So instead of opening an issue here, I would vote for adding a feature request for Orion so that this can be added one day there. @Brett Naul are you fine with that?
    Kevin Kho

    Kevin Kho

    6 months ago
    Flow of flows can work though with flow.run if the other flows are registered so you can somewhat test it. This just hangs so it’s painful if you multiple maps that end in the pause I think
    Anna Geller

    Anna Geller

    6 months ago
    Correct, and I totally get the problem that it's a bit painful to test this locally so we can definitely keep the issue open, but I'm not sure if this is doable in a local flow run.
    create_flow_run
    task triggered in
    flow.run()
    still triggers a change in the backend (new child flow run is added). Similarly, raising a PAUSE state that can be resumed would also need to be recorded in a backend to work properly, similarly to the
    create_flow_run
    task (I think).
    Kevin Kho

    Kevin Kho

    6 months ago
    Ahhh I see what you mean
    b

    Brett Naul

    6 months ago
    To resume a paused run, you would need to do it through the UI or GraphQL API so I don't think we can support that just using 
    flow.run()
    I don't quite follow this; why would this be true in the mapped context but not unmapped? in the unmapped case we get the simple input prompt
    <Task: pause> is currently Paused; enter 'y' to resume:
    , I can see why in say a DaskExecutor context this would not work but for the simple serial executor it seems like this same prompt would work just fine within a mapped task
    Anna Geller

    Anna Geller

    6 months ago
    Gotcha. I think the problem is that
    flow.run(
    ) is a single process and to have the functionality you described,
    flow.run()
    would need to be able to work across multiple mapped processes/threads. How would this even work - spinning up a separate terminal window for each? 😄
    b

    Brett Naul

    6 months ago
    still don't see a difference between say
    x = pause(1)
    y = pause(2)
    and
    xy = pause.map([1, 2])
    for a serial executor that is running all in one the same thread+process, all the same considerations apply to both, but the former works and the latter doesn't. I think a good analogy is a
    breakpoint()
    which similarly requires user input. if you swap
    raise PAUSE()
    for
    breakpoint()
    then everything I'm saying still holds, and the above two blocks both work as expected (you see a prompt, you press
    c
    , the task completes). so there is something different about
    PAUSE()
    that is specific to its implementation, not an inherent limitation
    Michael Adkins

    Michael Adkins

    6 months ago
    This does look like a bug. It may or may not be easy to fix though.
    b

    Brett Naul

    6 months ago
    thanks @Michael Adkins, if you suspect it might be an easy fix I can look into it. probably not worth it if you think it goes pretty deep into the like core task runner logic
    Michael Adkins

    Michael Adkins

    6 months ago
    I’m not sure without looking myself 🙂 most of that predates me.