Ken Nguyen
05/18/2022, 10:31 PMTypeError: Task is not iterable. If your task returns multiple results, pass `nout` to the task decorator/constructor, or provide a `Tuple` return-type annotation to your task.
@task
def prep_best_skus_records(df, start_date, end_date, best, removals, removal_deltas):
# Function to format and return records with best SKUs to be pushed to Snowflake
coverage_names = ['ITEM_' + str(i + 1) for i in range(20)]
removal_names = ['REMOVE_' + str(i + 1) for i in range(len(removals))]
removal_deltas_names = ['REMOVE_' + str(i + 1) + '_LOST_COVERAGE' for i in range(len(removals))]
best_res = pd.DataFrame([['BEST', df.shape[0], start_date, end_date, *best[0], best[1], *removals, *removal_deltas]], columns= ['METHOD', 'TOTAL_ORDERS', 'START_DATE', 'END_DATE', *coverage_names, 'COVERAGE', *removal_names, *removal_deltas_names])
top20_res = pd.DataFrame([['TOP20', df.shape[0], start_date, end_date, *top20, baseline_top20, *([None] * len(removals)), *([None] * len(removals))]], columns= ['METHOD', 'TOTAL_ORDERS', 'START_DATE', 'END_DATE', *coverage_names, 'COVERAGE', *removal_names, *removal_deltas_names])
<http://logger.info|logger.info>(best_res)
<http://logger.info|logger.info>(top20_res)
output = pd.concat([best_res, top20_res], axis=0)
<http://logger.info|logger.info>(output)
return output
...
start_date, end_date = get_dates(
df_raw,
task_args={"name": "Get Dates", "nout": 2},
upstream_tasks=[df_raw]
)
df = clean_df(
df_raw,
task_args={"name": "Clean Dataframe"},
upstream_tasks=[df_raw]
)
...
best, removals, removal_deltas = get_best_and_removals(
baseline_top20,
top20,
iterable,
all_skus,
task_args={"name": "Get Best and Removals", "nout": 3},
upstream_tasks=[baseline_top20, top20, iterable, all_skus]
)
best_skus_records = prep_best_skus_records(
df,
start_date,
end_date,
best,
removals,
removal_deltas,
task_args={"name": "Prep Records"},
upstream_tasks=[df, start_date, end_date, best, removals, removal_deltas]
)
Kevin Kho
05/18/2022, 10:46 PMbest, removals, removal_deltas = get_best_and_removals(
baseline_top20,
top20,
iterable,
all_skus,
task_args={"name": "Get Best and Removals", "nout": 3},
upstream_tasks=[baseline_top20, top20, iterable, all_skus]
)
You need to annotate get_best_and_removals
with nout=3
Ken Nguyen
05/18/2022, 10:48 PMtask_args
right? Or are you saying I have the wrong syntax?Kevin Kho
05/18/2022, 10:49 PMKen Nguyen
05/18/2022, 10:51 PMTask 'Prep Records': Exception encountered during task execution!
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "<string>", line 314, in prep_best_skus_records
File "/usr/local/lib/python3.8/site-packages/prefect/core/task.py", line 1034, in __iter__
raise TypeError(
TypeError: Task is not iterable. If your task returns multiple results, pass `nout` to the task decorator/constructor, or provide a `Tuple` return-type annotation to your task.
Kevin Kho
05/18/2022, 10:59 PMKen Nguyen
05/18/2022, 11:00 PMKevin Kho
05/18/2022, 11:01 PMKen Nguyen
05/18/2022, 11:05 PMKevin Kho
05/18/2022, 11:06 PMfrom prefect import Flow, task
@task
def abc():
return 1
@task(nout=2)
def bcd():
return 2, 3
@task()
def cde():
return 2
with Flow("..") as flow:
a=abc()
b,c = bcd(upstream_tasks=[a])
d = cde(upstream_tasks=[b,c])
Ken Nguyen
05/18/2022, 11:09 PMKevin Kho
05/18/2022, 11:40 PM