davzucky
01/13/2022, 12:32 PM@task
def list_file_from_storate(prefix: str) -> List[str]:
result = prefect.context["result"]
return result.list_files(prefix)
Looking at the context doc I cannot see the result been available here https://docs.prefect.io/api/latest/utilities/context.html
We we want is to be able to interact with the result using task from the flow setup.Anna Geller
import json
import requests
import prefect
from prefect import task, Flow
from prefect.backend.flow_run import FlowRunView
def send_report_on_success(task, old_state, new_state):
if new_state.is_successful():
flow_run = FlowRunView.from_flow_run_id(prefect.context.get("flow_run_id"))
task_run = flow_run.get_task_run(task_run_id=prefect.context.get("task_run_id"))
result = task_run.get_result()
<http://requests.post|requests.post>(url="webhook_url", data=json.dumps({"text": result}))
return new_state
@task(state_handlers=[send_report_on_success])
def return_some_data():
return "Some result"
with Flow(name="state-handler-demo-flow") as flow:
result = return_some_data()
the FlowRunVew can also be used in your tasks to access task run results and act on it in some way
and of course, as long as you know the result location, you can just list files using boto3:
s3.list_objects(Bucket='', MaxKeys=10, Prefix='optional_file_prefix_')
davzucky
01/13/2022, 1:20 PMfrom prefect import task
import pandas as dataframe
@task(target="{path}")
def get_cached_df(path:str) -> pd,dataframe:
raise Exception(f"File {path} is not cached")
with flow("load_data") as f:
/// This is the task I'm trying to write
files = get_files_list_from_result()
cached_df = get_cached_df.map(files)
davzucky
01/13/2022, 1:24 PMdavzucky
01/13/2022, 1:26 PMfrom _typeshed import SupportsAllComparisons
from typing import Any, Callable, Optional, Union
import prefect
from prefect.engine.serializers import Serializer
def custom_serializer_task(
fn: Callable = None,
serializer: Optional[Serializer] = None,
**task_init_kwargs: Any,
) -> Union[
"prefect.tasks.core.function.FunctionTask",
Callable[[Callable], "prefect.tasks.core.function.FunctionTask"],
]:
def get_task(fn, serializer, **task_init_kwargs):
t = prefect.tasks.core.function.FunctionTask(fn=fn, **task_init_kwargs,)
t.serializer = serializer # type: ignore [attr-defined]
return t
if fn is None:
return lambda fn: get_task(fn, serializer, **task_init_kwargs)
t = prefect.tasks.core.function.FunctionTask(fn=fn, **task_init_kwargs)
t.serializer = serializer # type: ignore [attr-defined]
return t
Anna Geller
from prefect import task
import pandas as dataframe
PATH = ""
@task(target="{PATH}")
def get_cached_df(path:str) -> pd,dataframe:
raise Exception(f"File {path} is not cached")
with flow("load_data") as f:
/// This is the task I'm trying to write
files = get_files_list_from_result(PATH)
cached_df = get_cached_df.map(files)
Definitely this path is not stored in context and retrieving it from API would be cumbersome. As an alternative to a global variable, you could push the result location to a KV Store and retrieve it from there to make it easier. If you are on Server, you could store this information e.g. is some database or Redis, or even in a separate S3 location and look it up in your flow.davzucky
01/13/2022, 1:40 PMAnna Geller
davzucky
01/13/2022, 2:01 PMAnna Geller
aws s3 ls <s3://mybucket/market_data/>
# or even:
aws s3 ls <s3://mybucket/market_data/20220103/>
anyway, feel free to open a Github issue, perhaps it’s doabledavzucky
01/13/2022, 2:19 PMdavzucky
01/13/2022, 2:22 PM