Prashob Nair
11/24/2021, 6:23 PMcreate_flow_run
to start a new flow 2 mins after the previous task.I have set the below parameter as follows but the flow run starts immediately instead.Please let me know where I'm going wrong.Thanks!
scheduled_start_time=pendulum.now().add(minutes=2),
Jake Kaplan
11/24/2021, 6:24 PMJake Kaplan
11/24/2021, 6:25 PMPrashob Nair
11/24/2021, 6:39 PMfrom prefect.tasks.prefect import create_flow_run
@task()
def task_a():
...
with Flow("FLOW_NAME",) as flow:
a = task_a
flow_task = create_flow_run(
flow_name="your-flow-name",
project_name="PROJECT_NAME",
parameters={"key": "value"},
scheduled_start_time=pendulum.now().add(minutes=2),
upstream_tasks=[a])
Prashob Nair
11/24/2021, 6:42 PMyour-flow-name
to start executing 2 mins after a
has completed.Jake Kaplan
11/24/2021, 6:51 PMfrom prefect import Flow, task
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
import pendulum
import time
@task
def upstream_task():
time.sleep(10)
print("finished upstream 10 seconds later")
with Flow("parent-flow") as flow:
a = upstream_task()
child_run_id = create_flow_run(project_name="hello world",
flow_name="hello-flow",
labels=["test-label"],
scheduled_start_time=pendulum.now().add(seconds=30),
upstream_tasks=[a])
wait_for_flow_run(child_run_id)
flow.run()
Jake Kaplan
11/24/2021, 6:51 PM[2021-11-24 13:48:48-0500] INFO - prefect.FlowRunner | Beginning Flow run for 'parent-flow'
[2021-11-24 13:48:48-0500] INFO - prefect.TaskRunner | Task 'upstream_task': Starting task run...
finished upstream 10 seconds later
[2021-11-24 13:48:58-0500] INFO - prefect.TaskRunner | Task 'upstream_task': Finished task run for task with final state: 'Success'
[2021-11-24 13:48:58-0500] INFO - prefect.TaskRunner | Task 'create_flow_run': Starting task run...
[2021-11-24 13:48:58-0500] INFO - prefect.create_flow_run | Creating flow run '6b023120-3f28-4cf5-99b2-70a621ecc19a-hello-flow' for flow 'hello-flow'...
[2021-11-24 13:48:59-0500] INFO - prefect.create_flow_run | Created flow run '6b023120-3f28-4cf5-99b2-70a621ecc19a-hello-flow': <https://cloud.prefect.io/jake-prefect-io-s-account/flow-run/4850eb55-2c7d-494f-ab4a-55e32aa12608>
[2021-11-24 13:48:59-0500] INFO - prefect.TaskRunner | Task 'create_flow_run': Finished task run for task with final state: 'Success'
[2021-11-24 13:48:59-0500] INFO - prefect.TaskRunner | Task 'wait_for_flow_run': Starting task run...
[2021-11-24 13:49:00-0500] INFO - prefect.wait_for_flow_run | Flow '6b023120-3f28-4cf5-99b2-70a621ecc19a-hello-flow': Entered state <Scheduled>: Flow run scheduled.
[2021-11-24 13:49:22-0500] INFO - prefect.wait_for_flow_run | Flow '6b023120-3f28-4cf5-99b2-70a621ecc19a-hello-flow': Entered state <Submitted>: Submitted for execution
[2021-11-24 13:49:22-0500] INFO - prefect.wait_for_flow_run | Flow '6b023120-3f28-4cf5-99b2-70a621ecc19a-hello-flow': Entered state <Running>: Running flow.
[2021-11-24 13:49:24-0500] INFO - prefect.wait_for_flow_run | Flow '6b023120-3f28-4cf5-99b2-70a621ecc19a-hello-flow': Entered state <Success>: All reference tasks succeeded.
[2021-11-24 13:49:26-0500] INFO - prefect.TaskRunner | Task 'wait_for_flow_run': Finished task run for task with final state: 'Success'
[2021-11-24 13:49:26-0500] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Jake Kaplan
11/24/2021, 6:53 PMwait_for_flow_run
is not required but should allow you to view if what you want to happen is actually occurring. Are you able to see the created flow run in the UI at all or something similar to the logs that I pasted?Jake Kaplan
11/24/2021, 7:21 PMscheduled_start_time
appears to be based on registration time not run time. Let me look into if there is a different way to do this and will get back to you shortly!Prashob Nair
11/24/2021, 7:22 PMAnna Geller
Prashob Nair
11/24/2021, 7:24 PMAnna Geller
Anna Geller
upstream_tasks=[a]
your child flow will start only after task a is finished, so there is no reason to add 2 minutes in between here to respect the dependencies. Immediately after task a started, the child flow can start and the dependency is met. Does it make sense? or do you still want to add this 2 min delay in between? If so, why?Prashob Nair
11/24/2021, 7:33 PMAnna Geller
Prashob Nair
11/24/2021, 7:39 PMAnna Geller
import pendulum
from prefect.engine.signals import RETRY
@task
def check_if_data_arrived_in_snowflake(**kwargs):
bool_data_arrived = check_if_available() # check in DB if data arrived
if bool_data_arrived is False:
raise RETRY("Data is not there yet, retrying in 1 minute.", start_time=pendulum.now().add(minutes=1))
Anna Geller
@task
def in_minutes(minutes: int):
return pendulum.now().add(minutes=minutes)
with Flow("parent-flow") as flow:
a = upstream_task()
child_run_id = create_flow_run(project_name="hello world",
flow_name="hello-flow",
labels=["test-label"],
scheduled_start_time=in_minutes(2, upstream_tasks=[a]),
upstream_tasks=[a])
wait_for_flow_run(child_run_id)
Additionally, we opened a Github issue for that - if you want to contribute, feel free to submit a PR: https://github.com/PrefectHQ/prefect/issues/5171Yehor Sikachov
12/19/2021, 10:17 PMValueError: No results found while querying for flows where {'name': {'_eq': 'hello-flow'}, 'archived': {'_eq': False}, 'project': {'name': {'_eq': 'hello world'}}}
Complete logs are below
[2021-12-20 00:14:20+0200] INFO - prefect.FlowRunner | Beginning Flow run for 'parent-flow'
[2021-12-20 00:14:20+0200] INFO - prefect.TaskRunner | Task 'upstream_task': Starting task run...
finished upstream 10 seconds later
[2021-12-20 00:14:30+0200] INFO - prefect.TaskRunner | Task 'upstream_task': Finished task run for task with final state: 'Success'
[2021-12-20 00:14:30+0200] INFO - prefect.TaskRunner | Task 'create_flow_run': Starting task run...
[2021-12-20 00:14:31+0200] ERROR - prefect.TaskRunner | Task 'create_flow_run': Exception encountered during task execution!
Traceback (most recent call last):
File "/home/ysikachov/projects/prefect_proj/venv3/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/home/ysikachov/projects/prefect_proj/venv3/lib/python3.8/site-packages/prefect/utilities/executors.py", line 467, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/home/ysikachov/projects/prefect_proj/venv3/lib/python3.8/site-packages/prefect/tasks/prefect/flow_run.py", line 123, in create_flow_run
flow = FlowView.from_flow_name(flow_name, project_name=project_name)
File "/home/ysikachov/projects/prefect_proj/venv3/lib/python3.8/site-packages/prefect/backend/flow.py", line 204, in from_flow_name
flows = cls._query_for_flows(
File "/home/ysikachov/projects/prefect_proj/venv3/lib/python3.8/site-packages/prefect/backend/flow.py", line 299, in _query_for_flows
raise ValueError(
ValueError: No results found while querying for flows where {'name': {'_eq': 'hello-flow'}, 'archived': {'_eq': False}, 'project': {'name': {'_eq': 'hello world'}}}
[2021-12-20 00:14:31+0200] INFO - prefect.TaskRunner | Task 'create_flow_run': Finished task run for task with final state: 'Failed'
[2021-12-20 00:14:31+0200] INFO - prefect.TaskRunner | Task 'wait_for_flow_run': Starting task run...
[2021-12-20 00:14:31+0200] INFO - prefect.TaskRunner | Task 'wait_for_flow_run': Finished task run for task with final state: 'TriggerFailed'
[2021-12-20 00:14:31+0200] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
I am wondering if I missing some dependencies or did something wrong during installationAnna Geller
scheduled_start_time
 will be based on the value of pendulum.now().add(seconds=30)
frozen at registration time, rather than being evaluated at runtime. Therefore, we suggested the workaround passing the callable as in here:
@task
def in_minutes(minutes: int):
return pendulum.now().add(minutes=minutes)
with Flow("parent-flow") as flow:
a = upstream_task()
child_run_id = create_flow_run(project_name="hello world",
flow_name="hello-flow",
labels=["test-label"],
scheduled_start_time=in_minutes(2, upstream_tasks=[a]),
upstream_tasks=[a])
wait_for_flow_run(child_run_id)
If you use this pattern with Callable, there is no need to make any changes and submit any PR. But if you wish to add a delta to this, you can contribute, as described in the open issue.
LMK if you have any other questions.