Hey all, If I have a flow with the following `Fl...
# ask-community
r
Hey all, If I have a flow with the following
Flow_A = create_flow_run(
flow_name="Flow A",
project_name="Project A",
task_args=dict(name="Flow A (Execution)"),
scheduled_start_time=pendulum.now().add(minutes=60),
parameters={"Parameter1":"Flow A Parameter 1"}
)
Flow_A_Flag = wait_for_flow_run(
Flow_A,
raise_final_state=True,
stream_logs=True
)
I would expect the parent flow to kick off the flow run.. and then the wait for to wait for an hour before that wait for would succeed. But, it appears that the Flow A actually executes immediately.. not waiting the hour, so the wait_for succeeds after minutes. Is my misunderstanding in the scheduled_Start_time.. or the wait_for?
👀 1
1
b
Hey Roger, thanks for reaching out. We'll take a look at this for you and get back to you with a response asap
Hello Roger, could you tell us a little bit more about your use case? What is the goal that you would like to accomplish?
Would registering a flow, and then creating a schedule through the Cloud 1.0 UI for the flow be sufficient?
Hi Roger, I was able to get the following to work. In one file, I have a flow that I want to create a child run for:
Copy code
import prefect
from prefect import task, Flow, Parameter

@task
def hello_task(name):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(f"Hello {name}!")
    

with Flow("hello-flow") as flow:
    # An optional parameter "people", with a default list of names
    people = Parameter("people", default=["Arthur", "Ford", "Marvin"])
    # Map `say_hello` across the list of names
    hello_task.map(people)

#Register the flow
flow.register(project_name="FirstProject")
In another file, I have a script that creates a run of the flow above:
Copy code
#Prior to running/registration, ensure that an agent is running in a seperate terminal/machine in order to intercept the created flow run.
from prefect import Flow
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
import pendulum

with Flow("parent_flow") as flow:
    child_flow_run_id = create_flow_run(
        flow_name="hello-flow", 
        project_name= "FirstProject", 
        scheduled_start_time = pendulum.now().add(minutes=2), #Replace with 60 minutes.
        parameters={"people" : ['Bianca', 'Roger']}
    )
    child_flowrunview = wait_for_flow_run(
        child_flow_run_id, 
        raise_final_state=True, 
        stream_logs=True
    )

#Register the flow first
#flow.register(project_name="FirstProject")

#Then, run the flow 
#flow.run()
r
Well the idea is that we have a Lag between Step 1 and Step 3 of a process. Step 2 is not something we have control over and takes the out put of Step 1 and modifies it for the input of Step 3 (Hence why it is step 2). Since we have no control .. we can make it part of our DAG.. but we know it will take 45 minutes.. so we are wanting step 3 to start an hour after step 1 ends.
Yeah.. I think what you made is pretty much exactly what I made... Does your Child flow start executing with the two minute lag? and so your parent should be executing for more than 2 minutes?
In the use case above.. in prefect we have the parent that creates the flow for step 1... and the waitfor waits for it to finish.. then fires off the create for step 3 ... to be run in 60 minutes... and the waitfor to wait for it so we can kick off step 4...
b
Yes, that is correct. I think I understand what you're trying to do here, as well. In this case, I believe it will be beneficial to do a flow of flows (Flow 1 -> Flow 2 -> Flow 3 -> Flow 3 -> Flow 4).
wait_for_flow_run()
can be called for Flow 2 to wait for Flow 1 to finish, and so on and so forth. You won't need to set a 60 minute interval manually, the
wait_for_flow_run()
will just fire off as soon as the upstream flow is completed.
Check out the section that starts with 'Option 1) My preferred choice'...
r
Yeah.. thats the basics of what I have... the problem is .. FLow 2 .. is not something we can run. Hence the "timing".
I love the flow of flows.. that is the way we have a large etl process working. Its wonderful.
marvin 1
Our issue is that .. lets say that flow 1 creates a file on the file system in location A. We have to read that file .. but from Location B .. so step 3 is the flow that reads the file from the second location. The movement of the file from Location A to Location B .. is not done by the flow.. so we assume it will be in Location B within the hour because a process run by something else has to move it from A to B.
b
Huh. Step 2 is intriguing. Is there a reason that it can't be a task, or its own flow? Is the process of reading the file manual?
r
Its not our task.. so we have no control over what it does or when
👍 1
In that example, the copying of the file from A to B is the task that we dont control...
One thing I thoguht of that I cant see in docs anywhere.... is ... When is "pendulm.now.add(minutes=2)" actually calculated? at registration? or parent execution? or when creating the flow run? I could see if its at registration... that would be bad cause we register it on Monday.. and its supposed to execute Everyday.. its all run off schedules.
b
That is calculated at execution.
For my example, I can run the
"parent_flow"
and it creates a flow run for
"hello-flow"
which is run 2 minutes afterwards.
r
And you should expect
"parent_flow"
should not run faster than 2 minutes then... cause its waiting for at least 2 minutes for the
"hello-flow"
to start. Our entire
"parent_flow"
takes 37 minutes. Which would indicate its not waiting an hour for it to start up ..right? The below pic (if its visible) is of the is the equivalent of your
"child_flowrunview"
in your
parent_flow
. Which the duration (for me should be > 60 minutes (Delay + execution time)).. but was actually 2 minutes and 36 seconds.