Tony Liberato
03/18/2022, 7:44 PMKevin Kho
@task(result=PrefectResult())
? this should be itcache_for
and cache_validator
actually combined with PrefectResult.Alex Rogozhnikov
04/06/2022, 6:49 AMfrom prefect import Client
client = Client()
client_flow_output_id = client.create_flow_run(
parameters=dict(people=['one', 'two', 'three']),
flow_id=flow_id,
)
remote_flow_state.load_cached_results().__dict__
{'message': 'All reference tasks succeeded.',
'_result': <Result: None>,
'context': {},
'cached_inputs': {}}
Kevin Kho
PrefectResult
specifically. Maybe you can try the get_task_run_result
task and invoke it using get_task_run_result.run(…)
What are you trying to do? Are you trying to access it for just one task? Because persisting stuff in the KV Store might be easier to work withAlex Rogozhnikov
04/06/2022, 3:57 PMKevin Kho
get_task_run_result
task is what you need. you need a flow_id and task slug to do that. For S3Result though, you can specify the location if that helps you retrieve it later on. For PrefectResult it’s kinda weird because the location is equal to the valueAlex Rogozhnikov
04/06/2022, 11:28 PM"flow_runs": [
{
"parameters": {
"people": [
"one",
"two",
"three"
]
},
"flow_id": "e78727e4-ac66-416b-bca1-65427daf58a6",
"task_runs": [
{
"task": {
"name": "people"
},
"state": "Success",
"cache_key": null,
"state_result": null,
"name": null,
"tenant_id": "a7371ee2-0e8a-4309-9375-a017f502bb31"
},
{
I don't see even see keys being saved in the database, so it seems database will not be able to connect thoseget_task_run_result
can retrieve parameters from the local cache, but not prefect cache. Though I am not sure at all.
The way I get it running:
get_task_run_result.run(flow_run_id=client_flow_output_id, task_slug=result.slug)
seems to not require client and thus I don't expect it to be able to retrieve anything from the database.
Yet it seems to return output without knowing where to connect to. (I use local client)Kevin Kho
cache_key
and the result. The result is for checkpointing. Caching is more about holding state in the Prefect Cloud/Server to not re-run a task. You should be looking for the result
under the task run.
get_task_run_result
uses the FlowRunView
here from the id that you give, which then goes here to a method to query, which then creates a Client to get the details for you. And then that Client instantiation pulls your stored API Key and uses it.get_task_run_result
, it retrieves the result by querying the API for that specific task (by using the task slug), and then pulls the result, and then uses that information to load it in, which is quite a bit of work to create.
So for all results (except PrefectResult), you specify where it’s stored like Local/S3/GCS
and then the task writes it there. Then Prefect uses the location info to retrieve it with the get_task_run_result
.
I am still unsure about whether or not PrefectResult can be used for retrieval beyond checkpointingfrom prefect import Flow, task
from prefect.engine.results import PrefectResult
@task
def abc(result=PrefectResult()):
return "testing"
with Flow("sub") as flow:
abc()
print(flow.serialize()) # this is how i get the slug
flow.register("databricks")
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run, get_task_run_result
flow_id = create_flow_run.run(project_name="databricks", flow_name="sub")
wait_for_flow_run.run(flow_id)
res = get_task_run_result.run(flow_id, "abc-1")
print(res)
Alex Rogozhnikov
04/07/2022, 4:50 PMKevin Kho
Luan Tiburcio
05/26/2022, 1:35 PMGood morning, @Kevin Kho @Alex Rogozhnikov can I retrieve this result through the graphql api?? The code really does what I need, which is to retrieve a json.