https://prefect.io logo
m

Marco Petrazzoli

01/28/2021, 10:03 AM
Hi, I would like to launch a flow where each task is a docker image(with a CMD altredy set, just need some volume mount). For me it's important that each task in UI results as function name and not like each docker task name.(https://docs.prefect.io/api/latest/tasks/docker.html)
e

emre

01/28/2021, 10:26 AM
You can give a custom name to any task, including docker tasks:
CreateConatiner(name="Do X")(command="ls")
. This will show as task “Do X” in the UI.
m

Marco Petrazzoli

01/28/2021, 10:37 AM
Thanks. Going to test with that new info!
There is a way to group these Docker task under one name? Like start->wait status->get logs under a name "ImageExecutor". I wanna handle the whole docker image under on task.
e

emre

01/28/2021, 1:27 PM
doesn’t seem like it, but
CreateContainer
gets really close. It can create a container, run the provided command, and wait its execution without detaching on its own. This leaves getlogs as an extra task. Is there a reason why you want to reduce your workflow to few and bigger tasks? because people usually try to do the inverse and have multiple small tasks.
So you can either subclass
CreateContainer
and add whatever
GetContainerLogs
does to the mix. But if your concern is to have pieces that are tightly related to each other bundled together, I usually just leave them as seperate tasks and wrap them in a function, something like:
Copy code
def run_container_get_logs(img, cmd):
    container_id = CreateContainer(img, name="Create")(cmd)
    container_logs = GetContainerLogs(name="GetLog")(container_id)
    return container_logs

with Flow('F') as flow:
    cmd_a_logs = run_container_get_logs(myimg, cmd_a)
    cmd_b_logs = run_container_get_logs(myimg2, cmd_b)
This leverages prefects philosophy to have small units of tasks, while keeping the structure of the flow build step clean.
z

Zanie

01/28/2021, 3:47 PM
Just to note, you can group tasks into a single one by calling
task.run()
explicitly — for example:
Copy code
from prefect import task, Flow


@task
def foo(x):
    return x * 2


@task
def bar(y):
    return y * 2


with Flow("example-no-rollup") as flow:
    y = foo(5)
    result = bar(y)

flow.run()


@task
def one_big_task(x):
    y = foo.run(x)
    return bar.run(y)


with Flow("example-rollup-task") as flow:
    result = one_big_task(5)

flow.run()
2 Views