Dekel R
03/06/2022, 8:43 PMcomparable_items_df['tag_rank'] = comparable_items_df.groupby(['id', 'tag_name']).cumcount()
And I get the following error (when running outside of a task context it works)
TypeError: unhashable type: 'ResultSet'
For now I’m running this locally on my mac but it will run eventually using Vertex AI.
I’ll appreciate any help since I’m currently stuck.
Thanks.Kevin Kho
pool.map
I think that type error is from something in the Flow block, not the task. What is the return of that task and how do you use it inside the Flow?Dekel R
03/07/2022, 7:27 AMwith Flow('comparable-products-data-extraction',
storage=Docker(registry_url="us-central1-docker.pkg.dev/xxx",
dockerfile="./Dockerfile")) as flow: # , schedule=daily_schedule
comparable_items_df = create_comparable_items_df(full_res=raw_x_data, extraction_time=extraction_time)
The task looks something like that -
@task()
def create_comparable_items_df(full_res: List[AsinTuple], extraction_time: datetime) -> pd.DataFrame:
logger = prefect.context.get(LOGGER_NAME)
comparable_items_res, asins_without_data, unavailable_asins = [], [], [
I’m not posting the full code - but after these rows Im ruuning
comparable_items_df['tag_rank'] = comparable_items_df.groupby(['id', 'tag_name']).cumcount()
Which gives the error above.flow.run()
Now looking at this guide -
https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming
And specifically this row -
Safe importing of main module
It seems like each process is actually importing the main module (which in my case is the module in which I’m building a flow and running it) and since I have the flow.run() row - each process actually runs the flow from the beginning. Any idea of how to overcome this?Make sure that the main module can be safely imported by a new Python interpreter without causing unintended side effects (such a starting a new process).
with Flow('comparable-products-data-extraction-development',
storage=Docker(registry_url="us-central1-docker.pkg.dev/xxx/",
dockerfile="./Dockerfile"), executor=LocalDaskExecutor()) as flow: # , schedule=daily_schedule
# raw_credentials = PrefectSecret(CREDENTIALS_SECRET_NAME)
raw_credentials = get_local_credentials()
google_credentials = parse_credentials(credentials=raw_credentials)
raw_data = extract_batch_data.map(batches, unmapped(google_credentials))
comparable_items_df = create_comparable_items_df(full_res=raw_data, extraction_time=extraction_time)
Adding the code of the failed task - create_comparable_items_df - deleting non relevant code
@task()
def create_comparable_items_df(full_res: List[SomethingTuple], extraction_time: datetime) -> pd.DataFrame:
logger = prefect.context.get(LOGGER_NAME)
comparable_items_res, something_without_data, unavailable_something = [], [], []
flat_list = [item for sublist in full_res for item in sublist]
for something in flat_list:
if something.comparable_items:
for tag in something.comparable_items:
temp = [[*something.something_info, tag[0], item] for item in tag[1]]
comparable_items_res.append(temp)
else:
something_without_data.append(something.something_info)
if something[3]:
unavailable_something.append(something.something_info)
comparable_items_res = list(chain(*comparable_items_res))
comparable_items_df = pd.DataFrame(comparable_items_res, columns=['something', 'web_page_id', 'XXX_date', 'tag_name',
'tag_value'])
# TODO: FIX THIS ISSUE -
comparable_items_df['tag_rank'] = comparable_items_df.groupby(['something', 'tag_name']).cumcount()
return comparable_items_df
The row that fails is -
comparable_items_df['tag_rank'] = comparable_items_df.groupby(['something', 'tag_name']).cumcount()
With this error -
TypeError: unhashable type: 'ResultSet'
Kevin Kho
Dekel R
03/08/2022, 2:27 PMKevin Kho
comparable_items_df
? Seems like it’s not what you expectDekel R
03/08/2022, 2:29 PMKevin Kho
Dekel R
03/08/2022, 2:33 PMKevin Kho