Hi. What I should use instead of <wait_for_flow_ru...
# prefect-community
v
Hi. What I should use instead of wait_for_flow_run(Prefect 1) task in Prefect 2?
1
o
Add this as a parameter to your task calls: wait_for=[task_to_wait_for]
v
I use create_flow_run that imminently submits the flow run, so it will not help just use wait_for.
o
Ah. For some reason I thought you were asking about tasks. Not sure, I know there's a wait_for for subflows as well but I am not familiar with create_flow_run.
n
Hi @Vadym Dytyniak 👋 In prefect 2, we can simply call flows from inside of flows! • So if you want your subflow to run on the same infrastructure as the parent, you can do something like
Copy code
@flow
def child_flow():
   time.sleep(5)
   return "something"

@flow
def parent_flow():
   result = child_flow()
   
   print("this will only happen once {result!r} returns!")
• or if you want to run a separate deployment and wait for it, you can use
run_deployment
Copy code
from prefect.deployments import run_deployment

some_other_deployed_flow = "test-flow/my-deployment"

@flow
def parent_flow():
     flow_run_model = run_deployment(name=some_other_deployed_flow)

     print("this will only happen once {flow_run_model!r} returns!")
v
Hi @Nate. So, run_deployment is waiting for the flow_run by default?
n
yes! there's a
timeout
parameter you can set to zero if you don't want to wait for it
v
It is great news! Thanks a lot!
😄 1
n
sure thing!
v
Will it work with submit to fire for example 100 flow runs and wait for them?
o
Unfortunately run_deployment doesn't let you run it concurrently, otherwise I'd be using it.. Unless I've missed something!
n
Unfortunately run_deployment doesn't let you run it concurrently
@Oscar Björhn what do you mean? There's nothing preventing you from calling
run_deployment
concurrently to get many instances of the same deployment running at the same time in a production use case, I would suggest wrapping the
run_deployment
call in a task so you can leverage mapping / concurrency offered by task runners
o
Hmm, alright. I believe I tried running it with submit and it didn't work, but it's been a month or so and I might be mistaken. I'll have to try it out again.
Oh, it's because it's not a task, so it can't be used with submit, and therefor can't be wait_fored. That's probably it. My mainflow depends on starting a bunch of subflows and waiting for them to complete before starting more subflows.
n
that sounds correct! 👍
o
yeah, so it might still be fine in Vadym's case. Sorry about the confusion, not my best thread replies. 😄
n
no problem - we're all just here to learn! 🙂
v
Hi @Nate. I was trying to submit
run_deployment
wrapped in the task about 60 times and flow just hangs, but works fine for ~20. I am not sure I have full understanding how ConcurrentTaskRunner works. Do you have any idea what can be the issue?
Copy code
@task
def run_deployment_task(
    flow_name: str,
    parameters: dict,
    flow_run_name: str,
    scheduled_time: datetime.datetime,
) -> FlowRun:
    return run_deployment(
        name=flow_name,
        parameters=parameters,
        flow_run_name=flow_run_name,
        scheduled_time=scheduled_time,
    )

for run_params in parameters:
        named_run_deployment_task = run_deployment_task.with_options(
            name=run_params['flow_run_name'],
        )
        named_run_deployment_task.submit(
            flow_name=flow_name + '/' + deployment_name,
            parameters=run_params['run_params'],
            flow_run_name=run_params['flow_run_name'],
            scheduled_time=run_params['schedule'],
        )
n
hi @Vadym Dytyniak I don't see anything that looks wrong in your code, are there any logs on the flows that hang?
v
bunch of:
Copy code
18:43:26.322 | INFO    | Flow run 'amber-lynx' - Created task run 'sample/sample (2022-04-01)-b461a5d7-64' for task 'sample/sample (2022-04-01)'
18:43:26.322 | INFO    | Flow run 'amber-lynx' - Submitted task run 'sample/sample (2022-04-01)-b461a5d7-64' for execution.
I am submitting them as sync task, so I am not sure how many I can submit to be executed concurrently.
@Nate Any thoughts?
We have the same issue in other flow as well, looks like critical bug. And there is no way to control concurrent for ConcurrentTaskRunner?
n
hi @Vadym Dytyniak do you mind opening a new thread with a minimal reproducible example of the issue that you're seeing? could you also share your prefect version?
this thread may be useful context