Hi all -- this is not necessarily a prefect specif...
# ask-community
t
Hi all -- this is not necessarily a prefect specific question, but it might be. I am not sure... I am thinking about implementing something akin to a context in my flows, which gets updated with key data statistics as they get derived at intermediary stages throughout the processing of the workflow. A simply
NamedTuple
or pydantic model with a set of attributes. Being able to do something like: For example, some use cases could be something like this
Copy code
from prefect import task, flow
import numpy as np

from flint.context import FieldImageProperties, get_field_image_properties

@task
deg add_image_rms(image_data):
    field_image_properties: FieldImageProperties = get_field_image_properties()

    field_image_properties.image_rms = np.std(image_data) 

@task
def add_frequency(image_header):
    field_image_properties: FieldImageProperties = get_field_image_properties()

     field_image_proerties.image_frequency = header['FREQ']

@flow
def expensive_image_job():
     image_data, image_header = super_expensive_thing()
     add_image_properties.submit(image_data)
     add_frequency.submit(image_header)
What would be the normal way of implementing something like this, and how would this interact in the prefect flow environment?
1
m
well, do you mean something like this:
Copy code
class User(NamedTuple):
    name: str
    email: str

@task
def get_users() -> List[User]:
    # ...
    return [
        User(
            name="test_user",
            email=String.load("test-email").value,
        )
    ]

@flow(retries=3, retry_delay_seconds=30)
def send_newsletters(data: Dict[str, Any], region_code: str) -> None:
    # ...
    users: List[User] = get_users()
... this works for me 👍
j
Hey @Tim Galvin — I love this idea and it’s been on my wishlist for a long time. We’re making some concrete progress toward it now, hopefully we’ll have something to talk about soon! @mira is right that if you can express the object as the return value of a task for use within the same flow, you can achieve something similar today, but I’m also interested in cases where the data is collected as a side effect, possible across many runs.
t
Hi Mira - something like that, but a little bit more primative. A lot of the values I would be collecting are derived from existing tasks that are already executed. I didn't express this part too well, but if I have to pass a single object throughout all tasks to capture these values, some / a lot of the concurrency that prefect offers would be lost. I was curious whether something like a context manager could help keep me sane here. Hi @Jeremiah - I am glad I am not too crazy 😄 Awesome to hear that it is on your mind as well. If I can help at all please let me know !
m
Yeah, that would be a cool feature 👍