https://prefect.io logo
#prefect-community
Title
# prefect-community
m

Massimiliano Fanciulli

06/12/2022, 10:21 AM
Hello, i'm stuck in the following issue setting up Prefect + Dask cluster. When the lfow is executed multiple tasks are run on the Dask Worker. When it run i get an exception (see thread)
This happens on a docker based architecture (1 docker for prefect, agent + dask scheduler, other for dask worker only)
I get the error on the worker nodes
If i setup ec3 machines and run the same simple example i have no errors
Any clue?
a

Anna Geller

06/12/2022, 11:16 AM
Could you move code blocks to the thread?
m

Massimiliano Fanciulli

06/12/2022, 11:53 AM
Sure.
this is the deployment file
from prefect.deployments import DeploymentSpec from prefect.orion.schemas.schedules import IntervalSchedule from prefect.flow_runners import SubprocessFlowRunner from prefect.blocks.storage import S3StorageBlock from datetime import timedelta DeploymentSpec( schedule=IntervalSchedule(interval=timedelta(minutes=1)), flow_location="info.py", flow_storage=S3StorageBlock( bucket="newpreproc-test" ), name="Processor" )
and this is info.py
import datetime from datetime import timedelta from prefect.task_runners import DaskTaskRunner from prefect import flow, task @flow(name="GW Score", task_runner=DaskTaskRunner(address="localhost:8786")) def calculate_score(): for i in range(10): calculate_score_group(i) @task() def calculate_score_group(index): print("SCORE FOR INDEX "+index+" IS 100")
a

Anna Geller

06/12/2022, 2:33 PM
I meant moving your code block to this thread instead of having the code and exception tracebacks in your original message. This helps us keep the main channel clean. Could you move that part here? Thanks so much in advance
m

Massimiliano Fanciulli

