https://prefect.io logo
#prefect-community
Title
# prefect-community
k

Ken Nguyen

05/18/2022, 10:31 PM
I have a task that I think returns only 1 result, but I keep getting the below error. Is there a reason why this task thinks it’s returning multiple results? (code in thread)
Copy code
TypeError: Task is not iterable. If your task returns multiple results, pass `nout` to the task decorator/constructor, or provide a `Tuple` return-type annotation to your task.
Copy code
@task
def prep_best_skus_records(df, start_date, end_date, best, removals, removal_deltas):
    # Function to format and return records with best SKUs to be pushed to Snowflake
    coverage_names = ['ITEM_' + str(i + 1) for i in range(20)]
    removal_names = ['REMOVE_' + str(i + 1) for i in range(len(removals))]
    removal_deltas_names = ['REMOVE_' + str(i + 1) + '_LOST_COVERAGE' for i in range(len(removals))]

    best_res = pd.DataFrame([['BEST', df.shape[0], start_date, end_date, *best[0], best[1], *removals, *removal_deltas]], columns= ['METHOD', 'TOTAL_ORDERS', 'START_DATE', 'END_DATE', *coverage_names, 'COVERAGE', *removal_names, *removal_deltas_names])
    top20_res = pd.DataFrame([['TOP20', df.shape[0], start_date, end_date, *top20, baseline_top20, *([None] * len(removals)), *([None] * len(removals))]], columns= ['METHOD', 'TOTAL_ORDERS', 'START_DATE', 'END_DATE', *coverage_names, 'COVERAGE', *removal_names, *removal_deltas_names])

    <http://logger.info|logger.info>(best_res)
    <http://logger.info|logger.info>(top20_res)
    output = pd.concat([best_res, top20_res], axis=0)
    <http://logger.info|logger.info>(output)

    return output
For some more context, this is what the task looks like in my flow, including directly upstream tasks:
Copy code
...
    start_date, end_date = get_dates(
        df_raw,
        task_args={"name": "Get Dates", "nout": 2},
        upstream_tasks=[df_raw]
    )

    df = clean_df(
        df_raw,
        task_args={"name": "Clean Dataframe"},
        upstream_tasks=[df_raw]
    )
    
...
    best, removals, removal_deltas = get_best_and_removals(
        baseline_top20,
        top20,
        iterable,
        all_skus,
        task_args={"name": "Get Best and Removals", "nout": 3},
        upstream_tasks=[baseline_top20, top20, iterable, all_skus]
    )

    best_skus_records = prep_best_skus_records(
        df,
        start_date,
        end_date,
        best,
        removals,
        removal_deltas,
        task_args={"name": "Prep Records"},
        upstream_tasks=[df, start_date, end_date, best, removals, removal_deltas]
    )
k

Kevin Kho

05/18/2022, 10:46 PM
I think if you want to do this:
Copy code
best, removals, removal_deltas = get_best_and_removals(
        baseline_top20,
        top20,
        iterable,
        all_skus,
        task_args={"name": "Get Best and Removals", "nout": 3},
        upstream_tasks=[baseline_top20, top20, iterable, all_skus]
    )
You need to annotate
get_best_and_removals
with
nout=3
k

Ken Nguyen

05/18/2022, 10:48 PM
I’ve done that already in the
task_args
right? Or are you saying I have the wrong syntax?
k

Kevin Kho

05/18/2022, 10:49 PM
Ahh I see
What happens if you add it to the decorator?
k

Ken Nguyen

05/18/2022, 10:51 PM
Lemme try
It’s running, but I wanna add that I did the nout within task_args, then printed the 3 arguments and they came out correct
Finished running, same error:
Copy code
Task 'Prep Records': Exception encountered during task execution!
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/usr/local/lib/python3.8/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "<string>", line 314, in prep_best_skus_records
  File "/usr/local/lib/python3.8/site-packages/prefect/core/task.py", line 1034, in __iter__
    raise TypeError(
TypeError: Task is not iterable. If your task returns multiple results, pass `nout` to the task decorator/constructor, or provide a `Tuple` return-type annotation to your task.
k

Kevin Kho

05/18/2022, 10:59 PM
I suspect the upstream tasks with multiple output is messing up
k

Ken Nguyen

05/18/2022, 11:00 PM
Even tho each of the multiple output is printed fine?
k

Kevin Kho

05/18/2022, 11:01 PM
I tried it and it was fine, so I can’t identify immediately what is off here 😅
🥲 1
k

Ken Nguyen

05/18/2022, 11:05 PM
So strange, could you send me what you tried so I can compare the syntax?
k

Kevin Kho

05/18/2022, 11:06 PM
Copy code
from prefect import Flow, task

@task
def abc():
    return 1

@task(nout=2)
def bcd():
    return 2, 3

@task()
def cde():
    return 2

with Flow("..") as flow:
    a=abc()
    b,c = bcd(upstream_tasks=[a])
    d = cde(upstream_tasks=[b,c])
k

Ken Nguyen

05/18/2022, 11:09 PM
Truly baffling, I’ll just do a workaround because I feel like I’m chasing a ghost. Thank you!
I figured out the root cause! It was me being an idiot 🙃🙃
I was using variables within the task that I didn’t include as an argument, but the error was just different from usual
k

Kevin Kho

05/18/2022, 11:40 PM
Ahh I see
Nice work figuring that out!
🙌 1
57 Views