https://prefect.io logo
#prefect-community
Title
# prefect-community
v

Viktor Svekolkin

03/25/2020, 9:21 AM
I have a following question. I have following pipeline: first task checks S3 bucket for user files, where are two kinds of files, which must be processed differently. First task generates list of dictionaries with metadata, e.g. {'file_type': 'a', 'file_uri':'s3://something'}, on which i plan to map branch selector task, which will be fed to switch operator, which selects appropriate downstream postprocessing task. But i stuck on how to pass metadata after switch operator to selected downstream task. Can you help me with this usecase?
n

nicholas

03/25/2020, 3:02 PM
Hi @Viktor Svekolkin! You can directly pass the data from your upstream metadata task to the tasks in your switch operator as well as to your switch condition task:
Copy code
import prefect
from prefect import Flow, task
from prefect.tasks.control_flow.conditional import switch
import random


@task
def get_data():
    return random.random()


@task
def selector(x):
    print(x)
    return "branch1" if x > 0.5 else "branch2"


@task
def branch1(x):
    # do something with metadata
    print(x)


@task
def branch2(x):
    # do something with metadata
    print(x)


with Flow("Switch Flow") as flow:
    s = selector(x=get_data)
    switch(s, dict(branch1=branch1(get_data), branch2=branch2(get_data)))


flow.run()
Hopefully that helps answer your question. For some more insight, https://docs.prefect.io/core/task_library/control_flow.html#switch describes switch tasks and links to some more resources. If you haven't seen it already, I'd encourage you to take a look at Chris' post about event driven workflows using AWS Lambda: https://medium.com/the-prefect-blog/event-driven-workflows-with-aws-lambda-2ef9d8cc8f1a!
v

Viktor Svekolkin

03/25/2020, 3:23 PM
Thanks for reply! Is this behavior compatible with mapping over list of object? E.g get_data returns list and i use selector.map(get_data)?
Typcally, we recieve data in bulk, so i figured, that mapping over individual entries would be more appropriate strategy, in order to parallelize execution.
When i try to modify your code to be mappable like this:
Copy code
import prefect
from prefect import Flow, task
from prefect.tasks.control_flow.conditional import switch
import random

@task
def get_data():
    return [random.random() for i in range(5)]

@task
def selector(x):
    print(x)
    return "branch1" if x > 0.5 else "branch2"

@task
def branch1(x):
    # do something with metadata
    print(x)
    return x + 1
    
@task
def branch2(x):
    # do something with metadata
    print(x)
    return x + 2
    
with Flow("Switch Flow") as flow:
    
    d = get_data()
    
    s = selector.map(d)
    
    b1 = branch1.map(d)
    b2 = branch2.map(d)
    
    switch(s, dict(branch1=b1, branch2=b2))
    
    b3 = branch2.map(b1)
    
flow.run()
For some reason downstream tasks are set to 'Skipped'
n

nicholas

03/25/2020, 6:07 PM
Hm I think the switch isn't ideal here then since it may not properly map those results. I'd suggest instead to filter the data upstream and pass the filtered results to the proper resulting branches. So something like this:
Copy code
import prefect
from prefect import Flow, task
import random


@task
def get_data():
    return [random.random() for i in range(5)]


@task
def filter1(x):
    return list(filter(lambda n: n > 0.5, x))


@task
def filter2(x):
    return list(filter(lambda n: n <= 0.5, x))


@task
def branch1(x):
    # do something with metadata
    print(x)
    return x + 1


@task
def branch2(x):
    # do something with metadata
    print(x)
    return x + 2


with Flow("Switch Flow") as flow:
    d = get_data()

    b1 = filter1(d)
    b2 = filter2(d)

    branch1.map(b1)
    branch2.map(b2)
Which would give you this schematic:
v

Viktor Svekolkin

03/26/2020, 9:18 AM
Ok, thanks, got it!
2 Views