Hi All, Prefect newbie here. I am currently evalua...
# prefect-community
s
Hi All, Prefect newbie here. I am currently evaluating Prefect for a switch over from Airflow. A particular usecase I am struggling with is accessing result of upstream tasks in state handler of current task. In Airflow, I could achieve the same by querying the XCom for the upstream task instance. Also is there a way to access task using its ID or name inside state handler? Any help would be really appreciated. Thanks.
👀 2
l
Hi @Sandeep Aggarwal, welcome to Prefect! In Prefect, the result of a task is attached to the task state object, so you have access to it directly in the handler as
new_state.result
(for just the value) or
new_state._result
(for the whole Result object, which itself has a
value
attribute). You don’t need to query to get it as it is already on that object! A couple useful docs links on the subject: https://docs.prefect.io/core/concepts/states.html#state-objects, https://docs.prefect.io/core/advanced_tutorials/task-guide.html#state-handlers To your second question, IIRC state handlers should have access to
prefect.context
, which has a number of auto-populated attributes during a flow run including
task_name
, the whole list is here: https://docs.prefect.io/api/latest/utilities/context.html#context
s
@Laura Lorenz (she/her) Thanks for quick turnaround. Yes I am able to access current task and associated result in state handler. However, I need a way to access upstream task and its result. To make it more clear, let me explain the exact usecase: The initial task in my workflow calls a GRPC API. This API returns an object which I need to be have access to during the complete run of workflow. The only way I can think to achieve it is if somehow I could access the result of initial task in state handlers of other downstream tasks. I know I could pass the result directly as parameter to downstream tasks, but the second task is Docker CreateContainer task which doesn't really accepts that parameter. The other option that I thought is if using the flow id/name (via context), I could get reference to actual flow object and use it to retrieve result of init task. But I can't really find any method that does that from inside the state handler.
l
Gotcha, I misunderstood then! Yeah, the official Prefect way would be to pass it along as a data dependency like you’re talking about with the Parameter — in the case of Docker Create Container, it looks like it accepts arbitrary **kwargs in its signature so I think you could pass it through if you want. Do you think this would work for you? There are a couple other potentially hacking-the-internals way I can think of, like accessing
prefect.context.caches
directly in your state handlers after you cache the output from your GRPC task (https://docs.prefect.io/core/concepts/persistence.html#output-caching). Regarding getting the flow state via context, I think the context, if you import it in the logic of the handler, might have the flow object on it, though I’m not sure if it is totally hydrated with the states you need. I can poke around a little bit about that some more if the data dependency route is still a no-go!
s
I created a quick snippet to explain the issue:
Copy code
def container_task_state_handler(task, oldstate, newstate):
    # Do something with obj (output of create_object) here.

    return newstate


@task
def create_object():
    obj = # Hit GRPC API to get the object.

    return obj

container_task = CreateContainer(
    image_name="python:3.6",
    state_handlers=[
        container_task_state_handler
    ]
)



with Flow("Sample flow") as flow:
    command_object = create_object()

    container = container_task(command=command_object.command)


flow.run()
I checked the CreateContainer code and looks like the **kwargs are passed on to parent Task class which unfortunately accepts a fixed set of attributes. I can try the context.caches hack if that works. That makes me think if it is possible to attach values to prefect.context that are available for whole flow run? The documentation doesn't say that context contains flow object, but I can see if it is actually available at runtime though.
l
Yeah the catch for the context hack is context is read-only inside a task, you can add context to your whole flow run when you call
flow.run
but not in a persistent way at runtime, and for you the thing you want to attach doesn’t exist until your first task runs
s
Hey @Laura Lorenz (she/her) Sorry for not responding back. I was trying to make things work with the cache hack you suggested and I did managed to get some success there while running the flow locally using flow.run(). However, seems like cache values are not persisted when running server backend with RemoteDaskEnvironment. Also, looks like the task result is pushed to cache only after flow run and not after each task run, so I had to manually push values to cache to make it work. Can you think of any other way to get access to upstream task results inside state handlers? What about getting the results via graphql API (Not sure if that is even possible)? The only way I can think of is to move state handler logic to task, but that would require significant refactoring and would like to keep it as last option.
Ok I think I managed to resolve this using PrefectResult and graphql API. Thanks for looking into it. Keep up the good work. 👍
l
Yay glad you got something working! Is your current solution via querying for the upstream result from the database with the graphql api in the state handler directly?
s
Yes its in state handler itself. 🙂