Hello! I'm trying to use `create_flow_run` to star...
# ask-community
p
Hello! I'm trying to use
create_flow_run
to start a new flow 2 mins after the previous task.I have set the below parameter as follows but the flow run starts immediately instead.Please let me know where I'm going wrong.Thanks!
Copy code
scheduled_start_time=pendulum.now().add(minutes=2),
j
Hi Prashob, let me see if I can find an answer for you
do you mind sharing an example of your flow?
p
Copy code
from prefect.tasks.prefect import create_flow_run

@task()
def task_a():
    ...


with Flow("FLOW_NAME",) as flow:
    a = task_a

    flow_task = create_flow_run(
        flow_name="your-flow-name",
        project_name="PROJECT_NAME",
        parameters={"key": "value"},
        scheduled_start_time=pendulum.now().add(minutes=2),
        upstream_tasks=[a])
I want
your-flow-name
to start executing 2 mins after
a
has completed.
j
That makes total sense. I am able to do this with something like following which seems to match what you are doing:
Copy code
from prefect import Flow, task
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
import pendulum
import time


@task
def upstream_task():
    time.sleep(10)
    print("finished upstream 10 seconds later")


with Flow("parent-flow") as flow:
  a = upstream_task()
  child_run_id = create_flow_run(project_name="hello world",
                                 flow_name="hello-flow",
                                 labels=["test-label"],
                                 scheduled_start_time=pendulum.now().add(seconds=30),
                                 upstream_tasks=[a])
  wait_for_flow_run(child_run_id)

flow.run()
upvote 1
and the logs:
Copy code
[2021-11-24 13:48:48-0500] INFO - prefect.FlowRunner | Beginning Flow run for 'parent-flow'
[2021-11-24 13:48:48-0500] INFO - prefect.TaskRunner | Task 'upstream_task': Starting task run...
finished upstream 10 seconds later
[2021-11-24 13:48:58-0500] INFO - prefect.TaskRunner | Task 'upstream_task': Finished task run for task with final state: 'Success'
[2021-11-24 13:48:58-0500] INFO - prefect.TaskRunner | Task 'create_flow_run': Starting task run...
[2021-11-24 13:48:58-0500] INFO - prefect.create_flow_run | Creating flow run '6b023120-3f28-4cf5-99b2-70a621ecc19a-hello-flow' for flow 'hello-flow'...
[2021-11-24 13:48:59-0500] INFO - prefect.create_flow_run | Created flow run '6b023120-3f28-4cf5-99b2-70a621ecc19a-hello-flow': <https://cloud.prefect.io/jake-prefect-io-s-account/flow-run/4850eb55-2c7d-494f-ab4a-55e32aa12608>
[2021-11-24 13:48:59-0500] INFO - prefect.TaskRunner | Task 'create_flow_run': Finished task run for task with final state: 'Success'
[2021-11-24 13:48:59-0500] INFO - prefect.TaskRunner | Task 'wait_for_flow_run': Starting task run...
[2021-11-24 13:49:00-0500] INFO - prefect.wait_for_flow_run | Flow '6b023120-3f28-4cf5-99b2-70a621ecc19a-hello-flow': Entered state <Scheduled>: Flow run scheduled.
[2021-11-24 13:49:22-0500] INFO - prefect.wait_for_flow_run | Flow '6b023120-3f28-4cf5-99b2-70a621ecc19a-hello-flow': Entered state <Submitted>: Submitted for execution
[2021-11-24 13:49:22-0500] INFO - prefect.wait_for_flow_run | Flow '6b023120-3f28-4cf5-99b2-70a621ecc19a-hello-flow': Entered state <Running>: Running flow.
[2021-11-24 13:49:24-0500] INFO - prefect.wait_for_flow_run | Flow '6b023120-3f28-4cf5-99b2-70a621ecc19a-hello-flow': Entered state <Success>: All reference tasks succeeded.
[2021-11-24 13:49:26-0500] INFO - prefect.TaskRunner | Task 'wait_for_flow_run': Finished task run for task with final state: 'Success'
[2021-11-24 13:49:26-0500] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
wait_for_flow_run
is not required but should allow you to view if what you want to happen is actually occurring. Are you able to see the created flow run in the UI at all or something similar to the logs that I pasted?
Actually I believe I have found the problem, if you are registering the flow,
scheduled_start_time
appears to be based on registration time not run time. Let me look into if there is a different way to do this and will get back to you shortly!
🙏 1
p
Yes, I was about to say that pendulum.now() is picking up registered time..Thanks!
a
@Prashob Nair you can add a task in between that does nothing but sleep for 2 minutes 🙂
p
@Anna Geller Yes, with that it works fine..just wanted to see if there was a way to schedule in future and not have to wait for it..
a
in general, Jake was trying to explain that Prefect is able to know when your specific task or flow is finished so that adding arbitrary buffer time in between is not really needed, as long as you set dependencies between tasks
so since you do:
Copy code
upstream_tasks=[a]
your child flow will start only after task a is finished, so there is no reason to add 2 minutes in between here to respect the dependencies. Immediately after task a started, the child flow can start and the dependency is met. Does it make sense? or do you still want to add this 2 min delay in between? If so, why?
p
@Anna Geller task_a for us writes files into AWS S3. These files get auto ingested into snowflake as they land but there is slight delay for auto ingestion to complete. That's why we want 2 mins buffer before kicking off child flow
a
Got it. Is it always 2 minutes? maybe you want to add a parameter to make this variable, set it as default=2 and sleep for this amount of minutes in between those 2 tasks?
p
yes, we can make it a parameter.. but instead of sleeping for say 2 mins, we just want to schedule the child_flow 2 mins in future after the task_a (last task of parent flow) and finish/exit parent_flow
a
well, technically you can schedule this flow in the future. You would schedule this parent flow, say at 1 AM, and if you know that task_a takes 5 minutes, you could schedule the child flow for 1:07 AM (5 min + 2 min buffer) but it’s not something I would recommend as a long-term solution. I would rather opt for sleep for some time, and then check if this data arrived in Snowflake. If not, sleep again, etc. In a way polling if the data arrived. You could accomplish it this way - here we check every minute:
Copy code
import pendulum
from prefect.engine.signals import RETRY


