Peter Roelants
06/02/2021, 2:57 PMflow_a = FlowRunTask(flow_name='flow_a', wait=True)
@task
def task_b(param_b):
...
with Flow('flow_c') as flow:
param_a = Parameter('param_a')
result_a = flow_a(parameters={'param_a': param_a})
result_b = task_b(result_a)
However I noticed people creating some workarounds. However, I want to avoid setting a persisted result, and want to share a serializable result directly to the next task.
I was wondering: Is there a canonical way to add a result to a flow, get that result from a StartFlowRun, and use that result in a following task?Kevin Kho
Peter Roelants
06/02/2021, 3:07 PMPeter Roelants
06/02/2021, 3:14 PMKevin Kho
Peter Roelants
06/07/2021, 12:59 PMPrefectResult
to share results from a StartFlowRun
?Kevin Kho
StartFlowRun
?Peter Roelants
06/07/2021, 1:06 PMStartFlowRun
, sorry for the confusion (edited the mistake above)Kevin Kho
Peter Roelants
06/07/2021, 1:16 PMKevin Kho
Peter Roelants
06/07/2021, 1:56 PMKevin Kho
from prefect import client, task, Flow
from prefect.engine.results.prefect_result import PrefectResult
from prefect.tasks.prefect.flow_run import StartFlowRun
from prefect.client.client import Client
import prefect
client = Client()
@task
def abc():
return 1
@task
def bcd(x):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(f"Got the value {x}")
return x
@task
def get_result(flow_id):
logger = prefect.context.get("logger")
flow_id = str(flow_id).split(" ")[0]
<http://logger.info|logger.info>(f"Got the value {str(flow_id)}")
query = """
query {
flow_run(where: { id: {_eq: \"""" + flow_id + """\"} }) {
id
task_runs {
id
name
serialized_state
}
}
}
"""
results = client.graphql(query)
# Get the result here
out = results['data']["flow_run"][0]["task_runs"][0]["serialized_state"]["_result"]["location"]
return out
with Flow("flow_a") as flow1:
abc()
flow1.result = PrefectResult()
flow1.register("omlds")
test = StartFlowRun("flow_a", "omlds", wait=True)
with Flow("flow_b") as flow2:
result1 = test()
result1_out = get_result(result1)
bcd(result1_out)
flow2.run()
Peter Roelants
06/07/2021, 5:28 PMPeter Roelants
06/08/2021, 9:08 AMclient = prefect.Client()
flow_run_info = client.get_flow_run_info(flow_run_id)
• It is also possible to get the result via:
target_task_run: TaskRunInfoResult
result = target_task_run.state.load_result(target_task_run.state._result).result
• The order of tasks in flow_run_info.task_runs
doesn't seem to be guaranteed. I'm using task slugs to identify the task where I want to get the result from together with this helper function to get the right `TaskRunInfoResult`:
def get_target_task_run_from_slug(target_slug: str, task_runs: List[TaskRunInfoResult]) -> TaskRunInfoResult:
"""Get a given task based on the task's slug from a list of task runs."""
target_task_runs = list(filter(lambda t: t.task_slug and target_slug in t.task_slug, task_runs))
return target_task_runs[0]
Additional, I also found this GitHub comment to be relevant.Kevin Kho
Kevin Kho