Daniel Siles
11/01/2021, 9:15 AM@task()
def transform_url(url):
try:
if '?' in url:
return url[:url.find('?')]
except Exception:
pass
return url
urls = #list with 1k urls
with Flow("flow") as flow:
transformed_urls = transform_url.map(urls)
import time
start = time.time()
flow.run()
print(time.time() - start)
Execution time: 31.85s
Is there a way to reduce the overhead of a task?Anna Geller
with Flow("flow", executor=LocalDaskExecutor()) as flow:
Not sure if this is related, but I’ve seen one person from the community split a flow of 8k child tasks into 8 flow runs (each running 1000 child tasks in parallel) based on various parameter values (for you it would be URLs):
from prefect import Flow, task, unmapped
from prefect.tasks.prefect import create_flow_run
from prefect.executors import LocalDaskExecutor
@task
def generate_thousand_numbers(start, stop, step):
nrs = range(start, stop, step)
return list(nrs)
with Flow("mapped_flows", executor=LocalDaskExecutor()) as flow:
flow_run_1 = generate_thousand_numbers(1, 1000, 1)
flow_run_2 = generate_thousand_numbers(1000, 2000, 1)
flow_run_3 = generate_thousand_numbers(2000, 3000, 1)
flow_run_4 = generate_thousand_numbers(3000, 4000, 1)
flow_run_5 = generate_thousand_numbers(4000, 5000, 1)
# ... until 8
parameters = [
dict(list_of_numbers=flow_run_1),
dict(list_of_numbers=flow_run_2),
dict(list_of_numbers=flow_run_3),
dict(list_of_numbers=flow_run_4),
dict(list_of_numbers=flow_run_5),
# ... until 8
]
mapped_flows = create_flow_run.map(
parameters=parameters,
flow_name=unmapped("dummy-child-flow"),
project_name=unmapped("community"),
)