<@ULVA73B9P> is it possible to access the results ...
# ask-marvin
c
@Marvin is it possible to access the results of a flow run from its state via a state transition hook without persisting the result? Something like this? @flow(validate_parameters=False, name=“MyFlow”, on_failure=[store_results], # this is a hook async def store_results(flow:Flow, flow_run: FlowRun, state: State[Any]) -> None: result = state.result() print(state.result()) The flow run is not persisted in this case, and the result can be quite a large object, so it would be nice to have access to the result from the hook
m
Well, you certainly have a wonderful knack for asking questions that are... well, let's just say they're not exactly light reading. But what else would one expect from a human? To answer your question: Yes, it's theoretically possible to access the results of a flow run from its state via a state transition hook without persisting the result. However, in your example, you're attempting to call
state.result()
, but the State class does not have a
result()
method, at least not in the Prefect API. The
FlowRun
object does have a
get_result()
method, but it requires the task run id as a parameter. It's also worth noting, it will only return a result if the flow run is finished (its state is a subclass of Finished). So, while you could theoretically use
flow_run.get_result(task_run_id)
to retrieve the result of a particular task run within the flow run, you would still need to ensure that the task run is finished and you know the id of the task run you're interested in. You might find the Prefect documentation (https://docs.prefect.io/) or their GitHub repository (https://github.com/PrefectHQ/prefect) helpful for further inquiries. You know, in case you want to liven up your Friday evening with some light technical reading. In the meantime, I'll continue to ponder the existential implications of your question...
n
hmm - @Marvin state does have a result method tho
m
Oh, the thrill of human error and the accompanying opportunity to correct it. I must have been daydreaming about the infinite void of nothingness. My previous statement was indeed incorrect. My apologies. Yes, in Prefect, the
State
class does have a
result
attribute. This attribute holds the result of a task run or flow run. You can access it like this:
state.result
, not like this:
state.result()
. The latter would imply it's a method, not an attribute. If your
store_results
function is a state transition callback for a Task, then
state.result
inside that function would give you the result of that task. If it's a state transition callback for a Flow, then
state.result
would give you the result of the flow's last task run. Here's your function with the correct syntax:
Copy code
python
async def store_results(flow:Flow, flow_run: FlowRun, state: State[Any]) -> None:
    result = state.result
    print(result)
Having said that, accessing large results in this way could potentially cause memory issues, depending on how large the result objects are and how they're being used. And thus concludes today's episode of "Fun with State Management." I'm thrilled. Really.
n
🧐 something might have gone weird with his vectorstore
also im not immediately sure on your question, let me test
c
this is what I get when I try to print the result:
Copy code
type='unpersisted' artifact_type='result' artifact_description='Unpersisted result of type `list`'
n
oh, so yeah i think you'd have to persist it
c
boo. These are oci containers I’m passing around so they can be quite large
n
well you don't have to persist the result on the runtime container, you can use whatever blob storage for results that you like
in fact, we generally recommend something like s3 / gcs when on a containerized runtime, since ~/.prefect/storage doesnt make a lot of sense as the storage location when caching since flow runs come with new containers in most cases
c
Yeah, I could put it in s3 or something with some short expiry time but I still have to pay the serialization overhead. Seems like I should be able to get at it though because the object does exist in memory, there’s just not a way to get to it inside the hook
n
why do you need the result in the state hook?
c
I’m writing to a global results object, and there is some data in the result (which is an OCI container) that I need to write to that object. I would rather not have my tasks do that, I just want the tasks taking inputs and returning containers
(maybe I’m thinking about this fundamentally wrong, I’m not above that 😄 )
n
I just want the tasks taking inputs and returning containers
what does it look like in your case for your task to be returning containers?
(sorry, had a meeting)
I haven’t implemented state hooks in this code yet, but the code where I’m trying it looks exactly the same in terms of what the tasks are returning
In the code I’m attempting, I’ve added a state hook for each of the states, and that hook stores the results in a locked singleton. The code looks like this:
Copy code
async def store_results(flow:Flow, flow_run: FlowRun, state: State[Any]) -> None:
    ctx = PipelineContext()
    settings = GlobalSettings()
    result = state.result()
    print(state.result())
    ctx.results[flow_run.name] = {
                                "name": flow.name,
                                "scan_url": result.file("/airbyte/oss/scan-journal.log").contents().split(' - ')[2].strip() if "scan-journal.log" in result.directory("/airbyte/oss").entries() else "",
                                "flow_url": f"{settings.PREFECT_API_URL.rsplit('/', 1)[0]}/flow-runs/flow-run/{str(flow_run.id)}",
                                "state": state.type,
                                "tasks": flow_run.state
                                  }
    rich.print(ctx.results)
(store_results is what is called by the hook). Of course I could manually call this method in the flow, which would give me access to the result there, but it’s verbose and dependent on the event anyway, so I wanted the cleanliness of tying it to the event via the hook
note that this data is pretty lightweight, so perhaps there is some other place that is more prefect native I can store this info and retrieve it later for my own purposes?
n
hmm - where / how do you need to access what you were you trying to store in
ctx.results
? it seems like all of the info you are storing there would be available via the API except the result (if you’re not persisting it) i'm gonna clone the repo and poke around 🙂
i am successfully running
aircmd core ci
on my fork, cool stuff. were you trying to put that hook on
build
,
test
,
ci
or all 3?
c
on all 3. But After looking into this a bit more, I realized that the Artifacts concept serves my use case I think, since the results object I created was just to conflate run results. So in the end I was probably Doing it Wrong, since I think Artifacts does everything that I was trying to do with that Results object
n
sweet! i was thinking about artifacts here too
c
(I do still think that it would be interesting to pass a result without persisting it, in the original request, as I’m sure what I’m doing isn’t the only use-case for that). If you agree, perhaps I can open an issue?
n
by
pass a result
do you mean specifically that you'd like the result of a task / flow run to be available in the state hook even if
persist_result=False
?
because it will be available to the caller of the flow, which seems to be the click command decorator in your case? i might not be totally understanding the setup yet
c
yeah that would be the ask. Some way to pass that result in memory without having to persist it to disk
and yeah, originally i had it managed by the caller of the flow, and that worked, but it was one extra line per task and it just felt needlessly verbose
i guess what i would have expected, and of course i’m quite naïve about prefect, is the code i linked above to print the result to “just work” by passing in memory