Stephen Lloyd
02/28/2023, 5:46 AM@task
def som_task():
...
return name, list
@task
def mapped_process():
...
return ??
@flow
def my_flow():
names = ['test1', 'test2']
x = some_task.map(names)
# x = [('test1', [1,2,3,4,5]), ('test2', [6,7,8,9,10])]
lots_of_data = mapped_process.map()
jpuris
02/28/2023, 8:14 AMmy_flow
4. Task needs to also return the enum (ID) as part of the return
5. Save the task run returns into a list
6. zip up the list of tuples by the returned ID
That said, this is whatever comes to my mind on the spot. Probably can be done in much more pythonic way or maybe Prefect has a mechanism for this 🤷Ryan Peden
02/28/2023, 3:02 PMfrom prefect import task, flow, get_run_logger
@task
def some_task(name: str):
# read the int from the last character of the name
current = int(name[-1])
start = (current - 1) * 5
nums = [i for i in range(start, start + 5)]
return name, nums
@task
def mapped_process(input: tuple[str, list[int]]):
name, data = input
# sum of data
data_sum = sum(data)
return f"sum for {name} is {data_sum}"
@flow
def my_flow():
names = ['test1', 'test2', 'test3', 'test4', 'test5']
x = some_task.map(names)
lots_of_data = mapped_process.map(x)
results = [d.result() for d in lots_of_data]
tuples = [t.result() for t in x]
# zip the results with the original tuples
output = list(zip(tuples, results))
logger = get_run_logger()
for o in output:
<http://logger.info|logger.info>(o)
if __name__ == "__main__":
my_flow()
This results in output like:
10:00:07.185 | INFO | Flow run 'zippy-chihuahua' - (('test1', [0, 1, 2, 3, 4]), 'sum for test1 is 10')
10:00:07.186 | INFO | Flow run 'zippy-chihuahua' - (('test2', [5, 6, 7, 8, 9]), 'sum for test2 is 35')
10:00:07.186 | INFO | Flow run 'zippy-chihuahua' - (('test3', [10, 11, 12, 13, 14]), 'sum for test3 is 60')
10:00:07.186 | INFO | Flow run 'zippy-chihuahua' - (('test4', [15, 16, 17, 18, 19]), 'sum for test4 is 85')
10:00:07.186 | INFO | Flow run 'zippy-chihuahua' - (('test5', [20, 21, 22, 23, 24]), 'sum for test5 is 110')
Stephen Lloyd
03/01/2023, 8:52 AMmapped_process()
for each element in x[1]
. I can do:
@task
def mapped_process(val: str, num: int):
square = num * num
resturn val, square
@flow
def_my_flow()
### stuff
lots_of_data = mapped_process.map(unmapped(x[0]), x[1])
but I was unsure of how to tie the results back together correctly.
Are the order of returned results in x
and lots_of_data
guaranteed?jpuris
03/01/2023, 8:53 AMAre the order of returned results inWith sequential runner, I'd say yes. With dask, concurrent etc runner - I don't think soandx
guaranteed?lots_of_data
Stephen Lloyd
03/01/2023, 9:01 AMjpuris
03/01/2023, 9:05 AMStephen Lloyd
03/01/2023, 9:13 AM