Thread
#prefect-community
    Tony Liberato

    Tony Liberato

    6 months ago
    Does anyone have a complete working example on how to use PrefectResult? Its simple: Scrape a website for a link, if the link is different than the prefect result, download it and save the new link as the result. Otherwise exit with "success: no work to be done". I don't want this to be a local result because we plan to move these flows off of a windows agent and into docker agent. Thanks in advance!
    Kevin Kho

    Kevin Kho

    6 months ago
    For the first question on how to use the result, did you try:
    @task(result=PrefectResult())
    ? this should be it
    I am not sure it can fit this use case because it’s not intended to inspect the result of a previous run. The KV Store seems like a much better fit
    Ah maybe you can with
    cache_for
    and
    cache_validator
    actually combined with PrefectResult.
    Alex Rogozhnikov

    Alex Rogozhnikov

    5 months ago
    Hi @Kevin Kho, I'm stuck with the same question and it did not work for me. To be more specific, after adding result=PrefectResult() I expect there should be some way to retrieve that data. I run service locally (i.e. on the same node) with docker workers. I've set up caching, it seems to work on local runs. That's what I do:
    from prefect import Client
    
    client = Client()
    client_flow_output_id = client.create_flow_run(
        parameters=dict(people=['one', 'two', 'three']),
        flow_id=flow_id,
    )
    I see that results are cached in prefect UI. However I don't understand how I am supposed to retrieve cache results. That's what I've found:
    remote_flow_state.load_cached_results().__dict__
    
    {'message': 'All reference tasks succeeded.',
     '_result': <Result: None>,
     'context': {},
     'cached_inputs': {}}
    so my problem: cache seems to work, but I don't understand how I can retrieve it manually or check it's existence. If there are other tools to retrieve results of stages in python (not caching) when working with docker runs and cluster runs, I'd also love to learn about them
    Kevin Kho

    Kevin Kho

    5 months ago
    I am not sure you can fetch the results of
    PrefectResult
    specifically. Maybe you can try the
    get_task_run_result
    task and invoke it using
    get_task_run_result.run(…)
    What are you trying to do? Are you trying to access it for just one task? Because persisting stuff in the KV Store might be easier to work with
    Alex Rogozhnikov

    Alex Rogozhnikov

    5 months ago
    KV store isn't an option, it does not scale and limitation of 10KB is too tight for me. I am testing if prefect can work in my scenario (I expect that before running a batch of jobs in multiple cases I'll need to tune the job parameters, when done need to check that it works in docker created for the task, then scale up): • I'd like to be able to run things locally (same env, different process) and return result for visualization / debugging. I can retrieve .result here, seems to work. Results are mixtures of images and tabular data. • I need to run on local docker executor two-three tasks and confirm they produce the same result • Then I need to scale it up. I'd like to cache results for some time so e.g. if something went wrong, I could check cached outputs during the next two weeks. I thought that cached result would be a nice option, but it is unclear how one can take retrieve result from a spawned process. If I can do that for S3Result - I can use S3Result, but it's still unclear if I can given the task result without knowing beforehand which key the task used for storing
    I can give more details if it is helpful
    Kevin Kho

    Kevin Kho

    5 months ago
    whoops i gave the wrong link for KV store lol. reading the response
    Yes the
    get_task_run_result
    task is what you need. you need a flow_id and task slug to do that. For S3Result though, you can specify the location if that helps you retrieve it later on. For PrefectResult it’s kinda weird because the location is equal to the value
    Alex Rogozhnikov

    Alex Rogozhnikov

    5 months ago
    I can't make it work. Can you provide some minimal example? Caching as I mentioned works I've been checking graphql interface, but it appears that cache_key is empty - so it's unclear to me how one can start say from the flow run and collect intermediate computations
    "flow_runs": [
              {
                "parameters": {
                  "people": [
                    "one",
                    "two",
                    "three"
                  ]
                },
                "flow_id": "e78727e4-ac66-416b-bca1-65427daf58a6",
                "task_runs": [
                  {
                    "task": {
                      "name": "people"
                    },
                    "state": "Success",
                    "cache_key": null,
                    "state_result": null,
                    "name": null,
                    "tenant_id": "a7371ee2-0e8a-4309-9375-a017f502bb31"
                  },
                  {
    I don't see even see keys being saved in the database, so it seems database will not be able to connect those
    I think
    get_task_run_result
    can retrieve parameters from the local cache, but not prefect cache. Though I am not sure at all. The way I get it running:
    get_task_run_result.run(flow_run_id=client_flow_output_id, task_slug=result.slug)
    seems to not require client and thus I don't expect it to be able to retrieve anything from the database. Yet it seems to return output without knowing where to connect to. (I use local client)
    Kevin Kho

    Kevin Kho

    5 months ago
    I think you are conflating that
    cache_key
    and the result. The result is for checkpointing. Caching is more about holding state in the Prefect Cloud/Server to not re-run a task. You should be looking for the
    result
    under the task run.
    get_task_run_result
    uses the
    FlowRunView
    here from the id that you give, which then goes here to a method to query, which then creates a Client to get the details for you. And then that Client instantiation pulls your stored API Key and uses it.
    In Prefect, the terminology for Caching refers to something different which is like “cache this task for 1 hour”
    And then for the
    get_task_run_result
    , it retrieves the result by querying the API for that specific task (by using the task slug), and then pulls the result, and then uses that information to load it in, which is quite a bit of work to create. So for all results (except PrefectResult), you specify where it’s stored like
    Local/S3/GCS
    and then the task writes it there. Then Prefect uses the location info to retrieve it with the
    get_task_run_result
    . I am still unsure about whether or not PrefectResult can be used for retrieval beyond checkpointing
    Let me make an example one sec
    Actually there is one here
    Ok from my test I think PrefectResult works. Here is a full code example:
    from prefect import Flow, task
    from prefect.engine.results import PrefectResult
    
    @task
    def abc(result=PrefectResult()):
        return "testing"
    
    with Flow("sub") as flow:
        abc()
    
    print(flow.serialize()) # this is how i get the slug
    flow.register("databricks")
    
    from prefect.tasks.prefect import create_flow_run, wait_for_flow_run, get_task_run_result
    flow_id = create_flow_run.run(project_name="databricks", flow_name="sub")
    wait_for_flow_run.run(flow_id)
    res = get_task_run_result.run(flow_id, "abc-1")
    
    print(res)
    Alex Rogozhnikov

    Alex Rogozhnikov

    5 months ago
    Thank you so much, that had shed light for me! I've found that queries also retrieve serialized_state for flow_run, which in turns contains result location field. Still digging into how client knows which endpoint to connect to.
    and to confirm: yes, I can retrieve run result now
    Kevin Kho

    Kevin Kho

    5 months ago
    Nice! Client just pulls the API KEY stored by default but you can point it explicitly
    Luan Tiburcio

    Luan Tiburcio

    4 months ago
    Good morning, @Kevin Kho @Alex Rogozhnikov can I retrieve this result through the graphql api?? The code really does what I need, which is to retrieve a json.