Thread
#prefect-community
    v

    Viktor Svekolkin

    2 years ago
    I have a following question. I have following pipeline: first task checks S3 bucket for user files, where are two kinds of files, which must be processed differently. First task generates list of dictionaries with metadata, e.g. {'file_type': 'a', 'file_uri':'s3://something'}, on which i plan to map branch selector task, which will be fed to switch operator, which selects appropriate downstream postprocessing task. But i stuck on how to pass metadata after switch operator to selected downstream task. Can you help me with this usecase?
    nicholas

    nicholas

    2 years ago
    Hi @Viktor Svekolkin! You can directly pass the data from your upstream metadata task to the tasks in your switch operator as well as to your switch condition task:
    import prefect
    from prefect import Flow, task
    from prefect.tasks.control_flow.conditional import switch
    import random
    
    
    @task
    def get_data():
        return random.random()
    
    
    @task
    def selector(x):
        print(x)
        return "branch1" if x > 0.5 else "branch2"
    
    
    @task
    def branch1(x):
        # do something with metadata
        print(x)
    
    
    @task
    def branch2(x):
        # do something with metadata
        print(x)
    
    
    with Flow("Switch Flow") as flow:
        s = selector(x=get_data)
        switch(s, dict(branch1=branch1(get_data), branch2=branch2(get_data)))
    
    
    flow.run()
    Hopefully that helps answer your question. For some more insight, https://docs.prefect.io/core/task_library/control_flow.html#switch describes switch tasks and links to some more resources. If you haven't seen it already, I'd encourage you to take a look at Chris' post about event driven workflows using AWS Lambda: https://medium.com/the-prefect-blog/event-driven-workflows-with-aws-lambda-2ef9d8cc8f1a!
    v

    Viktor Svekolkin

    2 years ago
    Thanks for reply! Is this behavior compatible with mapping over list of object? E.g get_data returns list and i use selector.map(get_data)?
    Typcally, we recieve data in bulk, so i figured, that mapping over individual entries would be more appropriate strategy, in order to parallelize execution.
    When i try to modify your code to be mappable like this:
    import prefect
    from prefect import Flow, task
    from prefect.tasks.control_flow.conditional import switch
    import random
    
    @task
    def get_data():
        return [random.random() for i in range(5)]
    
    @task
    def selector(x):
        print(x)
        return "branch1" if x > 0.5 else "branch2"
    
    @task
    def branch1(x):
        # do something with metadata
        print(x)
        return x + 1
        
    @task
    def branch2(x):
        # do something with metadata
        print(x)
        return x + 2
        
    with Flow("Switch Flow") as flow:
        
        d = get_data()
        
        s = selector.map(d)
        
        b1 = branch1.map(d)
        b2 = branch2.map(d)
        
        switch(s, dict(branch1=b1, branch2=b2))
        
        b3 = branch2.map(b1)
        
    flow.run()
    For some reason downstream tasks are set to 'Skipped'
    nicholas

    nicholas

    2 years ago
    Hm I think the switch isn't ideal here then since it may not properly map those results. I'd suggest instead to filter the data upstream and pass the filtered results to the proper resulting branches. So something like this:
    import prefect
    from prefect import Flow, task
    import random
    
    
    @task
    def get_data():
        return [random.random() for i in range(5)]
    
    
    @task
    def filter1(x):
        return list(filter(lambda n: n > 0.5, x))
    
    
    @task
    def filter2(x):
        return list(filter(lambda n: n <= 0.5, x))
    
    
    @task
    def branch1(x):
        # do something with metadata
        print(x)
        return x + 1
    
    
    @task
    def branch2(x):
        # do something with metadata
        print(x)
        return x + 2
    
    
    with Flow("Switch Flow") as flow:
        d = get_data()
    
        b1 = filter1(d)
        b2 = filter2(d)
    
        branch1.map(b1)
        branch2.map(b2)
    Which would give you this schematic:
    v

    Viktor Svekolkin

    2 years ago
    Ok, thanks, got it!