We have a somewhat edge case where an external lib...
# ask-community
s
We have a somewhat edge case where an external library is generating Prefect Flows https://github.com/pangeo-data/rechunker/blob/master/rechunker/executors/prefect.py#L42 that we register. Prior to registration we need to apply a decorator to each of the Flow’s tasks (in this case to register Dask
WorkerPlugins
). To test this initially I applied our decorator to a test
task
Copy code
def register_plugin(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        client = distributed.get_client()
        plugin = PipInstall(packages=["xarray"])
        client.register_worker_plugin(plugin)
        result = func(*args, **kwargs)
        return result
    return wrapper


@task
@register_plugin
def say_hello():
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Hello, Cloud")
    return "hello result"
This works as expected and the Dask worker logs show the plugin use.
Copy code
[2021-04-11 23:11:49+0000] INFO - prefect.CloudTaskRunner | Task 'say_hello': Starting task run...
distributed.worker - INFO - Starting Worker plugin pip
distributed.diagnostics.plugin - INFO - Pip installing the following packages: ['xarray']
[2021-04-11 23:11:50+0000] INFO - prefect.say_hello | Hello, Cloud
[2021-04-11 23:11:50+0000] INFO - prefect.CloudTaskRunner | Task 'say_hello': Finished task run for task with final state: 'Success'
However, we don’t have access to the underlying tasks so instead we need to access them from an existing flow, wrap them and replace them within the flow prior to flow registration with
Copy code
for flow_task in flow.tasks:
    wrapped_task = register_plugin(flow_task)
    flow.replace(flow_task, wrapped_task)
But using this approach seems to alter the task’s execution as the worker logs do not report the plugin use, or the Prefect
logger
statements.
Copy code
distributed.core - INFO - Starting established connection
[2021-04-11 22:43:26+0000] INFO - prefect.CloudTaskRunner | Task 'Constant[function]': Starting task run...
[2021-04-11 22:43:27+0000] INFO - prefect.CloudTaskRunner | Task 'Constant[function]': Finished task run for task with final state: 'Success'
Am I missing something obvious in my task wrapping approach and replacement in the flow? Is there a better approach for accomplishing this?
z
Hey @Sean Harkins Could you explain a bit more about what you mean with
However, we don’t have access to the underlying tasks so instead we need to access them from an existing flow, wrap them and replace them within the flow prior to flow registration with
Can you also explain why this is your preferred way to install python libraries instead of using some sort of containerized execution?
s
Sorry for the delay in responding @Zanie. For your first question, we are working to build infrastructure where scientific developers can write vanilla Python “recipes” which they can develop independent of any
prefect
dependencies. You can have a look here for about more about the project approach https://github.com/pangeo-forge/roadmap. Because scientists will develop vanilla Python without
prefect
deps there is a later step where “recipes” can be executed with different orchestrators (Prefect) in this case. So part of our CI pipeline transforms this generic Python code into Prefect Tasks in a Flow using https://github.com/pangeo-data/rechunker/blob/master/rechunker/executors/prefect.py#L42. Ideally, this library can operate independently and not need any specialized decorators for managing Dask worker dependencies.
For your second question, the eventual goal is that all final, production “recipes” will use a version tagged base image. However, there is currently a lot of flux in some of the core libraries and the “recipe” developers would like the flexibility to override dependencies in our existing worker base image with bleeding edge versions during development and testing. Again, specifying these worker dependency overrides is something we would like to accomplish via CI after the flow has already been created. My question is why the above ☝️ task decorator and wrapping approaches are behaving differently since they should be equivalent.
z
Ah thanks for all the explanation. It seems likely this is because the decorator order is reversed in the second example which means the task is no longer a task object. I can debug further in ~1hr
🙇 1
s
@Zanie Just wanted to check in and see if I could give you more details on this if needed.
z
Copy code
from prefect import Flow, task
from functools import wraps


def show_flow_tasks(flow):
    print("Flow contains the following tasks:")
    for flow_task in flow.tasks:
        print(f"{flow_task}")


def register_plugin(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        print("Look! We're inside the wrapper.")
        func(*args, **kwargs)

    return wrapper


@task
@register_plugin
def foo(x):
    print(x)


@task
def bar(y):
    print(y)


with Flow("ex") as flow:
    foo(1)
    bar(2)

show_flow_tasks(flow)

flow.run()

print("--------------------------")
print("Now with replacement")

for flow_task in flow.tasks:
    wrapped_task = register_plugin(flow_task)
    flow.replace(flow_task, wrapped_task)

show_flow_tasks(flow)

flow.run()
Copy code
❯ python example-decorated-tasks.py
Flow contains the following tasks:
<Task: foo>
<Task: bar>
[2021-04-13 20:52:33-0500] INFO - prefect.FlowRunner | Beginning Flow run for 'ex'
[2021-04-13 20:52:33-0500] INFO - prefect.TaskRunner | Task 'foo': Starting task run...
Look! We're inside the wrapper.
1
[2021-04-13 20:52:34-0500] INFO - prefect.TaskRunner | Task 'foo': Finished task run for task with final state: 'Success'
[2021-04-13 20:52:34-0500] INFO - prefect.TaskRunner | Task 'bar': Starting task run...
2
[2021-04-13 20:52:34-0500] INFO - prefect.TaskRunner | Task 'bar': Finished task run for task with final state: 'Success'
[2021-04-13 20:52:34-0500] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
--------------------------
Now with replacement
Flow contains the following tasks:
<Task: Constant[function]>
<Task: Constant[function]>
[2021-04-13 20:52:34-0500] INFO - prefect.FlowRunner | Beginning Flow run for 'ex'
[2021-04-13 20:52:34-0500] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
In the first, your decorator is applied then the task decorator is applied
In the second, the task decorator has been applied then your decorator so they are just functions now instead of tasks and they are autoconverted to "Constant" tasks by Prefect but don't actually run anymore.
Here's a working example for what you want to do
Copy code
from prefect import Flow, task
from functools import wraps


def show_flow_tasks(flow):
    print("Flow contains the following tasks:")
    for flow_task in flow.tasks:
        print(f"{flow_task}")


def register_plugin(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        print("Look! We're inside the wrapper.")
        func(*args, **kwargs)

    return wrapper


@task
def foo(x):
    print(x)


@task
def bar(y):
    print(y)


with Flow("ex") as flow:
    foo(1)
    bar(2)


print("----- Flow tasks before replacement")
show_flow_tasks(flow)
print("-----")

for flow_task in flow.tasks:
    flow_task.run = register_plugin(flow_task.run)

print("----- Flow tasks after replacement")
show_flow_tasks(flow)
print("-----")

flow.run()
s
Thank you @Zanie that approach works perfectly.