Issues when using DaskTaskRunner at scale - runnin...
# ask-community
j
Hey guys, I’m running a flow with a DaskTaskRunner that spawns a task that takes around 20seconds and apparently it’s running sequentially?! My machine has 16 threads and I see in the UI each task only being run after another one finishes. Any inputs?
Copy code
23:04:54.057 | INFO    | prefect.engine - Created flow run 'optimal-snail' for flow 'test1'
23:04:54.057 | INFO    | prefect.task_runner.dask - Creating a new Dask cluster with `distributed.deploy.local.LocalCluster`
23:04:55.980 | INFO    | prefect.task_runner.dask - The Dask dashboard is available at <http://127.0.0.1:8787/status>
23:05:00.371 | INFO    | Flow run 'optimal-snail' - Created task run 'Execute values of the query-8165e3c8-0' for task 'Execute values of the query'
23:05:00.372 | INFO    | Flow run 'optimal-snail' - Executing 'Execute values of the query-8165e3c8-0' immediately...
23:05:24.612 | INFO    | Task run 'Execute values of the query-8165e3c8-0' - Finished in state Completed()
23:05:26.370 | INFO    | Flow run 'optimal-snail' - Created task run 'Execute values of the query-8165e3c8-1' for task 'Execute values of the query'
23:05:26.370 | INFO    | Flow run 'optimal-snail' - Executing 'Execute values of the query-8165e3c8-1' immediately...
23:05:42.533 | INFO    | Task run 'Execute values of the query-8165e3c8-1' - Finished in state Completed()
23:05:44.295 | INFO    | Flow run 'optimal-snail' - Created task run 'Execute values of the query-8165e3c8-2' for task 'Execute values of the query'
23:05:44.296 | INFO    | Flow run 'optimal-snail' - Executing 'Execute values of the query-8165e3c8-2' immediately...
23:06:09.538 | INFO    | Task run 'Execute values of the query-8165e3c8-2' - Finished in state Completed()
As you see in these logs it’s not really spawning more tasks. I was under the impression that the call to a function with the
@task
decorator would be non-blocking if made within a flow
@flow
which uses DaskTaskRunner
1
z
Hey Jean 👋 you’ll want to use
.submit
for those task calls to run them in the background
🙏 1
☝️ 1
j
Thanks @Zanie that’s working, got other issues now tho
Apparently the amount of data I’m sending to each task is too much, i.e., a list of 10k records (20 values)
Getting a bunch of errors from sqlite, ValueError: no active connection, sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) database is locked
z
If you’re performing a bunch of concurrency, we recommend switching to Postgres as a backing database.
j
I’m getting sqlalchemy.exc.InvalidRequestError: The asyncio extension requires an async driver to be used. The loaded ‘psycopg2’ is not async. @Zanie when I try to use POSTGRES
I have both asyncpg and psycopg2 installed
z
There’s an issue with the rendering of settings in our documentation reference right now, sounds like your connection string is wrong: https://github.com/PrefectHQ/prefect/blob/main/src/prefect/settings.py#L438-L456
j
That did the trick thanks!
So Dask is complaining to use Scatter and workers are crashing
Copy code
23:52:10.536 | INFO    | Flow run 'obedient-seal' - Created task run 'Execute values of the query-8165e3c8-0' for task 'Execute values of the query'
/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/distributed/worker.py:2936: UserWarning: Large object of size 2.75 MiB detected in task graph: 
  {'task': <prefect.tasks.Task object at 0x7fb7a4f1a ... ENABLED=True))}
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and 
keep data on workers

    future = client.submit(func, big_data)    # bad

    big_future = client.scatter(big_data)     # good
    future = client.submit(func, big_future)  # good
  warnings.warn(
23:52:11.735 | INFO    | Flow run 'obedient-seal' - Submitted task run 'Execute values of the query-8165e3c8-0' for execution.
23:52:15.432 | INFO    | Flow run 'obedient-seal' - Created task run 'Execute values of the query-8165e3c8-1' for task 'Execute values of the query'
23:52:16.169 | INFO    | Flow run 'obedient-seal' - Submitted task run 'Execute values of the query-8165e3c8-1' for execution.
23:52:21.062 | INFO    | Flow run 'obedient-seal' - Created task run 'Execute values of the query-8165e3c8-2' for task 'Execute values of the query'
23:52:22.476 | INFO    | Flow run 'obedient-seal' - Submitted task run 'Execute values of the query-8165e3c8-2' for execution.
23:52:24.879 | INFO    | Flow run 'obedient-seal' - Created task run 'Execute values of the query-8165e3c8-4' for task 'Execute values of the query'
23:52:26.272 | INFO    | Flow run 'obedient-seal' - Submitted task run 'Execute values of the query-8165e3c8-4' for execution.
23:52:27.541 | ERROR   | prefect.orion - Encountered exception in request:
Traceback (most recent call last):
  File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/starlette/middleware/errors.py", line 162, in __call__
    await <http://self.app|self.app>(scope, receive, _send)
  File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 75, in __call__
    raise exc
  File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 64, in __call__
    await <http://self.app|self.app>(scope, receive, sender)
  File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
    raise e
  File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
    await <http://self.app|self.app>(scope, receive, send)
  File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/starlette/routing.py", line 680, in __call__
    await route.handle(scope, receive, send)
  File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/starlette/routing.py", line 275, in handle
    await <http://self.app|self.app>(scope, receive, send)
  File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/starlette/routing.py", line 65, in app
    response = await func(request)
  File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/prefect/orion/utilities/server.py", line 101, in handle_response_scoped_depends
    response = await default_handler(request)
  File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/fastapi/routing.py", line 231, in app
    raw_response = await run_endpoint_function(
  File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/fastapi/routing.py", line 160, in run_endpoint_function
    return await dependant.call(**values)
  File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/prefect/orion/api/task_runs.py", line 49, in create_task_run
    model = await models.task_runs.create_task_run(
  File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/prefect/orion/database/dependencies.py", line 117, in async_wrapper
    return await fn(*args, **kwargs)
  File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/prefect/orion/models/task_runs.py", line 61, in create_task_run
    await session.execute(insert_stmt)
  File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/ext/asyncio/session.py", line 215, in execute
    result = await greenlet_spawn(
  File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 126, in greenlet_spawn
    result = context.throw(*sys.exc_info())
  File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1711, in execute
    conn = self._connection_for_bind(bind)
  File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1552, in _connection_for_bind
    return self._transaction._connection_for_bind(
  File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 747, in _connection_for_bind
    conn = bind.connect()
  File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/future/engine.py", line 406, in connect
    return super(Engine, self).connect()
  File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 3315, in connect
    return self._connection_cls(self, close_with_result=close_with_result)
  File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 96, in __init__
    else engine.raw_connection()
  File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 3394, in raw_connection
    return self._wrap_pool_connect(self.pool.connect, _connection)
  File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 3361, in _wrap_pool_connect
    return fn()
  File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 320, in connect
    return _ConnectionFairy._checkout(self)
  File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 884, in _checkout
    fairy = _ConnectionRecord.checkout(pool)
  File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 486, in checkout
    rec = pool._do_get()
  File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/pool/impl.py", line 145, in _do_get
    with util.safe_reraise():
  File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    compat.raise_(
  File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 208, in raise_
    raise exception
  File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/pool/impl.py", line 143, in _do_get
    return self._create_connection()
  File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 266, in _create_connection
    return _ConnectionRecord(self)
  File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 381, in __init__
    self.__connect()
  File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 691, in __connect
    )._exec_w_sync_on_first_run(self.dbapi_connection, self)
  File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/event/attr.py", line 326, in _exec_w_sync_on_first_run
    self(*args, **kw)
  File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/event/attr.py", line 334, in __call__
    fn(*args, **kw)
  File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/create.py", line 658, in on_connect
    do_on_connect(dbapi_connection)
  File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 1109, in connect
    conn.await_(self.setup_asyncpg_jsonb_codec(conn))
  File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 68, in await_only
    return current.driver.switch(awaitable)
  File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 121, in greenlet_spawn
    value = await result
  File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 1086, in setup_asyncpg_jsonb_codec
    await asyncpg_connection.set_type_codec(
  File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/asyncpg/connection.py", line 1222, in set_type_codec
    typeinfo = await self._introspect_type(typename, schema)
  File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/asyncpg/connection.py", line 473, in _introspect_type
    rows = await self._execute(
  File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/asyncpg/connection.py", line 1659, in _execute
    result, _ = await self.__execute(
  File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/asyncpg/connection.py", line 1684, in __execute
    return await self._do_execute(
  File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/asyncpg/connection.py", line 1734, in _do_execute
    result = await executor(stmt, timeout)
  File "asyncpg/protocol/protocol.pyx", line 182, in bind_execute
  File "asyncpg/protocol/protocol.pyx", line 696, in asyncpg.protocol.protocol.BaseProtocol._get_timeout_impl
asyncio.exceptions.TimeoutError
2022-09-28 23:52:29,034 - distributed.client - ERROR - 
Traceback (most recent call last):
  File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/distributed/client.py", line 1579, in _close
    await self.scheduler_comm.close()
asyncio.exceptions.CancelledError
23:52:29.314 | ERROR   | Flow run 'obedient-seal' - Crash detected! Execution was interrupted by an unexpected exception.
z
I don’t have a great way to help with that warning yet, you may need to manually scatter large data types to the Dask cluster which would require something like https://github.com/PrefectHQ/prefect-dask/pull/33
The failure here though is due to the database running out of available connections
j
Which database? Postgres? The one Orion is using?
z
Yeah it looks like it. Basically, you’re beyond a simple flow and you need to run your server separately with
prefect orion start
j
I’m running my server with prefect orion start
z
Did you set
PREFECT_API_URL
before running the flow? I can see a call directly to the database in your flow run’s traceback which indicates it’s not making requests to your separate server.
j
It’s not even a webservice, just a screep with a flow that iterates a 1m list and each iteration sends a task of 20s
Hmmmm let me check
Yeah I didnt set prefect_api_url in my script but I can see the flows/tasks running in localhost:4200
z
I’m tempted to make
PREFECT_API_URL
auto-detect a locally running API to improve this kind of ux. Otherwise, what’s happening here is the run and your standalone server are sharing a database so it looks like it’s working but really each task run is standing up a copy of the entire Orion API in memory and it connects to the database directly.
🙏 1
❤️ 1
🙏 1
You’ll need to set
PREFECT_API_URL
to have the task runs send API requests to your centralized server which then holds a single (roughly) connection to the database.
j
ok just did
prefect config set PREFECT_API_URL=<http://127.0.0.1:4200/api>
That should be it or should I do something else in the python script?
z
That should do it
j
It’s very confusing, since I can see the flows/tasks inside the UI but it’s not connected to it? Not sure I understand
z
Let’s see here..
prefect orion start
starts the Orion API and connects to the database. The UI is hosted as well and asks the API for data to display.
When you run a flow without an API URL set. It starts the Orion API in memory which connects to the same database as your standalone server.
The UI asks the standalone server for data and it reads the flow run from database.
j
Ah I see okay
Well not it has run a bit but still failing at one point
z
However, because your flow has tasks that are happening on Dask workers, we end up making a bunch of copies of the Orion API which each requires a connection to the database, etc.
What’s it fail with now? 😞
j
z
If you set
PREFECT_DEBUG_MODE=1
we’ll get the traceback for the crash.
j
RuntimeError: The connection pool was closed while 2 HTTP requests/responses were still in-flight. 2022-09-29 000706,316 - distributed.process - WARNING - [<AsyncProcess Dask Worker process (from Nanny)>] process 9283 exit status was already read will report exitcode 255 sys1 RuntimeWarning: coroutine ‘get_task_call_return_value’ was never awaited
raise mapped_exc(message) from exc httpx.ReadError
File “/home/ubuntu/.cache/pypoetry/virtualenvs/data-fDAH7s3U-py3.10/lib/python3.10/site-packages/distributed/comm/tcp.py”, line 225, in read frames_nbytes = await stream.read_bytes(fmt_size) asyncio.exceptions.CancelledError
z
Ah okay now we’ve reached a real bug
It looks like Dask may have crashed. I’d need the full traceback to tell though, since once the flow crashes we shut everything down.
j
out.txt
I mean, maybe I’m sending too much data? If I do it manually it works fine or if I reduce the bach size (currently at 7500, at 2500 it works fine I think)
z
I’ve got to head out for the night. Thanks for working through this with me though! We’re aware of these
httpx.ReadError
s when running tasks at scale and we’re putting together a team to fix them.
🙏 1
If you can put together a minimal example that’d be really helpful?
How many task runs are you running? The data is being passed as a parameter to your tasks? Are you passing large amounts of data elsewhere?
j
@Zanie
Copy code
#!/usr/bin/env python3
from random import choice
from typing import Any, Union

import psycopg2
from psycopg2.extras import execute_values
from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner

redshift_host = ""
redshift_database = ""
redshift_user = ""
redshift_pass = ""

def _connect_to_redshift(host: str = redshift_host, db: str = redshift_database, user: str = redshift_user, pw: str = redshift_pass):
    conn = psycopg2.connect(host=host, database=db, user=user, password=pw, port=5439)
    return conn

@task(name="Execute values of the query", retries=5, retry_delay_seconds=10)
def _batch_insert_into_redshift(batch: list[dict[str, Union[str, list[Any]]]]):
    conn = _connect_to_redshift()
    cursor = conn.cursor()
    execute_values(
        cursor,
        f"INSERT INTO {batch['table']} ({', '.join(batch['fields'])}) VALUES %s",
        batch["values"],
    )
    conn.commit()
    conn.close()

@flow(name="Sending batch of data to Redshift", task_runner=DaskTaskRunner())
def batch_insert_records():
    batch_size = 3_500
    total_b = 1_918_573
    b_fields = [i for i in range(20)]
    b_possible_values = [100, "12312312-sdsdsds-12312321-sssss", "2022-05-03T04:02:02"]

    b = 0
    while b < total_b:
        chunk = batch_size
        if b + batch_size > total_b:
            chunk = total_b - b

        b += chunk
        b_batch: dict[str, Union[str, list[Any]]] = {
            "table": "b",
            "fields": [field for field in b_fields],
            "values": [],
        }
        for _ in range(chunk):
            b_batch["values"].append([choice(b_possible_values) for _ in range(20)])

        _batch_insert_into_redshift.submit(b_batch)

if __name__ == "__main__":
    batch_insert_records()
How many task runs are you running? In the example above it would be 549 (1_918_573 / 3_500) The data is being passed as a parameter to your tasks? Yes Are you passing large amounts of data elsewhere? I don’t think so, only to the task inside the flow
a
@Jean, since your flow is doing a bunch of IO operations, have you tried using
ConcurrentTaskRunner
instead of
DaskTaskRunner
? You may end up getting better performance and fixing this issue at the same time.
z
Thanks for the additional details! Anna has a good point that all we can run the tasks concurrently with threads. All this data will be passed in memory and we’ll also do better job sharing clients between tasks. You may see an improvement in performance there.
j
@Anna Geller not crashing anymore but performance is not better at all. Tasks where being completed before crashing with dask (around 130 tasks mark) it’s been 1 hour and only like 10 tasks have finished
I’ll try running again… each task supposed to finish in at most 30sec
And how concurrent the
ConcurrentTaskRunner
actually is?
Yeah it’s just getting stuck again somewhere @Zanie
z
And how concurrent the
ConcurrentTaskRunner
actually is?
It uses threading instead of multiprocessing.
This actually might be affected by https://github.com/PrefectHQ/prefect/pull/6948
(Which I believe we will be releasing today)
j
Ah okay let me know and I will test
As of now none of the approaches are working for me 😕 (Concurrent/Dask)
z
We have released that change now
🙏 2
Let me know if it works?
j
Sure
292 Views