Vitaly Shulgin
12/17/2020, 7:25 PMDylan
Zanie
Zanie
Zanie
import time
import random
import prefect
from prefect import Flow, task, Parameter
MAX_SIZE = 30
@task
def dynamic_list_of_tasks(window_size):
size = random.randint(window_size, MAX_SIZE)
size += size % window_size # Ensure it is divisable for the fixed window
return [i for i in range(size)]
@task
def fixed_window(tasks: list, window_size: int):
windowed = []
while tasks:
windowed.append(tasks[:window_size])
tasks = tasks[window_size:]
return windowed
@task
def display(i):
<http://prefect.context.logger.info|prefect.context.logger.info>(f"Processing {i}")
window_size = Parameter("window_size", default=2)
with Flow("windowed-tasks") as flow:
tasks = dynamic_list_of_tasks(window_size)
windowed_tasks = fixed_window(tasks, window_size)
display.map(windowed_tasks)
flow.run()
# flow.register("default")
Zanie
map
as this requires having a function take all of your list and repack it, I’m not sure how common it would be thoughZanie
tasks
may be clearer as items
Vitaly Shulgin
12/18/2020, 6:10 AMVitaly Shulgin
12/18/2020, 6:18 AMtask.map
, the above solution will work, thanks @ZanieZanie
prefect
repo to add this, mapping is complicated and there’s a lot to consider