hi, is there a way to wait for a flow run to end o...
# ask-community
e
hi, is there a way to wait for a flow run to end other than polling
get_flow_run_info
and until the state is finished?
and when doing it, the state donen’t have
result
even if its successfull
a
@Eric Feldman there are two other tasks that you can combine with the `create_flow_run`: •
wait_for_flow_run
- does exactly what you’re looking for - waits for a child FlowRun to finish, •
get_task_run_result
- gets result of a task from a specific FlowRunView
e
amazing! i was looking in
client
instead 🙃
thanks!
can i get the reference task somehow from the result? since I have the result of all task, I need a way to identify what is the reference task maybe i can assume that it will be the last task in
wait_for_flow_run.run(flow_run).get_all_task_runs()
?
assuming that
get_all_task_runs()
are ordered it wrong 😞 any other way to get the reference task from
wait_for_flow_run
?
a
I will look how to identify reference tasks. wait_for_flow_run returns a FlowRunView. From here, you can also get a TaskRunView:
Copy code
task_run = flow_run.get_task_run(task_slug='say_hello-1')
as long as your task is not mapped, the task slug should be
{task_name}-1
I think
To test FlowRunView functionality, you can do:
Copy code
from prefect.backend import FlowRunView

flow_run = FlowRunView.from_flow_run_id("your-flow-run-uuid")
e
i tried to change the reference task slug and name to
reference
before registering it i see that name in the ui, but when
flow_result.get_all_task_runs()
it still returns the old name
a
all child flows need to be registered before you can trigger them from a parent flow using
create_flow_run
, unfortunately
e
I don't have any child flows I just saying that even after I change the name and the slug of the reference task, when asking for the flow run result tasks, the name I assigned doesn't exist
a
Could you describe your use case a bit more? I’m confused because the original question was asking to wait for a flow run to finish, and this indicates that you trigger a child FlowRun from other flow
Perhaps you could share a minimal reproducible example so that we can look at it together?
@Eric Feldman Regarding this:
even after I change the name and the slug of the reference task, when asking for the flow run result tasks, the name I assigned doesn’t exist
Did you add
-1
to the task name in your slug? The task slug is usually
task_slug='{task_name}-1'
@Eric Feldman also, the default reference task is typically the last task, which is ordered as a first element in
flow_run.get_all_task_runs()[0]
Copy code
from prefect.backend import FlowRunView

flow_run_id = "your-flow-run-uuid"
flow_run = FlowRunView.from_flow_run_id(flow_run_id)
default_reftask_task_run = flow_run.get_all_task_runs()[0]
result = default_reftask_task_run.get_result()
e
Now that you told me how to wait for the flow run to end, I want the result of the flow But I can’t get the flow’s result, but tasks results, and I don’t have the flow to call
reference_task()
So I was thinking that before registering the flow, I’ll change the task name and slug, so I’ll know what task to get the result of
Did you add 
-1
 to the task name in your slug? The task slug is usually
no, just called it
reference
so it will be quicker to fetch it
the default reference task is typically the last task
testing it out, thats not right
a
maybe this helps to avoid confusion: “By default, a flow’s reference tasks are its terminal tasks, which includes any task that has no downstream tasks.” https://docs.prefect.io/core/concepts/flows.html#reference-tasks
e
I understand, but having only the flow id, and the flow run id, i dont understand how can i know what is the reference task and there isn’t a way to get a flow result, only task result
a
You can determine this yourself in your Flow definition:
Copy code
flow.set_reference_tasks([your_task_reference])
e
but i dont have the flow object when i run it
how can I run a flow using the client and get the final result?
a
so you want to dynamically determine what is a reference task of a child flow? My understanding was, you would know it because you defined both flows yourself 🙂 but perhaps we can use GraphQL and client somehow to determine this
e
I don’t have any child flow I have a flow id, and it want to run it a wait for it to end and fetch the result
a
In the current Prefect implementation, afaik, flows don’t return any values, the tasks do. That’s why you can retrieve results of a TaskRun using
get_task_run_result
, but not results of a FlowRun. Does it make sense?
the result of a FlowRun in that vein is the state of a flow run and the result of its corresponding tasks
e
but i want the result of the final task, the refereance task and i don’t understand how to fetch it
this is my flow, I want to have the result of
reference
task
flow_run_view.get_all_task_runs()
returns the tasks in a random order, so I can’t fetch the result of
reference
from there
a
Oh, so you run it locally from a notebook. Then you don’t necessarily need to retrieve it from the API. You can do:
Copy code
task_ref = flow.get_tasks()[index_of_your_reference_task]
state = flow.run()
state.result[task_ref]._result
e
i don’t run it locally, and i don’t have
index_of_your_reference_task
Copy code
def run_and_block(flow_id: str):
    flow_run_id = client.create_flow_run(flow_id='<flow_id>')
    flow_view = wait_for_flow_run.run(flow_run_id)
    flow_result = ?
