Josh
01/14/2021, 4:39 PM1. List S3 files
2. Filter to S3 files I want to copy
3. Download S3 files
4. Upload to GCP
I’m wondering if I can make a compound task that can parameterize the following steps. Or is the prefect idiom to make a dependent flow?
Part of the motivation is laziness, but if we ever change where/how we are starting our data ingestion, it’s be great not to have to change it in multiple placesZanie
from prefect import Flow, task, Parameter
@task
def add_task(x, add):
return x + add
@task
def multiply_task(y, multiply):
return y * multiply
@task
def divide_task(y, divide):
return y / divide
@task(log_stdout=True)
def display(x):
print(x)
def common_tasks(start_param, add_param, multiply_param):
intermediate = add_task(start_param, add_param)
return multiply_task(intermediate, multiply_param)
start = Parameter("start", default=0)
add = Parameter("add", default=10)
multiply = Parameter("multiply", default=2)
divide = Parameter("divide", default=2)
with Flow("just-common") as flow_a:
result = common_tasks(start, add, multiply)
display(result)
with Flow("common-with-division") as flow_b:
intermediate = common_tasks(start, add, multiply)
result = divide_task(intermediate, divide)
display(result)
with Flow("ignores-a-parameter") as flow_c:
result = common_tasks(100, add, multiply)
display(result)
flow_a.run()
flow_b.run()
flow_c.run()
Josh
01/14/2021, 7:51 PMcommon_tasks
a standalone class, would it subclass Task
as well?Zanie
common_tasks
within a flow context block and it calls each task which still has the context so they’re registered into your flow.common_tasks
were a Task
then you would not be able to call each subtask like that because Task.run()
isn’t called until flow runtime and then there’s not a context to register the subtasks. If you want to do that, you need to call each subtask’s run function e.g. divide_task.run(args)
and then you’ll have common_tasks
displayed in your DAG instead of your subtaskscommon_tasks
is just a utility that creates tasks for your dag, it is not itself a part of the dag)Josh
01/14/2021, 10:18 PMZanie
StartFlowRun
task or use a function (like I showed) to define a sub-flow within your flow contextJosh
01/15/2021, 5:34 PMZanie
Josh
01/15/2021, 8:54 PMZanie
StartFlowRun
super easily. I would recommend passing a location to the subflow as a parameter and having it write the result to that location.Jay Sundaram
03/16/2021, 10:54 PMZanie
Jay Sundaram
03/17/2021, 3:21 PMZanie
Jay Sundaram
03/18/2021, 9:06 PM