Hey all, I'm trying to include my `Artifact` (just...
# ask-community
b
Hey all, I'm trying to include my
Artifact
(just a link in our case) in a custom Slack notification. The State Handler gets a
Flow
and
State
. I'm having trouble finding the
Artifact
somewhere in the
Flow
objects. I guess I should try and find a
flow_run
object of some sort?
k
Hey @Bouke Krom, I don’t think the
artifact
is attached to a Flow. It’s attached to the
task_run
so I think the only way to do this is the use the
Client
to query the API. You need to query the
task_run_artifact
endpoint where
task_run_id
is taken from the prefect context
b
Thanks for the pointers Kevin! I was hoping to keep it server-independent (make it work with any flow anywhere), but I guess that's difficult since the artifact is indeed part of a specific
task_run
and not the definition of the task.
Actually, setting up a client and using the graphql queries to get the results from a flow run which the notifier code is still part of feels like a weird loop around. All I want is to post a slack message with an overview of the flow run (flow state, artefacts, some details about specific tasks). Can't I use the
prefect.context
, set some values there during the flow run and access those in the notifier code? Or would that count as context misuse?
k
This is a good use of context, but you’re limited to what context provides and artifact is not there. I have some sample code of this to get started with if you want?
b
Yeah I see but since the artifact is just a link, maybe I can, in the task:
Copy code
# Code creating a resource somewhere
prefect.artifacts.create_link(link)
prefect.context.artifact_link = link
And then in the notifier retrieve
prefect.context.artifact_link
again? Or are additions to the context at runtime not persisted?
k
Ahh, no you can’t modify context like that as it won’t be persisted. Editing context like you suggested works. I will leave this code snippet here though as an alternative, but I think your solution is better. You can access it if you return it though through the
result
. Code sample below:
Copy code
# Import Library
import prefect
from prefect import task, Flow, Parameter
from typing import Dict
from prefect.engine.signals import SUCCESS

class SuccessSubclass(SUCCESS):
    def __init__(self, message, inputs: Dict): 
        self.message = message
        self.inputs = inputs
        super().__init__(self.message)

def myStateHandler(task, old_state, new_state):
    logger = prefect.context.get("logger")
    if new_state.is_finished():
        <http://logger.info|logger.info>("Task Result")
        <http://logger.info|logger.info>(task.result)

        # Task Variables
        <http://logger.info|logger.info>("Task inputs")
        <http://logger.info|logger.info>(new_state)
        <http://logger.info|logger.info>(new_state.result.inputs)

        # Parameters
        <http://logger.info|logger.info>("Parameters")
        <http://logger.info|logger.info>(prefect.context.get("parameters", {}))
        <http://logger.info|logger.info>("Task Result")
        <http://logger.info|logger.info>(task.result)

# Error Task
@task(state_handlers=[myStateHandler])
def myTask(str_param):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(str(str_param))
    raise(SuccessSubclass(f"Success", inputs={'str_param': str_param}))

with Flow("flow_must_fail") as flow:
    param1 = Parameter('param1',default="XXX")
    a = myTask(param1)

flow.run()
I tested what you suggested with adding it to context and it works! The only thing to note is that it won’t persist onto the next task. It will only work for the state handler.
b
Ah right! I was just discovering the same through prototyping 😅
k
Context manipulation can lead to strange errors though so we don’t really recommend it. I would say just don’t depend too much on it, but it seems like it’ll work for you here 👍
b
Going to try your solution as well because I am actually handling the state of the whole flow, not individual tasks.
Thanks for the help!
Reporting back: in the end I could make some modifications to the task results (the values that are returned), and use those in the eventual reporting. I learned a lot about
states
and
results
though 🙂
k
Can I see what you ended up with?
b
The essence of what I'm doing now is this:
Copy code
# Import Library
import prefect
from prefect import task, Flow, Parameter
from prefect.utilities.notifications import callback_factory

def checker(new_state):
    """
    Decides which states should lead to a notification.
    """
    return new_state.is_finished()

def myStateHandler(flow: Flow, new_state):
    logger = prefect.context.get("logger")

    # Flow result
    <http://logger.info|logger.info>(f"Flow Result: {flow.result}")

    # Task results
    for task, state in new_state.result.items():
        <http://logger.info|logger.info>(f"{task} - {state} {state.result}")


@task
def task_noreport(str_param):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Running noreport task")
    return f"A wonderful result based on {str_param}"


@task
def task_reporting(prevresult):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Running task that should be reported")
    return "The reporting task has received: " + prevresult


with Flow("Test") as flow:
    param1 = Parameter('param1',default="XXX")
    a = task_noreport(param1)
    task_reporting(a)

flow.state_handlers.append(callback_factory(myStateHandler, checker))
flow.run()
And that works when testing locally with
flow.run()
, but somehow registering that flow and running it on Prefect Server the
new_state.result.items()
is empty 😕
k
Yeah, unfortunately flow results are not populated when handled by cloud because large mapped tasks can cause too much memory usage. If you need this at the flow level, you’d have to query the graphql API for this info and work with that data.
The query would look like this:
Copy code
client = Client()
    response = client.graphql(
        f"""
        query {{ 
          flow_run_by_pk ( id: "{flow_run_id}" ),
        {{ 
            id, 
            flow {{
              id,
              name,
              version
            }}
            state,
            start_time,
            end_time,
            state_message 
            task_runs {{
              id, 
              state, 
              state_message,
              start_time,
              end_time,
              task {{
                id,
                name,
                slug,
                type
              }}
            }}
          }}
        }}"""
    )
    return