Diego Escobedo
07/12/2023, 6:28 PM@flow(name="start_pipeline")
async def start_pipeline_test(
csv_path: str = "shopify/enroute/enroute_products.csv",
) -> None:
path = Path(csv_path)
products = await load_shopify_products_from_csv_task(path)
for product in products[:10]:
print(product)
@task(name="load_shopify_products_from_csv")
async def load_shopify_products_from_csv_task(
path: Path,
) -> list[IShopifyProductCreateFromScrape]:
products = await load_shopify_products_from_csv(file_path=path)
return products
In the await line I get that Incompatible types in "await" (actual type "None", expected type "Awaitable[Any]")
If i make the function sync instead:
@task(name="load_shopify_products_from_csv")
def load_shopify_products_from_csv_task(
path: Path,
) -> list[IShopifyProductCreateFromScrape]:
products = syncify(load_shopify_products_from_csv)(file_path=path)
return products
then when looping over the products I get Value of type "None" is not indexable [index]mypy(error)
(variable) products: list[IShopifyProductCreateFromScrape]
Which is interesting, since mypy can't infer the type but looks like pylance correctly sees that its supposed to be list[IShopifyProductCreateFromScrape]
. I have a ton more issues like this in a relatively involved flow with a lot of steps and tasks, so I'm not keen on doing type: ignore on every line or a similar workaround.
Please advise on how to make this work with mypy! async/await seems to be messing it up and the task decorator isn't transmitting the types correctly.Nate
07/17/2023, 2:00 PMnot indexable
error, is mypy happy if you add a check that its not None
?
@task(name="load_shopify_products_from_csv")
def load_shopify_products_from_csv_task(
path: Path,
) -> list[IShopifyProductCreateFromScrape]:
products = syncify(load_shopify_products_from_csv)(file_path=path)
if products is None:
raise ValueError("Products should be a `list[IShopifyProductCreateFromScrape]`, got `None`")
return products
Diego Escobedo
07/17/2023, 2:27 PMSophia Ponte
09/29/2023, 11:20 PM