Frederick Thomas
02/22/2022, 1:02 PMAnna Geller
Frederick Thomas
02/22/2022, 1:21 PM@task
def named(context,limit):
pp(context.today)
pp(context.flow_run_id)
name = context.flow_name
pp(context.flow_name)
client = prefect.Client() #type:ignore
q = client.graphql(f"""
query {{
flow_run(limit:3, where:{{flow:{{name:{{_eq:"{name}"}}project:{{name:{{_eq:"CDNY"}}}}}}}}order_by:{{start_time:desc}}){{
name
id
start_time
state
}}
}}
""").to_dict()
pp(q)
def register_run_flow():
with Flow("Honky Tonk") as flow:
u = unnamed()
n = named(prefect.context, 3) #type:ignore
flow.run()
if __name__ == "__main__":
register_run_flow()
I'm unsure about certain terms like the differences between flow_run
and flow_runs
and the flow_*_id
are hard to distinguish which should be used... Thanks againAnna Geller
{
flow_run(
limit: 3
where: {flow: {name: {_eq: "dwh_flow_of_flows"}, project: {name: {_eq: "dwh"}}}}
order_by: {start_time: desc}
) {
name
id
start_time
state
}
}
Stéphan Taljaard
02/22/2022, 2:34 PMInteractice API
tab in the Prefect UI?Frederick Thomas
02/22/2022, 3:47 PMcontext.flow_name
and the code is returning data :
'2022-02-22'
'20e9365c-b11b-40d8-913d-2510591b227d'
'Honky Tonk'
{'data': {'flow_run': []}}
My question is can you introspect on a currently running flow? Does it necessarily require that the flow be registered? I am in meetings so I am free for a little while. Thanks again.Kevin Kho
def register_run_flow():
with Flow("Honky Tonk") as flow:
u = unnamed()
n = named(prefect.context, 3) #type:ignore
flow.run()
and if you use prefect.context
here, it will get evaluated during registration time because calling prefect.context
is not deferred. You should just pull the context from inside a task as opposed to passing it inFrederick Thomas
02/22/2022, 5:02 PMKevin Kho
flow_run_id
is supplied by the backend (Cloud/Server) so it won’t exist with just flow.run()
. So it depends on what info from context you needFrederick Thomas
02/22/2022, 5:07 PMKevin Kho
flow.run()
but not the backend because the backend doesn’t hold all of the states because it can be big. I would suggest you handle that logic inside the Flow. For example:
@task
def some_task(x):
if x > 3:
raise ValueError
return x
@task(trigger=always_run)
def collecting_task(x):
print(x)
return x
with Flow() as flow:
a = some_task.map([1,2,3,4,5])
collecting_task(a)
and the trigger will make the collecting_task run and you’ll have access to all of the mapped states and if you print it you will see the ValueErrorsAnna Geller
Frederick Thomas
02/22/2022, 7:32 PMAnna Geller
Frederick Thomas
02/22/2022, 7:36 PMAnna Geller
Frederick Thomas
02/22/2022, 7:38 PM