Fabrice Toussaint
05/31/2021, 8:46 AMKevin Kho
Fabrice Toussaint
05/31/2021, 4:08 PM@task
def offset_for_number_of_pages(pages: int = 1):
if pages < 1:
raise ValueError('Cannot be lower than 1!')
else:
return [100 * n for n in range(pages)]
@task
def get_items(offset, url):
"""Retrieves the orders response of the API request."""
request = requests.get(url, offset=offset)
return request.json
@task
def do_something(item):
return item + 1
def execute_for_item(item):
return do_something(item)
with Flow as flow:
pages = Parameter('pages', default=1)
offsets = offset_for_number_of_pages(pages=pages) #[0, 100, 200, 300, 500]
items = get_items.map(offset=offsets, url=unmapped(url)) #[[0..100], [100..200], [200..300], [300..400]
results = apply_map(execute_for_item, item=flatten(items)) #[0..400]
Fabrice Toussaint
05/31/2021, 4:09 PMFabrice Toussaint
05/31/2021, 4:28 PMfrom prefect import task, Flow, apply_map, flatten, unmapped, Parameter
@task
def generate_list(amount: int = 1):
if amount < 1:
raise ValueError('Cannot be lower than 1!')
else:
return [100 * n for n in range(amount)]
@task
def get_items(item):
"""Retrieves the orders response of the API request."""
return [0, 1, 2, 3, item]
@task
def add_one(item):
print(item)
return item + 1
@task
def add_two(item):
return item + 2
def execute_for_item(item):
item_plus_one = add_one(item)
return add_two(item_plus_one)
with Flow("Test") as flow:
amount = Parameter('amount', default=1)
l = generate_list(amount=amount) # [0, 100, 200, 300, 500]
items = get_items.map(item=l) # [[0..100], [100..200], [200..300], [300..400]
results = apply_map(execute_for_item, item=flatten(items)) # [0..400]
flow.run()
Fabrice Toussaint
05/31/2021, 4:28 PMFabrice Toussaint
05/31/2021, 4:29 PMFabrice Toussaint
05/31/2021, 4:35 PMKevin Kho
Fabrice Toussaint
05/31/2021, 6:17 PMFabrice Toussaint
05/31/2021, 6:17 PMFabrice Toussaint
06/01/2021, 12:07 PMKevin Kho
Kevin Kho
Kevin Kho
from prefect import Flow, task, flatten, case, apply_map
import prefect
@task
def generate_numbers():
return [[1],[2,3],[4,5,6]]
@task
def up(x):
return x + 1
@task
def is_even(x):
return x % 2 ==0
def up_or_down(x):
cond = is_even(x)
with case(cond, True):
res = up(x)
return res
@task
def simple_flatten(x):
return [num for elem in x for num in elem]
with Flow('test') as flow:
x = generate_numbers()
y = simple_flatten(x)
apply_map(up_or_down, y)
flow.run()
Fabrice Toussaint
06/01/2021, 2:01 PMFabrice Toussaint
06/01/2021, 2:01 PMKevin Kho