Hi! I’m trying to map over 1k urls (that will even...
# ask-community
d
Hi! I’m trying to map over 1k urls (that will eventually be 1 million) but a simple task takes too much time.
Copy code
@task()
    def transform_url(url):
        try:
            if '?' in url:
                return url[:url.find('?')]
        except Exception:
            pass
        return url

    urls = #list with 1k urls

    with Flow("flow") as flow:
        transformed_urls = transform_url.map(urls)

    import time
    start = time.time()
    flow.run()
    print(time.time() - start)
Execution time: 31.85s Is there a way to reduce the overhead of a task?
a
@Daniel Siles Where is the overhead in your flow coming from - from having too many child tasks? There is some minimal overhead in a task as compared to a normal function execution because the task needs to communicate its states to the API, but it should be rather small. I see you use mapping, but you don’t have any Dask executor attached - perhaps adding LocalDaskExecutor can help you parallelize work to make it run faster?
Copy code
with Flow("flow", executor=LocalDaskExecutor()) as flow:
Not sure if this is related, but I’ve seen one person from the community split a flow of 8k child tasks into 8 flow runs (each running 1000 child tasks in parallel) based on various parameter values (for you it would be URLs):
Copy code
from prefect import Flow, task, unmapped
from prefect.tasks.prefect import create_flow_run
from prefect.executors import LocalDaskExecutor


@task
def generate_thousand_numbers(start, stop, step):
    nrs = range(start, stop, step)
    return list(nrs)


with Flow("mapped_flows", executor=LocalDaskExecutor()) as flow:
    flow_run_1 = generate_thousand_numbers(1, 1000, 1)
    flow_run_2 = generate_thousand_numbers(1000, 2000, 1)
    flow_run_3 = generate_thousand_numbers(2000, 3000, 1)
    flow_run_4 = generate_thousand_numbers(3000, 4000, 1)
    flow_run_5 = generate_thousand_numbers(4000, 5000, 1)
    # ... until 8
    parameters = [
        dict(list_of_numbers=flow_run_1),
        dict(list_of_numbers=flow_run_2),
        dict(list_of_numbers=flow_run_3),
        dict(list_of_numbers=flow_run_4),
        dict(list_of_numbers=flow_run_5),
        # ... until 8
    ]
    mapped_flows = create_flow_run.map(
        parameters=parameters,
        flow_name=unmapped("dummy-child-flow"),
        project_name=unmapped("community"),
    )