Viktor Svekolkin
03/25/2020, 9:21 AMnicholas
import prefect
from prefect import Flow, task
from prefect.tasks.control_flow.conditional import switch
import random
@task
def get_data():
return random.random()
@task
def selector(x):
print(x)
return "branch1" if x > 0.5 else "branch2"
@task
def branch1(x):
# do something with metadata
print(x)
@task
def branch2(x):
# do something with metadata
print(x)
with Flow("Switch Flow") as flow:
s = selector(x=get_data)
switch(s, dict(branch1=branch1(get_data), branch2=branch2(get_data)))
flow.run()
Hopefully that helps answer your question. For some more insight, https://docs.prefect.io/core/task_library/control_flow.html#switch describes switch tasks and links to some more resources.
If you haven't seen it already, I'd encourage you to take a look at Chris' post about event driven workflows using AWS Lambda: https://medium.com/the-prefect-blog/event-driven-workflows-with-aws-lambda-2ef9d8cc8f1a!Viktor Svekolkin
03/25/2020, 3:23 PMimport prefect
from prefect import Flow, task
from prefect.tasks.control_flow.conditional import switch
import random
@task
def get_data():
return [random.random() for i in range(5)]
@task
def selector(x):
print(x)
return "branch1" if x > 0.5 else "branch2"
@task
def branch1(x):
# do something with metadata
print(x)
return x + 1
@task
def branch2(x):
# do something with metadata
print(x)
return x + 2
with Flow("Switch Flow") as flow:
d = get_data()
s = selector.map(d)
b1 = branch1.map(d)
b2 = branch2.map(d)
switch(s, dict(branch1=b1, branch2=b2))
b3 = branch2.map(b1)
flow.run()
nicholas
import prefect
from prefect import Flow, task
import random
@task
def get_data():
return [random.random() for i in range(5)]
@task
def filter1(x):
return list(filter(lambda n: n > 0.5, x))
@task
def filter2(x):
return list(filter(lambda n: n <= 0.5, x))
@task
def branch1(x):
# do something with metadata
print(x)
return x + 1
@task
def branch2(x):
# do something with metadata
print(x)
return x + 2
with Flow("Switch Flow") as flow:
d = get_data()
b1 = filter1(d)
b2 = filter2(d)
branch1.map(b1)
branch2.map(b2)
Viktor Svekolkin
03/26/2020, 9:18 AM