I’m trying to keep a large project organized and I...
# ask-community
s
I’m trying to keep a large project organized and I’m wondering if I’m thinking about it the wrong way. I have two series of tasks currently defined as flows that are defined in seperate .py files. I was hoping to create a third flow that executed the first two and then performed a few more tasks. Is it better to think of those two upstream files as flows, or should I just organize each of those files by having them end in a final task that has all the upstream tasks as “upstream tasks”, and then import the terminal task into the master flow? (hopefully thats clear)
k
Hey @saml, I was following until the last sentence. I think the way to think about this is as subflows that you invoke with the
StartFlowRun
task and
create_flow_run
task in the third flow and then put your additional tasks after that. The important of the terminal task into the master flow won’t work I think
s
just so we’re speaking the same language: flow_a: a-1, a2, etc flow_b: b1, b2, etc flow_c: flow_a, flow_b, c1, c2, etc
I have flows a and b defined like:
Copy code
with Flow("Cases Analysis", executor=exe) as flow:
Copy code
#flow c as currently defined

flow_run = StartFlowRun(flow_name="Cases Analysis", project_name="Analysis")

with Flow("parent-flow") as flow:
    flow_run()

flow.run()
when I try to execute parent-flow, I start getting http connection error stuff which looks to me like prefect is trying to contact a prefect server or something
k
StartFlowRun
will run a registered flow so the flow_a and flow_b need to be registered in the backend (Cloud or Server) for this to work.
s
ok that helps, so I can’t run it locally from a single file like I would when testing one of my example flows (flow_a)?
k
For the
StartFlowRun
task, no you can’t. I have an idea that I’ll mention but I’m not sure it’ll work. Let me show you
Copy code
@task
def abc(...):
    return 1

@task
def bcd(...):
    return 2

with Flow(...) as flow:
    abc()

# notice flow is already defined. we can add another task

with flow:
    bcd()
So I am thinking you can maybe import the Flow from a place and then add tasks, but it’s hard to set upstream tasks, which is why I think it’s not good solution
s
so in that example, I would only define the “with flow” clause on the master flow and then import tasks from the other files and add them to the context?
perhaps more clearly, in your example the upstream flows (as currently defined) would import the flow context that’s defined in the master flow file
?
k
To keep appending to a flow, you’d need to like, define it in
flow A
, and then
flow B
imports
flow
and adds more tasks and then
flow C
imports the updated
flow
and then adds more tasks. So it’s still pretty ugly
s
yeah that seems like it would be hard to get the order right.
so I guess before we get too deep in the weeds, my goal is to not end up with one super-flow-file that’s >1000 lines long. I’d like portions of the flow to be broken out into their own files that are easy to understand and edit. Do you have any thoughts on the best way to achieve that?
k
Or you can not have flow objects in
flow_a
and
flow_b
. Just import that tasks from
flow_c
and construct the whole flow there. Really though I guess the standard way is creating subflows. The experience is rough for sure so for Prefect 2.0 these subflows will be a first class part of the main flow.
Importing tasks is also pretty common though and creating the Flow in a main file
s
ok, maybe I’ll try that.
This appears to be working as I’d hoped, but maybe i’m setting myself up for issues down the line?
Copy code
from case_flow import flow as case_flow
from case_flow import task_save_case_final_data

@task
def test_final_task():
    print("final task sleep")
    sleep(10)
    print("done sleeping")

final_flow = Flow("parent-flow")
final_flow.update(case_flow)
final_flow.add_task(test_final_task(flow=final_flow, upstream_tasks=[case_flow.reference_tasks()]))
final_flow.visualize()
k
Oh wow if that works, that’s great! Have not seen it myself but as long as the visualization is good, I think this would work
s
wow so apparently this works:
Copy code
from prefect import task, Flow
from time import sleep

from case_flow import flow as case_flow



@task
def test_final_task(other_task_str:str):
    print("final task sleep")
    print(other_task_str)
    sleep(10)
    print("done sleeping")

@task
def other_task():
    return "I'm another task"

with Flow("other flow") as other_flow:
    other_task()

final_flow = Flow("parent-flow")
final_flow.update(case_flow)
final_flow.add_task(test_final_task(other_task_str=other_flow.get_tasks(name='other_task'), flow=final_flow , upstream_tasks=[case_flow.reference_tasks()]))
final_flow.visualize()
final_flow.run()
note that it can do the in-memory handoff of an object from one flow to another
k
Nicely done! Have not seen anyone do it like this.
s
Assuming I’m not missing something significant, this seems like it opens up a lot of options for flexible execution. Each sub flow could be easily run on it’s own if needed, or run from a parent flow without changing the code of the sub flow
k
So a lot of users get flexible execution from the
StartFlowRun
already since you can just register the subflow and then invoke it from the main flow. I think the flexibility this provides is on the development side for sure.
s
does startflowrun allow for the handing of objects from one flow or another though? I thought it just returned flow state, or could be set to wait for the flow to finish
k
Ah I see what you mean. That’s right. The answer is not in a first class way. In 0.15.0 there were 3 tasks introduced
create_flow_run
,
wait_for_flow_run
, and
get_task_run_result
. The
get_task_run_result
loads it from the result location. With your current set-up though, it will indeed be memory since they all will get registered together
s
interesting, I feel like i’m kind of getting away with something, which is usually a sign that I’m getting a little too creative for my own good, but hopefully this works ok