Bouke Krom
08/02/2021, 3:18 PMArtifact
(just a link in our case) in a custom Slack notification. The State Handler gets a Flow
and State
. I'm having trouble finding the Artifact
somewhere in the Flow
objects. I guess I should try and find a flow_run
object of some sort?Kevin Kho
artifact
is attached to a Flow. It’s attached to the task_run
so I think the only way to do this is the use the Client
to query the API.
You need to query the task_run_artifact
endpoint where task_run_id
is taken from the prefect contextBouke Krom
08/03/2021, 6:46 AMtask_run
and not the definition of the task.Bouke Krom
08/03/2021, 2:58 PMprefect.context
, set some values there during the flow run and access those in the notifier code? Or would that count as context misuse?Kevin Kho
Bouke Krom
08/03/2021, 3:09 PM# Code creating a resource somewhere
prefect.artifacts.create_link(link)
prefect.context.artifact_link = link
And then in the notifier retrieve prefect.context.artifact_link
again? Or are additions to the context at runtime not persisted?Kevin Kho
result
. Code sample below:
# Import Library
import prefect
from prefect import task, Flow, Parameter
from typing import Dict
from prefect.engine.signals import SUCCESS
class SuccessSubclass(SUCCESS):
def __init__(self, message, inputs: Dict):
self.message = message
self.inputs = inputs
super().__init__(self.message)
def myStateHandler(task, old_state, new_state):
logger = prefect.context.get("logger")
if new_state.is_finished():
<http://logger.info|logger.info>("Task Result")
<http://logger.info|logger.info>(task.result)
# Task Variables
<http://logger.info|logger.info>("Task inputs")
<http://logger.info|logger.info>(new_state)
<http://logger.info|logger.info>(new_state.result.inputs)
# Parameters
<http://logger.info|logger.info>("Parameters")
<http://logger.info|logger.info>(prefect.context.get("parameters", {}))
<http://logger.info|logger.info>("Task Result")
<http://logger.info|logger.info>(task.result)
# Error Task
@task(state_handlers=[myStateHandler])
def myTask(str_param):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(str(str_param))
raise(SuccessSubclass(f"Success", inputs={'str_param': str_param}))
with Flow("flow_must_fail") as flow:
param1 = Parameter('param1',default="XXX")
a = myTask(param1)
flow.run()
Kevin Kho
Bouke Krom
08/03/2021, 3:26 PMKevin Kho
Bouke Krom
08/03/2021, 3:28 PMBouke Krom
08/03/2021, 3:28 PMBouke Krom
08/04/2021, 2:09 PMstates
and results
though 🙂Kevin Kho
Bouke Krom
08/05/2021, 12:24 PM# Import Library
import prefect
from prefect import task, Flow, Parameter
from prefect.utilities.notifications import callback_factory
def checker(new_state):
"""
Decides which states should lead to a notification.
"""
return new_state.is_finished()
def myStateHandler(flow: Flow, new_state):
logger = prefect.context.get("logger")
# Flow result
<http://logger.info|logger.info>(f"Flow Result: {flow.result}")
# Task results
for task, state in new_state.result.items():
<http://logger.info|logger.info>(f"{task} - {state} {state.result}")
@task
def task_noreport(str_param):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Running noreport task")
return f"A wonderful result based on {str_param}"
@task
def task_reporting(prevresult):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Running task that should be reported")
return "The reporting task has received: " + prevresult
with Flow("Test") as flow:
param1 = Parameter('param1',default="XXX")
a = task_noreport(param1)
task_reporting(a)
flow.state_handlers.append(callback_factory(myStateHandler, checker))
flow.run()
And that works when testing locally with flow.run()
, but somehow registering that flow and running it on Prefect Server the new_state.result.items()
is empty 😕Kevin Kho
Kevin Kho
client = Client()
response = client.graphql(
f"""
query {{
flow_run_by_pk ( id: "{flow_run_id}" ),
{{
id,
flow {{
id,
name,
version
}}
state,
start_time,
end_time,
state_message
task_runs {{
id,
state,
state_message,
start_time,
end_time,
task {{
id,
name,
slug,
type
}}
}}
}}
}}"""
)
return