Sean Harkins
04/11/2021, 11:17 PMWorkerPlugins
). To test this initially I applied our decorator to a test task
def register_plugin(func):
@wraps(func)
def wrapper(*args, **kwargs):
client = distributed.get_client()
plugin = PipInstall(packages=["xarray"])
client.register_worker_plugin(plugin)
result = func(*args, **kwargs)
return result
return wrapper
@task
@register_plugin
def say_hello():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Hello, Cloud")
return "hello result"
This works as expected and the Dask worker logs show the plugin use.
[2021-04-11 23:11:49+0000] INFO - prefect.CloudTaskRunner | Task 'say_hello': Starting task run...
distributed.worker - INFO - Starting Worker plugin pip
distributed.diagnostics.plugin - INFO - Pip installing the following packages: ['xarray']
[2021-04-11 23:11:50+0000] INFO - prefect.say_hello | Hello, Cloud
[2021-04-11 23:11:50+0000] INFO - prefect.CloudTaskRunner | Task 'say_hello': Finished task run for task with final state: 'Success'
However, we don’t have access to the underlying tasks so instead we need to access them from an existing flow, wrap them and replace them within the flow prior to flow registration with
for flow_task in flow.tasks:
wrapped_task = register_plugin(flow_task)
flow.replace(flow_task, wrapped_task)
But using this approach seems to alter the task’s execution as the worker logs do not report the plugin use, or the Prefect logger
statements.
distributed.core - INFO - Starting established connection
[2021-04-11 22:43:26+0000] INFO - prefect.CloudTaskRunner | Task 'Constant[function]': Starting task run...
[2021-04-11 22:43:27+0000] INFO - prefect.CloudTaskRunner | Task 'Constant[function]': Finished task run for task with final state: 'Success'
Am I missing something obvious in my task wrapping approach and replacement in the flow? Is there a better approach for accomplishing this?Zanie
However, we don’t have access to the underlying tasks so instead we need to access them from an existing flow, wrap them and replace them within the flow prior to flow registration with
Zanie
Sean Harkins
04/12/2021, 7:57 PMprefect
dependencies. You can have a look here for about more about the project approach https://github.com/pangeo-forge/roadmap. Because scientists will develop vanilla Python without prefect
deps there is a later step where “recipes” can be executed with different orchestrators (Prefect) in this case. So part of our CI pipeline transforms this generic Python code into Prefect Tasks in a Flow using https://github.com/pangeo-data/rechunker/blob/master/rechunker/executors/prefect.py#L42. Ideally, this library can operate independently and not need any specialized decorators for managing Dask worker dependencies.Sean Harkins
04/12/2021, 8:02 PMZanie
Sean Harkins
04/13/2021, 5:41 PMZanie
from prefect import Flow, task
from functools import wraps
def show_flow_tasks(flow):
print("Flow contains the following tasks:")
for flow_task in flow.tasks:
print(f"{flow_task}")
def register_plugin(func):
@wraps(func)
def wrapper(*args, **kwargs):
print("Look! We're inside the wrapper.")
func(*args, **kwargs)
return wrapper
@task
@register_plugin
def foo(x):
print(x)
@task
def bar(y):
print(y)
with Flow("ex") as flow:
foo(1)
bar(2)
show_flow_tasks(flow)
flow.run()
print("--------------------------")
print("Now with replacement")
for flow_task in flow.tasks:
wrapped_task = register_plugin(flow_task)
flow.replace(flow_task, wrapped_task)
show_flow_tasks(flow)
flow.run()
Zanie
❯ python example-decorated-tasks.py
Flow contains the following tasks:
<Task: foo>
<Task: bar>
[2021-04-13 20:52:33-0500] INFO - prefect.FlowRunner | Beginning Flow run for 'ex'
[2021-04-13 20:52:33-0500] INFO - prefect.TaskRunner | Task 'foo': Starting task run...
Look! We're inside the wrapper.
1
[2021-04-13 20:52:34-0500] INFO - prefect.TaskRunner | Task 'foo': Finished task run for task with final state: 'Success'
[2021-04-13 20:52:34-0500] INFO - prefect.TaskRunner | Task 'bar': Starting task run...
2
[2021-04-13 20:52:34-0500] INFO - prefect.TaskRunner | Task 'bar': Finished task run for task with final state: 'Success'
[2021-04-13 20:52:34-0500] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
--------------------------
Now with replacement
Flow contains the following tasks:
<Task: Constant[function]>
<Task: Constant[function]>
[2021-04-13 20:52:34-0500] INFO - prefect.FlowRunner | Beginning Flow run for 'ex'
[2021-04-13 20:52:34-0500] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Zanie
Zanie
Zanie
Zanie
from prefect import Flow, task
from functools import wraps
def show_flow_tasks(flow):
print("Flow contains the following tasks:")
for flow_task in flow.tasks:
print(f"{flow_task}")
def register_plugin(func):
@wraps(func)
def wrapper(*args, **kwargs):
print("Look! We're inside the wrapper.")
func(*args, **kwargs)
return wrapper
@task
def foo(x):
print(x)
@task
def bar(y):
print(y)
with Flow("ex") as flow:
foo(1)
bar(2)
print("----- Flow tasks before replacement")
show_flow_tasks(flow)
print("-----")
for flow_task in flow.tasks:
flow_task.run = register_plugin(flow_task.run)
print("----- Flow tasks after replacement")
show_flow_tasks(flow)
print("-----")
flow.run()
Sean Harkins
04/14/2021, 3:15 PM