:wave: hi! have a question about a particular case...
# ask-community
j
👋 hi! have a question about a particular case where a flow gets stuck running forever in prefect 2.0, example in thread
👋 3
Copy code
@task
def task_one():
    return 1


@task
def task_two(results):
    logger = get_run_logger()
    <http://logger.info|logger.info>(results)
    return 2


@flow
def test_flow():
    results = {}

    t1_result = task_one()
    results["t1_result"] = t1_result
    t2_result = task_two(results, wait_for=[t1_result])
    results["t2_result"] = t2_result
appears to cause the flow to run indefinitely and i'm unclear why..
a
to retrieve the results of a task, you would need to use .result()
j
i would've expected that it would just print out something like:
Copy code
{"t1_result": PrefectFuture}
in
task_two
though?
a
check this section: "Using results from tasks" https://orion-docs.prefect.io/concepts/tasks/
j
so that makes sense, but it looks like you need to pass the result future directly in as an arg? if you wrap it in a dictionary should it cause the flow to run indefinitely until
.result()
is called?
since i'm not actually using
t1_result
in
task_two
in the above example, i wouldn't expect it to be stuck. fully agree if i planned to use it, i would have to do something like:
Copy code
def task_two(results):
    do_something_with(results["t1_result"].result())
raising it up to a more generic level, the case i'm trying to solve for is:
given a list of prefect tasks, execute each one while passing the results of all previously completed tasks to the next
so given that, an example would be:
Copy code
@task def t1
@task def t2
@task def t3
...

@flow
def my_flow():
    tasks = [t1, t2, t3, t4]

    results = {}
    for task in tasks:
        task_result = task(results)
        results[task.name] = task_result
or, is the expectation that you can't do that, and you have to instead do
Copy code
task_result = task(**results)
because prior task results (futures) need to be passed in as args/kwargs
z
You should be able to pass futures wrapped in Python collections, we’ll autoresolve them still.
k
I can replicate that the first code snippet does hang. I think something is wrong?
z
Yeah this is definitely a bug, a flow should never hang like this. I’ve also reproduced.
Oh gosh
😓 1
😨 2
You might have made t2 depend on itself by adding its future to the dictionary you passed to it
🙏 1
Since the dictionary is mutable
j
YIKES
z
I’m not 100% sure but I’ll poke around
j
i think that's what is happening:
Copy code
@flow
def test_flow():
    results = {}

    t1_result = task_one()
    results["t1_result"] = t1_result
    t2_result = task_two(results, wait_for=[t1_result])
    new_results = {**results}
    new_results["t2_result"] = t2_result
successfully runs
z
task_two(results.copy(), …
also works yeah
I’m not entirely sure we can guard against this
You could nest a mutable object arbitrarily deeply in there
We can exclude the simple cycle of self-dependency though
j
yeah definitely seems either (a) impossible or (b) very difficult without possibly significantly affecting performance to guard against this
unless there's a point where you ask the question "have all futures that need to resolve to run this task resolved?" in which case...you could validate that the future isn't related to the currently in progress task, and if it is then throw an error and exit? would need you to pre-process and pull out all futures arbitrarily deep though i'd guess
z
We do pull out futures arbitrarily deep
🔥 2
We can easily exclude the case where a task requires its own future, but we’d have to set the value in your object to
None
or something. I’d probably just throw an exception if we find that.
The issue is that the cycle could be deeper, e.g. task C relies on task B which relies on task A which ries on C.
j
ahh yes yes
z
Futures don’t store what they depend on right now, so we can’t even detect that if we wanted.
We could add that to futures though it starts getting complicated. All very interesting 🙂
j
definitely very interesting! in regards to our problem, i think the
.copy()
is a totally reasonable workaround. it won't affect execution time significantly because we don't expect the list of tasks to be extremely long
z
Are you going to pass the results of every upstream task to every downstream task like that?
You might as well switch to the
SequentialTaskRunner
if you’re not using concurrency.
upvote 1
j
hmm, that's a good point. i don't know if we necessarily can know at execution time which tasks rely on the results of which other upstream tasks, which is why we were going to use the
dict
approach. i guess if the consequence of that of that is that each subsequent task relies on the completion of every task before it, it is essentially sequential
👍 1