r

    RyanB

    2 years ago
    It appears I can't use the context supplied logger when using the DaskExecutor as I get an exception that the context cannot be pickled?
    j

    Joe Schmid

    2 years ago
    Hi @RyanB, we do lots of logging from tasks when running with DaskExecutor. Try doing this:
    @task
    def my_task():
        import prefect
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>("Blah blah blah")
    r

    RyanB

    2 years ago
    👍🏾
    j

    josh

    2 years ago
    r

    RyanB

    2 years ago
    @josh I did, its the import prefect within the task that I was missing, the doc is not explicit in this regard
    j

    josh

    2 years ago
    Good call, I’ll update the doc with a note about importing prefect
    Actually this is a great motivator for maybe looking into making context pickleable so this could have been avoided in the first place 🙂 https://github.com/PrefectHQ/prefect/issues/1710
    a

    Arnaud Legendre

    2 years ago
    hello, same issue here : i can get my log from default executor, but not with DaskExecutor, any idea ? thxs
    from dask.distributed import Client, progress
    dask_client = Client()
    
    from prefect import context, Flow, task
    from prefect.engine.executors import DaskExecutor
    
    
    @task
    def dummy():
        import prefect
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>("Here")
    
    with Flow("ETL") as flow:
        l = dummy()
    
    executor = DaskExecutor(address=dask_client.scheduler_info()['address'])
    flow.run(executor=executor)
    j

    josh

    2 years ago
    @Arnaud Legendre Do you see the logs on the dask worker?
    a

    Arnaud Legendre

    2 years ago
    sure, with the dask excutor:
    [2019-11-21 08:09:45,215] INFO - prefect.FlowRunner | Beginning Flow run for 'ETL'
    INFO:prefect.FlowRunner:Beginning Flow run for 'ETL'
    [2019-11-21 08:09:45,218] INFO - prefect.FlowRunner | Starting flow run.
    INFO:prefect.FlowRunner:Starting flow run.
    DEBUG:asyncio:Using selector: SelectSelector
    DEBUG:asyncio:Using selector: SelectSelector
    [2019-11-21 08:09:46,211] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
    INFO:prefect.FlowRunner:Flow run SUCCESS: all reference tasks succeeded
    whereas with the default flow.run():
    [2019-11-21 08:09:41,849] INFO - prefect.FlowRunner | Beginning Flow run for 'ETL'
    INFO:prefect.FlowRunner:Beginning Flow run for 'ETL'
    [2019-11-21 08:09:41,853] INFO - prefect.FlowRunner | Starting flow run.
    INFO:prefect.FlowRunner:Starting flow run.
    [2019-11-21 08:09:41,861] INFO - prefect.TaskRunner | Task 'dummy': Starting task run...
    INFO:prefect.TaskRunner:Task 'dummy': Starting task run...
    [2019-11-21 08:09:41,863] INFO - prefect.Task: dummy | Here
    INFO:prefect.Task: dummy:Here
    [2019-11-21 08:09:41,867] INFO - prefect.TaskRunner | Task 'dummy': finished task run for task with final state: 'Success'
    INFO:prefect.TaskRunner:Task 'dummy': finished task run for task with final state: 'Success'
    [2019-11-21 08:09:41,870] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
    INFO:prefect.FlowRunner:Flow run SUCCESS: all reference tasks succeeded
    j

    josh

    2 years ago
    @Arnaud Legendre Are you using this in conjunction with Prefect Cloud or local only? I believe for local flow runs the logger just uses the standard python logging mechanism which makes sense as to why you can’t see them from the place you ran the flow. You would need to implement something which collects the logs. This is something that Prefect Cloud provides!
    a

    Arnaud Legendre

    2 years ago
    Hello. I am using Prefect core, locally. This what I am realizing : the logs seems to be collected from the process where the Dask Cluster is instanciated and not where my flow runs. Thank you !