Daniel
05/29/2024, 11:41 PMMarvin
05/29/2024, 11:41 PMpython
import time
from prefect import flow, task
from pydantic import BaseModel, Field
class Data(BaseModel):
key: str
value: dict = Field(default_factory=dict)
@task
def fetch_data_for_key(key: str) -> Data:
"""A task that simulates fetching data from some source"""
return Data(key=key, value={"data": "some data"})
@task
def process_data(data: Data) -> Data:
"""A task that simulates processing data"""
print(f"Processing {data!r} (I swear)")
time.sleep(3)
return data
@flow
def work_i_need_to_do_sometimes(keys: list[str]):
"""Work you might need to do ever so often"""
raw_data_futures = fetch_data_for_key.map(keys)
process_data.map(raw_data_futures)
if __name__ == "__main__":
work_i_need_to_do_sometimes.serve(
name="sporradic-deployment",
parameters={"keys": ["foo", "bar", "baz"]},
)
This code utilizes fetch_data_for_key
and process_data
tasks that can be adapted for incremental data fetch based on specific keys. You can customize it to implement your incremental fetching and processing logic.
For more detailed information, please also refer to the Prefect Automations documentation.
Feel free to integrate this example into your specific use case and adapt it according to your requirements.