Ed Burroughes

    Ed Burroughes

    3 months ago
    Hey all, currently we have some reusable tasks that at the minute are getting repeated alot in the code. I was wondering if it's possible to do something like this (currently doesn't work, but wondering if there is way round it), we're using prefect 1:
    def build_full_refresh_base(name, s3_dir_prefix, **flow_kwargs):
        with Flow(name, **flow_kwargs) as flow:
    
            repeat_task_output = repeat_task()
        return flow, repeat_task_output
    
    
    @contextmanager
    def FullRefreshFlow(name, s3_dir_prefix, **flow_kwargs):
        flow = build_full_refresh_base(name, s3_dir_prefix, **flow_kwargs)
        try:
            yield flow
        finally:
            print("do something")
    
    
    if __name__ == "__main__":
        @task(log_stdout=True)
        def some_task(repeat_task_output):
            print(repeat_task_output)
    
        with FullRefreshFlow("hello", "some_dir") as (flow, repeat_task_output):
            some_task(repeat_task_output)
    
        flow.run()
    Kevin Kho

    Kevin Kho

    3 months ago
    So if you do:
    with Flow() as flow:
        task_one()
    you can add to it with:
    with flow:
        task_two()
    but it becomes hard to set upstream dependencies defined in a different block. I think the tricky part here is the s3_dir_prefix or repeat_task_output. Not sure I understand what info youโ€™re trying to retain. Could you elaborate more on that?
    Ed Burroughes

    Ed Burroughes

    3 months ago
    Ahh cool, sorry didn't make that very clear. So it's the output of the task in the base flow i.e. task_one in your example, that would then need to be supplied to task_two. Hopefully that's clearer. No worries if it's not possible ๐Ÿ™‚
    Ok I got it behaving how I want thank you for the pointer in the right direction, much appreciated ๐Ÿ™‚
    Kevin Kho

    Kevin Kho

    3 months ago
    Ah I know what you are saying. You want:
    with Flow() as flow:
        a = task_one()
    
    with flow:
        task_two(a)
    You got that working? O.o
    Can I see?
    Ed Burroughes

    Ed Burroughes

    3 months ago
    Sure classic case of over complicating it ๐Ÿคฆโ€โ™‚๏ธ
    @task()
    def some_task():
        print("hello world")
        return "hello"
    
    
    @task()
    def some_task_two(var):
        print(var)
    
    
    def build_base_flow():
        with Flow("hello") as flow:
            var = some_task()
        return flow, var
    
    
    flow, var = build_base_flow()
    with flow:
        some_task_two(var)
    
    if __name__ == "__main__":
        flow.run()
    Thanks again ๐Ÿ™‚
    Kevin Kho

    Kevin Kho

    3 months ago
    Ah gotcha. Thanks!