Can we write a task that could return the list of ...
# ask-community
d
Can we write a task that could return the list of files/folder from the result storage that prefect is using. The idea is that the flow is setup with S3 storage this would do an s3 list_object and a local would a recursive scan. This task could take a prefix to Supports filtering A speudo code could look like that
Copy code
@task
def list_file_from_storate(prefix: str) -> List[str]:
    result = prefect.context["result"]
    return result.list_files(prefix)
Looking at the context doc I cannot see the result been available here https://docs.prefect.io/api/latest/utilities/context.html We we want is to be able to interact with the result using task from the flow setup.
a
in general, due to the hybrid execution model, Prefect doesn’t store your data, only the result location, so I think it may be a bit hard to do. But you can use e.g. a state handler to get the result and do something with it either by leveraging the state.result or entirely querying it from the API using the FlowRunView:
Copy code
import json
import requests
import prefect
from prefect import task, Flow
from prefect.backend.flow_run import FlowRunView


def send_report_on_success(task, old_state, new_state):
    if new_state.is_successful():
        flow_run = FlowRunView.from_flow_run_id(prefect.context.get("flow_run_id"))
        task_run = flow_run.get_task_run(task_run_id=prefect.context.get("task_run_id"))
        result = task_run.get_result()
        <http://requests.post|requests.post>(url="webhook_url", data=json.dumps({"text": result}))
    return new_state


@task(state_handlers=[send_report_on_success])
def return_some_data():
    return "Some result"


with Flow(name="state-handler-demo-flow") as flow:
    result = return_some_data()
the FlowRunVew can also be used in your tasks to access task run results and act on it in some way and of course, as long as you know the result location, you can just list files using boto3:
Copy code
s3.list_objects(Bucket='', MaxKeys=10, Prefix='optional_file_prefix_')
d
Hi @Anna Geller, I know that prefect doesn't store our data. What I want is to have a task that is able to query the result which have bee store usually using the target parameter. We have different environment and I don't want to manage retrieving result from different result storage base on it. In local we are using usually the prefect local storage and on prod the S3 result storage. We have flow that do external source -> result storage using target and other which are doing result storage -> analytic db or ML processing. We leveraging all the capacity of prefect that allow us to abstract the result storage avoid from the business logic. However we have a new requirement for which we want to be a able to reload all the data persist in the result storage under a given prefix and for that I was thinking about something like that
Copy code
from prefect import task
import pandas as dataframe


@task(target="{path}")
def get_cached_df(path:str) -> pd,dataframe:
    raise Exception(f"File {path} is not cached")

with flow("load_data") as f:
     /// This is the task I'm trying to write
    files = get_files_list_from_result()
    cached_df = get_cached_df.map(files)
Does this speudo code make sense ?
We already wrote another task decorator that allow to separate the serializer from the result and we are generating the result of the task on the registration or run base on our setup and the serializer setup like that
Copy code
from _typeshed import SupportsAllComparisons
from typing import Any, Callable, Optional, Union
import prefect
from prefect.engine.serializers import Serializer

def custom_serializer_task(
    fn: Callable = None,
    serializer: Optional[Serializer] = None,
    **task_init_kwargs: Any,
) -> Union[
    "prefect.tasks.core.function.FunctionTask",
    Callable[[Callable], "prefect.tasks.core.function.FunctionTask"],
]:
    def get_task(fn, serializer, **task_init_kwargs):
        t = prefect.tasks.core.function.FunctionTask(fn=fn, **task_init_kwargs,)
        t.serializer = serializer  # type: ignore [attr-defined]
        return t
 
    if fn is None:
        return lambda fn: get_task(fn, serializer, **task_init_kwargs)
 
    t = prefect.tasks.core.function.FunctionTask(fn=fn, **task_init_kwargs)
    t.serializer = serializer  # type: ignore [attr-defined]
    return t
a
The easiest way would be to just store this path as a global variable:
Copy code
from prefect import task
import pandas as dataframe

PATH = ""

@task(target="{PATH}")
def get_cached_df(path:str) -> pd,dataframe:
    raise Exception(f"File {path} is not cached")

with flow("load_data") as f:
     /// This is the task I'm trying to write
    files = get_files_list_from_result(PATH)
    cached_df = get_cached_df.map(files)
Definitely this path is not stored in context and retrieving it from API would be cumbersome. As an alternative to a global variable, you could push the result location to a KV Store and retrieve it from there to make it easier. If you are on Server, you could store this information e.g. is some database or Redis, or even in a separate S3 location and look it up in your flow.
d
This is not what I'm looking for, maybe this is not realistic with the current version of prefect. Like we have a concept of run_config and executor, I want the result storage to be setup on the flow level (Which we are doing alreay) and available in the context to query. It already provide access exist for example to check if the current result if cached already. Here we want to be able to access this information from inside a task in the context. I know today result doesn't have list_object implementation but this would not be hard to do.
a
I understand why having this info in the context would be beneficial for you. Can you explain why using a global variable as I described wouldn’t work for your use case? Because for targets or results you need to specify the path yourself, so you have access to this information in your flow already. Not sure what I’m missing. I think listing objects for all task run results on a flow level may be tricky since afaik it’s bound to a task run rather than flow run, and there are so many edge cases depending on what Result class you use, whether you use mapping/target-based caching etc. Would you be willing to open a Github issue discussing the feature request you would like to see? This way, other engineers could look at it and validate whether this is a feature we could add.
d
Sure let me open an issue. Imagine I save data everyday using a custom target using today Now I want to load all the history files that we persisted to perfect result storage to do another processing with them however I don't know which find exist. And I don't want in my flow to have to have custom talks for local, S3 or azure blob storage. I want the result storage setup on my flow to be able to return the path but only the part that prefect manage. I don't want s3://mybucket/market_data/20220103/spy.parquet but only market_data/20220103/spy.parquet so that the dummy task I wrote above can retrieve it from the cache
a
That would be convenient for sure. But given that you template your targets to s3://mybucket/market_data/ retrieving all files persisted by Prefect is easy:
Copy code
aws s3 ls <s3://mybucket/market_data/>
# or even:
aws s3 ls <s3://mybucket/market_data/20220103/>
anyway, feel free to open a Github issue, perhaps it’s doable
d
This is my problem some environment are using local as base and as we are looking to move to azure we will use azure:/) with different bases path that are setup on the flow at registration
And this infrastructure detail should be be the responsibility of the person who write the flow but runtime detail like the task executor