Conor
07/21/2023, 4:22 PMMarvin
07/21/2023, 4:22 PMstate.result()
, but the State class does not have a result()
method, at least not in the Prefect API.
The FlowRun
object does have a get_result()
method, but it requires the task run id as a parameter. It's also worth noting, it will only return a result if the flow run is finished (its state is a subclass of Finished).
So, while you could theoretically use flow_run.get_result(task_run_id)
to retrieve the result of a particular task run within the flow run, you would still need to ensure that the task run is finished and you know the id of the task run you're interested in.
You might find the Prefect documentation (https://docs.prefect.io/) or their GitHub repository (https://github.com/PrefectHQ/prefect) helpful for further inquiries. You know, in case you want to liven up your Friday evening with some light technical reading.
In the meantime, I'll continue to ponder the existential implications of your question...Nate
07/21/2023, 4:23 PMMarvin
07/21/2023, 4:23 PMState
class does have a result
attribute. This attribute holds the result of a task run or flow run.
You can access it like this: state.result
, not like this: state.result()
. The latter would imply it's a method, not an attribute.
If your store_results
function is a state transition callback for a Task, then state.result
inside that function would give you the result of that task. If it's a state transition callback for a Flow, then state.result
would give you the result of the flow's last task run.
Here's your function with the correct syntax:
python
async def store_results(flow:Flow, flow_run: FlowRun, state: State[Any]) -> None:
result = state.result
print(result)
Having said that, accessing large results in this way could potentially cause memory issues, depending on how large the result objects are and how they're being used.
And thus concludes today's episode of "Fun with State Management." I'm thrilled. Really.Nate
07/21/2023, 4:24 PMNate
07/21/2023, 4:24 PMConor
07/21/2023, 4:25 PMtype='unpersisted' artifact_type='result' artifact_description='Unpersisted result of type `list`'
Nate
07/21/2023, 4:25 PMConor
07/21/2023, 4:25 PMNate
07/21/2023, 4:28 PMNate
07/21/2023, 4:29 PMConor
07/21/2023, 4:29 PMNate
07/21/2023, 4:29 PMConor
07/21/2023, 4:30 PMConor
07/21/2023, 4:30 PMNate
07/21/2023, 4:34 PMI just want the tasks taking inputs and returning containerswhat does it look like in your case for your task to be returning containers?
Conor
07/21/2023, 5:12 PMConor
07/21/2023, 5:13 PMConor
07/21/2023, 5:13 PMConor
07/21/2023, 5:14 PMasync def store_results(flow:Flow, flow_run: FlowRun, state: State[Any]) -> None:
ctx = PipelineContext()
settings = GlobalSettings()
result = state.result()
print(state.result())
ctx.results[flow_run.name] = {
"name": flow.name,
"scan_url": result.file("/airbyte/oss/scan-journal.log").contents().split(' - ')[2].strip() if "scan-journal.log" in result.directory("/airbyte/oss").entries() else "",
"flow_url": f"{settings.PREFECT_API_URL.rsplit('/', 1)[0]}/flow-runs/flow-run/{str(flow_run.id)}",
"state": state.type,
"tasks": flow_run.state
}
rich.print(ctx.results)
Conor
07/21/2023, 5:16 PMConor
07/21/2023, 5:33 PMNate
07/21/2023, 8:08 PMctx.results
?
it seems like all of the info you are storing there would be available via the API except the result (if you’re not persisting it)
i'm gonna clone the repo and poke around 🙂Nate
07/21/2023, 8:31 PMaircmd core ci
on my fork, cool stuff. were you trying to put that hook on build
, test
, ci
or all 3?Conor
07/21/2023, 9:49 PMNate
07/21/2023, 9:50 PMConor
07/21/2023, 9:50 PMNate
07/21/2023, 10:16 PMpass a resultdo you mean specifically that you'd like the result of a task / flow run to be available in the state hook even if
persist_result=False
?Nate
07/21/2023, 10:18 PMConor
07/21/2023, 10:26 PMConor
07/21/2023, 10:27 PMConor
07/21/2023, 10:27 PM