Michał
12/29/2022, 7:20 PMfrom prefect import flow
@task
def get_pages() -> List[int]:
# Get pages for subflows
...
@task
def extract(page: int) -> List[Dict[str, Any]]:
...
@task
def load(records: List[Dict[str, Any]])
...
@flow
def subflow_etl(page: int):
"""
Extracts data from single page, and inserts it into DB
"""
data = extract(page)
load(data)
@flow
def main_flow():
pages = get_pages() # returns [1, 2, 3, 4, ..., 200, 201]
# How now starts 201 subflows_etl?
# I have several agents which would like to consume subflow_etl
# And process it on several machines.
# Psuedo code
for page in pages:
subflow_etl(page)
if __name__ == "__main__":
main_flow()
Nate
12/29/2022, 7:43 PMsubflow_etl
) and another for your orchestrator / parent (main_flow
)
• create a task e.g. submit_subflow
in main_flow
that triggers runs of your worker deployment (using run_deployment
utility from prefect.deployments
)
• submit_subflow.map(pages)
(or make a chunked iterable out of your pages
and submit a chunk of pages to each subflow like this)
this setup would allow you to control the infrastructure / resources for your worker and orchestrator flows independently, and if failures occur in a given worker, you can prevent that from tanking your entire orchestrator run (if desired)Michał
12/29/2022, 7:49 PM