Vadym Dytyniak
11/09/2022, 3:09 PMOscar Björhn
11/09/2022, 3:11 PMVadym Dytyniak
11/09/2022, 3:12 PMOscar Björhn
11/09/2022, 3:13 PMNate
11/09/2022, 3:22 PM@flow
def child_flow():
time.sleep(5)
return "something"
@flow
def parent_flow():
result = child_flow()
print("this will only happen once {result!r} returns!")
• or if you want to run a separate deployment and wait for it, you can use run_deployment
from prefect.deployments import run_deployment
some_other_deployed_flow = "test-flow/my-deployment"
@flow
def parent_flow():
flow_run_model = run_deployment(name=some_other_deployed_flow)
print("this will only happen once {flow_run_model!r} returns!")
Vadym Dytyniak
11/09/2022, 3:24 PMNate
11/09/2022, 3:25 PMtimeout
parameter you can set to zero if you don't want to wait for itVadym Dytyniak
11/09/2022, 3:25 PMNate
11/09/2022, 3:25 PMVadym Dytyniak
11/09/2022, 3:36 PMOscar Björhn
11/09/2022, 3:41 PMNate
11/09/2022, 4:27 PMUnfortunately run_deployment doesn't let you run it concurrently@Oscar Björhn what do you mean? There's nothing preventing you from calling
run_deployment
concurrently to get many instances of the same deployment running at the same time
in a production use case, I would suggest wrapping the run_deployment
call in a task so you can leverage mapping / concurrency offered by task runnersOscar Björhn
11/09/2022, 4:30 PMOscar Björhn
11/09/2022, 4:30 PMNate
11/09/2022, 4:31 PMOscar Björhn
11/09/2022, 4:31 PMNate
11/09/2022, 4:31 PMVadym Dytyniak
11/11/2022, 12:29 PMrun_deployment
wrapped in the task about 60 times and flow just hangs, but works fine for ~20. I am not sure I have full understanding how ConcurrentTaskRunner works. Do you have any idea what can be the issue?Vadym Dytyniak
11/11/2022, 12:31 PM@task
def run_deployment_task(
flow_name: str,
parameters: dict,
flow_run_name: str,
scheduled_time: datetime.datetime,
) -> FlowRun:
return run_deployment(
name=flow_name,
parameters=parameters,
flow_run_name=flow_run_name,
scheduled_time=scheduled_time,
)
for run_params in parameters:
named_run_deployment_task = run_deployment_task.with_options(
name=run_params['flow_run_name'],
)
named_run_deployment_task.submit(
flow_name=flow_name + '/' + deployment_name,
parameters=run_params['run_params'],
flow_run_name=run_params['flow_run_name'],
scheduled_time=run_params['schedule'],
)
Nate
11/11/2022, 4:22 PMVadym Dytyniak
11/11/2022, 4:45 PM18:43:26.322 | INFO | Flow run 'amber-lynx' - Created task run 'sample/sample (2022-04-01)-b461a5d7-64' for task 'sample/sample (2022-04-01)'
18:43:26.322 | INFO | Flow run 'amber-lynx' - Submitted task run 'sample/sample (2022-04-01)-b461a5d7-64' for execution.
Vadym Dytyniak
11/11/2022, 4:47 PMVadym Dytyniak
11/28/2022, 3:53 PMVadym Dytyniak
11/28/2022, 3:53 PMNate
11/28/2022, 4:04 PMNate
11/28/2022, 4:25 PM