Marco Petrazzoli
01/28/2021, 10:03 AMemre
01/28/2021, 10:26 AMCreateConatiner(name="Do X")(command="ls")
. This will show as task “Do X” in the UI.Marco Petrazzoli
01/28/2021, 10:37 AMemre
01/28/2021, 1:27 PMCreateContainer
gets really close. It can create a container, run the provided command, and wait its execution without detaching on its own. This leaves getlogs as an extra task.
Is there a reason why you want to reduce your workflow to few and bigger tasks? because people usually try to do the inverse and have multiple small tasks.CreateContainer
and add whatever GetContainerLogs
does to the mix.
But if your concern is to have pieces that are tightly related to each other bundled together, I usually just leave them as seperate tasks and wrap them in a function, something like:
def run_container_get_logs(img, cmd):
container_id = CreateContainer(img, name="Create")(cmd)
container_logs = GetContainerLogs(name="GetLog")(container_id)
return container_logs
with Flow('F') as flow:
cmd_a_logs = run_container_get_logs(myimg, cmd_a)
cmd_b_logs = run_container_get_logs(myimg2, cmd_b)
This leverages prefects philosophy to have small units of tasks, while keeping the structure of the flow build step clean.Zanie
task.run()
explicitly — for example:from prefect import task, Flow
@task
def foo(x):
return x * 2
@task
def bar(y):
return y * 2
with Flow("example-no-rollup") as flow:
y = foo(5)
result = bar(y)
flow.run()
@task
def one_big_task(x):
y = foo.run(x)
return bar.run(y)
with Flow("example-rollup-task") as flow:
result = one_big_task(5)
flow.run()