Marc Lipoff
01/28/2021, 2:54 PMZanie
import time
import random
import prefect
from prefect import Flow, task, Parameter
MAX_SIZE = 30
@task
def dynamic_list_of_tasks(window_size):
size = random.randint(window_size, MAX_SIZE)
size += size % window_size # Ensure it is divisable for the fixed window
return [i for i in range(size)]
@task
def fixed_window(tasks: list, window_size: int):
windowed = []
while tasks:
windowed.append(tasks[:window_size])
tasks = tasks[window_size:]
return windowed
@task
def display(i):
<http://prefect.context.logger.info|prefect.context.logger.info>(f"Processing {i}")
window_size = Parameter("window_size", default=2)
with Flow("windowed-tasks") as flow:
tasks = dynamic_list_of_tasks(window_size)
windowed_tasks = fixed_window(tasks, window_size)
display.map(windowed_tasks)
flow.run()
# flow.register("default")
Marc Lipoff
01/28/2021, 5:08 PM