Jai P
05/06/2022, 2:25 PMJai P
05/06/2022, 2:25 PM@task
def task_one():
return 1
@task
def task_two(results):
logger = get_run_logger()
<http://logger.info|logger.info>(results)
return 2
@flow
def test_flow():
results = {}
t1_result = task_one()
results["t1_result"] = t1_result
t2_result = task_two(results, wait_for=[t1_result])
results["t2_result"] = t2_result
appears to cause the flow to run indefinitely and i'm unclear why..Anna Geller
Jai P
05/06/2022, 2:27 PM{"t1_result": PrefectFuture}
in task_two
though?Anna Geller
Jai P
05/06/2022, 2:31 PM.result()
is called?Jai P
05/06/2022, 2:32 PMt1_result
in task_two
in the above example, i wouldn't expect it to be stuck. fully agree if i planned to use it, i would have to do something like:
def task_two(results):
do_something_with(results["t1_result"].result())
Jai P
05/06/2022, 2:35 PMgiven a list of prefect tasks, execute each one while passing the results of all previously completed tasks to the nextso given that, an example would be:
@task def t1
@task def t2
@task def t3
...
@flow
def my_flow():
tasks = [t1, t2, t3, t4]
results = {}
for task in tasks:
task_result = task(results)
results[task.name] = task_result
Jai P
05/06/2022, 2:36 PMtask_result = task(**results)
because prior task results (futures) need to be passed in as args/kwargsZanie
Kevin Kho
Zanie
Zanie
Zanie
Zanie
Jai P
05/06/2022, 2:41 PMZanie
Jai P
05/06/2022, 2:46 PM@flow
def test_flow():
results = {}
t1_result = task_one()
results["t1_result"] = t1_result
t2_result = task_two(results, wait_for=[t1_result])
new_results = {**results}
new_results["t2_result"] = t2_result
successfully runsZanie
task_two(results.copy(), …
also works yeahZanie
Zanie
Zanie
Jai P
05/06/2022, 2:53 PMJai P
05/06/2022, 2:55 PMZanie
Zanie
None
or something. I’d probably just throw an exception if we find that.Zanie
Jai P
05/06/2022, 2:58 PMZanie
Zanie
Jai P
05/06/2022, 3:01 PM.copy()
is a totally reasonable workaround. it won't affect execution time significantly because we don't expect the list of tasks to be extremely longZanie
Zanie
SequentialTaskRunner
if you’re not using concurrency.Jai P
05/06/2022, 3:34 PMdict
approach. i guess if the consequence of that of that is that each subsequent task relies on the completion of every task before it, it is essentially sequential