Thread
#prefect-community
    j

    Jai P

    4 months ago
    👋 hi! have a question about a particular case where a flow gets stuck running forever in prefect 2.0, example in thread
    @task
    def task_one():
        return 1
    
    
    @task
    def task_two(results):
        logger = get_run_logger()
        <http://logger.info|logger.info>(results)
        return 2
    
    
    @flow
    def test_flow():
        results = {}
    
        t1_result = task_one()
        results["t1_result"] = t1_result
        t2_result = task_two(results, wait_for=[t1_result])
        results["t2_result"] = t2_result
    appears to cause the flow to run indefinitely and i'm unclear why..
    Anna Geller

    Anna Geller

    4 months ago
    to retrieve the results of a task, you would need to use .result()
    j

    Jai P

    4 months ago
    i would've expected that it would just print out something like:
    {"t1_result": PrefectFuture}
    in
    task_two
    though?
    Anna Geller

    Anna Geller

    4 months ago
    check this section: "Using results from tasks"https://orion-docs.prefect.io/concepts/tasks/
    j

    Jai P

    4 months ago
    so that makes sense, but it looks like you need to pass the result future directly in as an arg? if you wrap it in a dictionary should it cause the flow to run indefinitely until
    .result()
    is called?
    since i'm not actually using
    t1_result
    in
    task_two
    in the above example, i wouldn't expect it to be stuck. fully agree if i planned to use it, i would have to do something like:
    def task_two(results):
        do_something_with(results["t1_result"].result())
    raising it up to a more generic level, the case i'm trying to solve for is:
    given a list of prefect tasks, execute each one while passing the results of all previously completed tasks to the next
    so given that, an example would be:
    @task def t1
    @task def t2
    @task def t3
    ...
    
    @flow
    def my_flow():
        tasks = [t1, t2, t3, t4]
    
        results = {}
        for task in tasks:
            task_result = task(results)
            results[task.name] = task_result
    or, is the expectation that you can't do that, and you have to instead do
    task_result = task(**results)
    because prior task results (futures) need to be passed in as args/kwargs
    Michael Adkins

    Michael Adkins

    4 months ago
    You should be able to pass futures wrapped in Python collections, we’ll autoresolve them still.
    Kevin Kho

    Kevin Kho

    4 months ago
    I can replicate that the first code snippet does hang. I think something is wrong?
    Michael Adkins

    Michael Adkins

    4 months ago
    Yeah this is definitely a bug, a flow should never hang like this. I’ve also reproduced.
    You might have made t2 depend on itself by adding its future to the dictionary you passed to it
    Since the dictionary is mutable
    j

    Jai P

    4 months ago
    YIKES
    Michael Adkins

    Michael Adkins

    4 months ago
    I’m not 100% sure but I’ll poke around
    j

    Jai P

    4 months ago
    i think that's what is happening:
    @flow
    def test_flow():
        results = {}
    
        t1_result = task_one()
        results["t1_result"] = t1_result
        t2_result = task_two(results, wait_for=[t1_result])
        new_results = {**results}
        new_results["t2_result"] = t2_result
    successfully runs
    Michael Adkins

    Michael Adkins

    4 months ago
    task_two(results.copy(), …
    also works yeah
    I’m not entirely sure we can guard against this
    You could nest a mutable object arbitrarily deeply in there
    We can exclude the simple cycle of self-dependency though
    j

    Jai P

    4 months ago
    yeah definitely seems either (a) impossible or (b) very difficult without possibly significantly affecting performance to guard against this
    unless there's a point where you ask the question "have all futures that need to resolve to run this task resolved?" in which case...you could validate that the future isn't related to the currently in progress task, and if it is then throw an error and exit? would need you to pre-process and pull out all futures arbitrarily deep though i'd guess
    Michael Adkins

    Michael Adkins

    4 months ago
    We do pull out futures arbitrarily deep
    We can easily exclude the case where a task requires its own future, but we’d have to set the value in your object to
    None
    or something. I’d probably just throw an exception if we find that.
    The issue is that the cycle could be deeper, e.g. task C relies on task B which relies on task A which ries on C.
    j

    Jai P

    4 months ago
    ahh yes yes
    Michael Adkins

    Michael Adkins

    4 months ago
    Futures don’t store what they depend on right now, so we can’t even detect that if we wanted.
    We could add that to futures though it starts getting complicated. All very interesting 🙂
    j

    Jai P

    4 months ago
    definitely very interesting! in regards to our problem, i think the
    .copy()
    is a totally reasonable workaround. it won't affect execution time significantly because we don't expect the list of tasks to be extremely long
    Michael Adkins

    Michael Adkins

    4 months ago
    Are you going to pass the results of every upstream task to every downstream task like that?
    You might as well switch to the
    SequentialTaskRunner
    if you’re not using concurrency.
    j

    Jai P

    4 months ago
    hmm, that's a good point. i don't know if we necessarily can know at execution time which tasks rely on the results of which other upstream tasks, which is why we were going to use the
    dict
    approach. i guess if the consequence of that of that is that each subsequent task relies on the completion of every task before it, it is essentially sequential