Eric Feldman
10/26/2021, 8:56 AMget_flow_run_info
and until the state is finished?Eric Feldman
10/26/2021, 8:59 AMresult
even if its successfullAnna Geller
wait_for_flow_run
- does exactly what you’re looking for - waits for a child FlowRun to finish,
• get_task_run_result
- gets result of a task from a specific FlowRunViewEric Feldman
10/26/2021, 9:22 AMclient
instead 🙃Eric Feldman
10/26/2021, 9:22 AMEric Feldman
10/26/2021, 9:42 AMwait_for_flow_run.run(flow_run).get_all_task_runs()
?Eric Feldman
10/26/2021, 9:47 AMget_all_task_runs()
are ordered it wrong 😞
any other way to get the reference task from wait_for_flow_run
?Anna Geller
task_run = flow_run.get_task_run(task_slug='say_hello-1')
as long as your task is not mapped, the task slug should be {task_name}-1
I thinkAnna Geller
from prefect.backend import FlowRunView
flow_run = FlowRunView.from_flow_run_id("your-flow-run-uuid")
Eric Feldman
10/26/2021, 10:08 AMreference
before registering it
i see that name in the ui, but when flow_result.get_all_task_runs()
it still returns the old nameAnna Geller
create_flow_run
, unfortunatelyEric Feldman
10/26/2021, 10:39 AMAnna Geller
Anna Geller
Anna Geller
even after I change the name and the slug of the reference task, when asking for the flow run result tasks, the name I assigned doesn’t existDid you add
-1
to the task name in your slug? The task slug is usually task_slug='{task_name}-1'
Anna Geller
flow_run.get_all_task_runs()[0]
from prefect.backend import FlowRunView
flow_run_id = "your-flow-run-uuid"
flow_run = FlowRunView.from_flow_run_id(flow_run_id)
default_reftask_task_run = flow_run.get_all_task_runs()[0]
result = default_reftask_task_run.get_result()
Eric Feldman
10/26/2021, 11:32 AMreference_task()
So I was thinking that before registering the flow, I’ll change the task name and slug, so I’ll know what task to get the result of
Did you addno, just called itto the task name in your slug? The task slug is usually-1
reference
so it will be quicker to fetch it
the default reference task is typically the last tasktesting it out, thats not right
Anna Geller
Eric Feldman
10/26/2021, 11:37 AMAnna Geller
flow.set_reference_tasks([your_task_reference])
Eric Feldman
10/26/2021, 11:39 AMEric Feldman
10/26/2021, 11:39 AMAnna Geller
Eric Feldman
10/26/2021, 11:42 AMAnna Geller
get_task_run_result
, but not results of a FlowRun. Does it make sense?Anna Geller
Eric Feldman
10/26/2021, 11:46 AMEric Feldman
10/26/2021, 11:46 AMreference
taskEric Feldman
10/26/2021, 11:47 AMflow_run_view.get_all_task_runs()
returns the tasks in a random order, so I can’t fetch the result of reference
from thereAnna Geller
task_ref = flow.get_tasks()[index_of_your_reference_task]
state = flow.run()
state.result[task_ref]._result
Eric Feldman
10/26/2021, 11:58 AMindex_of_your_reference_task
Eric Feldman
10/26/2021, 12:00 PMdef run_and_block(flow_id: str):
flow_run_id = client.create_flow_run(flow_id='<flow_id>')
flow_view = wait_for_flow_run.run(flow_run_id)
flow_result = ?
Anna Geller
flow.get_tasks()
?Eric Feldman
10/26/2021, 12:02 PMflow
object
the only thing i have is the flow idAnna Geller
client.create_flow_run(flow_id='<flow_id>')
Eric Feldman
10/26/2021, 12:03 PMEric Feldman
10/26/2021, 12:03 PMEric Feldman
10/26/2021, 12:03 PMAnna Geller
Eric Feldman
10/26/2021, 12:06 PMEric Feldman
10/26/2021, 12:07 PMdef run_and_block(flow_id: str):
flow_run_id = client.create_flow_run(flow_id='<flow_id>')
flow_view = wait_for_flow_run.run(flow_run_id)
flow_result = ?
i only have the flow id, and it want to get its resultAnna Geller
Eric Feldman
10/26/2021, 12:21 PMref_task = list(f.reference_tasks())[0]
ref_task.slug = 'reference'
ref_task.name = 'reference'
so i would only need to do
return_value = [v.result for k,v in flow_run_view.result.items() if k.name == 'reference'][0]
Anna Geller
task_ref = list(f.reference_tasks())[0]
state = flow.run()
state.result[task_ref]._result
Eric Feldman
10/26/2021, 12:26 PMEric Feldman
10/26/2021, 12:27 PMAnna Geller
Eric Feldman
10/26/2021, 12:30 PMf
objectEric Feldman
10/26/2021, 12:30 PMregister
it changed the names i specifiedAnna Geller
flow.run()
directly. Did you lose the python code of this flow? Sorry, but I’m really trying to help but don’t know how I can help here 😕Eric Feldman
10/26/2021, 12:34 PMflow.run
blocks the thread and I don’t want to to be blockedAnna Geller
Eric Feldman
10/26/2021, 12:38 PMdef run_and_block(flow_id: str):
flow_run_id = client.create_flow_run(flow_id='<flow_id>')
flow_view = wait_for_flow_run.run(flow_run_id)
flow_result = ?
Eric Feldman
10/26/2021, 12:38 PMEric Feldman
10/26/2021, 12:49 PMquery{flow(where: {
name: {_eq: "batch"},
archived: {_eq: false},
project: {name: {_eq: "eric-test-123"}}
}){
name, id, tasks {
is_terminal_task,
is_root_task,
is_reference_task,
slug,
name
}
}}
🤔Anna Geller
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run, get_task_run_result
Eric Feldman
10/26/2021, 12:50 PMget_task_run_result
Anna Geller
flow_run.get_all_task_runs()
but I don’t know which of them is your reference_taskEric Feldman
10/26/2021, 12:52 PMAnna Geller
Eric Feldman
10/26/2021, 1:03 PMdownstream_edges
for now
query{flow(where: {
name: {_eq: "batch"},
archived: {_eq: false},
project: {name: {_eq: "eric-test-123"}}
}){
name, id, tasks {
slug,
downstream_edges(limit: 1) {
id
}
}
}}
but i’m not sure how to filter for empty arraysAnna Geller
query {
flow (where: {id: {_eq: "d5b600f9-7fb1-4953-86e5-4b2de1fae798"}}){
name
tasks {
id
is_reference_task
is_terminal_task
}
}
}
Eric Feldman
10/26/2021, 1:17 PMis_reference_task
and is_terminal_task
returns false 😞
I even tried calling flow.set_reference_tasks
before registering the flow so flow._reference_tasks
wont be empty
but it is still not workingAnna Geller
Eric Feldman
10/26/2021, 1:30 PMAnna Geller
Anna Geller
Eric Feldman
10/26/2021, 1:36 PMEric Feldman
10/26/2021, 1:36 PMflow.run()
isn’t making things easier because it blocks the main threadAnna Geller
flow_run.get_task_run(task_slug="YOUR_TASK_NAME-1")
Kevin Kho
reference_tasks
are all false. But I’m not sure it’s reliable either because a Flow can have multiple reference tasks and get_task_run_result
only pulls the result of one task so this has to be explicit.
Yes you can specify the slug of your task before registration.
@task(slug=...)
def test
...
This would make it easier to pick up. And then you can just do get_task_run_result(flow_run_id, "slug")
. If you use the task inside the Flow block, I think Prefect will likely add a number like "slug-1"
to distinguish between calls of the task.Eric Feldman
10/26/2021, 2:34 PMflow.reference_tasks()
and change their name and slug, and i see in the UI the name I speicified
but when calling get_task_run_result
i get the old name and not the one i specifiedKevin Kho
Eric Feldman
10/26/2021, 3:41 PMreference
that gets flow.reference_task()
as input and won’t do anything
that way I’ll know the reference task slug name
pretty ugly but changing flow.reference_task()
is not workingKevin Kho
task_args
to change the slug in your flow definition like this:
with Flow("s3-ingest-azure-load") as flow:
s3_keys = ven_next_keys(prefix='prefix') # [list, of, keys]
s3_obj = ven_next_dl.map(key=s3_keys, as_bytes=unmapped(True)) # [list, of, objs]
reduce_map = reduce_map(s3_obj, task_args={"name": "testing_name","slug": "testing"})
Eric Feldman
10/26/2021, 3:45 PMKevin Kho
Kevin Kho
Eric Feldman
10/26/2021, 3:52 PMKevin Kho
Kevin Kho
reference_task
is not doable so you have to do it with your workaroundEric Feldman
10/26/2021, 4:11 PMreference_method
but the slug i get calling flow_run.get_all_task_runs()
is reference_method-copy
Kevin Kho
Flow
block will add the copy
to distinguish between multiple callsEric Feldman
10/26/2021, 4:13 PMKevin Kho
Eric Feldman
10/26/2021, 4:19 PM