Hi, I'm rewriting some ETL jobs from bonobo based ...
# ask-community
v
Hi, I'm rewriting some ETL jobs from bonobo based implementation to prefect, question, does Prefect support functionality like FixedWindow, to limit number of input elements?
d
Hi @Vitaly Shulgin, I don’t believe we have a way to limit the number of input elements in the way that bonobo does. Can you describe what you’re trying to achieve in a bit more detail and I’ll see if I can help you find a “Prefect” solution
z
For reference for anyone else, this is packing elements of a list into tuples of a fixed size e.g. https://github.com/python-bonobo/bonobo/blob/develop/bonobo/nodes/basics.py#L173
I’ll try to do an example of this
Copy code
import time
import random

import prefect
from prefect import Flow, task, Parameter

MAX_SIZE = 30


@task
def dynamic_list_of_tasks(window_size):
    size = random.randint(window_size, MAX_SIZE)
    size += size % window_size  # Ensure it is divisable for the fixed window
    return [i for i in range(size)]


@task
def fixed_window(tasks: list, window_size: int):
    windowed = []
    while tasks:
        windowed.append(tasks[:window_size])
        tasks = tasks[window_size:]
    return windowed


@task
def display(i):
    <http://prefect.context.logger.info|prefect.context.logger.info>(f"Processing {i}")


window_size = Parameter("window_size", default=2)

with Flow("windowed-tasks") as flow:
    tasks = dynamic_list_of_tasks(window_size)
    windowed_tasks = fixed_window(tasks, window_size)
    display.map(windowed_tasks)

flow.run()
# flow.register("default")
This may be interesting functionality to include directly into
map
as this requires having a function take all of your list and repack it, I’m not sure how common it would be though
Where I refer to
tasks
may be clearer as
items
v
🙇 Thanks
1
Hello @Dylan, need to pump json data from REST API to another, and last one has batch size limit, I need to avoid limit excess, actually, this is very common scenario when batch size is needed, nice to have in prefect
task.map
, the above solution will work, thanks @Zanie
z
Feel free to open an issue in the
prefect
repo to add this, mapping is complicated and there’s a lot to consider