Hey! I was wondering if there's a simple way to do...
# ask-community
h
Hey! I was wondering if there's a simple way to do windowed mapping. Imagine the first task is a mapping with outputs
[1, 2, 3, 4]
. In different points I want to do overlapping and non-overlapping windows. So, 4 mapped tasks with inputs
[(1, 2), (2, 3), (3, 4)]
and then (separately) another group of 2 mapped tasks with inputs
[(1, 2), (3, 4)]
. Is this possible?
k
It is not. You would need an intermediate task to construct those tuples. But I think this specific solution can be done if you did:
Copy code
@task
def task1(x):
    return x

@task
def task2(x):
    return x + 1

@task
def windowed(val1, val2):
    return val1 + val2

with Flow(...) as flow:
    start = [1,2,3,4]
    t = task1.map(start)
    u = task2.map(t)
    windows(t, u)
but would double the memory of course. I suspect your window is a lot bigger.
h
Hmm... Actually my window size really is 2, and the tasks don't pass the data to each other, just pointers to where it's stored. So maybe it could work...
Although if I understand correctly, this would be running each task twice? I mean, there are 8 starting tasks instead of 4. Maybe we can make the tasks check if they are already running and if so just wait.
Copy code
@task
def initial_task(x):
    if is_running_in_cluster(x):
        wait_for(x)
    else:
        run_in_cluster(x)
   return x

@task
def combine_results(x, y):
    return x + y

with Flow(...) as flow:
    start = [1,2,3,4]
    collected_tasks = initial_task.map(start)
    duplicate_tasks = initial_task.map(start[1:] + [None])
    combine_results(collected_tasks, duplicate_tasks)
