Julio Venegas
04/15/2021, 7:36 PMnout
to the task decorator/constructor, or provide a Tuple
return-type annotation to your task.` when I instantiate the Task with nout=2. Class in the thread. Any suggestions?Julio Venegas
04/15/2021, 7:39 PMget_product = GetProduct(nout=2, checkpoint=False)
Kevin Kho
Julio Venegas
04/15/2021, 7:50 PMKevin Kho
Julio Venegas
04/15/2021, 7:54 PMKevin Kho
Julio Venegas
04/15/2021, 7:56 PMJulio Venegas
04/15/2021, 7:57 PMget_product
gets used in the flow @Kevin KhoJulio Venegas
04/15/2021, 7:58 PMget_product = GetProduct(nout=2, checkpoint=False)
Kevin Kho
Kevin Kho
Julio Venegas
04/15/2021, 8:05 PMKevin Kho
from typing import Tuple
from prefect import Task, Flow, task, flatten
@task
def get_collection():
return [0,1]
@task
def get_collection_details(n):
return {"name": f"name{n}"}
@task
def get_collection_item_list(n):
return {"id": n, "created": "2019-01-01"}
@task
def pair_item_collection(collection_item, collection_detail):
return (collection_item, collection_detail)
class GetProduct(Task):
def run(self, item_collection_pair: tuple) -> Tuple[str, dict]:
collection_item, collection_details = item_collection_pair
if not collection_item:
return None, {}
else:
product = {'collection_item_id': collection_item['id'],
'collection_item_created_at': collection_item['created'],
}
return collection_item['id'], {**collection_details, **product}
get_product = GetProduct(nout=2, checkpoint=False)
with Flow('test') as flow:
collection_response = get_collection()
collection_details = get_collection_details.map(collection_response)
collection_item = get_collection_item_list.map(collection_response)
item_collection_details_pairs = pair_item_collection.map(collection_item, collection_details)
product_id, collection_product = get_product.map(flatten(item_collection_details_pairs))
flow.run()
Kevin Kho
Julio Venegas
04/15/2021, 8:16 PMKevin Kho
Julio Venegas
04/15/2021, 8:22 PMJulio Venegas
04/15/2021, 8:33 PMKevin Kho
class GetProduct(Task):
def run(self, item_collection_pair: tuple) -> Tuple[str, dict]:
collection_item, collection_details = item_collection_pair
if not collection_item:
return None, {}
else:
product = {'collection_item_id': collection_item['id'],
'collection_item_created_at': collection_item['created']}
return collection_item['id'], product
get_product = GetProduct(nout=2)
with Flow('test') as flow0:
collection_response = get_collection()
collection_details = get_collection_details.map(collection_response)
collection_item = get_collection_item_list.map(collection_response)
item_collection_details_pairs = pair_item_collection.map(collection_item, collection_details)
test = get_product.map(item_collection_details_pairs)
product_ids = task(lambda x: x[0]).map(test)
collection_product = task(lambda x: x[1]).map(test)
flow0.run()
Kevin Kho
Iterable
so it can be Iterable[Tuple]
but the way you were calling it is Tuple[Iterable]
where you would be expecting two lists.Kevin Kho
a, b = ([1,2,3], [2,3,4])
in Python works but a,b = [(1,2),(2,3),(3,4)]
will not 🙂Julio Venegas
04/15/2021, 9:39 PMJulio Venegas
04/15/2021, 9:41 PMKevin Kho
map
takes in an Iterable and outputs an Iterable so you need something to reshapeJulio Venegas
04/15/2021, 9:43 PMKevin Kho