Hi guys, while migrating to prefect orion I starte...
# prefect-community
i
Hi guys, while migrating to prefect orion I started using
ContextModel
. I'm running into issues with the asynchronous section of my code when using
task_runner=DaskTaskRunner
the context are not defined in the tasks.
Copy code
@flow(name="Biorxiv", task_runner=DaskTaskRunner())
def biorxiv_main_flow(block_name):
    with MyContextModel(block_name):
        prepare_destination = biorxiv.prepare_destination()
        items = get_docs()
        items_futures = []
        with tags("converters"):
            for item in items:
                items_futures.append(biorxiv.convert_xml_to_json.submit(item))
            item_results = [i_futures.result() for i_futures in items_futures]
AttributeError: 'NoneType' object has no attribute 'data_fs'
I get this error on the line of code
MyContextModel.get().data_fs
get() returns None Is there something I need to config for the context to be defined in the dask tasks?
m
Hey Ian, what version of Prefect are you using? And when you run this without the DaskTaskRunner, the error does not appear?
i
I'm using conda with
prefect                   2.0.3
prefect-dask              0.1.2
yes, It runs correctly when I'm not using Dask
m
Cool, I would recommend updating to 2.2 just to make sure that is not affecting it. To use the dask task runner you do need to have the prefect-dask collection installed. I have linked docs below for this. Lmk if these help. https://docs.prefect.io/tutorials/dask-ray-task-runners/?h=dask
i
Hi Matt, upgraded my prefect to 2.2, the prefect-dask module is the latest but the same version 0.1.2. Now I get a different error. I made a simple example code of the issue:
Copy code
from prefect import task, flow
from prefect.context import ContextModel, ContextVar
from prefect_dask import DaskTaskRunner


class MyContextModel(ContextModel):
    
    block_name: str
    __var__ = ContextVar("config_context")


@task
def show_work(item):
    print(f"block:{MyContextModel.get().block_name} item:{item}")


@flow #(task_runner=DaskTaskRunner())
def context_flow(block_name):
    with MyContextModel(block_name=block_name):
        items = [1,2,3,4,5]
        item_futures = []
        for item in items:
            item_futures.append(show_work.submit(item))
        item_results = [i_futures.result() for i_futures in item_futures]


if __name__ == "__main__":
    context_flow("new_block")
as is works as intended but if you uncomment on the flow line to
@flow (task_runner=DaskTaskRunner())
now I get a
_pickle.PicklingError
m
Ah, I see. Check out this Discourse page. You have the
if __name__ == "__main__"
guard in there which is good, but I think it comes down to having a task input or output not serializable by cloudpickle. https://discourse.prefect.io/t/why-do-i-get-the-error-typeerror-cannot-pickle-object/74
i
@Matt Conger, in my example I'm passing an
int
which is serializable type and the return value is
None
. The error
_pickle.PicklingError: Can't pickle <cyfunction str_validator at 0x7fde94f23040>: attribute lookup lambda12 on pydantic.validators failed
comes from
prefect.context.ContextModel
which uses
pydantic.validators
.
@Bianca Hoch, any ideas why I get this error or how to avoid it?