Gabe Grand
04/12/2021, 4:40 PMflow_1 (preprocessing) -> flow_2 (training) -> flow_3 (deployment)
Gabe Grand
04/12/2021, 4:41 PM# Flow-of-flows definition
flow_1 = StartFlowRun(flow_name="preprocessing", project_name="examples", wait=True)
flow_2 = StartFlowRun(flow_name="training", project_name="examples", wait=True)
flow_3 = StartFlowRun(flow_name="deployment", project_name="examples", wait=True)
with Flow("parent-flow") as flow:
r1 = flow_1()
r2 = flow_2(upstream_tasks=[r1])
r3 = flow_3(upstream_tasks=[r2])
Gabe Grand
04/12/2021, 4:41 PMflow_0
that checks an external data source for new data on some routine schedule. I want to have flow_0
trigger the rest of the pipeline, but only when there is new data. I’m considering the following two solutions:
SOLUTION 1
My first instinct was to add flow_0
to the flow-of-flows, but I haven’t been able to find a clean way to use the result of flow_0
to determine whether to run the rest of the pipeline. Digging further, it seems like passing data between flows in a flow-of-flows is not yet supported, but it’s possible that I’m not fully understanding the situation.
SOLUTION 2
An alternative would be to append a StartFlowRun(flow_name="parent-flow")
as the final task of flow_0
and make it conditional on whether there is new data. I know this will work, but it feels less clean than having all of the flows inside a single parent-flow
pipeline that runs on a schedule.Gabe Grand
04/12/2021, 4:42 PMKevin Kho
SKIP
condition if there is nothing. You could also use case
I think.Kevin Kho
Gabe Grand
04/12/2021, 4:52 PMflow_0
to pass a parameter to flow_1
(e.g., the S3 location of the data that needs to get preprocessed)?Kevin Kho
Kevin Kho
Gabe Grand
04/12/2021, 5:45 PMKevin Kho
raise SKIP
under an if clause to check if there is data. : https://docs.prefect.io/core/concepts/states.html#skip-while-runningKevin Kho
Kevin Kho
Gabe Grand
04/12/2021, 6:08 PMJohn Urbanik
04/12/2021, 8:08 PMprefect.client.client.Client.get_flow_run_state
in a task and use that to retrieve the result of flow_0
so as to avoid having to modify flow_0
to persist the data elsewhere? I can try it out this evening, but was wondering if someone knew offhand.Kevin Kho
John Urbanik
04/12/2021, 10:47 PMflow_0
may be owned by another team so changing the Result class / using templating isn’t acceptable and you might not actually know the location it is stored in. I started playing around with it, and a few issues popped up:
1. StartFlowRun
raises a signal on wait=True
instead of returning a string as suggested in the documentation. I can get the run_id
by doing a little string parsing, but it is messy.
2. I was wrong about the method, I need get_flow_run_info
in order to actually get the state of the individual tasks.
3. This only gets the locations for given task slugs. You can
Here is a minimal demo:
import prefect
from prefect import Flow, task, Parameter
from prefect.tasks.prefect import StartFlowRun
from prefect.engine.results import PrefectResult
@task
def add(x, y):
return x + y
@task
def get_results_from_flow_run_for_slugs(flow_run_signal, slugs):
client = prefect.Client()
flow_state = client.get_flow_run_info(flow_run_signal.state.message.split(' ')[0])
flow_tasks = flow_state.task_runs
params = {
tr.task_slug: tr.state._result.read(tr.state._result.location)
for tr in flow_tasks if tr.task_slug in slugs
}
return params
with Flow("flow_0", result=PrefectResult()) as flow:
result = add(3, 5, task_args=dict(name="get_this"))
flow.register(project_name="ju-testing")
first_flow = StartFlowRun(flow_name="flow_0", project_name="ju-testing", wait=True)
with Flow("parent", result=PrefectResult()) as p_flow:
flow_run_signal = first_flow()
res = get_results_from_flow_run_for_slugs(flow_run_signal, slugs=[result.slug])
p_flow.register(project_name="ju-testing")
If there is interest, I can probably make a few PRs to make a task to this effect (I’d really rather not be relying on this string splitting though). I feel like there is probably a more elegant solution so I won’t be bothered if not!Kevin Kho
Kevin Kho
John Urbanik
04/12/2021, 10:53 PMKevin Kho
Kevin Kho
John Urbanik
04/12/2021, 10:56 PMStartFlowRun
so that it wraps this logic inside.Gabe Grand
04/13/2021, 2:47 PMStartFlowRun(..., wait=True)
return a dict of per-task return values that can be passed to subsequent flowsGabe Grand
04/14/2021, 5:48 PMraise SKIP
if there’s no new data. This works well for skipping the remaining tasks within step_1
(I also took your suggestion to merge steps 0 + 1 to avoid having to pass data between them). However, within the flow-of-flows, when step_1
raises SKIP, the subsequent step_2
and step_3
pipelines still get run. I’m assuming it’s because these are separate flows that get called with StartFlowRun
. Is there any way to make these downstream flows respect the SKIP
signal?Zanie
Gabe Grand
04/14/2021, 5:58 PMKevin Kho
SKIP
is causing step_1
to be labelled a success. We had a recent change that will help us propagate the SKIP
, but I need to test it out. Let me work on an example for you.Gabe Grand
04/14/2021, 6:20 PMStartFlowRun(…, wait=True)
task could reflect the SKIP
signal, though I’m obviously not sure how much complexity that will involve. If you have an example I can try out or a branch I can fetch, that’d be great.Zanie
Zanie
Kevin Kho
Gabe Grand
04/14/2021, 10:09 PMGabe Grand
04/14/2021, 10:10 PMZanie
Zanie
case
statement check for the existence of the file.Gabe Grand
04/14/2021, 11:20 PMGabe Grand
04/14/2021, 11:26 PMJeremy Tee
04/15/2021, 6:54 AM("'Success' object has no attribute 'read'")
Any ideas on why?Zanie
Zanie
Zanie
John Urbanik
04/15/2021, 2:17 PMZanie
it’d be really helpful to haveis not directly feasible because many of our customers have flow runs with >10,000 tasks which we could not feasibly collect and return like that.return a dict of per-task return values that can be passed to subsequent flowsStartFlowRun(..., wait=True)
John Urbanik
04/15/2021, 2:25 PMStartFlowRun
a list of tasks/slugs that you want to return the results for. Internally maybe they’re mapped over with tasks so that you can parallelize the requests if it is still a relatively large number.
Another option would be to add another form of key task
. At my company, we’re overloading reference_tasks
as a way to get the set of tasks to pass to the next flow in our sequence.Zanie
Zanie
FlowRunner
has an idea of return_tasks
that's basically unused when running flows with the API