Hi, I'm rewriting some ETL jobs from bonobo based implementation to prefect, question, does Prefect support functionality like FixedWindow, to limit number of input elements?
d
Dylan
12/17/2020, 7:37 PM
Hi @Vitaly Shulgin,
I don’t believe we have a way to limit the number of input elements in the way that bonobo does. Can you describe what you’re trying to achieve in a bit more detail and I’ll see if I can help you find a “Prefect” solution
import time
import random
import prefect
from prefect import Flow, task, Parameter
MAX_SIZE = 30
@task
def dynamic_list_of_tasks(window_size):
size = random.randint(window_size, MAX_SIZE)
size += size % window_size # Ensure it is divisable for the fixed window
return [i for i in range(size)]
@task
def fixed_window(tasks: list, window_size: int):
windowed = []
while tasks:
windowed.append(tasks[:window_size])
tasks = tasks[window_size:]
return windowed
@task
def display(i):
<http://prefect.context.logger.info|prefect.context.logger.info>(f"Processing {i}")
window_size = Parameter("window_size", default=2)
with Flow("windowed-tasks") as flow:
tasks = dynamic_list_of_tasks(window_size)
windowed_tasks = fixed_window(tasks, window_size)
display.map(windowed_tasks)
flow.run()
# flow.register("default")
Zanie
12/17/2020, 7:53 PM
This may be interesting functionality to include directly into
map
as this requires having a function take all of your list and repack it, I’m not sure how common it would be though
Zanie
12/17/2020, 7:54 PM
Where I refer to
tasks
may be clearer as
items
v
Vitaly Shulgin
12/18/2020, 6:10 AM
🙇 Thanks
✅ 1
Vitaly Shulgin
12/18/2020, 6:18 AM
Hello @Dylan, need to pump json data from REST API to another, and last one has batch size limit, I need to avoid limit excess, actually, this is very common scenario when batch size is needed, nice to have in prefect
task.map
, the above solution will work, thanks @Zanie
z
Zanie
12/18/2020, 7:13 PM
Feel free to open an issue in the
prefect
repo to add this, mapping is complicated and there’s a lot to consider
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.