https://prefect.io logo
#prefect-community
Title
# prefect-community
t

Tony Liberato

03/18/2022, 7:44 PM
Does anyone have a complete working example on how to use PrefectResult? Its simple: Scrape a website for a link, if the link is different than the prefect result, download it and save the new link as the result. Otherwise exit with "success: no work to be done". I don't want this to be a local result because we plan to move these flows off of a windows agent and into docker agent. Thanks in advance!
k

Kevin Kho

03/18/2022, 7:53 PM
For the first question on how to use the result, did you try:
Copy code
@task(result=PrefectResult())
? this should be it
I am not sure it can fit this use case because it’s not intended to inspect the result of a previous run. The KV Store seems like a much better fit
Ah maybe you can with
cache_for
and
cache_validator
actually combined with PrefectResult.
a

Alex Rogozhnikov

04/06/2022, 6:49 AM
Hi @Kevin Kho, I'm stuck with the same question and it did not work for me. To be more specific, after adding result=PrefectResult() I expect there should be some way to retrieve that data. I run service locally (i.e. on the same node) with docker workers. I've set up caching, it seems to work on local runs. That's what I do:
Copy code
from prefect import Client

client = Client()
client_flow_output_id = client.create_flow_run(
    parameters=dict(people=['one', 'two', 'three']),
    flow_id=flow_id,
)
I see that results are cached in prefect UI. However I don't understand how I am supposed to retrieve cache results. That's what I've found:
Copy code
remote_flow_state.load_cached_results().__dict__

{'message': 'All reference tasks succeeded.',
 '_result': <Result: None>,
 'context': {},
 'cached_inputs': {}}
so my problem: cache seems to work, but I don't understand how I can retrieve it manually or check it's existence. If there are other tools to retrieve results of stages in python (not caching) when working with docker runs and cluster runs, I'd also love to learn about them
k

Kevin Kho

04/06/2022, 1:45 PM
I am not sure you can fetch the results of
PrefectResult
specifically. Maybe you can try the
get_task_run_result
task and invoke it using
get_task_run_result.run(…)
What are you trying to do? Are you trying to access it for just one task? Because persisting stuff in the KV Store might be easier to work with
a

Alex Rogozhnikov

04/06/2022, 3:57 PM
KV store isn't an option, it does not scale and limitation of 10KB is too tight for me. I am testing if prefect can work in my scenario (I expect that before running a batch of jobs in multiple cases I'll need to tune the job parameters, when done need to check that it works in docker created for the task, then scale up): • I'd like to be able to run things locally (same env, different process) and return result for visualization / debugging. I can retrieve .result here, seems to work. Results are mixtures of images and tabular data. • I need to run on local docker executor two-three tasks and confirm they produce the same result • Then I need to scale it up. I'd like to cache results for some time so e.g. if something went wrong, I could check cached outputs during the next two weeks. I thought that cached result would be a nice option, but it is unclear how one can take retrieve result from a spawned process. If I can do that for S3Result - I can use S3Result, but it's still unclear if I can given the task result without knowing beforehand which key the task used for storing
I can give more details if it is helpful
k

Kevin Kho

04/06/2022, 4:03 PM
whoops i gave the wrong link for KV store lol. reading the response
Yes the
get_task_run_result
task is what you need. you need a flow_id and task slug to do that. For S3Result though, you can specify the location if that helps you retrieve it later on. For PrefectResult it’s kinda weird because the location is equal to the value
a

Alex Rogozhnikov

04/06/2022, 11:28 PM
I can't make it work. Can you provide some minimal example? Caching as I mentioned works I've been checking graphql interface, but it appears that cache_key is empty - so it's unclear to me how one can start say from the flow run and collect intermediate computations
Copy code
"flow_runs": [
          {
            "parameters": {
              "people": [
                "one",
                "two",
                "three"
              ]
            },
            "flow_id": "e78727e4-ac66-416b-bca1-65427daf58a6",
            "task_runs": [
              {
                "task": {
                  "name": "people"
                },
                "state": "Success",
                "cache_key": null,
                "state_result": null,
                "name": null,
                "tenant_id": "a7371ee2-0e8a-4309-9375-a017f502bb31"
              },
              {
I don't see even see keys being saved in the database, so it seems database will not be able to connect those
I think
Copy code
get_task_run_result
can retrieve parameters from the local cache, but not prefect cache. Though I am not sure at all. The way I get it running:
Copy code
get_task_run_result.run(flow_run_id=client_flow_output_id, task_slug=result.slug)
seems to not require client and thus I don't expect it to be able to retrieve anything from the database. Yet it seems to return output without knowing where to connect to. (I use local client)
k

Kevin Kho

04/07/2022, 1:24 AM
I think you are conflating that
cache_key
and the result. The result is for checkpointing. Caching is more about holding state in the Prefect Cloud/Server to not re-run a task. You should be looking for the
result
under the task run.
get_task_run_result
uses the
FlowRunView
here from the id that you give, which then goes here to a method to query, which then creates a Client to get the details for you. And then that Client instantiation pulls your stored API Key and uses it.
In Prefect, the terminology for Caching refers to something different which is like “cache this task for 1 hour”
And then for the
get_task_run_result
, it retrieves the result by querying the API for that specific task (by using the task slug), and then pulls the result, and then uses that information to load it in, which is quite a bit of work to create. So for all results (except PrefectResult), you specify where it’s stored like
Local/S3/GCS
and then the task writes it there. Then Prefect uses the location info to retrieve it with the
get_task_run_result
. I am still unsure about whether or not PrefectResult can be used for retrieval beyond checkpointing
Let me make an example one sec
Actually there is one here
Ok from my test I think PrefectResult works. Here is a full code example:
Copy code
from prefect import Flow, task
from prefect.engine.results import PrefectResult

@task
def abc(result=PrefectResult()):
    return "testing"

with Flow("sub") as flow:
    abc()

print(flow.serialize()) # this is how i get the slug
flow.register("databricks")

from prefect.tasks.prefect import create_flow_run, wait_for_flow_run, get_task_run_result
flow_id = create_flow_run.run(project_name="databricks", flow_name="sub")
wait_for_flow_run.run(flow_id)
res = get_task_run_result.run(flow_id, "abc-1")

print(res)
a

Alex Rogozhnikov

04/07/2022, 4:50 PM
Thank you so much, that had shed light for me! I've found that queries also retrieve serialized_state for flow_run, which in turns contains result location field. Still digging into how client knows which endpoint to connect to.
and to confirm: yes, I can retrieve run result now
k

Kevin Kho

04/07/2022, 4:53 PM
Nice! Client just pulls the API KEY stored by default but you can point it explicitly
l

Luan Tiburcio

05/26/2022, 1:35 PM
Copy code
Good morning, @Kevin Kho @Alex Rogozhnikov can I retrieve this result through the graphql api?? The code really does what I need, which is to retrieve a json.
99 Views