@task
def check_if_data_arrived_in_snowflake(**kwargs):
    bool_data_arrived = check_if_available() # check in DB if data arrived
    if bool_data_arrived is False:
        raise RETRY("Data is not there yet, retrying in 1 minute.", start_time=pendulum.now().add(minutes=1))
@Prashob Nair just talked to the engineering team: this is something you could try as well:
Copy code
@task
def in_minutes(minutes: int):
    return pendulum.now().add(minutes=minutes)

with Flow("parent-flow") as flow:
    a = upstream_task()
    child_run_id = create_flow_run(project_name="hello world",
                                   flow_name="hello-flow",
                                   labels=["test-label"],
                                   scheduled_start_time=in_minutes(2, upstream_tasks=[a]),
                                   upstream_tasks=[a])
    wait_for_flow_run(child_run_id)
Additionally, we opened a Github issue for that - if you want to contribute, feel free to submit a PR: https://github.com/PrefectHQ/prefect/issues/5171
upvote 1
🙏 1
y
@Anna Geller @Jake Kaplan I am trying to work on feature request mentioned above but with the exact code proposed by Jake I am now getting
Copy code
ValueError: No results found while querying for flows where {'name': {'_eq': 'hello-flow'}, 'archived': {'_eq': False}, 'project': {'name': {'_eq': 'hello world'}}}
Complete logs are below
Copy code
[2021-12-20 00:14:20+0200] INFO - prefect.FlowRunner | Beginning Flow run for 'parent-flow'
[2021-12-20 00:14:20+0200] INFO - prefect.TaskRunner | Task 'upstream_task': Starting task run...
finished upstream 10 seconds later
[2021-12-20 00:14:30+0200] INFO - prefect.TaskRunner | Task 'upstream_task': Finished task run for task with final state: 'Success'
[2021-12-20 00:14:30+0200] INFO - prefect.TaskRunner | Task 'create_flow_run': Starting task run...
[2021-12-20 00:14:31+0200] ERROR - prefect.TaskRunner | Task 'create_flow_run': Exception encountered during task execution!
Traceback (most recent call last):
  File "/home/ysikachov/projects/prefect_proj/venv3/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/home/ysikachov/projects/prefect_proj/venv3/lib/python3.8/site-packages/prefect/utilities/executors.py", line 467, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/home/ysikachov/projects/prefect_proj/venv3/lib/python3.8/site-packages/prefect/tasks/prefect/flow_run.py", line 123, in create_flow_run
    flow = FlowView.from_flow_name(flow_name, project_name=project_name)
  File "/home/ysikachov/projects/prefect_proj/venv3/lib/python3.8/site-packages/prefect/backend/flow.py", line 204, in from_flow_name
    flows = cls._query_for_flows(
  File "/home/ysikachov/projects/prefect_proj/venv3/lib/python3.8/site-packages/prefect/backend/flow.py", line 299, in _query_for_flows
    raise ValueError(
ValueError: No results found while querying for flows where {'name': {'_eq': 'hello-flow'}, 'archived': {'_eq': False}, 'project': {'name': {'_eq': 'hello world'}}}
[2021-12-20 00:14:31+0200] INFO - prefect.TaskRunner | Task 'create_flow_run': Finished task run for task with final state: 'Failed'
[2021-12-20 00:14:31+0200] INFO - prefect.TaskRunner | Task 'wait_for_flow_run': Starting task run...
[2021-12-20 00:14:31+0200] INFO - prefect.TaskRunner | Task 'wait_for_flow_run': Finished task run for task with final state: 'TriggerFailed'
[2021-12-20 00:14:31+0200] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
I am wondering if I missing some dependencies or did something wrong during installation
a
@Yehor Sikachov did you register this child flow with the name “hello-flow” and under project “hello world”? It looks like you didn’t. What Jake shared was just a hello-world example. To replicate this, you would need to register the child flow before using it in a parent flow. Also, make sure to adjust it to your flow’s and project’s name. Additionally, note that if you are registering the parent flow, then 
scheduled_start_time
 will be based on the value of
pendulum.now().add(seconds=30)
frozen at registration time, rather than being evaluated at runtime. Therefore, we suggested the workaround passing the callable as in here:
Copy code
@task
def in_minutes(minutes: int):
    return pendulum.now().add(minutes=minutes)

with Flow("parent-flow") as flow:
    a = upstream_task()
    child_run_id = create_flow_run(project_name="hello world",
                                   flow_name="hello-flow",
                                   labels=["test-label"],
                                   scheduled_start_time=in_minutes(2, upstream_tasks=[a]),
                                   upstream_tasks=[a])
    wait_for_flow_run(child_run_id)
If you use this pattern with Callable, there is no need to make any changes and submit any PR. But if you wish to add a delta to this, you can contribute, as described in the open issue. LMK if you have any other questions.
🙏 1