Joe Schmid
05/15/2020, 3:05 AM@task
def times_two(x):
return x * 2
@task
def add(items):
return sum(items)
class SimpleFlow(SRMPrefectFlow):
@property
def flow(self) -> Flow:
with Flow("SimpleFlow", environment=env) as flow:
x = Parameter("x", default=[1, 2, 3])
times_two_task_result = times_two.map(x)
flow_result = add(times_two_task_result)
return flow
And a simple test to run the flow & check the last task's result:
def test_flow_run_result():
flow = SimpleFlow().flow
fr = flow.run()
assert list(fr.result.values())[2].result == 12
The list(fr.result.values())[2].result
works, but is fragile. We'd rather fr.result[flow_result].result
but flow_result
isn't available outside of the function that defines the flow. Is there a better approach that people have used?SRMPrefectFlow
to handle common CICD aspects for flows, e.g. save()
, register()
, etc. (We want all Flows to conform to this so that we can add them to docker images and register them with Cloud in the same way.)Jeremiah
05/15/2020, 3:24 AMflow_result
according to a convention (you may know these)flow_result = list(flow.terminal_tasks())[0]
reference_task
(if appropriate for your workflow), and use flow.reference_tasks()
insteadflow.get_task(name='key task name')
to retrieve itflow_result
available in local scope, but perhaps they let you recreate it programmatically, or by convention?flow.flow_result = flow_result
inside the function, so you could access it easily on the returned flow object but that feels a bit hackierJoe Schmid
05/15/2020, 3:29 AMflow.get_task(name='key task name')
should work well or I like flow.flow_result = flow_result
also. Much appreciated!Jeremiah
05/15/2020, 3:34 AMget_task(name=...)
will return a list, so as long as you choose a good name, you’ll know what the one item is (which is why this depends slightly on adopting a convention)Joe Schmid
05/15/2020, 3:37 AMJeremiah
05/15/2020, 3:40 AMJoe Schmid
05/15/2020, 3:44 AMitay livni
05/15/2020, 4:54 AMres = my_special_task.get_tasks(tags=["special_tag"])[0]
df = flow_state.result[res].result
pprint(f"Number of Definitions fron secondary sources: {df.shape}")