Hello everyone, I have a task which keeps failing ...
# prefect-community
i
Hello everyone, I have a task which keeps failing because the
.map()
is trying to run the task more times than necessary and the 4th time, the index is a string. .
product_categories
is a list containing 3 elements,
flow_config
is a dictionary and
product_category_variables
is also a dictionary. When the task runs in prefect cloud there are three successful task runs indexed 0, 1, 2 but then it tries to do another run with the index as a string and it fails saying
Task 'set_dynamic_config_settings['dataset_bucket_path']': Starting task run...
TypeError: list indices must be integers or slices, not str
The task
set_dynamic_config_settings
looks like this within the flow definition
Copy code
with Flow(name="flow-name") as: 

   product_category_variables = get_run_variables(
        is_zero_nyp=is_zero_nyp_param,
        bucket_base=flow_config["bucket_base"],
        ltv_product_categories=product_categories,
        return_type="vars",
       )

   final_config = set_dynamic_config_settings.map(
        cfg=unmapped(flow_config),
        product_category_variables=unmapped(product_category_variables),
        product_category=product_categories,
        upstream_tasks=[unmapped(product_category_variables)],
       )
1
k
This looks right. Does
set_dynamic_config_settings
potentially have a default argument? Were you able to log
product_categories
to verify?
i
There's no default argument in
set_dynanmic_config_settings
. I'm not sure what you mean by log
product_categories
but this is the list
Copy code
product_categories = ["Processing", "Capital", "SaaS"]
Also the runs using each of those inputs were completed successfuly.
I don't know why it tries a new run afterwards
k
Does this have another upstream by chance that is mapped?
i
No it doesn't. I've looked at it again and i have an idea what the issue is. A down stream task utilizes
final_config
. However, this task uses
final_config["dataset_bucket_path"]
but due to the map, final_config I guess requires an index like 0, 1 and 2. How do I select
["dataset_bucket_path"]
in this downstream task.
Copy code
dataset = make_features_task.map(
        database=unmapped(sf_seller_ds_db),
        schema=unmapped(sf_ltv_schema),
        sf_gcs_bucket=sf_buckets,
        gcs_bucket=final_config["dataset_bucket_path"],
        ltv_product_category=ltv_product_categories,
        train=unmapped(True),
        zero_nyp=unmapped(is_zero_nyp_param),
        upstream_tasks=[sampled_spine],
    )
k
You need to make a task that outputs a list of those