a
oh, how do you run it then? I saw a Jupyter notebook cell what do you get as result of:
flow.get_tasks()
?
e
it will get all tasks, i don’t now what is the reference task i don’t care what is the result of the second task, only the last one and i don’t have
flow
object the only thing i have is the flow id
a
just to clarify terminology and confusion I had - this is a child flow run because you are creating a flow run from other (parent) flow:
Copy code
client.create_flow_run(flow_id='<flow_id>')
e
but I dont have another parent flow
I have only 1 flow that I registered it a while ago
now I only have its ID and I want to run it
a
Could you show your entire flow definition? I would very much appreciate if you could share a short minimal reproducible example to understand the problem better
e
the flow already registered, i don’t now what its deinition I have the flow id, and i to run it in the agent and get the final result
this is my method
Copy code
def run_and_block(flow_id: str):
    flow_run_id = client.create_flow_run(flow_id='<flow_id>')
    flow_view = wait_for_flow_run.run(flow_run_id)
    flow_result = ?
i only have the flow id, and it want to get its result
a
to clarify: • you want to get the result of a reference task without knowing what is the reference task, correct? • so this would involve writing some GraphQL query to find out what is the reference task for this flow ID, then trying to get the result of that TASK Did I understand what you try to do?
e
yes. using GraphQL is the only way for me to get the reference task? can i change the reference task name/slug before registering them?
Copy code
ref_task = list(f.reference_tasks())[0]
ref_task.slug = 'reference'
ref_task.name = 'reference'
so i would only need to do
Copy code
return_value = [v.result for k,v in flow_run_view.result.items() if k.name == 'reference'][0]
a
Regarding #1 - I don’t know. #2 Yes, I think you can So you do have access to the flow definition. You can solve it this way:
Copy code
task_ref = list(f.reference_tasks())[0]
state = flow.run()
state.result[task_ref]._result
e
it isn’t really working 😞
although prefect has the name I specified
a
But you do something else, it’s not the code I posted
e
as i mentioned before, i dont have
f
object
when i call
register
it changed the names i specified
a
I still don’t understand why can’t you call
flow.run()
directly. Did you lose the python code of this flow? Sorry, but I’m really trying to help but don’t know how I can help here 😕
e
I dont have the flow object, the only thing I have is the flow ID , thats it. I dont know what tasks are going to run and even if I had the flow object,
flow.run
blocks the thread and I don’t want to to be blocked
a
Is there any chance you could build an example I could use to reproduce the issue? e.g. I have no way of knowing what is the object “e”
e
this is my example
Copy code
def run_and_block(flow_id: str):
    flow_run_id = client.create_flow_run(flow_id='<flow_id>')
    flow_view = wait_for_flow_run.run(flow_run_id)
    flow_result = ?
