Is there a concept of a task of tasks? I have a fe...
# prefect-community
j
Is there a concept of a task of tasks? I have a few ETL flows that all start with similar steps of
Copy code
1. List S3 files
2. Filter to S3 files I want to copy
3. Download S3 files
4. Upload to GCP
I’m wondering if I can make a compound task that can parameterize the following steps. Or is the prefect idiom to make a dependent flow? Part of the motivation is laziness, but if we ever change where/how we are starting our data ingestion, it’s be great not to have to change it in multiple places
z
Example incoming 🙂
There are several ways to go about this — curious if this is what you’re looking for
Copy code
from prefect import Flow, task, Parameter


@task
def add_task(x, add):
    return x + add


@task
def multiply_task(y, multiply):
    return y * multiply


@task
def divide_task(y, divide):
    return y / divide


@task(log_stdout=True)
def display(x):
    print(x)


def common_tasks(start_param, add_param, multiply_param):
    intermediate = add_task(start_param, add_param)
    return multiply_task(intermediate, multiply_param)


start = Parameter("start", default=0)
add = Parameter("add", default=10)
multiply = Parameter("multiply", default=2)
divide = Parameter("divide", default=2)

with Flow("just-common") as flow_a:
    result = common_tasks(start, add, multiply)
    display(result)

with Flow("common-with-division") as flow_b:
    intermediate = common_tasks(start, add, multiply)
    result = divide_task(intermediate, divide)
    display(result)

with Flow("ignores-a-parameter") as flow_c:
    result = common_tasks(100, add, multiply)
    display(result)

flow_a.run()
flow_b.run()
flow_c.run()
j
That’s very helpful. Thank you.
If I were to make
common_tasks
a standalone class, would it subclass
Task
as well?
z
That’s a bit of a different idea. Here, I’m calling
common_tasks
within a flow context block and it calls each task which still has the context so they’re registered into your flow.
If
common_tasks
were a
Task
then you would not be able to call each subtask like that because
Task.run()
isn’t called until flow runtime and then there’s not a context to register the subtasks. If you want to do that, you need to call each subtask’s run function e.g.
divide_task.run(args)
and then you’ll have
common_tasks
displayed in your DAG instead of your subtasks
(with my example
common_tasks
is just a utility that creates tasks for your dag, it is not itself a part of the dag)
j
Is there an idiom of a sub-flow?
z
Not right now, you can use the
StartFlowRun
task or use a function (like I showed) to define a sub-flow within your flow context
j
How do you handle the return values of the sub-flow from a task to the parent flow?
z
Using the example I gave?
👀 1
j
the second example where you have a task kickoff a sub-flow
z
You won’t be able to get the results from
StartFlowRun
super easily. I would recommend passing a location to the subflow as a parameter and having it write the result to that location.
Otherwise you can query the API afterwards to get information about result locations, but that seems harder to me.
j
@Zanie: can you please point me to documentation and an example that outlines the following use case? Where : • the job in B executes in one container and • the job in C executes in a different container (different image) and • the job in D executes in yet a another different container (using yet another different image) thank you in advance
z
Have you considered just using the container tasks in the task library within a single flow? e.g. https://docs.prefect.io/api/latest/tasks/docker.html#startcontainer and https://docs.prefect.io/api/latest/tasks/kubernetes.html#createnamespacedjob There are no examples of multi-flow data passing in the documentation right now because we do not have an official pattern.
j
@Zanie, thank you. Can you show me with an mocked example (code) how the single flow would accomplish this? Pls note that the outputs of A should be provided as inputs for B, C and D. Here is that flow diagram (corrected):
Or is it best practice to just have job/task A write output (calculated/generated results) to a storage location e.g.: S3 bucket and then have downstream jobs/tasks B, C and D read from that bucket? Would still need a way for the four tasks to coordinate the bucket location though.
z
You can pass a shared parameter to the tasks with the location that data should be written to then pulled from. I'm on vacation so I can't give a code example, but we often will not fulfill such a request in slack support anyway. If you want to open an issue in the prefect repo to add such an example to the docs perhaps someone else will appreciate it as well!
j
thank you for the follow up even while you're on vacation! 🙂 i've submitted the new issue per your suggestion.