https://prefect.io logo
f

Frederick Thomas

02/22/2022, 1:02 PM
Hello all I am attempting to get information from the current flow that I'm in using GraphQL, however, I'm only getting an empty response from the API. I'm almost dead certain that the query is correct as I've used it in the sandbox first. We're using Server for our flows. Any help would be appreciated. Thanks
👋 2
a

Anna Geller

02/22/2022, 1:14 PM
Can you share that query? hard to say what's wrong in the query without seeing it 😄
🙌 1
f

Frederick Thomas

02/22/2022, 1:21 PM
The last time I posted Kevin said I screwed it up somehow so I just waited til someone asked...
@task
def named(context,limit):
   
pp(context.today)
   
pp(context.flow_run_id)
   
name = context.flow_name
   
pp(context.flow_name)
   
client = prefect.Client()                                                                                       #type:ignore
   
q = client.graphql(f"""
   
query {{
       
flow_run(limit:3, where:{{flow:{{name:{{_eq:"{name}"}}project:{{name:{{_eq:"CDNY"}}}}}}}}order_by:{{start_time:desc}}){{
     
name
     
id
     
start_time
     
state
   
}}
   
}}
   
""").to_dict()
   
pp(q)
def register_run_flow():
   
with Flow("Honky Tonk") as flow:
       
u = unnamed()
       
n = named(prefect.context, 3)      #type:ignore
       
flow.run()
if __name__ == "__main__":
   
register_run_flow()
I'm unsure about certain terms like the differences between
flow_run
and
flow_runs
and the
flow_*_id
are hard to distinguish which should be used... Thanks again
a

Anna Geller

02/22/2022, 1:31 PM
The comma was missing and you had too many brackets - replace by your values but this should work:
Copy code
{
  flow_run(
    limit: 3
    where: {flow: {name: {_eq: "dwh_flow_of_flows"}, project: {name: {_eq: "dwh"}}}}
    order_by: {start_time: desc}
  ) {
    name
    id
    start_time
    state
  }
}
s

Stéphan Taljaard

02/22/2022, 2:34 PM
Frederick, did this query work for you as you had it, in the
Interactice API
tab in the Prefect UI?
f

Frederick Thomas

02/22/2022, 3:47 PM
@Anna Geller I had to escape the curly braces so I could use the f string to pass the
context.flow_name
and the code is returning data :
'2022-02-22'
'20e9365c-b11b-40d8-913d-2510591b227d'
'Honky Tonk'
{'data': {'flow_run': []}}
My question is can you introspect on a currently running flow? Does it necessarily require that the flow be registered? I am in meetings so I am free for a little while. Thanks again.
k

Kevin Kho

02/22/2022, 4:53 PM
The Flow definition looks wrong because you do:
Copy code
def register_run_flow():
    with Flow("Honky Tonk") as flow:
        u = unnamed()
        n = named(prefect.context, 3)      #type:ignore
        flow.run()
and if you use
prefect.context
here, it will get evaluated during registration time because calling
prefect.context
is not deferred. You should just pull the context from inside a task as opposed to passing it in
f

Frederick Thomas

02/22/2022, 5:02 PM
@Kevin Kho I see that, thanks. I would still like to know is it possible to metadata from a running flow and does it need to be registered or not??
k

Kevin Kho

02/22/2022, 5:04 PM
I don’t see how we would have metadata if it’s not registered? What metadata are you thinking?
For flow_name maybe, but.
flow_run_id
is supplied by the backend (Cloud/Server) so it won’t exist with just
flow.run()
. So it depends on what info from context you need
f

Frederick Thomas

02/22/2022, 5:07 PM
Thanks! What I need is the state of the tasks within the flow so we can determine some logic
Or do you think that using Context would be an easier lift?
k

Kevin Kho

02/22/2022, 5:12 PM
That…is the other way around lol. It will work in
flow.run()
but not the backend because the backend doesn’t hold all of the states because it can be big. I would suggest you handle that logic inside the Flow. For example:
Copy code
@task
def some_task(x):
    if x > 3:
        raise ValueError
    return x

@task(trigger=always_run)
def collecting_task(x):
    print(x)
    return x

with Flow() as flow:
    a = some_task.map([1,2,3,4,5])
    collecting_task(a)
and the trigger will make the collecting_task run and you’ll have access to all of the mapped states and if you print it you will see the ValueErrors
a

Anna Geller

02/22/2022, 6:32 PM
@Frederick Thomas It would be great if you could provide more context of what you try to do. So far you were asking about single features and we provided answers to those questions (like the query etc.) but it will be quite unproductive until we know what problem do you try to solve here (e.g. maybe you don't need any query or context to solve your issue). Can you explain your problem a bit more?
👍 1
f

Frederick Thomas

02/22/2022, 7:32 PM
@Kevin Kho @Anna Geller Hello again, I'm finally out of meetings one of which was with your two Kyle's, which was fun... What I'm trying to do is create a flow run based on whether a task fails or succeeds. Is this possible? Thanks
a

Anna Geller

02/22/2022, 7:34 PM
We have triggers that can be attached to determine whether you want to run task always, or only when it succeeds, or only when it fails etc.
👍 1
f

Frederick Thomas

02/22/2022, 7:36 PM
Thanks, I see it now. Never code sleepless...
a

Anna Geller

02/22/2022, 7:37 PM
that's true 😂
and always easier to start with the problem
f

Frederick Thomas

02/22/2022, 7:38 PM
I'm always solutioning when I shouldn't
4 Views