Thread
#prefect-community
    Ken Nguyen

    Ken Nguyen

    4 months ago
    I have a task that I think returns only 1 result, but I keep getting the below error. Is there a reason why this task thinks it’s returning multiple results? (code in thread)
    TypeError: Task is not iterable. If your task returns multiple results, pass `nout` to the task decorator/constructor, or provide a `Tuple` return-type annotation to your task.
    @task
    def prep_best_skus_records(df, start_date, end_date, best, removals, removal_deltas):
        # Function to format and return records with best SKUs to be pushed to Snowflake
        coverage_names = ['ITEM_' + str(i + 1) for i in range(20)]
        removal_names = ['REMOVE_' + str(i + 1) for i in range(len(removals))]
        removal_deltas_names = ['REMOVE_' + str(i + 1) + '_LOST_COVERAGE' for i in range(len(removals))]
    
        best_res = pd.DataFrame([['BEST', df.shape[0], start_date, end_date, *best[0], best[1], *removals, *removal_deltas]], columns= ['METHOD', 'TOTAL_ORDERS', 'START_DATE', 'END_DATE', *coverage_names, 'COVERAGE', *removal_names, *removal_deltas_names])
        top20_res = pd.DataFrame([['TOP20', df.shape[0], start_date, end_date, *top20, baseline_top20, *([None] * len(removals)), *([None] * len(removals))]], columns= ['METHOD', 'TOTAL_ORDERS', 'START_DATE', 'END_DATE', *coverage_names, 'COVERAGE', *removal_names, *removal_deltas_names])
    
        <http://logger.info|logger.info>(best_res)
        <http://logger.info|logger.info>(top20_res)
        output = pd.concat([best_res, top20_res], axis=0)
        <http://logger.info|logger.info>(output)
    
        return output
    For some more context, this is what the task looks like in my flow, including directly upstream tasks:
    ...
        start_date, end_date = get_dates(
            df_raw,
            task_args={"name": "Get Dates", "nout": 2},
            upstream_tasks=[df_raw]
        )
    
        df = clean_df(
            df_raw,
            task_args={"name": "Clean Dataframe"},
            upstream_tasks=[df_raw]
        )
        
    ...
        best, removals, removal_deltas = get_best_and_removals(
            baseline_top20,
            top20,
            iterable,
            all_skus,
            task_args={"name": "Get Best and Removals", "nout": 3},
            upstream_tasks=[baseline_top20, top20, iterable, all_skus]
        )
    
        best_skus_records = prep_best_skus_records(
            df,
            start_date,
            end_date,
            best,
            removals,
            removal_deltas,
            task_args={"name": "Prep Records"},
            upstream_tasks=[df, start_date, end_date, best, removals, removal_deltas]
        )
    Kevin Kho

    Kevin Kho

    4 months ago
    I think if you want to do this:
    best, removals, removal_deltas = get_best_and_removals(
            baseline_top20,
            top20,
            iterable,
            all_skus,
            task_args={"name": "Get Best and Removals", "nout": 3},
            upstream_tasks=[baseline_top20, top20, iterable, all_skus]
        )
    You need to annotate
    get_best_and_removals
    with
    nout=3
    Ken Nguyen

    Ken Nguyen

    4 months ago
    I’ve done that already in the
    task_args
    right? Or are you saying I have the wrong syntax?
    Kevin Kho

    Kevin Kho

    4 months ago
    Ahh I see
    What happens if you add it to the decorator?
    Ken Nguyen

    Ken Nguyen

    4 months ago
    Lemme try
    It’s running, but I wanna add that I did the nout within task_args, then printed the 3 arguments and they came out correct
    Finished running, same error:
    Task 'Prep Records': Exception encountered during task execution!
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
        value = prefect.utilities.executors.run_task_with_timeout(
      File "/usr/local/lib/python3.8/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
        return task.run(*args, **kwargs)  # type: ignore
      File "<string>", line 314, in prep_best_skus_records
      File "/usr/local/lib/python3.8/site-packages/prefect/core/task.py", line 1034, in __iter__
        raise TypeError(
    TypeError: Task is not iterable. If your task returns multiple results, pass `nout` to the task decorator/constructor, or provide a `Tuple` return-type annotation to your task.
    Kevin Kho

    Kevin Kho

    4 months ago
    I suspect the upstream tasks with multiple output is messing up
    Ken Nguyen

    Ken Nguyen

    4 months ago
    Even tho each of the multiple output is printed fine?
    Kevin Kho

    Kevin Kho

    4 months ago
    I tried it and it was fine, so I can’t identify immediately what is off here 😅
    Ken Nguyen

    Ken Nguyen

    4 months ago
    So strange, could you send me what you tried so I can compare the syntax?
    Kevin Kho

    Kevin Kho

    4 months ago
    from prefect import Flow, task
    
    @task
    def abc():
        return 1
    
    @task(nout=2)
    def bcd():
        return 2, 3
    
    @task()
    def cde():
        return 2
    
    with Flow("..") as flow:
        a=abc()
        b,c = bcd(upstream_tasks=[a])
        d = cde(upstream_tasks=[b,c])
    Ken Nguyen

    Ken Nguyen

    4 months ago
    Truly baffling, I’ll just do a workaround because I feel like I’m chasing a ghost. Thank you!
    I figured out the root cause! It was me being an idiot 🙃🙃
    I was using variables within the task that I didn’t include as an argument, but the error was just different from usual
    Kevin Kho

    Kevin Kho

    4 months ago
    Ahh I see
    Nice work figuring that out!