Hey again! Quick question about My Prefect worker...
# prefect-cloud
d
Hey again! Quick question about My Prefect worker shares the same codebase with FastAPI and SQLAlchemy. Prefect worker opens async connection to DB on start. Any @flow works fine, but if inside a flow I call @task which has some DB calls, I got this message:
Copy code
RuntimeError: Task <Task pending name='Task-31' coro=<Call._run_async() running at /usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:355> cb=[_run_until_complete_cb() at /usr/local/lib/python3.10/asyncio/base_events.py:184]> got Future <Future pending cb=[Protocol._on_waiter_completed()]> attached to a different loop
So as I understand, the problem is that I’m trying to use a db connection which is attached to loop of Prefect worker, but Prefect @task is creating a separate loop for execution. Question: is it possible to execute async function in the main loop of Prefect worker? I don’t want to open a new DB connection in each @task if that’s possible.
Looks like I’ve chosen a wrong Slack channel. But I’m a Prefect Cloud user so hope it is ok.
Also my @task is trying to download files by url and save metadata results to Postgres. The quick solution is to rename @task to @flow but I’m not sure if that’s a best practice.
n
hi @Daniil Okhlopkov
My Prefect worker shares the same codebase with FastAPI and SQLAlchemy. Prefect worker opens async connection to DB on start.
can you give an example of what you mean by this? one thing to note is that tasks are run in their own thread, which may be related to this
Copy code
attached to a different loop
for example, if you created your connection objects in the flow, passed them to tasks (in their own threads) then I could see how you'd end up with this error
is it possible to execute async function in the main loop of Prefect worker?
generally speaking, the worker's job is to submit flow code for execution (not run it), only in the case of the
Process Worker
does the worker actually run user code.
1