Hi guys, while migrating to prefect orion I started using
. I'm running into issues with the asynchronous section of my code when using
the context are not defined in the tasks.
@flow(name="Biorxiv", task_runner=DaskTaskRunner())
def biorxiv_main_flow(block_name):
    with MyContextModel(block_name):
        prepare_destination = biorxiv.prepare_destination()
        items = get_docs()
        items_futures = []
        with tags("converters"):
            for item in items:
            item_results = [i_futures.result() for i_futures in items_futures]
AttributeError: 'NoneType' object has no attribute 'data_fs'
I get this error on the line of code
get() returns None Is there something I need to config for the context to be defined in the dask tasks?
Hey Ian, what version of Prefect are you using? And when you run this without the DaskTaskRunner, the error does not appear?
I'm using conda with
prefect                   2.0.3
prefect-dask              0.1.2
yes, It runs correctly when I'm not using Dask
Cool, I would recommend updating to 2.2 just to make sure that is not affecting it. To use the dask task runner you do need to have the prefect-dask collection installed. I have linked docs below for this. Lmk if these help. https://docs.prefect.io/tutorials/dask-ray-task-runners/?h=dask
Hi Matt, upgraded my prefect to 2.2, the prefect-dask module is the latest but the same version 0.1.2. Now I get a different error. I made a simple example code of the issue:
from prefect import task, flow
from prefect.context import ContextModel, ContextVar
from prefect_dask import DaskTaskRunner

class MyContextModel(ContextModel):
    block_name: str
    __var__ = ContextVar("config_context")

def show_work(item):
    print(f"block:{MyContextModel.get().block_name} item:{item}")

@flow #(task_runner=DaskTaskRunner())
def context_flow(block_name):
    with MyContextModel(block_name=block_name):
        items = [1,2,3,4,5]
        item_futures = []
        for item in items:
        item_results = [i_futures.result() for i_futures in item_futures]

if __name__ == "__main__":
as is works as intended but if you uncomment on the flow line to
@flow (task_runner=DaskTaskRunner())
now I get a
Ah, I see. Check out this Discourse page. You have the
if __name__ == "__main__"
guard in there which is good, but I think it comes down to having a task input or output not serializable by cloudpickle. https://discourse.prefect.io/t/why-do-i-get-the-error-typeerror-cannot-pickle-object/74
@Matt Conger, in my example I'm passing an
which is serializable type and the return value is
. The error
_pickle.PicklingError: Can't pickle <cyfunction str_validator at 0x7fde94f23040>: attribute lookup lambda12 on pydantic.validators failed
comes from
which uses
@Bianca Hoch, any ideas why I get this error or how to avoid it?