https://prefect.io logo
Title
v

Vadym Dytyniak

11/09/2022, 3:09 PM
Hi. What I should use instead of wait_for_flow_run(Prefect 1) task in Prefect 2?
1
o

Oscar Björhn

11/09/2022, 3:11 PM
Add this as a parameter to your task calls: wait_for=[task_to_wait_for]
v

Vadym Dytyniak

11/09/2022, 3:12 PM
I use create_flow_run that imminently submits the flow run, so it will not help just use wait_for.
o

Oscar Björhn

11/09/2022, 3:13 PM
Ah. For some reason I thought you were asking about tasks. Not sure, I know there's a wait_for for subflows as well but I am not familiar with create_flow_run.
n

Nate

11/09/2022, 3:22 PM
Hi @Vadym Dytyniak 👋 In prefect 2, we can simply call flows from inside of flows! • So if you want your subflow to run on the same infrastructure as the parent, you can do something like
@flow
def child_flow():
   time.sleep(5)
   return "something"

@flow
def parent_flow():
   result = child_flow()
   
   print("this will only happen once {result!r} returns!")
• or if you want to run a separate deployment and wait for it, you can use
run_deployment
from prefect.deployments import run_deployment

some_other_deployed_flow = "test-flow/my-deployment"

@flow
def parent_flow():
     flow_run_model = run_deployment(name=some_other_deployed_flow)

     print("this will only happen once {flow_run_model!r} returns!")
v

Vadym Dytyniak

11/09/2022, 3:24 PM
Hi @Nate. So, run_deployment is waiting for the flow_run by default?
n

Nate

11/09/2022, 3:25 PM
yes! there's a
timeout
parameter you can set to zero if you don't want to wait for it
v

Vadym Dytyniak

11/09/2022, 3:25 PM
It is great news! Thanks a lot!
😄 1
n

Nate

11/09/2022, 3:25 PM
sure thing!
v

Vadym Dytyniak

11/09/2022, 3:36 PM
Will it work with submit to fire for example 100 flow runs and wait for them?
o

Oscar Björhn

11/09/2022, 3:41 PM
Unfortunately run_deployment doesn't let you run it concurrently, otherwise I'd be using it.. Unless I've missed something!
n

Nate

11/09/2022, 4:27 PM
Unfortunately run_deployment doesn't let you run it concurrently
@Oscar Björhn what do you mean? There's nothing preventing you from calling
run_deployment
concurrently to get many instances of the same deployment running at the same time in a production use case, I would suggest wrapping the
run_deployment
call in a task so you can leverage mapping / concurrency offered by task runners
o

Oscar Björhn

11/09/2022, 4:30 PM
Hmm, alright. I believe I tried running it with submit and it didn't work, but it's been a month or so and I might be mistaken. I'll have to try it out again.
Oh, it's because it's not a task, so it can't be used with submit, and therefor can't be wait_fored. That's probably it. My mainflow depends on starting a bunch of subflows and waiting for them to complete before starting more subflows.
n

Nate

11/09/2022, 4:31 PM
that sounds correct! 👍
o

Oscar Björhn

11/09/2022, 4:31 PM
yeah, so it might still be fine in Vadym's case. Sorry about the confusion, not my best thread replies. 😄
n

Nate

11/09/2022, 4:31 PM
no problem - we're all just here to learn! 🙂
v

Vadym Dytyniak

11/11/2022, 12:29 PM
Hi @Nate. I was trying to submit
run_deployment
wrapped in the task about 60 times and flow just hangs, but works fine for ~20. I am not sure I have full understanding how ConcurrentTaskRunner works. Do you have any idea what can be the issue?
@task
def run_deployment_task(
    flow_name: str,
    parameters: dict,
    flow_run_name: str,
    scheduled_time: datetime.datetime,
) -> FlowRun:
    return run_deployment(
        name=flow_name,
        parameters=parameters,
        flow_run_name=flow_run_name,
        scheduled_time=scheduled_time,
    )

for run_params in parameters:
        named_run_deployment_task = run_deployment_task.with_options(
            name=run_params['flow_run_name'],
        )
        named_run_deployment_task.submit(
            flow_name=flow_name + '/' + deployment_name,
            parameters=run_params['run_params'],
            flow_run_name=run_params['flow_run_name'],
            scheduled_time=run_params['schedule'],
        )
n

Nate

11/11/2022, 4:22 PM
hi @Vadym Dytyniak I don't see anything that looks wrong in your code, are there any logs on the flows that hang?
v

Vadym Dytyniak

11/11/2022, 4:45 PM
bunch of:
18:43:26.322 | INFO    | Flow run 'amber-lynx' - Created task run 'sample/sample (2022-04-01)-b461a5d7-64' for task 'sample/sample (2022-04-01)'
18:43:26.322 | INFO    | Flow run 'amber-lynx' - Submitted task run 'sample/sample (2022-04-01)-b461a5d7-64' for execution.
I am submitting them as sync task, so I am not sure how many I can submit to be executed concurrently.
@Nate Any thoughts?
We have the same issue in other flow as well, looks like critical bug. And there is no way to control concurrent for ConcurrentTaskRunner?
n

Nate

11/28/2022, 4:04 PM
hi @Vadym Dytyniak do you mind opening a new thread with a minimal reproducible example of the issue that you're seeing? could you also share your prefect version?
this thread may be useful context