Dan Kerrigan
06/15/2020, 9:43 PMDan Kerrigan
06/15/2020, 9:44 PMimport time
from prefect import Client, task, Flow
client = Client()
@task
def dummy_task():
return 42
with Flow("meaning of life") as flow:
life_meaning = dummy_task()
flow_id = flow.register()
flow_run_id = client.create_flow_run(flow_id)
state = client.get_flow_run_info(flow_run_id).state
while not state.is_finished():
print(f"{state.message} Sleeping 5.")
time.sleep(5)
state = client.get_flow_run_info(flow_run_id).state
print(f"Flow finished. {state.message}")
assert state.is_successful()
print(f"Result is None? {state.result is None}")
Dan Kerrigan
06/15/2020, 9:44 PMResult check: OK
Flow: <http://localhost:8080/flow/015ce980-762a-48cf-a864-1f9c5bafb2a8>
Flow run scheduled. Sleeping 5.
Flow run scheduled. Sleeping 5.
Flow finished. All reference tasks succeeded.
Result is None? True
Kyle Moon-Wright
06/15/2020, 10:25 PMtask_run
of your flow. In this case, we can do something like this:
from prefect import Client
c = Client()
# Getting task runs
flow_run = c.get_flow_run_info("YOUR_FLOW_RUN_ID")
task_runs = flow_run.task_runs
# Grabbing the location of a task's result
# (in this case the first task in the list)
location = task_runs[0].state._result.location
# Grabbing the result of a specific task (based on location)
task_runs[0].state._result.read(location)
I hope that makes sense, let me know if we can clarify this to suit your provided flow.Dan Kerrigan
06/16/2020, 12:36 PM