https://prefect.io logo
Title
d

Dekel R

03/06/2022, 8:43 PM
Hey everyone, I have a pretty complex flow with multiple tasks that reads data from multiple sources, then extract the relevant parts from it and saves it to google storage. I have 2 questions: 1. This flow was originally written outside of Prefect and used pool.map in order to extract data from the objects I have (html pages) - is there any knows issue regards using pool.map inside a Prefect task context? 2. In another task that isn’t using pool.map Im running this (valid) pandas row -
comparable_items_df['tag_rank'] = comparable_items_df.groupby(['id', 'tag_name']).cumcount()
And I get the following error (when running outside of a task context it works)
TypeError: unhashable type: 'ResultSet'
For now I’m running this locally on my mac but it will run eventually using Vertex AI. I’ll appreciate any help since I’m currently stuck. Thanks.
k

Kevin Kho

03/07/2022, 12:49 AM
I believe pool.map is fine, but you can also bring it to LocalDaskExecutor. I think the issues I have seen are about logging where logging configuration does not stick when using
pool.map
I think that type error is from something in the Flow block, not the task. What is the return of that task and how do you use it inside the Flow?
d

Dekel R

03/07/2022, 7:27 AM
The task returns a pandas df - I use it like this -
with Flow('comparable-products-data-extraction',
          storage=Docker(registry_url="us-central1-docker.pkg.dev/xxx",
                         dockerfile="./Dockerfile")) as flow:  # , schedule=daily_schedule
   
    comparable_items_df = create_comparable_items_df(full_res=raw_x_data, extraction_time=extraction_time)
The task looks something like that -
@task()
def create_comparable_items_df(full_res: List[AsinTuple], extraction_time: datetime) -> pd.DataFrame:
    logger = prefect.context.get(LOGGER_NAME)
    comparable_items_res, asins_without_data, unavailable_asins = [], [], [
I’m not posting the full code - but after these rows Im ruuning
comparable_items_df['tag_rank'] = comparable_items_df.groupby(['id', 'tag_name']).cumcount()
Which gives the error above.
Regarding pool map - let me explain - Assuming I have tasks A, B and C and task C uses pool.map. The behavior I’m seeing is really strange - Prefect executing task A, B then C - and then it looks like for each Process all of the tasks are getting executed again from A (there are dependencies between A B and C so they are synchronous). Have you ever encountered this?
Hey - I think that I found the root cause here - since I’m developing on my local machine and debugging the code I have the following line of code in the same file of my Flow object -
flow.run()
Now looking at this guide - https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming And specifically this row -
Safe importing of main module
Make sure that the main module can be safely imported by a new Python interpreter without causing unintended side effects (such a starting a new process).
It seems like each process is actually importing the main module (which in my case is the module in which I’m building a flow and running it) and since I have the flow.run() row - each process actually runs the flow from the beginning. Any idea of how to overcome this?
@Kevin Kho can you please take a look? In order to overcome this I changed my approach and I’m now using Prefect mapping instead of pool - I’m getting the same error. Adding some more code in the next message.
All the variables that I’m using inside the flow are the result of previous tasks that I deleted from here (all are running and working well) . batches is a list of lists and so is raw_data.
with Flow('comparable-products-data-extraction-development',
          storage=Docker(registry_url="us-central1-docker.pkg.dev/xxx/",
                         dockerfile="./Dockerfile"), executor=LocalDaskExecutor()) as flow:  # , schedule=daily_schedule
    # raw_credentials = PrefectSecret(CREDENTIALS_SECRET_NAME)
    raw_credentials = get_local_credentials()
    google_credentials = parse_credentials(credentials=raw_credentials)

    raw_data = extract_batch_data.map(batches, unmapped(google_credentials))
    comparable_items_df = create_comparable_items_df(full_res=raw_data, extraction_time=extraction_time)
Adding the code of the failed task - create_comparable_items_df - deleting non relevant code
@task()
def create_comparable_items_df(full_res: List[SomethingTuple], extraction_time: datetime) -> pd.DataFrame:
    logger = prefect.context.get(LOGGER_NAME)
    comparable_items_res, something_without_data, unavailable_something = [], [], []
    flat_list = [item for sublist in full_res for item in sublist]
    for something in flat_list:
        if something.comparable_items:
            for tag in something.comparable_items:
                temp = [[*something.something_info, tag[0], item] for item in tag[1]]
                comparable_items_res.append(temp)
        else:
            something_without_data.append(something.something_info)
        if something[3]:
            unavailable_something.append(something.something_info)
    comparable_items_res = list(chain(*comparable_items_res))
    comparable_items_df = pd.DataFrame(comparable_items_res, columns=['something', 'web_page_id', 'XXX_date', 'tag_name',
                                                                      'tag_value'])
    # TODO: FIX THIS ISSUE - 
    comparable_items_df['tag_rank'] = comparable_items_df.groupby(['something', 'tag_name']).cumcount()
    return comparable_items_df
The row that fails is -
comparable_items_df['tag_rank'] = comparable_items_df.groupby(['something', 'tag_name']).cumcount()
With this error -
TypeError: unhashable type: 'ResultSet'
k

Kevin Kho

03/08/2022, 2:15 PM
Oops I let this slip I’ll take a look
Not seeing anything wrong immediately.Will look a bit more
d

Dekel R

03/08/2022, 2:27 PM
Hey Kevin, I’m pretty sure its just an issue in my data - just figured it out. The error is not as explicit as I’d expect - but seems like its not prefect related.
k

Kevin Kho

03/08/2022, 2:29 PM
I think so too. Maybe you can print the head of
comparable_items_df
? Seems like it’s not what you expect
d

Dekel R

03/08/2022, 2:29 PM
I do find it pretty hard tho to debug Prefect locally while using pool - I managed to work around this issue by using prefect’s map - but when using pythons pool and using flow.run() each process spawned a copy of the flow - which is pretty bad. I think it’s a pretty unique case - I couldn’t find any Prefect reference for debugging locally when using pool.
k

Kevin Kho

03/08/2022, 2:31 PM
Ah I think LocalDaskExecutor would be really preferred over Pool. LocalDask is a multiprocessing pool anyway if you can use it
But I think you already moved towards it? Would like to know what happened here even if not Prefect related. Have not seen this error before
d

Dekel R

03/08/2022, 2:33 PM
Yeah that’s actually what I did and it works great - but this way I have to force the whole team to not use pool.
Are you talking about the “TypeError: unhashable type: ‘ResultSet’” error? If you do - it seems like one of the columns had mixed types - both strings and lists. Now When I tried grouping on it and doing “cumcount” inside the prefect flow it failed - but after writing this df (which forced a type casting to string) it worked… that’s what confused me here.
k

Kevin Kho

03/08/2022, 2:36 PM
Yeah ah I see. Thanks for the info!