Derek Heyman
09/22/2023, 3:57 AMJake Kaplan
09/22/2023, 1:24 PMwait_for
if you're passing data between your tasks. wait_for
is only to indicate that taskB should wait for taskA to finish ONLY if data from taskA is not being passed to to taskB. If you're passing data it happens automatically.Jake Kaplan
09/22/2023, 1:24 PMfrom prefect import flow, task
@task
def upstream_task(n):
return n + 1
@task
def downstream_task(name, n):
print(f"Downstream {name}: {n}")
@flow
def my_flow():
results = []
for i in range(3):
results.append(upstream_task(i))
downstream_task("1", results[0])
downstream_task("2", results[1])
downstream_task("3", results[2])
if __name__ == '__main__':
my_flow()
Jake Kaplan
09/22/2023, 1:24 PM.map
or .submit
Jake Kaplan
09/22/2023, 1:25 PM@flow
def my_flow():
futures = upstream_task.map([i for i in range(3)])
downstream_task("1", futures[0])
...
with submit:
@flow
def my_flow():
futures = []
for i in range(3):
futures.append(upstream_task.submit(i))
downstream_task("1", futures[0])
...
Derek Heyman
09/22/2023, 2:36 PMJake Kaplan
09/22/2023, 2:50 PM@flow
def my_flow():
values = [1, 2, 3]
futures = {v: upstream_task.submit(v) for v in values}
print(futures)
Jake Kaplan
09/22/2023, 2:51 PM@flow
def my_flow():
values = [1, 2, 3]
results = {v: upstream_task(v) for v in values}
print(results)
is a valid thing to doDerek Heyman
09/22/2023, 2:53 PMJake Kaplan
09/22/2023, 2:54 PMJake Kaplan
09/22/2023, 2:54 PMDerek Heyman
09/22/2023, 2:55 PMDerek Heyman
09/23/2023, 12:53 AM