What is the best way to pass non-serializable obje...
# prefect-community
m
What is the best way to pass non-serializable objects into tasks? For example, I have a database connection or a sftp connection that I want to reuse. The use is that I have many files I need to write to a sftp, and want to use mapping functionality. But creating many connections is costly (and it seems that this sftp server is limiting the number of connections)
z
Hi @Marc Lipoff — this is not a solved problem yet. There’s an open PR at https://github.com/PrefectHQ/prefect/pull/3139 to introduce caching for things like this. It seems like your SFTP server won’t be able to handle the number of simultaneous uploads you’re talking about though (even if they shared connection objects). You could consider batching your tasks, first class support for this is planned but here’s an example where we have a “window” size
Copy code
import time
import random

import prefect
from prefect import Flow, task, Parameter

MAX_SIZE = 30


@task
def dynamic_list_of_tasks(window_size):
    size = random.randint(window_size, MAX_SIZE)
    size += size % window_size  # Ensure it is divisable for the fixed window
    return [i for i in range(size)]


@task
def fixed_window(tasks: list, window_size: int):
    windowed = []
    while tasks:
        windowed.append(tasks[:window_size])
        tasks = tasks[window_size:]
    return windowed


@task
def display(i):
    <http://prefect.context.logger.info|prefect.context.logger.info>(f"Processing {i}")


window_size = Parameter("window_size", default=2)

with Flow("windowed-tasks") as flow:
    tasks = dynamic_list_of_tasks(window_size)
    windowed_tasks = fixed_window(tasks, window_size)
    display.map(windowed_tasks)

flow.run()
# flow.register("default")
m
that makes sense. thanks
@Zanie for what its worth, this would be a very useful feature for us. just want to +1 the PR.