Marco Petrazzoli
01/28/2021, 10:03 AMemre
01/28/2021, 10:26 AMCreateConatiner(name="Do X")(command="ls")
. This will show as task “Do X” in the UI.Marco Petrazzoli
01/28/2021, 10:37 AMMarco Petrazzoli
01/28/2021, 11:02 AMemre
01/28/2021, 1:27 PMCreateContainer
gets really close. It can create a container, run the provided command, and wait its execution without detaching on its own. This leaves getlogs as an extra task.
Is there a reason why you want to reduce your workflow to few and bigger tasks? because people usually try to do the inverse and have multiple small tasks.emre
01/28/2021, 1:33 PMCreateContainer
and add whatever GetContainerLogs
does to the mix.
But if your concern is to have pieces that are tightly related to each other bundled together, I usually just leave them as seperate tasks and wrap them in a function, something like:
def run_container_get_logs(img, cmd):
container_id = CreateContainer(img, name="Create")(cmd)
container_logs = GetContainerLogs(name="GetLog")(container_id)
return container_logs
with Flow('F') as flow:
cmd_a_logs = run_container_get_logs(myimg, cmd_a)
cmd_b_logs = run_container_get_logs(myimg2, cmd_b)
This leverages prefects philosophy to have small units of tasks, while keeping the structure of the flow build step clean.Zanie
task.run()
explicitly — for example:Zanie
from prefect import task, Flow
@task
def foo(x):
return x * 2
@task
def bar(y):
return y * 2
with Flow("example-no-rollup") as flow:
y = foo(5)
result = bar(y)
flow.run()
@task
def one_big_task(x):
y = foo.run(x)
return bar.run(y)
with Flow("example-rollup-task") as flow:
result = one_big_task(5)
flow.run()