https://prefect.io logo
Title
l

Lana Dann

04/05/2022, 11:02 PM
is it possible to set a context variable on the flow level? for context, i want to set a flow owner to the flow and then use that value in my slack state handler. but even when i set
with prefect.context(dict(flow_owner="test")):
before defining the flow, i still get an error
Traceback (most recent call last):
  File "/Users/lanadann/.pyenv/versions/data-prefect-3.9.7/lib/python3.9/site-packages/prefect/engine/runner.py", line 161, in handle_state_change
    new_state = self.call_runner_target_handlers(old_state, new_state)
  File "/Users/lanadann/.pyenv/versions/data-prefect-3.9.7/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 113, in call_runner_target_handlers
    new_state = handler(self.task, old_state, new_state) or new_state
  File "/Users/lanadann/prefect/data_prefect/lib/notifiers.py", line 13, in post_to_slack_on_failure
    f"@{prefect.context.flow_owner} "
AttributeError: 'Context' object has no attribute 'flow_owner'
:discourse: 1
a

Anna Geller

04/05/2022, 11:08 PM
perhaps you can set it in the README in the UI?
l

Lana Dann

04/05/2022, 11:10 PM
i don’t necessarily need the variable in the context, but i need a way to pass
flow_owner
into the slack state handler so that the automated slack messages can tag the owner in the alerts channel. can i do that through the readme?
a

Anna Geller

04/05/2022, 11:11 PM
are you on Prefect Cloud? is the flow owner also the person who registered the flow?
You could get the info about who created the flow automatically via a GraphQL query - it's described here
l

Lana Dann

04/05/2022, 11:13 PM
i am on prefect cloud! but we register flows in a CI script that runs in a repo so there’s nothing tying flows back to an individual
k

Kevin Kho

04/05/2022, 11:14 PM
You can’t add stuff to the context like this for Cloud/Server runs as a “default” context. You can only add context for runs you trigger through the UI
:upvote: 1
🥲 1
a

Anna Geller

04/05/2022, 11:15 PM
agree with Kevin, I think your best bet is setting this info about flow owner as environment variable on your run config and retrieving this in your state handler this way
k

Kevin Kho

04/05/2022, 11:15 PM
But I dont think this is meant for stuff like that. It’s more like you can change the value of
today
to backfill
l

Lana Dann

04/05/2022, 11:15 PM
hmm i assume you can’t add extra kwargs in the state handler either, right?
i think i can use the env variables set in the run config then… thanks for the help!
a

Anna Geller

04/05/2022, 11:21 PM
actually, I have a better idea! why not set your flow owner as a Parameter task? This would allow you to easily override it at runtime when needed and you could retrieve it in your state handler this way:
prefect.context["parameters"].get("flow_owner")
assuming:
with Flow("yourflow") as flow:
    owner = Parameter("flow_owner", default="Lana")
l

Lana Dann

04/05/2022, 11:22 PM
that’s a great idea! thank you 😄
🙌 1
a

Anna Geller

04/05/2022, 11:27 PM
Lana, one caveat though: if you don't use this owner object in downstream tasks, you will need to manually add it to your flow like so:
flow.add_task(owner)
(just to save you some trouble)
l

Lana Dann

04/05/2022, 11:28 PM
thank you again! that’s actually exactly what i did just now. we have a
BaseECSFlow
method that returns a flow with all of the metadata set up to run in our default ECS cluster. i updated the logic to:
ecs_flow = Flow(
        name=name,
        schedule=schedule,
        run_config=run_config,
        storage=get_default_storage(file_path),
    )
    ecs_flow.add_task(Parameter("flow_owner", default=owner))

    return ecs_flow
and now folks can use the flow like this:
with BaseECSFlow(
    name=FLOW_NAME,
    file_path=FILE_PATH,
    start_date=START_DATE,
    interval=INTERVAL,
    owner=OWNER
) as flow:
    ...
👍 1
k

Kevin Kho

04/05/2022, 11:29 PM
You can also call it
with Flow(..) as flow:
    flow_owner = Parameter("flow_owner", "default")()
notice the second
()
. This will make it used I think
🙌 1
l

Lana Dann

04/06/2022, 5:19 PM
is there a way to grab or build the link to the current flow run based on the prefect context?
k

Kevin Kho

04/06/2022, 5:25 PM
I think not exactly because the link is:
<https://cloud.prefect.io/[tenant_name]/flow-run/[flow_run_id]>
The flow_run_id is easy to grab but tenant name/slug is not. Try using the
Client().get_defrault_tenant_slug
and then constructing it?
l

Lana Dann

04/06/2022, 5:30 PM
i can hardcode the tenant slug i think
thank you!!
k

Kevin Kho

04/06/2022, 5:32 PM
Ah ok yeah then just get the
flow_run_id
from context
l

Lana Dann

04/06/2022, 7:37 PM
can you pass state handlers into the flow instead of the task?
k

Kevin Kho

04/06/2022, 7:41 PM
yes.
with Flow(..., state_handlers=[..]) as flow:
will work and the state handler can be of signature
def myhandler(obj: Union[Flow, Task], old_state, new_state)
meaning the same handler can be used at the task level and flow level
l

Lana Dann

04/06/2022, 7:43 PM
cool! can i still reference
prefect.context.task_name
if i use the state handler in the flow?
k

Kevin Kho

04/06/2022, 7:45 PM
Wait. No
I think it may be the last task, but that wouldn’t be accurate. What are you trying to do?
l

Lana Dann

04/06/2022, 7:47 PM
i originally attached the state handler to all tasks but since i have auto retries on it’ll send the slack message up to three times. so i think it’s better to attach the state handler to the flow but ideally it would still describe the exception that caused the flow to fail
k

Kevin Kho

04/06/2022, 8:00 PM
Ah you can fix that by using
task_run_count
in the state handler and an if else to not repeatedly fire
Compare
task_run_count
to
max_retries
(which is not in context) before firing the Slack alert
Actually I just remembered you have the task object in the state handler so you can do:
task.max_retries
inside the state handler if the signature is
def myhandler(state, old_state, new_state):
and compare it to
task_run_count