Thread
#prefect-community
    Sean Harkins

    Sean Harkins

    1 year ago
    We have a somewhat edge case where an external library is generating Prefect Flows https://github.com/pangeo-data/rechunker/blob/master/rechunker/executors/prefect.py#L42 that we register. Prior to registration we need to apply a decorator to each of the Flow’s tasks (in this case to register Dask
    WorkerPlugins
    ). To test this initially I applied our decorator to a test
    task
    def register_plugin(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            client = distributed.get_client()
            plugin = PipInstall(packages=["xarray"])
            client.register_worker_plugin(plugin)
            result = func(*args, **kwargs)
            return result
        return wrapper
    
    
    @task
    @register_plugin
    def say_hello():
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>("Hello, Cloud")
        return "hello result"
    This works as expected and the Dask worker logs show the plugin use.
    [2021-04-11 23:11:49+0000] INFO - prefect.CloudTaskRunner | Task 'say_hello': Starting task run...
    distributed.worker - INFO - Starting Worker plugin pip
    distributed.diagnostics.plugin - INFO - Pip installing the following packages: ['xarray']
    [2021-04-11 23:11:50+0000] INFO - prefect.say_hello | Hello, Cloud
    [2021-04-11 23:11:50+0000] INFO - prefect.CloudTaskRunner | Task 'say_hello': Finished task run for task with final state: 'Success'
    However, we don’t have access to the underlying tasks so instead we need to access them from an existing flow, wrap them and replace them within the flow prior to flow registration with
    for flow_task in flow.tasks:
        wrapped_task = register_plugin(flow_task)
        flow.replace(flow_task, wrapped_task)
    But using this approach seems to alter the task’s execution as the worker logs do not report the plugin use, or the Prefect
    logger
    statements.
    distributed.core - INFO - Starting established connection
    [2021-04-11 22:43:26+0000] INFO - prefect.CloudTaskRunner | Task 'Constant[function]': Starting task run...
    [2021-04-11 22:43:27+0000] INFO - prefect.CloudTaskRunner | Task 'Constant[function]': Finished task run for task with final state: 'Success'
    Am I missing something obvious in my task wrapping approach and replacement in the flow? Is there a better approach for accomplishing this?
    Michael Adkins

    Michael Adkins

    1 year ago
    Hey @Sean Harkins Could you explain a bit more about what you mean with
    However, we don’t have access to the underlying tasks so instead we need to access them from an existing flow, wrap them and replace them within the flow prior to flow registration with
    Can you also explain why this is your preferred way to install python libraries instead of using some sort of containerized execution?
    Sean Harkins

    Sean Harkins

    1 year ago
    Sorry for the delay in responding @Michael Adkins. For your first question, we are working to build infrastructure where scientific developers can write vanilla Python “recipes” which they can develop independent of any
    prefect
    dependencies. You can have a look here for about more about the project approach https://github.com/pangeo-forge/roadmap. Because scientists will develop vanilla Python without
    prefect
    deps there is a later step where “recipes” can be executed with different orchestrators (Prefect) in this case. So part of our CI pipeline transforms this generic Python code into Prefect Tasks in a Flow using https://github.com/pangeo-data/rechunker/blob/master/rechunker/executors/prefect.py#L42. Ideally, this library can operate independently and not need any specialized decorators for managing Dask worker dependencies.
    For your second question, the eventual goal is that all final, production “recipes” will use a version tagged base image. However, there is currently a lot of flux in some of the core libraries and the “recipe” developers would like the flexibility to override dependencies in our existing worker base image with bleeding edge versions during development and testing. Again, specifying these worker dependency overrides is something we would like to accomplish via CI after the flow has already been created. My question is why the above task decorator and wrapping approaches are behaving differently since they should be equivalent.
    Michael Adkins

    Michael Adkins

    1 year ago
    Ah thanks for all the explanation. It seems likely this is because the decorator order is reversed in the second example which means the task is no longer a task object. I can debug further in ~1hr
    Sean Harkins

    Sean Harkins

    1 year ago
    @Michael Adkins Just wanted to check in and see if I could give you more details on this if needed.
    Michael Adkins

    Michael Adkins

    1 year ago
    from prefect import Flow, task
    from functools import wraps
    
    
    def show_flow_tasks(flow):
        print("Flow contains the following tasks:")
        for flow_task in flow.tasks:
            print(f"{flow_task}")
    
    
    def register_plugin(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            print("Look! We're inside the wrapper.")
            func(*args, **kwargs)
    
        return wrapper
    
    
    @task
    @register_plugin
    def foo(x):
        print(x)
    
    
    @task
    def bar(y):
        print(y)
    
    
    with Flow("ex") as flow:
        foo(1)
        bar(2)
    
    show_flow_tasks(flow)
    
    flow.run()
    
    print("--------------------------")
    print("Now with replacement")
    
    for flow_task in flow.tasks:
        wrapped_task = register_plugin(flow_task)
        flow.replace(flow_task, wrapped_task)
    
    show_flow_tasks(flow)
    
    flow.run()
    ❯ python example-decorated-tasks.py
    Flow contains the following tasks:
    <Task: foo>
    <Task: bar>
    [2021-04-13 20:52:33-0500] INFO - prefect.FlowRunner | Beginning Flow run for 'ex'
    [2021-04-13 20:52:33-0500] INFO - prefect.TaskRunner | Task 'foo': Starting task run...
    Look! We're inside the wrapper.
    1
    [2021-04-13 20:52:34-0500] INFO - prefect.TaskRunner | Task 'foo': Finished task run for task with final state: 'Success'
    [2021-04-13 20:52:34-0500] INFO - prefect.TaskRunner | Task 'bar': Starting task run...
    2
    [2021-04-13 20:52:34-0500] INFO - prefect.TaskRunner | Task 'bar': Finished task run for task with final state: 'Success'
    [2021-04-13 20:52:34-0500] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
    --------------------------
    Now with replacement
    Flow contains the following tasks:
    <Task: Constant[function]>
    <Task: Constant[function]>
    [2021-04-13 20:52:34-0500] INFO - prefect.FlowRunner | Beginning Flow run for 'ex'
    [2021-04-13 20:52:34-0500] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
    In the first, your decorator is applied then the task decorator is applied
    In the second, the task decorator has been applied then your decorator so they are just functions now instead of tasks and they are autoconverted to "Constant" tasks by Prefect but don't actually run anymore.
    Here's a working example for what you want to do
    from prefect import Flow, task
    from functools import wraps
    
    
    def show_flow_tasks(flow):
        print("Flow contains the following tasks:")
        for flow_task in flow.tasks:
            print(f"{flow_task}")
    
    
    def register_plugin(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            print("Look! We're inside the wrapper.")
            func(*args, **kwargs)
    
        return wrapper
    
    
    @task
    def foo(x):
        print(x)
    
    
    @task
    def bar(y):
        print(y)
    
    
    with Flow("ex") as flow:
        foo(1)
        bar(2)
    
    
    print("----- Flow tasks before replacement")
    show_flow_tasks(flow)
    print("-----")
    
    for flow_task in flow.tasks:
        flow_task.run = register_plugin(flow_task.run)
    
    print("----- Flow tasks after replacement")
    show_flow_tasks(flow)
    print("-----")
    
    flow.run()
    Sean Harkins

    Sean Harkins

    1 year ago
    Thank you @Michael Adkins that approach works perfectly.