(we aren't using prefect to pass the data, just the dependencies among tasks)
I'm not sure if the
+ [None]
is really necessary but we actually do want to include
(4, None)
so it makes sense.
k
It would re-run yeah so if it’s a long task, it would definitely be better to use the intermediate task to reshape the output from the first map
h
what do you mean reshape the output?
k
Making an intermediate task to form the windows
Copy code
@task
def initial_task(x):
    return x

@task
def reshape(list_x):
    mylist = copy(list_x)
    mylist.append(None)
    result = []
    for i in range(len(list)):
        result.append((mylist[i], mylist[i+1])
    return result

with Flow(...) as flow:
     initial = [1,2,3,4]
     x = initial_task.map(initial)
     y = reshape(x)
     # pass y to other tasks
h
Am I right that
reshape
wouldn't run until all the initial tasks finish?
k
Yeah because it’s a reduce task automatically
h
Hmm... that's a problem for us. I didn't want to go that route but how would you see creating all the tasks individually, up to some preset maximum? For a little more context, the initial tasks are data ingest, and we don't know how much data we'll get (typically around 240 but let's say a safe ceiling is 500).
Copy code
@task
def wait_for_data_to_arrive(item_ix):
   # wait
   return item_ix

@task
def process_two_items(item_a_ix, item_b_ix):
    # do something
    return item_a_ix, item_b_ix

with Flow():
    x = [wait_for_data_to_arrive(ix) for ix in range(500)]
    overlapping_combine = [
        process_two_items(a, b)
        for a, b in zip_longest(x, x[1:])
    ]
    non_overlapping_combine = [
        process_two_items(a, b)
        for a, b in zip_longest(x[::2], x[1::2])
    ]
Having written it out, if that allows us to use itertools-like idioms to construct the flow, it's perfect. A bunch of extra tasks could be worth it. To deal with all those dummy tasks, imagine we get a signal that data collection has finished after
n
items. Could we hit the API from another process to manually change the state of those ingestion tasks or is there a beter way?
You can't change the flow from inside a task, right?
k
There’s a bunch here. First is that the list comprehension syntax will not work for the Flow because these are instances of the
Task
class so you can’t use a for loop with them. If you map
wait_for_data_to_arrive
to your inputs like:
Copy code
wait_for_data_to_arrive.map(list(range(500))
Each one of those would finish before the reduce step. I am not 100% sure what the question is with creating tasks individually. Each of the mapped tasks is a first-class task that has observability and respects retries. So if you are not sure what is coming in, you can put that logic inside the Task to
return None
. You can also use the task
timeout
maybe to fail tasks that are just waiting too long. There are hooks pre and post task run and your can put any callable in them, But what it sounds like you are asking how is a hook mid run. You can indeed hit the API and mark tasks as successful, but I think this is overcomplicating things and will likely lead to unreliable results downstream because you will be marking tasks successful which will stop them from executing I think.
I think in general, think of using
map
instead of
for
in the Flow block. The
map
creates independent tasks
h
The reason I moved from
map
to
for
is that it seems mapping is not flexible enough if we can't do windowed slicing or itertools-like reshaping. At least not without reducing which is a dealbreaker. We need the pipelines to stay parallel as much as possible.
And are you sure I can't create tasks in a list comprehension like that? I'm new to prefect but from my knowledge of Python I don't think there's any difference between
Copy code
with Flow():
    x = [wait_for_data_to_arrive(ix) for ix in range(500)]
and
Copy code
with Flow():
    x_0 = wait_for_data_to_arrive(0)
    x_1 = wait_for_data_to_arrive(1)
    ...
    x_499 = wait_for_data_to_arrive(499)
Obviously we're not going to do the latter, but my point is, all the looping is happening outside of any prefect objects in this proposal.
To try to explain without the code, my idea is to build the DAG in advance for a fixed number of items, and then assuming the actual number is fewer, the extra tasks can timeout or pass
None
through the pipeline as you suggest
works great
Copy code
@task
def get_data(x):
    sleep(2 * x)
    print(f'got data {x}')
    return x

@task
def combine_results(a, b):
    print(f'combined {a} + {b}')
    return a, b

with Flow('test') as flow:
    datas = [get_data(ix) for ix in range(5)]
    combined = [combine_results(a, b) for a, b in zip(datas, datas[1:])]

flow.run(executor=LocalDaskExecutor())
outputs
Copy code
[2021-11-11 00:24:04+0100] INFO - prefect.FlowRunner | Beginning Flow run for 'test'
[2021-11-11 00:24:04+0100] INFO - prefect.TaskRunner | Task 'get_data': Starting task run...
[2021-11-11 00:24:04+0100] INFO - prefect.TaskRunner | Task 'get_data': Starting task run...
[2021-11-11 00:24:04+0100] INFO - prefect.TaskRunner | Task 'get_data': Starting task run...
got data 0
[2021-11-11 00:24:04+0100] INFO - prefect.TaskRunner | Task 'get_data': Starting task run...
[2021-11-11 00:24:04+0100] INFO - prefect.TaskRunner | Task 'get_data': Finished task run for task with final state: 'Success'
[2021-11-11 00:24:04+0100] INFO - prefect.TaskRunner | Task 'get_data': Starting task run...
got data 1
[2021-11-11 00:24:06+0100] INFO - prefect.TaskRunner | Task 'get_data': Finished task run for task with final state: 'Success'
[2021-11-11 00:24:06+0100] INFO - prefect.TaskRunner | Task 'combine_results': Starting task run...
combined 0 + 1
[2021-11-11 00:24:06+0100] INFO - prefect.TaskRunner | Task 'combine_results': Finished task run for task with final state: 'Success'
got data 2
[2021-11-11 00:24:08+0100] INFO - prefect.TaskRunner | Task 'get_data': Finished task run for task with final state: 'Success'
[2021-11-11 00:24:08+0100] INFO - prefect.TaskRunner | Task 'combine_results': Starting task run...
combined 1 + 2
[2021-11-11 00:24:08+0100] INFO - prefect.TaskRunner | Task 'combine_results': Finished task run for task with final state: 'Success'
got data 3
[2021-11-11 00:24:10+0100] INFO - prefect.TaskRunner | Task 'get_data': Finished task run for task with final state: 'Success'
[2021-11-11 00:24:10+0100] INFO - prefect.TaskRunner | Task 'combine_results': Starting task run...
combined 2 + 3
[2021-11-11 00:24:10+0100] INFO - prefect.TaskRunner | Task 'combine_results': Finished task run for task with final state: 'Success'
got data 4
[2021-11-11 00:24:12+0100] INFO - prefect.TaskRunner | Task 'get_data': Finished task run for task with final state: 'Success'
[2021-11-11 00:24:13+0100] INFO - prefect.TaskRunner | Task 'combine_results': Starting task run...
combined 3 + 4
[2021-11-11 00:24:13+0100] INFO - prefect.TaskRunner | Task 'combine_results': Finished task run for task with final state: 'Success'
[2021-11-11 00:24:13+0100] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
<Success: "All reference tasks succeeded.">
If I can do that with mapping even better but if not I think this will do. Appreciate your thoughts, thanks for your time!
k
Ah sorry I wasn’t specific. You can do the for if the first list is known ahead of time. But this will fail for example:
Copy code
from prefect import Flow, task

@task
def task1(x):
    return x+1

@task
def task2(x):
    return x+2

@task
def create_input():
    return [1,2,3,4]

with Flow('short_checker') as flow:
    start = create_input()
    a = [task1(x) for x in start]
    b = [task2(x) for x in a]
    
flow.run()
But this will succeed:
Copy code
with Flow('short_checker') as flow:
    start = [1,2,3,4]
    a = [task1(x) for x in start]
    b = [task2(x) for x in a]
    
flow.run()
What you have looks good, and if it’s working, that should be good!
h
One question, will this be scalable to 500 initial tasks? With this design we will need tons of workers, but the tasks don't actually do anything just either wait or run k8s jobs. Do you think we will need a super beefy dask cluster? Just worrying a bit if we are stretching the intended use case too much, which seems to be that the tasks actually do the work themselves. With Orion we could do coroutines instead of threads right? In the abstract that seems much better suited.
k
Oh man sorry I forgot to respond to this. Feel free to ping me next time. I believe it should be scalable by design, but because it’s not async, the waiting tasks will occupy a process. You may need a beefy cluster to really execute in parallel. With Orion this will indeed be async, so I imagine the experience should be a lot better.