Is there a way to easily get the result location f...
# ask-community
h
Is there a way to easily get the result location for a task so that we can re-run failed tasks? I can see the result location when I mouse over the field in the prefect cloud UI, but I can't figure out how to cut and paste it?
k
Hey @Hugo Shi, yeah it seems like you can’t easily copy it if it’s long. I can raise that feedback for the UI team. I’m not clear why you would need to copy this manually to re-run failed tasks though? Could you elaborate more on that?
h
yep, not sure if I need to or not. But I'm using checkpointing, so in order to re-run the failed tasks, I need to load the inputs to the task and call task.run right?
I should clarify - the objective here is to re-run the failed task so that I can troubleshoot the failure
It seems the methodology here is to • grab parameters from the prefect cloud UI since those are not persisted to the checkpoint • grab S3 locations for other inputs to the task • load the inputs via s3/pickle, and call
task.run
Is there a better way?
k
You only want to re-run the specific task, and not the rest of the flow, so hitting
Restart
would not work for you right?
h
correct (also I need to run it locally so that I can step through a debugger, etc etc) so probably doing anything in the UI for running is not sufficient
k
Oh ok I see what you are saying. I think the more common approach would be logging from the Flow side. Some users create their own error class and attach the relevant things to keep track of (inputs, parameters, location). Then when the task fails and the
state handler
is hit, you can send this info through Slack or you can write it out. What do you think of this?
This should also work for a local
flow.run()
call.
h
you mean to log the result locations of the inputs? In many cases the inputs could be large-ish data frames right? Logging the data itself isn't really feasible
k
I said inputs because a lot of other people have just been using smaller inputs, but yeah if it’s a dataframe, the location totally makes more sense.
I have a code snippet that might give an idea of this, do you want to see it?
h
yes that would be great
k
Copy code
# Import Library
import prefect
from prefect import task, Flow, Parameter
from typing import Dict
from prefect.engine.results import GCSResult

class AnotherError(Exception):
    def __init__(self, message, inputs: Dict): 
        self.message = message
        self.inputs = inputs
        super().__init__(self.message)

def myStateHandler(task, old_state, new_state):
    logger = prefect.context.get("logger")
    if new_state.is_finished():
        <http://logger.info|logger.info>("Task Result")
        <http://logger.info|logger.info>(task.result)
        <http://logger.info|logger.info>(task.result.location)

def onFailureTask(task, state):
    logger = prefect.context.get("logger")

    # Task Variables
    <http://logger.info|logger.info>("Task inputs")
    <http://logger.info|logger.info>(state)
    <http://logger.info|logger.info>(state.result.inputs)

    # Parameters
    <http://logger.info|logger.info>("Parameters")
    <http://logger.info|logger.info>(prefect.context.get("parameters", {}))

    <http://logger.info|logger.info>("Task Result")
    <http://logger.info|logger.info>(task.result)
    
@task(state_handlers=[myStateHandler], result=GCSResult(bucket="prefect-kvnkho", location="1.txt"))
def abc(x):
    return x

# Error Task
@task(on_failure=onFailureTask, result=GCSResult(bucket="prefect-kvnkho"))
def myTask(str_param):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(str(str_param))
    raise(AnotherError(f"Error Message Is Here", inputs={'str_param': str_param}))

with Flow("flow_must_fail") as flow:
    param1 = Parameter('param1',default="XXX")
    a = abc(param1)
    b = myTask(a)

flow.run()
h
@Kevin Kho so I ended up doing this
Copy code
def state_handler(task, old_state, new_state):
    logger = prefect.context.get("logger")
    if new_state.is_failed() or new_state.is_finished():
        <http://logger.info|logger.info>(f"State: {new_state}")
        <http://logger.info|logger.info>(new_state)
        <http://logger.info|logger.info>(new_state.result.inputs)
        <http://logger.info|logger.info>("Parameters")
        <http://logger.info|logger.info>(prefect.context.get("parameters", {}))
        <http://logger.info|logger.info>("Task Result")
        <http://logger.info|logger.info>(task.result)
        <http://logger.info|logger.info>(task.result.location)
Is that expected to work? I have one task that returns a datetime object, and the error I'm getting back is
Copy code
AttributeError: 'datetime.datetime' object has no attribute 'inputs
k
Ah I see because the
result.inputs
is the
input
attribute of the
AnotherError
class I used. I suppose you could use the
inputs
if
new_state.is_failed()
and you can use
result.value
if
new_state.is_finished()
?
h
I"d lke to run your onFailureTask stuff on success and failure
Is there a way to get the result locations of a tasks' input inside the state handler?
k
Let me try with the GraphQL API
Syncing up with an engineer tomorrow for this and will get back to you
h
thanks!
k
Chatted with the team. The issue here is that the location of an upstream task is tracked by orchestration is decoupled from the code logic, so getting the upstream result location is not something the downstream task is aware of. That said, the best things we can do are to put a state handler on the upstream task to log it, or if the location is an important concept in your flow, you would need to have it as an input or return it directly from the upstream task. I guess nothing new and we have working variations of this in this thread. Sorry about that, but do let me know if there’s anything I can help with.
h
no problem, thanks!