https://prefect.io logo
Title
m

Michał

12/29/2022, 7:20 PM
How to achive starting 201 subflows for ETL? Something like mapped flows. Are there good practices for this? I don't want to pack extact and load into single tasks, and map pages.
from prefect import flow 

@task
def get_pages() -> List[int]:
	# Get pages for subflows
	...

@task
def extract(page: int) -> List[Dict[str, Any]]:
	...

@task
def load(records: List[Dict[str, Any]])
	...

@flow 
def subflow_etl(page: int): 
	"""
	Extracts data from single page, and inserts it into DB
	"""
	data = extract(page)
	load(data)
	
@flow 
def main_flow(): 
	pages = get_pages() # returns [1, 2, 3, 4, ..., 200, 201]
	# How now starts 201 subflows_etl?
	# I have several agents which would like to consume subflow_etl
	# And process it on several machines.
	# Psuedo code
	for page in pages:
		subflow_etl(page)
	
	
if __name__ == "__main__": 
	main_flow()
1
n

Nate

12/29/2022, 7:43 PM
Hi @Michał One way you could go about this is: • create 1 deployment for your worker (
subflow_etl
) and another for your orchestrator / parent (
main_flow
) • create a task e.g.
submit_subflow
in
main_flow
that triggers runs of your worker deployment (using
run_deployment
utility from
prefect.deployments
) •
submit_subflow.map(pages)
(or make a chunked iterable out of your
pages
and submit a chunk of pages to each subflow like this) this setup would allow you to control the infrastructure / resources for your worker and orchestrator flows independently, and if failures occur in a given worker, you can prevent that from tanking your entire orchestrator run (if desired)
🙌 1
m

Michał

12/29/2022, 7:49 PM
Thanks! That's what is was looking for!
👍 1