Ji-Oh Yoo
03/03/2025, 11:49 PMTypeError: cannot pickle '_thread.lock' object
when my flow function is using a library that internally references some lock instance, and this failure happens when passing the prefect context to the ray task. The prefect context contains both flow & task codes. Is having flow code in the context necessary? I understand it needs some metadata on the current flow run (like flow run id), but wondering if there's a way to only pass task code and avoid serializing the flow code. Here's some example flow:
from ddtrace import tracer
@task(log_prints=True)
def my_task():
print("hi from task")
return 924
@flow(task_runner=RayTaskRunner(), log_prints=True)
@tracer.wrap(name="my_flow") # this line introduces a reference to some lock
def my_flow():
v = my_task.submit()
print("result from the flow:", v.result())
Ji-Oh Yoo
03/03/2025, 11:49 PMTypeError: cannot pickle '_thread.lock' object
when my flow function is using a library that internally references some lock instance, and this failure happens when passing the prefect context to the ray task. The prefect context contains both flow & task codes. Is having flow code in the context necessary? I understand it needs some metadata on the current flow run (like flow run id), but wondering if there's a way to only pass task code and avoid serializing the flow code. Here's some example flow:
from ddtrace import tracer
@task(log_prints=True)
def my_task():
print("hi from task")
return 924
@flow(task_runner=RayTaskRunner(), log_prints=True)
@tracer.wrap(name="my_flow") # this line introduces a reference to some lock
def my_flow():
v = my_task.submit()
print("result from the flow:", v.result())
Marvin
03/03/2025, 11:49 PMRayTaskRunner
is due to the serialization of the Prefect context, which includes flow code that references a lock object. This serialization is necessary for executing tasks in a distributed environment like Ray. However, there are some strategies you can consider to mitigate this issue:
1. Avoid Serializing Locks: The root cause is the serialization of objects that include locks. When using libraries that introduce locks, try to ensure that these objects are not part of the serialized context. This might involve restructuring your code or using alternative libraries that don't introduce such locks.
2. Custom Serialization: You could explore implementing custom serialization logic for your tasks, ensuring that only necessary parts of the context are serialized. However, this requires a deep understanding of both Prefect and the library causing the issue.
3. Use of Task Isolation: Consider isolating the task logic that involves locks from the Prefect flow context. This can be achieved by wrapping such logic in a separate function or module that is not directly part of the Prefect task.
Unfortunately, Prefect's current architecture doesn't allow excluding flow code from the context easily. The flow context is integral for maintaining metadata and execution states across distributed systems.
For more detailed guidance, you can refer to the Prefect documentation on flow code storage and explore Prefect's GitHub repository for context serialization details.
If you have further questions or need specific examples, feel free to ask.