Ed Burroughes
06/16/2022, 2:36 PMdef build_full_refresh_base(name, s3_dir_prefix, **flow_kwargs):
with Flow(name, **flow_kwargs) as flow:
repeat_task_output = repeat_task()
return flow, repeat_task_output
@contextmanager
def FullRefreshFlow(name, s3_dir_prefix, **flow_kwargs):
flow = build_full_refresh_base(name, s3_dir_prefix, **flow_kwargs)
try:
yield flow
finally:
print("do something")
if __name__ == "__main__":
@task(log_stdout=True)
def some_task(repeat_task_output):
print(repeat_task_output)
with FullRefreshFlow("hello", "some_dir") as (flow, repeat_task_output):
some_task(repeat_task_output)
flow.run()
Kevin Kho
06/16/2022, 2:52 PMwith Flow() as flow:
task_one()
you can add to it with:
with flow:
task_two()
but it becomes hard to set upstream dependencies defined in a different block.
I think the tricky part here is the s3_dir_prefix or repeat_task_output. Not sure I understand what info youโre trying to retain. Could you elaborate more on that?Ed Burroughes
06/16/2022, 3:04 PMKevin Kho
06/16/2022, 3:13 PMwith Flow() as flow:
a = task_one()
with flow:
task_two(a)
Ed Burroughes
06/16/2022, 3:15 PM@task()
def some_task():
print("hello world")
return "hello"
@task()
def some_task_two(var):
print(var)
def build_base_flow():
with Flow("hello") as flow:
var = some_task()
return flow, var
flow, var = build_base_flow()
with flow:
some_task_two(var)
if __name__ == "__main__":
flow.run()
Kevin Kho
06/16/2022, 3:17 PM