Menekse Kuyu
10/01/2024, 10:16 AME File "/Users/.../site-packages/prefect/_internal/concurrency/cancellation.py", line 536, in cancel_sync_at
E with cancel_sync_after(timeout, name=name) as ctx:
E ResourceWarning: unclosed <socket.socket fd=27, family=2, type=1, proto=6, laddr=('127.0.0.1', 64935), raddr=('127.0.0.1', 8651)>
Marvin
10/01/2024, 10:16 AMSequentialTaskRunner
has been deprecated and the task execution system has undergone some changes. The error you are encountering is likely related to these changes.
In Prefect 3, task runners like ThreadPoolTaskRunner
, DaskTaskRunner
, and RayTaskRunner
are recommended for running tasks concurrently or in parallel. The main change is that task runners need to be explicitly configured and used with the .submit()
method to enable concurrency or parallelism.
Here's how you might adapt your unit test to use Prefect 3.x's `ThreadPoolTaskRunner`:
python
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
@task
def example_task():
# Your task implementation
pass
@flow(task_runner=ThreadPoolTaskRunner())
def example_flow():
example_task.submit()
# or simply call example_task() if you don't require concurrency
# Now you can run your flow directly in your test
example_flow()
For more information on task runners in Prefect 3, you can refer to the Prefect 3 task runners documentation.
If the issue persists, consider examining sockets management in your tasks, as the error message indicates potentially unclosed sockets. Ensure that any network connections or file handles are properly closed after use.
Feel free to reach out if you have more questions!Marvin
10/01/2024, 12:07 PMbash
PYTHONTRACEMALLOC=25 pytest path_to_your_test.py
4. Task Runner Update:
Use Prefect’s recommended ThreadPoolTaskRunner
for flows which do not require specific concurrency limitations.
Here is how you can structure your flow with `ThreadPoolTaskRunner`:
python
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
@task
def example_task():
# Your task implementation
pass
@flow(task_runner=ThreadPoolTaskRunner())
def example_flow():
example_task.submit()
# Run your flow directly in your test
example_flow()
For further reading, you might find these resources useful:
1. Prefect’s global concurrency limits
2. Prefect’s troubleshooting guide
These steps and resources should help identify and mitigate the ResourceWarning
related to unclosed sockets in your unit tests with Prefect 3. If the issue persists, checking for proper resource clean-up in your tasks will be essential.