I am running a self-host prefect server and am not...
# ask-community
t
I am running a self-host prefect server and am noticing strange cases where I am 'losing' logs and tasks in the UI. I am mapping over an iterable of length ~6200, and I can see the main flow start up, and then the tasks start mapping I see task logs in the terminal, but not in the UI. Strangely when I limit the size of the mapped iterable to ~10 to 100 iterms, evrything works as expected. Are there any suggestions around how best to use the
.map
of a task decorated function. I can easily batch up my large iterable, but I am more curious as to why the
.map
might be falling over.
Some more context. The iterable is a pandas from a
pandas
dataframe, where teh iterable is
dataframe.iterrows()
.
a
I can provide a little more context as well. Our initial implementation looked something like this
Copy code
all_things = []
for thing_type in ("foo", "bar", "baz"):
    things_one = task_one.map(
        row=dataframe.iterrows(),
        thing_type=unmapped(thing_type),
    )
    things_two = task_two.map(
        thing_one=things_one,
        thing_type=unmapped(thing_type),
    )
    all_things.extend(things_two)

results = task_three.map(
    thing_two=all_things,
)
concrete_results = [r.result() for r in results]
This caused the flow run to halt / freeze almost immediately (with eventual timeouts)
Our first attempt at batching looked like this
Copy code
# Attempted fix 1
_batch_size = 100
results = []
for i in range(0, len(dataframe), _batch_size):
    sub_df = dataframe[i: i + _batch_size]
    all_things = []
    for thing_type in ("foo", "bar", "baz"):
        things_one = task_one.map(
            row=sub_df.iterrows(),
            thing_type=unmapped(thing_type),
        )
        things_two = task_two.map(
            thing_one=things_one,
            thing_type=unmapped(thing_type),
        )
        all_things.extend(things_two)

    sub_results = task_three.map(
        thing_two=all_things,
    )
    results.extend(sub_results)

concrete_results = [r.result() for r in results]
This got more tasks submitted, but eventually still froze up
Our second attempt looks like
Copy code
# Attempted fix 2
_batch_size = 100
concrete_results = []
for i in range(0, len(dataframe), _batch_size):
    sub_df = dataframe[i : i + _batch_size]
    all_things = []
    for thing_type in ("foo", "bar", "baz"):
        things_one = task_one.map(
            row=sub_df.iterrows(),
            thing_type=unmapped(thing_type),
        )
        things_two = task_two.map(
            thing_one=things_one,
            thing_type=unmapped(thing_type),
        )
        all_things.extend(things_two)

    sub_results = task_three.map(
        thing_two=all_things,
    )
    concrete_results.extend([f.result() for f in sub_results])
Which is running, but seems much less efficient 😕
For reference I'm working with @Tim Galvin on this
As I'm thinking about this, I can see how the task number is blowing up quickly. With an iterable of about 6k, we get: task_one ~ 6k tasks task_two ~ 6k tasks Over three thing_types ~= 36k tasks task_three therefore is ~36k tasks Total ~=72k tasks
Ok - another interesting update. We switched from using
.map
to
.submit
within a for loop - like:
Copy code
results = []
for row in dataframe.iterrows():
    for thing_type in ("foo", "bar", "baz"):
        thing_one = task_one.submit(
            row=row,
            thing_type=thing_type,
        )
        thing_two = task_two.submit(
            thing_one=things_one,
            thing_type=thing_type,
        )
        result = task_three.submit(
            thing_two=thing_two,
        )
        results.append(result)
concrete_results = [r.result() for r in results]
And this seems to be progressing much better It'd be great to understand why the
.map
pattern was causing things to freeze up
t
And it has successfully completed with the
.submit
version. The
.map
version has completely borked itself. Any ideas?