Henry Harrison
11/10/2021, 9:42 PM[1, 2, 3, 4]
. In different points I want to do overlapping and non-overlapping windows. So, 4 mapped tasks with inputs [(1, 2), (2, 3), (3, 4)]
and then (separately) another group of 2 mapped tasks with inputs [(1, 2), (3, 4)]
. Is this possible?Kevin Kho
@task
def task1(x):
return x
@task
def task2(x):
return x + 1
@task
def windowed(val1, val2):
return val1 + val2
with Flow(...) as flow:
start = [1,2,3,4]
t = task1.map(start)
u = task2.map(t)
windows(t, u)
but would double the memory of course. I suspect your window is a lot bigger.Henry Harrison
11/10/2021, 9:52 PMHenry Harrison
11/10/2021, 9:59 PM@task
def initial_task(x):
if is_running_in_cluster(x):
wait_for(x)
else:
run_in_cluster(x)
return x
@task
def combine_results(x, y):
return x + y
with Flow(...) as flow:
start = [1,2,3,4]
collected_tasks = initial_task.map(start)
duplicate_tasks = initial_task.map(start[1:] + [None])
combine_results(collected_tasks, duplicate_tasks)
Henry Harrison
11/10/2021, 10:00 PMHenry Harrison
11/10/2021, 10:01 PM+ [None]
is really necessary but we actually do want to include (4, None)
so it makes sense.Kevin Kho
Henry Harrison
11/10/2021, 10:02 PMKevin Kho
@task
def initial_task(x):
return x
@task
def reshape(list_x):
mylist = copy(list_x)
mylist.append(None)
result = []
for i in range(len(list)):
result.append((mylist[i], mylist[i+1])
return result
with Flow(...) as flow:
initial = [1,2,3,4]
x = initial_task.map(initial)
y = reshape(x)
# pass y to other tasks
Henry Harrison
11/10/2021, 10:14 PMreshape
wouldn't run until all the initial tasks finish?Kevin Kho
Henry Harrison
11/10/2021, 10:31 PM@task
def wait_for_data_to_arrive(item_ix):
# wait
return item_ix
@task
def process_two_items(item_a_ix, item_b_ix):
# do something
return item_a_ix, item_b_ix
with Flow():
x = [wait_for_data_to_arrive(ix) for ix in range(500)]
overlapping_combine = [
process_two_items(a, b)
for a, b in zip_longest(x, x[1:])
]
non_overlapping_combine = [
process_two_items(a, b)
for a, b in zip_longest(x[::2], x[1::2])
]
Having written it out, if that allows us to use itertools-like idioms to construct the flow, it's perfect. A bunch of extra tasks could be worth it.
To deal with all those dummy tasks, imagine we get a signal that data collection has finished after n
items. Could we hit the API from another process to manually change the state of those ingestion tasks or is there a beter way?Henry Harrison
11/10/2021, 10:32 PMKevin Kho
Task
class so you can’t use a for loop with them. If you map
wait_for_data_to_arrive
to your inputs like:
wait_for_data_to_arrive.map(list(range(500))
Each one of those would finish before the reduce step. I am not 100% sure what the question is with creating tasks individually. Each of the mapped tasks is a first-class task that has observability and respects retries. So if you are not sure what is coming in, you can put that logic inside the Task to return None
. You can also use the task timeout
maybe to fail tasks that are just waiting too long.
There are hooks pre and post task run and your can put any callable in them, But what it sounds like you are asking how is a hook mid run. You can indeed hit the API and mark tasks as successful, but I think this is overcomplicating things and will likely lead to unreliable results downstream because you will be marking tasks successful which will stop them from executing I think.Kevin Kho
map
instead of for
in the Flow block. The map
creates independent tasksHenry Harrison
11/10/2021, 10:57 PMmap
to for
is that it seems mapping is not flexible enough if we can't do windowed slicing or itertools-like reshaping. At least not without reducing which is a dealbreaker. We need the pipelines to stay parallel as much as possible.Henry Harrison
11/10/2021, 10:59 PMwith Flow():
x = [wait_for_data_to_arrive(ix) for ix in range(500)]
and
with Flow():
x_0 = wait_for_data_to_arrive(0)
x_1 = wait_for_data_to_arrive(1)
...
x_499 = wait_for_data_to_arrive(499)
Obviously we're not going to do the latter, but my point is, all the looping is happening outside of any prefect objects in this proposal.Henry Harrison
11/10/2021, 11:01 PMNone
through the pipeline as you suggestHenry Harrison
11/10/2021, 11:25 PM@task
def get_data(x):
sleep(2 * x)
print(f'got data {x}')
return x
@task
def combine_results(a, b):
print(f'combined {a} + {b}')
return a, b
with Flow('test') as flow:
datas = [get_data(ix) for ix in range(5)]
combined = [combine_results(a, b) for a, b in zip(datas, datas[1:])]
flow.run(executor=LocalDaskExecutor())
outputs
[2021-11-11 00:24:04+0100] INFO - prefect.FlowRunner | Beginning Flow run for 'test'
[2021-11-11 00:24:04+0100] INFO - prefect.TaskRunner | Task 'get_data': Starting task run...
[2021-11-11 00:24:04+0100] INFO - prefect.TaskRunner | Task 'get_data': Starting task run...
[2021-11-11 00:24:04+0100] INFO - prefect.TaskRunner | Task 'get_data': Starting task run...
got data 0
[2021-11-11 00:24:04+0100] INFO - prefect.TaskRunner | Task 'get_data': Starting task run...
[2021-11-11 00:24:04+0100] INFO - prefect.TaskRunner | Task 'get_data': Finished task run for task with final state: 'Success'
[2021-11-11 00:24:04+0100] INFO - prefect.TaskRunner | Task 'get_data': Starting task run...
got data 1
[2021-11-11 00:24:06+0100] INFO - prefect.TaskRunner | Task 'get_data': Finished task run for task with final state: 'Success'
[2021-11-11 00:24:06+0100] INFO - prefect.TaskRunner | Task 'combine_results': Starting task run...
combined 0 + 1
[2021-11-11 00:24:06+0100] INFO - prefect.TaskRunner | Task 'combine_results': Finished task run for task with final state: 'Success'
got data 2
[2021-11-11 00:24:08+0100] INFO - prefect.TaskRunner | Task 'get_data': Finished task run for task with final state: 'Success'
[2021-11-11 00:24:08+0100] INFO - prefect.TaskRunner | Task 'combine_results': Starting task run...
combined 1 + 2
[2021-11-11 00:24:08+0100] INFO - prefect.TaskRunner | Task 'combine_results': Finished task run for task with final state: 'Success'
got data 3
[2021-11-11 00:24:10+0100] INFO - prefect.TaskRunner | Task 'get_data': Finished task run for task with final state: 'Success'
[2021-11-11 00:24:10+0100] INFO - prefect.TaskRunner | Task 'combine_results': Starting task run...
combined 2 + 3
[2021-11-11 00:24:10+0100] INFO - prefect.TaskRunner | Task 'combine_results': Finished task run for task with final state: 'Success'
got data 4
[2021-11-11 00:24:12+0100] INFO - prefect.TaskRunner | Task 'get_data': Finished task run for task with final state: 'Success'
[2021-11-11 00:24:13+0100] INFO - prefect.TaskRunner | Task 'combine_results': Starting task run...
combined 3 + 4
[2021-11-11 00:24:13+0100] INFO - prefect.TaskRunner | Task 'combine_results': Finished task run for task with final state: 'Success'
[2021-11-11 00:24:13+0100] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
<Success: "All reference tasks succeeded.">
Henry Harrison
11/10/2021, 11:26 PMKevin Kho
from prefect import Flow, task
@task
def task1(x):
return x+1
@task
def task2(x):
return x+2
@task
def create_input():
return [1,2,3,4]
with Flow('short_checker') as flow:
start = create_input()
a = [task1(x) for x in start]
b = [task2(x) for x in a]
flow.run()
Kevin Kho
with Flow('short_checker') as flow:
start = [1,2,3,4]
a = [task1(x) for x in start]
b = [task2(x) for x in a]
flow.run()
Kevin Kho
Henry Harrison
11/11/2021, 9:20 AMKevin Kho