Hey all, currently we have some reusable tasks that at the minute are getting repeated alot in the code. I was wondering if it's possible to do something like this (currently doesn't work, but wondering if there is way round it), we're using prefect 1:
Copy code
def build_full_refresh_base(name, s3_dir_prefix, **flow_kwargs):
with Flow(name, **flow_kwargs) as flow:
repeat_task_output = repeat_task()
return flow, repeat_task_output
@contextmanager
def FullRefreshFlow(name, s3_dir_prefix, **flow_kwargs):
flow = build_full_refresh_base(name, s3_dir_prefix, **flow_kwargs)
try:
yield flow
finally:
print("do something")
if __name__ == "__main__":
@task(log_stdout=True)
def some_task(repeat_task_output):
print(repeat_task_output)
with FullRefreshFlow("hello", "some_dir") as (flow, repeat_task_output):
some_task(repeat_task_output)
flow.run()
k
Kevin Kho
06/16/2022, 2:52 PM
So if you do:
Copy code
with Flow() as flow:
task_one()
you can add to it with:
Copy code
with flow:
task_two()
but it becomes hard to set upstream dependencies defined in a different block.
I think the tricky part here is the s3_dir_prefix or repeat_task_output. Not sure I understand what info youโre trying to retain. Could you elaborate more on that?
e
Ed Burroughes
06/16/2022, 3:04 PM
Ahh cool, sorry didn't make that very clear. So it's the output of the task in the base flow i.e. task_one in your example, that would then need to be supplied to task_two. Hopefully that's clearer. No worries if it's not possible ๐
Ed Burroughes
06/16/2022, 3:12 PM
Ok I got it behaving how I want thank you for the pointer in the right direction, much appreciated ๐
k
Kevin Kho
06/16/2022, 3:13 PM
Ah I know what you are saying. You want:
Copy code
with Flow() as flow:
a = task_one()
with flow:
task_two(a)
Kevin Kho
06/16/2022, 3:13 PM
You got that working? O.o
Kevin Kho
06/16/2022, 3:13 PM
Can I see?
e
Ed Burroughes
06/16/2022, 3:15 PM
Sure classic case of over complicating it ๐คฆโโ๏ธ
Copy code
@task()
def some_task():
print("hello world")
return "hello"
@task()
def some_task_two(var):
print(var)
def build_base_flow():
with Flow("hello") as flow:
var = some_task()
return flow, var
flow, var = build_base_flow()
with flow:
some_task_two(var)
if __name__ == "__main__":
flow.run()
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.