Jean
09/28/2022, 11:10 PM23:04:54.057 | INFO | prefect.engine - Created flow run 'optimal-snail' for flow 'test1'
23:04:54.057 | INFO | prefect.task_runner.dask - Creating a new Dask cluster with `distributed.deploy.local.LocalCluster`
23:04:55.980 | INFO | prefect.task_runner.dask - The Dask dashboard is available at <http://127.0.0.1:8787/status>
23:05:00.371 | INFO | Flow run 'optimal-snail' - Created task run 'Execute values of the query-8165e3c8-0' for task 'Execute values of the query'
23:05:00.372 | INFO | Flow run 'optimal-snail' - Executing 'Execute values of the query-8165e3c8-0' immediately...
23:05:24.612 | INFO | Task run 'Execute values of the query-8165e3c8-0' - Finished in state Completed()
23:05:26.370 | INFO | Flow run 'optimal-snail' - Created task run 'Execute values of the query-8165e3c8-1' for task 'Execute values of the query'
23:05:26.370 | INFO | Flow run 'optimal-snail' - Executing 'Execute values of the query-8165e3c8-1' immediately...
23:05:42.533 | INFO | Task run 'Execute values of the query-8165e3c8-1' - Finished in state Completed()
23:05:44.295 | INFO | Flow run 'optimal-snail' - Created task run 'Execute values of the query-8165e3c8-2' for task 'Execute values of the query'
23:05:44.296 | INFO | Flow run 'optimal-snail' - Executing 'Execute values of the query-8165e3c8-2' immediately...
23:06:09.538 | INFO | Task run 'Execute values of the query-8165e3c8-2' - Finished in state Completed()
As you see in these logs it’s not really spawning more tasks. I was under the impression that the call to a function with the @task
decorator would be non-blocking if made within a flow @flow
which uses DaskTaskRunnerZanie
.submit
for those task calls to run them in the backgroundJean
09/28/2022, 11:18 PMJean
09/28/2022, 11:19 PMJean
09/28/2022, 11:20 PMZanie
Jean
09/28/2022, 11:41 PMJean
09/28/2022, 11:43 PMZanie
Zanie
Jean
09/28/2022, 11:48 PMJean
09/28/2022, 11:53 PMJean
09/28/2022, 11:54 PM23:52:10.536 | INFO | Flow run 'obedient-seal' - Created task run 'Execute values of the query-8165e3c8-0' for task 'Execute values of the query'
/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/distributed/worker.py:2936: UserWarning: Large object of size 2.75 MiB detected in task graph:
{'task': <prefect.tasks.Task object at 0x7fb7a4f1a ... ENABLED=True))}
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and
keep data on workers
future = client.submit(func, big_data) # bad
big_future = client.scatter(big_data) # good
future = client.submit(func, big_future) # good
warnings.warn(
23:52:11.735 | INFO | Flow run 'obedient-seal' - Submitted task run 'Execute values of the query-8165e3c8-0' for execution.
23:52:15.432 | INFO | Flow run 'obedient-seal' - Created task run 'Execute values of the query-8165e3c8-1' for task 'Execute values of the query'
23:52:16.169 | INFO | Flow run 'obedient-seal' - Submitted task run 'Execute values of the query-8165e3c8-1' for execution.
23:52:21.062 | INFO | Flow run 'obedient-seal' - Created task run 'Execute values of the query-8165e3c8-2' for task 'Execute values of the query'
23:52:22.476 | INFO | Flow run 'obedient-seal' - Submitted task run 'Execute values of the query-8165e3c8-2' for execution.
23:52:24.879 | INFO | Flow run 'obedient-seal' - Created task run 'Execute values of the query-8165e3c8-4' for task 'Execute values of the query'
23:52:26.272 | INFO | Flow run 'obedient-seal' - Submitted task run 'Execute values of the query-8165e3c8-4' for execution.
23:52:27.541 | ERROR | prefect.orion - Encountered exception in request:
Traceback (most recent call last):
File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/starlette/middleware/errors.py", line 162, in __call__
await <http://self.app|self.app>(scope, receive, _send)
File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 75, in __call__
raise exc
File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 64, in __call__
await <http://self.app|self.app>(scope, receive, sender)
File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
raise e
File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
await <http://self.app|self.app>(scope, receive, send)
File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/starlette/routing.py", line 680, in __call__
await route.handle(scope, receive, send)
File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/starlette/routing.py", line 275, in handle
await <http://self.app|self.app>(scope, receive, send)
File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/starlette/routing.py", line 65, in app
response = await func(request)
File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/prefect/orion/utilities/server.py", line 101, in handle_response_scoped_depends
response = await default_handler(request)
File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/fastapi/routing.py", line 231, in app
raw_response = await run_endpoint_function(
File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/fastapi/routing.py", line 160, in run_endpoint_function
return await dependant.call(**values)
File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/prefect/orion/api/task_runs.py", line 49, in create_task_run
model = await models.task_runs.create_task_run(
File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/prefect/orion/database/dependencies.py", line 117, in async_wrapper
return await fn(*args, **kwargs)
File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/prefect/orion/models/task_runs.py", line 61, in create_task_run
await session.execute(insert_stmt)
File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/ext/asyncio/session.py", line 215, in execute
result = await greenlet_spawn(
File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 126, in greenlet_spawn
result = context.throw(*sys.exc_info())
File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1711, in execute
conn = self._connection_for_bind(bind)
File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1552, in _connection_for_bind
return self._transaction._connection_for_bind(
File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 747, in _connection_for_bind
conn = bind.connect()
File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/future/engine.py", line 406, in connect
return super(Engine, self).connect()
File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 3315, in connect
return self._connection_cls(self, close_with_result=close_with_result)
File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 96, in __init__
else engine.raw_connection()
File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 3394, in raw_connection
return self._wrap_pool_connect(self.pool.connect, _connection)
File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 3361, in _wrap_pool_connect
return fn()
File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 320, in connect
return _ConnectionFairy._checkout(self)
File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 884, in _checkout
fairy = _ConnectionRecord.checkout(pool)
File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 486, in checkout
rec = pool._do_get()
File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/pool/impl.py", line 145, in _do_get
with util.safe_reraise():
File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
compat.raise_(
File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 208, in raise_
raise exception
File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/pool/impl.py", line 143, in _do_get
return self._create_connection()
File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 266, in _create_connection
return _ConnectionRecord(self)
File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 381, in __init__
self.__connect()
File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 691, in __connect
)._exec_w_sync_on_first_run(self.dbapi_connection, self)
File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/event/attr.py", line 326, in _exec_w_sync_on_first_run
self(*args, **kw)
File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/event/attr.py", line 334, in __call__
fn(*args, **kw)
File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/engine/create.py", line 658, in on_connect
do_on_connect(dbapi_connection)
File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 1109, in connect
conn.await_(self.setup_asyncpg_jsonb_codec(conn))
File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 68, in await_only
return current.driver.switch(awaitable)
File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 121, in greenlet_spawn
value = await result
File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 1086, in setup_asyncpg_jsonb_codec
await asyncpg_connection.set_type_codec(
File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/asyncpg/connection.py", line 1222, in set_type_codec
typeinfo = await self._introspect_type(typename, schema)
File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/asyncpg/connection.py", line 473, in _introspect_type
rows = await self._execute(
File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/asyncpg/connection.py", line 1659, in _execute
result, _ = await self.__execute(
File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/asyncpg/connection.py", line 1684, in __execute
return await self._do_execute(
File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/asyncpg/connection.py", line 1734, in _do_execute
result = await executor(stmt, timeout)
File "asyncpg/protocol/protocol.pyx", line 182, in bind_execute
File "asyncpg/protocol/protocol.pyx", line 696, in asyncpg.protocol.protocol.BaseProtocol._get_timeout_impl
asyncio.exceptions.TimeoutError
2022-09-28 23:52:29,034 - distributed.client - ERROR -
Traceback (most recent call last):
File "/home/ubuntu/.cache/pypoetry/virtualenvs/fDAH7s3U-py3.10/lib/python3.10/site-packages/distributed/client.py", line 1579, in _close
await self.scheduler_comm.close()
asyncio.exceptions.CancelledError
23:52:29.314 | ERROR | Flow run 'obedient-seal' - Crash detected! Execution was interrupted by an unexpected exception.
Zanie
Zanie
Jean
09/28/2022, 11:59 PMZanie
prefect orion start
Jean
09/29/2022, 12:00 AMZanie
PREFECT_API_URL
before running the flow? I can see a call directly to the database in your flow run’s traceback which indicates it’s not making requests to your separate server.Jean
09/29/2022, 12:01 AMJean
09/29/2022, 12:01 AMJean
09/29/2022, 12:02 AMZanie
PREFECT_API_URL
auto-detect a locally running API to improve this kind of ux. Otherwise, what’s happening here is the run and your standalone server are sharing a database so it looks like it’s working but really each task run is standing up a copy of the entire Orion API in memory and it connects to the database directly.Zanie
PREFECT_API_URL
to have the task runs send API requests to your centralized server which then holds a single (roughly) connection to the database.Jean
09/29/2022, 12:04 AMprefect config set PREFECT_API_URL=<http://127.0.0.1:4200/api>
Jean
09/29/2022, 12:04 AMZanie
Jean
09/29/2022, 12:05 AMZanie
prefect orion start
starts the Orion API and connects to the database. The UI is hosted as well and asks the API for data to display.Zanie
Zanie
Jean
09/29/2022, 12:07 AMJean
09/29/2022, 12:07 AMZanie
Zanie
Jean
09/29/2022, 12:08 AMZanie
PREFECT_DEBUG_MODE=1
we’ll get the traceback for the crash.Jean
09/29/2022, 12:08 AMJean
09/29/2022, 12:09 AMJean
09/29/2022, 12:09 AMZanie
Zanie
Jean
09/29/2022, 12:10 AMJean
09/29/2022, 12:11 AMZanie
httpx.ReadError
s when running tasks at scale and we’re putting together a team to fix them.Zanie
Zanie
Jean
09/29/2022, 1:19 AM#!/usr/bin/env python3
from random import choice
from typing import Any, Union
import psycopg2
from psycopg2.extras import execute_values
from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner
redshift_host = ""
redshift_database = ""
redshift_user = ""
redshift_pass = ""
def _connect_to_redshift(host: str = redshift_host, db: str = redshift_database, user: str = redshift_user, pw: str = redshift_pass):
conn = psycopg2.connect(host=host, database=db, user=user, password=pw, port=5439)
return conn
@task(name="Execute values of the query", retries=5, retry_delay_seconds=10)
def _batch_insert_into_redshift(batch: list[dict[str, Union[str, list[Any]]]]):
conn = _connect_to_redshift()
cursor = conn.cursor()
execute_values(
cursor,
f"INSERT INTO {batch['table']} ({', '.join(batch['fields'])}) VALUES %s",
batch["values"],
)
conn.commit()
conn.close()
@flow(name="Sending batch of data to Redshift", task_runner=DaskTaskRunner())
def batch_insert_records():
batch_size = 3_500
total_b = 1_918_573
b_fields = [i for i in range(20)]
b_possible_values = [100, "12312312-sdsdsds-12312321-sssss", "2022-05-03T04:02:02"]
b = 0
while b < total_b:
chunk = batch_size
if b + batch_size > total_b:
chunk = total_b - b
b += chunk
b_batch: dict[str, Union[str, list[Any]]] = {
"table": "b",
"fields": [field for field in b_fields],
"values": [],
}
for _ in range(chunk):
b_batch["values"].append([choice(b_possible_values) for _ in range(20)])
_batch_insert_into_redshift.submit(b_batch)
if __name__ == "__main__":
batch_insert_records()
Jean
09/29/2022, 1:21 AMJean
09/29/2022, 1:33 AMAnna Geller
ConcurrentTaskRunner
instead of DaskTaskRunner
? You may end up getting better performance and fixing this issue at the same time.Zanie
Jean
09/29/2022, 2:50 PMJean
09/29/2022, 2:52 PMJean
09/29/2022, 2:56 PMConcurrentTaskRunner
actually is?Jean
09/29/2022, 2:58 PMJean
09/29/2022, 2:58 PMZanie
And how concurrent theIt uses threading instead of multiprocessing.actually is?ConcurrentTaskRunner
Zanie
Zanie
Jean
09/29/2022, 3:05 PMJean
09/29/2022, 3:05 PMZanie
Zanie
Jean
09/30/2022, 10:22 PM