06/12/2022, 2:36 PM
Exceptionfollows:
101442.021 | ERROR | prefect.orion - Encountered exception in request: Traceback (most recent call last): File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/engine/base.py", line 3280, in _wrap_pool_connect return fn() File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/pool/base.py", line 310, in connect return _ConnectionFairy._checkout(self) File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/pool/base.py", line 868, in _checkout fairy = _ConnectionRecord.checkout(pool) File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/pool/base.py", line 476, in checkout rec = pool._do_get() File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/pool/impl.py", line 256, in _do_get return self._create_connection() File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/pool/base.py", line 256, in _create_connection return _ConnectionRecord(self) File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/pool/base.py", line 371, in init self.__connect() File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/pool/base.py", line 666, in __connect pool.logger.debug("Error on connect(): %s", e) File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/util/langhelpers.py", line 70, in exit compat.raise_( File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/util/compat.py", line 207, in raise_ raise exception File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/pool/base.py", line 661, in __connect self.dbapi_connection = connection = pool._invoke_creator(self) File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/engine/create.py", line 590, in connect return dialect.connect(*cargs, **cparams) File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/engine/default.py", line 597, in connect return self.dbapi.connect(*cargs, **cparams) File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 292, in connect await_only(connection), File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/util/_concurrency_py3k.py", line 68, in await_only return current.driver.switch(awaitable) File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/util/_concurrency_py3k.py", line 121, in greenlet_spawn value = await result File "/usr/local/lib/python3.8/dist-packages/aiosqlite/core.py", line 137, in _connect self._connection = await future File "/usr/local/lib/python3.8/dist-packages/aiosqlite/core.py", line 102, in run result = function() File "/usr/local/lib/python3.8/dist-packages/aiosqlite/core.py", line 397, in connector return sqlite3.connect(loc, **kwargs) sqlite3.OperationalError: unable to open database file The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/usr/local/lib/python3.8/dist-packages/starlette/middleware/errors.py", line 162, in call await self.app(scope, receive, _send) File "/usr/local/lib/python3.8/dist-packages/starlette/exceptions.py", line 93, in call raise exc File "/usr/local/lib/python3.8/dist-packages/starlette/exceptions.py", line 82, in call await self.app(scope, receive, sender) File "/usr/local/lib/python3.8/dist-packages/fastapi/middleware/asyncexitstack.py", line 21, in call raise e File "/usr/local/lib/python3.8/dist-packages/fastapi/middleware/asyncexitstack.py", line 18, in call await self.app(scope, receive, send) File "/usr/local/lib/python3.8/dist-packages/starlette/routing.py", line 670, in call await route.handle(scope, receive, send) File "/usr/local/lib/python3.8/dist-packages/starlette/routing.py", line 266, in handle await self.app(scope, receive, send) File "/usr/local/lib/python3.8/dist-packages/starlette/routing.py", line 65, in app response = await func(request) File "/usr/local/lib/python3.8/dist-packages/prefect/orion/utilities/server.py", line 87, in handle_response_scoped_depends response = await default_handler(request) File "/usr/local/lib/python3.8/dist-packages/fastapi/routing.py", line 227, in app raw_response = await run_endpoint_function( File "/usr/local/lib/python3.8/dist-packages/fastapi/routing.py", line 160, in run_endpoint_function return await dependant.call(**values) File "/usr/local/lib/python3.8/dist-packages/prefect/orion/api/task_runs.py", line 188, in set_task_run_state orchestration_result = await models.task_runs.set_task_run_state( File "/usr/local/lib/python3.8/dist-packages/prefect/orion/models/task_runs.py", line 279, in set_task_run_state run = await models.task_runs.read_task_run(session=session, task_run_id=task_run_id) File "/usr/local/lib/python3.8/dist-packages/prefect/orion/database/dependencies.py", line 112, in async_wrapper return await fn(*args, **kwargs) File "/usr/local/lib/python3.8/dist-packages/prefect/orion/models/task_runs.py", line 97, in read_task_run model = await session.get(db.TaskRun, task_run_id) File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/ext/asyncio/session.py", line 296, in get return await greenlet_spawn( File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/util/_concurrency_py3k.py", line 126, in greenlet_spawn result = context.throw(*sys.exc_info()) File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 2789, in get return self._get_impl( File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 2896, in _get_impl return db_load_fn( File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/loading.py", line 530, in load_on_pk_identity session.execute( File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 1695, in execute conn = self._connection_for_bind(bind) File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 1536, in _connection_for_bind return self._transaction._connection_for_bind( File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 747, in _connection_for_bind conn = bind.connect() File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/future/engine.py", line 406, in connect return super(Engine, self).connect() File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/engine/base.py", line 3234, in connect return self._connection_cls(self, close_with_result=close_with_result) File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/engine/base.py", line 96, in init else engine.raw_connection() File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/engine/base.py", line 3313, in raw_connection return self._wrap_pool_connect(self.pool.connect, _connection) File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/engine/base.py", line 3283, in _wrap_pool_connect Connection._handle_dbapi_exception_noconnection( File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/engine/base.py", line 2117, in _handle_dbapi_exception_noconnection util.raise_( File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/util/compat.py", line 207, in raise_ raise exception File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/engine/base.py", line 3280, in _wrap_pool_connect return fn() File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/pool/base.py", line 310, in connect return _ConnectionFairy._checkout(self) File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/pool/base.py", line 868, in _checkout fairy = _ConnectionRecord.checkout(pool) File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/pool/base.py", line 476, in checkout rec = pool._do_get() File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/pool/impl.py", line 256, in _do_get return self._create_connection() File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/pool/base.py", line 256, in _create_connection return _ConnectionRecord(self) File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/pool/base.py", line 371, in init self.__connect() File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/pool/base.py", line 666, in __connect pool.logger.debug("Error on connect(): %s", e) File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/util/langhelpers.py", line 70, in exit compat.raise_( File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/util/compat.py", line 207, in raise_ raise exception File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/pool/base.py", line 661, in __connect self.dbapi_connection = connection = pool._invoke_creator(self) File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/engine/create.py", line 590, in connect return dialect.connect(*cargs, **cparams) File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/engine/default.py", line 597, in connect return self.dbapi.connect(*cargs, **cparams) File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 292, in connect await_only(connection), File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/util/_concurrency_py3k.py", line 68, in await_only return current.driver.switch(awaitable) File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/util/_concurrency_py3k.py", line 121, in greenlet_spawn value = await result File "/usr/local/lib/python3.8/dist-packages/aiosqlite/core.py", line 137, in _connect self._connection = await future File "/usr/local/lib/python3.8/dist-packages/aiosqlite/core.py", line 102, in run result = function() File "/usr/local/lib/python3.8/dist-packages/aiosqlite/core.py", line 397, in connector return sqlite3.connect(loc, **kwargs) sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) unable to open database file (Background on this error at: https://sqlalche.me/e/14/e3q8) 2022-06-12 101442,045 - distributed.worker - WARNING - Compute Failed Key: calculate_score_group-63e95d0e-764-3ba520ff831e460cb1a3d0d81044ea9b Function: begin_task_run args: () kwargs: {'task': <prefect.tasks.Task object at 0x7f8af03338b0>, 'task_run': TaskRun(id=UUID('3ba520ff-831e-460c-b1a3-d0d81044ea9b'), name='calculate_score_group-63e95d0e-764', flow_run_id=UUID('14f976c7-545f-4837-a0c2-75d76c26f82c'), task_key='63e95d0ee90b584403f98497c3a130bf', dynamic_key='764', cache_key=None, cache_expiration=None, task_version=None, empirical_policy=TaskRunPolicy(max_retries=0, retry_delay_seconds=0.0), tags=[], state_id=UUID('c65a9c67-9641-49a3-80be-fb6f90327637'), task_inputs={'index': []}, state_type=StateType.PENDING, run_count=0, expected_start_time=datetime.datetime(2022, 6, 12, 10, 14, 41, 573413, tzinfo=datetime.timezone.utc), next_scheduled_start_time=None, start_time=None, end_time=None, total_run_time=datetime.timedelta(0), estimated_run_time=datetime.timedelta(0), estimated_start_time_delta=datetime.timedelta(microseconds=295338), state=Pending(message=None, type=PENDING, result=None, task_run_id=3ba520ff-831e-460c-b1a3-d0d81044ea9b)), 'parameters': {'index': 7 Exception: "OperationalError('(sqlite3.OperationalError) unable to open database file')"
a

Anna Geller

06/12/2022, 2:36 PM
Re your issue it seems like you are running Orion on a bind mount volume - can you try running it in a virtual environment instead of Docker? This would make debugging and permissioning easier
Thanks for moving the traceback
m

Massimiliano Fanciulli

06/12/2022, 2:38 PM
tried on a virtualized env (Ec2 on Amazon AWS) and had no issue
i'm wondering what sqllite file is searching on the dask-worker
maybe my docker image need some adjustment for this
a

Anna Geller

06/12/2022, 3:49 PM
Exactly as I thought - Docker makes things harder due to network - I'd recommend a virtual environment until we have some official dockerized recipe
3 Views