Hi guys:wave: I'm struggling with a problem. is th...
# ask-community
j
Hi guys👋 I'm struggling with a problem. is there anyone who already resolve it? How can I get flow's result from worker?
from prefect.deployments import run_deployment
response = run_deployment(deploy_id)
I'm running the deployment with this code then my agent container will run flow code. what I need is I have to get the return value of flow after
response = run_deployment(deploy_id)
this code. is there any genius who will be my lifesaver?
n
the response variable you’ve got there actually contains a model of the FlowRun. so as long as you’ve persisted results on the flow that you triggered/want to get results from, then
response.state.result()
should be the return value of that flow
j
@Nate wow thank you soooo much 👍
🦜 1
@Nate Sorry to bother you, because when I use response.state.result(), I got this error message,
ValueError: Path /root/.prefect/storage/0666775b472b4342b56c3448eb86aeac does not exist.
it's really weird because when I run the code without
response.state.result()
, it doesn't have any error. is there any pre-stage I have to do before using
response.state.result()
?
n
so as long as you’ve persisted results
ah so i would guess you're running this in a container where you don't have an external
result_storage
like (s3, gcs etc) that lives beyond your container. so you're probably persisting results onto the container that is torn down at the end of the run, and the storage dies with it. then when you call
response.state.result()
, prefect thinks you're trying to refer to local (on your machine) storage so i would update your flow that you're triggering like
Copy code
@flow(..., result_storage=S3.load("my-s3-block") # or gcs)
or you can set
Copy code
export PREFECT_DEFAULT_RESULT_STORAGE_BLOCK=gcs/my-gcs-block #again, same for s3 or azure
j
@Nate now it's working so well! I really appreciate it
n
catjam
j
hello @Nate, thanks for the input. do you have any recommendations on how to manage the lifecycle of the persisted results? as far as i can tell, flow results would stay around indefinitely and accumulate in the cloud storage. when using it like
res = run_deployment(...).state.result()
there would be no use for it anymore immediately after it is retrieved. how could this be cleared? when removing this directly from the cloud storage, this would leave a dead link in the results tab of the flow run but i couldn't find a way to remove a result without removing the flow run.
n
i'd say there's a lot of ways to go about it • lifecycle rules on the bucket via the storage provider • schedule a script to clear things in a more conditional / custom way
there would be no use for it anymore immediately after it is retrieved
if you use storage keys (
result_storage_key
kwarg on tasks) then you can overwrite the same key to avoid bloat, or if you have more complex clean up then you can do an
on_completion
hook on the parent that accesses the result to delete whatever blobs you no longer need
j
ok, so we have to live with it that there will be some results referenced in prefect that have no counterpart in the cloud storage anymore?
n
i think with a combo of lifecycle rules and custom result storage keys, then no you wouldnt have to - feel free to let me know if you have trouble doing that