Ian Andres Etnyre Mercader
08/23/2022, 9:45 PMContextModel
.
I'm running into issues with the asynchronous section of my code when using task_runner=DaskTaskRunner
the context are not defined in the tasks.
@flow(name="Biorxiv", task_runner=DaskTaskRunner())
def biorxiv_main_flow(block_name):
with MyContextModel(block_name):
prepare_destination = biorxiv.prepare_destination()
items = get_docs()
items_futures = []
with tags("converters"):
for item in items:
items_futures.append(biorxiv.convert_xml_to_json.submit(item))
item_results = [i_futures.result() for i_futures in items_futures]
AttributeError: 'NoneType' object has no attribute 'data_fs'
I get this error on the line of code MyContextModel.get().data_fs
get() returns None
Is there something I need to config for the context to be defined in the dask tasks?Matt Conger
08/23/2022, 10:56 PMIan Andres Etnyre Mercader
08/23/2022, 11:04 PMprefect 2.0.3
prefect-dask 0.1.2
yes, It runs correctly when I'm not using DaskMatt Conger
08/23/2022, 11:07 PMIan Andres Etnyre Mercader
08/24/2022, 8:29 PMfrom prefect import task, flow
from prefect.context import ContextModel, ContextVar
from prefect_dask import DaskTaskRunner
class MyContextModel(ContextModel):
block_name: str
__var__ = ContextVar("config_context")
@task
def show_work(item):
print(f"block:{MyContextModel.get().block_name} item:{item}")
@flow #(task_runner=DaskTaskRunner())
def context_flow(block_name):
with MyContextModel(block_name=block_name):
items = [1,2,3,4,5]
item_futures = []
for item in items:
item_futures.append(show_work.submit(item))
item_results = [i_futures.result() for i_futures in item_futures]
if __name__ == "__main__":
context_flow("new_block")
as is works as intended but if you uncomment on the flow line to @flow (task_runner=DaskTaskRunner())
now I get a _pickle.PicklingError
Matt Conger
08/24/2022, 11:03 PMif __name__ == "__main__"
guard in there which is good, but I think it comes down to having a task input or output not serializable by cloudpickle. https://discourse.prefect.io/t/why-do-i-get-the-error-typeerror-cannot-pickle-object/74Ian Andres Etnyre Mercader
08/25/2022, 8:55 PMint
which is serializable type and the return value is None
.
The error _pickle.PicklingError: Can't pickle <cyfunction str_validator at 0x7fde94f23040>: attribute lookup lambda12 on pydantic.validators failed
comes from prefect.context.ContextModel
which uses pydantic.validators
.