Tony Liberato
03/18/2022, 7:44 PMKevin Kho
@task(result=PrefectResult())
? this should be itKevin Kho
Kevin Kho
cache_for
and cache_validator
actually combined with PrefectResult.Alex Rogozhnikov
04/06/2022, 6:49 AMfrom prefect import Client
client = Client()
client_flow_output_id = client.create_flow_run(
parameters=dict(people=['one', 'two', 'three']),
flow_id=flow_id,
)
Alex Rogozhnikov
04/06/2022, 6:52 AMremote_flow_state.load_cached_results().__dict__
{'message': 'All reference tasks succeeded.',
'_result': <Result: None>,
'context': {},
'cached_inputs': {}}
Alex Rogozhnikov
04/06/2022, 6:58 AMKevin Kho
PrefectResult
specifically. Maybe you can try the get_task_run_result
task and invoke it using get_task_run_result.run(…)
What are you trying to do? Are you trying to access it for just one task? Because persisting stuff in the KV Store might be easier to work withAlex Rogozhnikov
04/06/2022, 3:57 PMAlex Rogozhnikov
04/06/2022, 3:58 PMKevin Kho
Kevin Kho
get_task_run_result
task is what you need. you need a flow_id and task slug to do that. For S3Result though, you can specify the location if that helps you retrieve it later on. For PrefectResult it’s kinda weird because the location is equal to the valueAlex Rogozhnikov
04/06/2022, 11:28 PM"flow_runs": [
{
"parameters": {
"people": [
"one",
"two",
"three"
]
},
"flow_id": "e78727e4-ac66-416b-bca1-65427daf58a6",
"task_runs": [
{
"task": {
"name": "people"
},
"state": "Success",
"cache_key": null,
"state_result": null,
"name": null,
"tenant_id": "a7371ee2-0e8a-4309-9375-a017f502bb31"
},
{
I don't see even see keys being saved in the database, so it seems database will not be able to connect thoseAlex Rogozhnikov
04/07/2022, 1:11 AMget_task_run_result
can retrieve parameters from the local cache, but not prefect cache. Though I am not sure at all.
The way I get it running:
get_task_run_result.run(flow_run_id=client_flow_output_id, task_slug=result.slug)
seems to not require client and thus I don't expect it to be able to retrieve anything from the database.
Yet it seems to return output without knowing where to connect to. (I use local client)Kevin Kho
cache_key
and the result. The result is for checkpointing. Caching is more about holding state in the Prefect Cloud/Server to not re-run a task. You should be looking for the result
under the task run.
get_task_run_result
uses the FlowRunView
here from the id that you give, which then goes here to a method to query, which then creates a Client to get the details for you. And then that Client instantiation pulls your stored API Key and uses it.Kevin Kho
Kevin Kho
get_task_run_result
, it retrieves the result by querying the API for that specific task (by using the task slug), and then pulls the result, and then uses that information to load it in, which is quite a bit of work to create.
So for all results (except PrefectResult), you specify where it’s stored like Local/S3/GCS
and then the task writes it there. Then Prefect uses the location info to retrieve it with the get_task_run_result
.
I am still unsure about whether or not PrefectResult can be used for retrieval beyond checkpointingKevin Kho
Kevin Kho
Kevin Kho
from prefect import Flow, task
from prefect.engine.results import PrefectResult
@task
def abc(result=PrefectResult()):
return "testing"
with Flow("sub") as flow:
abc()
print(flow.serialize()) # this is how i get the slug
flow.register("databricks")
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run, get_task_run_result
flow_id = create_flow_run.run(project_name="databricks", flow_name="sub")
wait_for_flow_run.run(flow_id)
res = get_task_run_result.run(flow_id, "abc-1")
print(res)
Alex Rogozhnikov
04/07/2022, 4:50 PMAlex Rogozhnikov
04/07/2022, 4:51 PMKevin Kho
Luan Tiburcio
05/26/2022, 1:35 PMGood morning, @Kevin Kho @Alex Rogozhnikov can I retrieve this result through the graphql api?? The code really does what I need, which is to retrieve a json.
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by