I have a basic question on how to handle submittin...
# prefect-getting-started
p
I have a basic question on how to handle submitting large quantities of tasks to ACI. I have a limited number of instances, say 100 and a very large number of tasks to assign to them (e.g. 10'000). Every task should get its own instance, so I'm deploying a worker flow first, and then use run_deployment to deploy it on its own instance. I tested that on 100s of tasks and 10s of instances and seems to work fine, but when I try to scale it up I'm running into issues. When submitting large number of tasks I noticed that only some actually get submitted before the flow gives up and either hangs or starts with just the subset of takss. • When submitting 10'000 tasks only ~1'000 (randomly more or less) actually show up in Task Runs tab. The submit_task flow either hangs during task submission or starts running with fewer tasks as normal. • I tried adding rate_limit and it doesn't solve the issue, even fewer tasks get submitted. With rate limit on I got a lot of "too many open files" errors from asyncio and "Crash detected! Execution was cancelled by the runtime environment.". There are also secondary issues related to UI: • UI shows all the tasks when they get submitted, even though I can only run e.g. 100 at a time. This generates a lot of clutter. • When the deployment starts, UI assigns it to the submit_task flow (which is the parent flow) and shows it parallel to the run_deployment_task flow. The run_deployment_task effectively has no "children" and no ties to the deployment. This clutters the UI even more. I guess this could be avoided by defining it as a flow but then I can't' use retries or .submit on it. My goal is to be able to submit tasks to be executed on 1'000s of instances in parallel, with a large queue of up to 100'000 individual tasks. Another complication is that these 1'000s of instances have to be run across multiple locations, but I'll leave that for later. Here's a minimal example of my implementation so far. Overall I have a feeling that I am not doing it The Prefect Way.
Copy code
from prefect import flow, task
from prefect.concurrency.sync import concurrency
from prefect.deployments import run_deployment
from prefect.states import raise_state_exception


@flow(log_prints=True)
def worker(a: int):
    print(f"Run a worker with a={a}")


# retries for handling infrastructure issues
@task(retries=10)
def run_deployment_task(deployment_name, parameters):
    with concurrency("aci-max-instances"):
        flow_run = run_deployment(name=deployment_name, parameters=parameters)
        raise_state_exception(flow_run.state)


# A flow that submits multiple tasks
@flow
def submit_tasks(n_tasks: int):
    # actual parameters go here
    parameters = [{"a": i} for i in range(n_tasks)]
    for task_parameters in parameters:
        run_deployment_task.submit(
            deployment_name="worker/test-deployment",
            parameters=task_parameters,
        )


if __name__ == "__main__":
    worker.deploy(
        name="test-deployment",
        work_pool_name="aci-pool",
        image="worker:test",
    )
    submit_tasks.deploy(
        name="test-submit-deployment",
        work_pool_name="aci-pool",
        image="submit-tasks:test",
    )
    run_deployment(
        "submit-tasks/test-submit-deployment",
        parameters={"n_tasks": 10_000},
    )
I should add that we are using a self-hosted server.
Should add some sort of batching of task submissions? Or rework the approach entirely?
here's what it looks like with rate limit on, the submission rate seems constant, some tasks start executing in the containers. Then the submission just stops and I see no new containers being created with
az container list
. I looked into the main container running
submit-tasks
flow and there I'm seeing the too many open files error followed by
19:20:44.546 | ERROR   | Task run 'run_deployment_task-272' - Crash detected! Execution was cancelled by the runtime environment.
Copy code
19:20:41.902 | ERROR   | Task run 'run_deployment_task-366' - Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/site-packages/prefect/engine.py", line 2103, in orchestrate_task_run
  File "/usr/local/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
  File "/usr/local/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
  File "/opt/prefect/protpardelle/prefect_pipeline/parallel-test.py", line 15, in run_deployment_task
  File "/usr/local/lib/python3.12/contextlib.py", line 137, in __enter__
  File "/usr/local/lib/python3.12/site-packages/prefect/concurrency/sync.py", line 61, in concurrency
  File "/usr/local/lib/python3.12/site-packages/prefect/concurrency/sync.py", line 103, in _call_async_function_from_sync
  File "/usr/local/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 421, in __call__
  File "/usr/local/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 308, in run
  File "/usr/local/lib/python3.12/asyncio/runners.py", line 193, in run
  File "/usr/local/lib/python3.12/asyncio/runners.py", line 58, in __enter__
  File "/usr/local/lib/python3.12/asyncio/runners.py", line 137, in _lazy_init
  File "/usr/local/lib/python3.12/asyncio/events.py", line 823, in new_event_loop
  File "/usr/local/lib/python3.12/asyncio/events.py", line 720, in new_event_loop
  File "/usr/local/lib/python3.12/asyncio/unix_events.py", line 64, in __init__
  File "/usr/local/lib/python3.12/asyncio/selector_events.py", line 63, in __init__
  File "/usr/local/lib/python3.12/selectors.py", line 349, in __init__
OSError: [Errno 24] Too many open files
Its also weird that I don't see this error in the UI logs, the flow just silently stops processing tasks... I have to manually cancel it and clear all the active slots from the concurrency settings, otherwise no new flows will start.
I can see the errors in the UI logs, I just had to scroll down a few times for them to load into the UI.
Each submitted task that awaits run_deployment execution (at the concurrency bottleneck) requires a file handle. I think I'm starting to get what's going on, there is a rather tight limit on the open file handles in ACI instances, and the main flow that distributes the tasks runs into this limit. There seems to be no way to increase the limit in ACI, so I'll have to rewrite the flow around this limitation.
so I switched to a tag-based task limits
Copy code
# retries for handling infrastructure issues
@task(retries=10, tags=["aci-max-instances"])
def run_deployment_task(deployment_name, parameters):
    flow_run = run_deployment(name=deployment_name, parameters=parameters)
    raise_state_exception(flow_run.state)
and it still fails at 10k tasks but differently. This time I get to almost 10k tasks submitted (in the pending status) but the entire flow still stops working with a crash state, after executing ~50 tasks. I inspected all logs and couldn't find what goes wrong this time. The same flow with 100 tasks total executes just fine (second image).
I had some success scaling up to almost 1k instances but then I started to run into some weird https connection issues
Copy code
Crash detected! Execution was interrupted by an unexpected exception: PrefectHTTPStatusError: Server error '500 Internal Server Error' for url '<https://10.0.1.4:4200/api/task_runs/>'
For more information check: <https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/500>
06:01:37 PM
prefect.flow_runs
Server logs only show some warnings of this kind:
Copy code
ESC[36mprefect-server_1  |ESC[0m 16:00:25.718 | WARNING | prefect.server.services.flowrunnotifications - FlowRunNotifications took 5.473042 seconds to run, which is longer than its loop interval of 4 seconds.
ESC[36mprefect-server_1  |ESC[0m 16:00:34.742 | WARNING | prefect.server.services.marklateruns - MarkLateRuns took 5.147579 seconds to run, which is longer than its loop interval of 5.0 seconds.
ESC[36mprefect-server_1  |ESC[0m 16:00:39.770 | WARNING | prefect.server.services.flowrunnotifications - FlowRunNotifications took 5.938708 seconds to run, which is longer than its loop interval of 4 seconds.
ESC[36mprefect-server_1  |ESC[0m 16:00:44.711 | WARNING | prefect.server.services.failexpiredpauses - FailExpiredPauses took 5.452976 seconds to run, which is longer than its loop interval of 5.0 seconds.
ESC[36mprefect-server_1  |ESC[0m 16:00:55.901 | WARNING | prefect.server.services.marklateruns - MarkLateRuns took 6.098662 seconds to run, which is longer than its loop interval of 5.0 seconds.
ESC[36mprefect-server_1  |ESC[0m 16:00:55.980 | WARNING | prefect.server.services.flowrunnotifications - FlowRunNotifications took 4.162087 seconds to run, which is longer than its loop interval of 4 seconds.
ESC[36mprefect-server_1  |ESC[0m 16:00:57.025 | WARNING | prefect.server.services.failexpiredpauses - FailExpiredPauses took 7.29833 seconds to run, which is longer than its loop interval of 5.0 seconds.
ESC[36mprefect-server_1  |ESC[0m 16:00:57.213 | WARNING | prefect.server.services.recentdeploymentsscheduler - RecentDeploymentsScheduler took 6.463927 seconds to run, which is longer than its loop interval of 5 seconds.
I switched to hosting workers on a VM because ACI instances were running into hard open file limits. My main flow has
Copy code
1,844 Task runs (164 pending, 825 running, 27 failed, 1371 crashed, 7 scheduled)
and
Copy code
550 Flow runs
The flow runs come from
run_deployment
that starts ACI instance with actual work to do. I'm running out of ideas, any help would be appreciated.
I guess the server is overloaded with so many connections, is there any way to either scale the server or decrease the n. of connections?