Hey all, currently we have some reusable tasks th...
# prefect-community
e
Hey all, currently we have some reusable tasks that at the minute are getting repeated alot in the code. I was wondering if it's possible to do something like this (currently doesn't work, but wondering if there is way round it), we're using prefect 1:
Copy code
def build_full_refresh_base(name, s3_dir_prefix, **flow_kwargs):
    with Flow(name, **flow_kwargs) as flow:

        repeat_task_output = repeat_task()
    return flow, repeat_task_output


@contextmanager
def FullRefreshFlow(name, s3_dir_prefix, **flow_kwargs):
    flow = build_full_refresh_base(name, s3_dir_prefix, **flow_kwargs)
    try:
        yield flow
    finally:
        print("do something")


if __name__ == "__main__":
    @task(log_stdout=True)
    def some_task(repeat_task_output):
        print(repeat_task_output)

    with FullRefreshFlow("hello", "some_dir") as (flow, repeat_task_output):
        some_task(repeat_task_output)

    flow.run()
k
So if you do:
Copy code
with Flow() as flow:
    task_one()
you can add to it with:
Copy code
with flow:
    task_two()
but it becomes hard to set upstream dependencies defined in a different block. I think the tricky part here is the s3_dir_prefix or repeat_task_output. Not sure I understand what info youโ€™re trying to retain. Could you elaborate more on that?
e
Ahh cool, sorry didn't make that very clear. So it's the output of the task in the base flow i.e. task_one in your example, that would then need to be supplied to task_two. Hopefully that's clearer. No worries if it's not possible ๐Ÿ™‚
Ok I got it behaving how I want thank you for the pointer in the right direction, much appreciated ๐Ÿ™‚
k
Ah I know what you are saying. You want:
Copy code
with Flow() as flow:
    a = task_one()

with flow:
    task_two(a)
You got that working? O.o
Can I see?
e
Sure classic case of over complicating it ๐Ÿคฆโ€โ™‚๏ธ
Copy code
@task()
def some_task():
    print("hello world")
    return "hello"


@task()
def some_task_two(var):
    print(var)


def build_base_flow():
    with Flow("hello") as flow:
        var = some_task()
    return flow, var


flow, var = build_base_flow()
with flow:
    some_task_two(var)

if __name__ == "__main__":
    flow.run()
Thanks again ๐Ÿ™‚
k
Ah gotcha. Thanks!