https://prefect.io logo
r

RyanB

11/06/2019, 4:51 PM
It appears I can't use the context supplied logger when using the DaskExecutor as I get an exception that the context cannot be pickled?
j

Joe Schmid

11/06/2019, 4:56 PM
Hi @RyanB, we do lots of logging from tasks when running with DaskExecutor. Try doing this:
Copy code
@task
def my_task():
    import prefect
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Blah blah blah")
👏 1
r

RyanB

11/06/2019, 4:59 PM
👍🏾
j

josh

11/06/2019, 5:07 PM
r

RyanB

11/06/2019, 5:12 PM
@josh I did, its the import prefect within the task that I was missing, the doc is not explicit in this regard
j

josh

11/06/2019, 5:13 PM
Good call, I’ll update the doc with a note about importing prefect
Actually this is a great motivator for maybe looking into making context pickleable so this could have been avoided in the first place 🙂 https://github.com/PrefectHQ/prefect/issues/1710
a

Arnaud Legendre

11/20/2019, 4:33 PM
hello, same issue here : i can get my log from default executor, but not with DaskExecutor, any idea ? thxs
Copy code
from dask.distributed import Client, progress
dask_client = Client()

from prefect import context, Flow, task
from prefect.engine.executors import DaskExecutor


@task
def dummy():
    import prefect
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Here")

with Flow("ETL") as flow:
    l = dummy()

executor = DaskExecutor(address=dask_client.scheduler_info()['address'])
flow.run(executor=executor)
j

josh

11/20/2019, 4:58 PM
@Arnaud Legendre Do you see the logs on the dask worker?
a

Arnaud Legendre

11/21/2019, 8:10 AM
sure, with the dask excutor:
Copy code
[2019-11-21 08:09:45,215] INFO - prefect.FlowRunner | Beginning Flow run for 'ETL'
INFO:prefect.FlowRunner:Beginning Flow run for 'ETL'
[2019-11-21 08:09:45,218] INFO - prefect.FlowRunner | Starting flow run.
INFO:prefect.FlowRunner:Starting flow run.
DEBUG:asyncio:Using selector: SelectSelector
DEBUG:asyncio:Using selector: SelectSelector
[2019-11-21 08:09:46,211] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
INFO:prefect.FlowRunner:Flow run SUCCESS: all reference tasks succeeded
whereas with the default `flow.run()`:
Copy code
[2019-11-21 08:09:41,849] INFO - prefect.FlowRunner | Beginning Flow run for 'ETL'
INFO:prefect.FlowRunner:Beginning Flow run for 'ETL'
[2019-11-21 08:09:41,853] INFO - prefect.FlowRunner | Starting flow run.
INFO:prefect.FlowRunner:Starting flow run.
[2019-11-21 08:09:41,861] INFO - prefect.TaskRunner | Task 'dummy': Starting task run...
INFO:prefect.TaskRunner:Task 'dummy': Starting task run...
[2019-11-21 08:09:41,863] INFO - prefect.Task: dummy | Here
INFO:prefect.Task: dummy:Here
[2019-11-21 08:09:41,867] INFO - prefect.TaskRunner | Task 'dummy': finished task run for task with final state: 'Success'
INFO:prefect.TaskRunner:Task 'dummy': finished task run for task with final state: 'Success'
[2019-11-21 08:09:41,870] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
INFO:prefect.FlowRunner:Flow run SUCCESS: all reference tasks succeeded
j

josh

11/21/2019, 2:41 PM
@Arnaud Legendre Are you using this in conjunction with Prefect Cloud or local only? I believe for local flow runs the logger just uses the standard python logging mechanism which makes sense as to why you can’t see them from the place you ran the flow. You would need to implement something which collects the logs. This is something that Prefect Cloud provides!
a

Arnaud Legendre

11/21/2019, 2:42 PM
Hello. I am using Prefect core, locally. This what I am realizing : the logs seems to be collected from the process where the Dask Cluster is instanciated and not where my flow runs. Thank you !