i have a flow id, I run it using a client, and I want to get its result
i’m trying to query the reference task in graphql. but it isn’t all of them are false
Copy code
query{flow(where: {
  name: {_eq: "batch"},
  archived: {_eq: false},
  project: {name: {_eq: "eric-test-123"}}
}){
  name, id, tasks {
    is_terminal_task,
    is_root_task,
    is_reference_task,
    slug,
    name
  }
}}
🤔
a
btw, I was referring to those tasks, when I wrote create_flow_run:
Copy code
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run, get_task_run_result
e
the issue that I don’t know what take slug to use when calling
get_task_run_result
a
one of the task slugs available here:
Copy code
flow_run.get_all_task_runs()
but I don’t know which of them is your reference_task
e
this is my whole problem, I dont know what is my reference task
a
me neither 😄 maybe @Kevin Kho can help you once he’s available
e
i’m trying to write a graphql query that filters tasks with no
downstream_edges
for now
Copy code
query{flow(where: {
  name: {_eq: "batch"},
  archived: {_eq: false},
  project: {name: {_eq: "eric-test-123"}}
}){
  name, id, tasks {
    slug, 
    downstream_edges(limit: 1) {
      id
    }
  }
}}
but i’m not sure how to filter for empty arrays
a
you could try this:
Copy code
query {
  flow (where: {id: {_eq: "d5b600f9-7fb1-4953-86e5-4b2de1fae798"}}){
    name
    tasks {
      id
      is_reference_task
      is_terminal_task
    }
  }
}
e
both
is_reference_task
and
is_terminal_task
returns false 😞 I even tried calling
flow.set_reference_tasks
before registering the flow so
flow._reference_tasks
wont be empty but it is still not working
a
How? 🙂 you said many times, you have no access to the flow object that you’re trying to get results of a reference task from
e
i do need to to register it once, than I can consume it without having it
a
but then you do know what is a reference task
and since you have access to the Flow object, you can run the flow with flow.run() which makes everything that much easier.
e
i do know the reference task when i build it, but only then. when I’ll come tomorrow i wont have the object any more
flow.run()
isn’t making things easier because it blocks the main thread
a
but then you also know the name of it:
Copy code
flow_run.get_task_run(task_slug="YOUR_TASK_NAME-1")
k
I see the same issue that the
reference_tasks
are all false. But I’m not sure it’s reliable either because a Flow can have multiple reference tasks and
get_task_run_result
only pulls the result of one task so this has to be explicit. Yes you can specify the slug of your task before registration.
Copy code
@task(slug=...)
def test
    ...
This would make it easier to pick up. And then you can just do
get_task_run_result(flow_run_id, "slug")
. If you use the task inside the Flow block, I think Prefect will likely add a number like
"slug-1"
to distinguish between calls of the task.
e
I dont know what will be the reference task while building the flow, I tried to take the reference tasks using
flow.reference_tasks()
and change their name and slug, and i see in the UI the name I speicified but when calling
get_task_run_result
i get the old name and not the one i specified
k
Ah I see ok will try this
🙌 1
e
I think that another way to do it is before registering the flow, have a new task called
reference
that gets
flow.reference_task()
as input and won’t do anything that way I’ll know the reference task slug name pretty ugly but changing
flow.reference_task()
is not working
k
I think you can use
task_args
to change the slug in your flow definition like this:
Copy code
with Flow("s3-ingest-azure-load") as flow:
    s3_keys = ven_next_keys(prefix='prefix') # [list, of, keys]
    s3_obj = ven_next_dl.map(key=s3_keys, as_bytes=unmapped(True)) # [list, of, objs]
    reduce_map = reduce_map(s3_obj, task_args={"name": "testing_name","slug": "testing"})
e
i dont know that referecne task while building it, only before regisetring it
k
Ah ok I remember. You have a flow builder for users right? Ok will explore the terminal task more
But you can have multiple reference tasks though, or are you guaranteed to only have one?
e
i can guarantee there will be only 1
k
Ok will look into it
Chatted with the team and doing it from
reference_task
is not doable so you have to do it with your workaround
e
tried to set the slug for the dummy task. called it
reference_method
but the slug i get calling
flow_run.get_all_task_runs()
is
reference_method-copy
k
Using a task inside the
Flow
block will add the
copy
to distinguish between multiple calls
e
but i have only one of those
k
That’s just the way the code that creates slugs works.
e
cool, the workaround worked 🦜