https://prefect.io logo
Title
b

Bob Colner

05/11/2022, 9:45 PM
So I’m migrating some 1.0 flows to 2.0. loving the new @flow API -loops and native control flow are very nice to have. I’m having a strange task PicklingError error with the dask task runner. The same task works on 1.0 and (this is strange) was working on 2.0 before I tried to setup a deployment. Any ideas on how to debug this task? (see thread)
:discourse: 1
error message:
Traceback (most recent call last):
  File "/opt/miniconda3/envs/orion/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 40, in dumps
    result = pickle.dumps(x, **dump_kwargs)
_pickle.PicklingError: Can't pickle <function asa_get_campaigns at 0x1087c4160>: it's not the same object as __main__.asa_get_campaigns
task def:
@task(
    retries=3,
    retry_delay_seconds=60,
    tags=['asa_api'],
)
def asa_get_campaigns(org_id: str, date: str, access_token: str) -> pd.DataFrame:
    logger = get_run_logger()
    <http://logger.info|logger.info>(f'fetching campaigns for: org: {org_id}, date: {date}')
    return asa_api.get_campaigns(org_id, date, access_token)
asa.get_campaigns
def:
def get_campaigns(org_id: str, date: str, access_token: str) -> pd.DataFrame:
    requestBody = {
        'startTime': date,
        'endTime': date,
        'selector': {
            'orderBy': [{'field': 'campaignId', 'sortOrder': 'ASCENDING'}],
            'conditions': [{'field': 'campaignStatus', 'operator': 'EQUALS', 'values': ['ENABLED']}]
        },
        'returnRowTotals': 'true'
    }
    headers = {'Content-Type': 'application/json', 'Authorization': f'Bearer {access_token}', 'X-AP-Context': f'orgId={org_id}'}
    response = <http://requests.post|requests.post>(CAMPAIGNS_URL, data=json.dumps(requestBody, ensure_ascii=False), headers=headers)

    jsonResponse = response.json()
    if jsonResponse["data"] == None or jsonResponse['pagination']['totalResults'] == 0:
        return pd.DataFrame()

    rows = jsonResponse['data']['reportingDataResponse']['row']
    format_row = lambda row : {
        'spend_date': date, 
        'org_id': org_id,
        'app_name': row['metadata']['app']['appName'],
        'campaign_id': row['metadata']['campaignId'],
        'campaign_name': row['metadata']['campaignName'],
        'modification_at': row['metadata']['modificationTime'],
        'inserted_at': datetime.utcnow().isoformat(),
        'spend_amount': float(row['total']['localSpend']['amount']),
        'impressions': row['total']['impressions'],
        'taps': row['total']['taps'],
        'installs': row['total']['installs'],
        'country': row['metadata']['countriesOrRegions'][0],
        'json_payload': json.dumps(row)
    }
    formatted_rows = [*map(format_row, rows)]
    df = pd.DataFrame(formatted_rows).convert_dtypes()
    df['spend_amount'] = df['spend_amount'].astype(float)
    return df
my task does not have any client objects that I am aware of. Just makes an HTTP post
z

Zanie

05/11/2022, 10:33 PM
Hm, I’ve seen this before. Usually when you’re lacking the
if __name__ == "__main__"
guard before calling the flow
b

Bob Colner

05/11/2022, 10:40 PM
Interesting, I have the main guard in place in the flow file. Any other debug ideas?
z

Zanie

05/11/2022, 10:42 PM
Does it happen with a minimal example?
b

Bob Colner

05/11/2022, 10:44 PM
It was working for me. I’ll try to revert to some earlier git commit. Issue started when I tried to create a local deployment for the first time. Unless I somehow introduced a subtle bug at the same time
z

Zanie

05/11/2022, 10:45 PM
How did you define your deployment? Is the error happening when you just call the flow or does it happen when the deployment runs?
b

Bob Colner

05/11/2022, 10:47 PM
Local flow, default universal flow runner, gcp storage. FIY changing flow runners is not well documented IMO
I get the same pickle issue when running the flow file or ‘executing’ the deployment
z

Zanie

05/11/2022, 10:49 PM
We’re designing some changes to flow runners atm so we’re hesitant to sink much time into the existing interface.
Can you share the full flow file?
b

Bob Colner

05/11/2022, 10:51 PM
Makes sense, yeah I’ll share the full flow file tomorrow when I’m in front of my computer. Thanks for your help
here is my full task/flow .py file
z

Zanie

05/12/2022, 3:38 PM
Hm. If I throw some mocks in there for the external items this runs fine
import time
from prefect import task, flow, get_run_logger
from prefect.task_runners import (
    SequentialTaskRunner,
    ConcurrentTaskRunner,
    DaskTaskRunner,
)

from typing import List
from unittest.mock import MagicMock

pd = MagicMock()
GcpCredentials = MagicMock()
bigquery_insert_stream = MagicMock()
asa_api = MagicMock()


@task(
    retries=3,
    retry_delay_seconds=60,
    tags=["asa_api"],
)
def asa_get_campaigns(org_id: str, date: str, access_token: str) -> pd.DataFrame:
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"fetching campaigns for: org: {org_id}, date: {date}")
    return asa_api.get_campaigns(org_id, date, access_token)


@task(
    retries=3,
    retry_delay_seconds=60,
    tags=["asa_api"],
)
def asa_get_adgroups(
    campaign_id: str, org_id: str, date: str, access_token: str
) -> pd.DataFrame:
    logger = get_run_logger()
    <http://logger.info|logger.info>(
        f"fetching adgroups for: org: {org_id}, campaign: {campaign_id}, date: {date}"
    )
    return asa_api.get_adgroups(campaign_id, org_id, date, access_token)


@task(
    retries=3,
    retry_delay_seconds=60,
    tags=["asa_api"],
)
def asa_get_keywords(
    campaign_id: str, org_id: str, date: str, access_token: str
) -> pd.DataFrame:
    logger = get_run_logger()
    <http://logger.info|logger.info>(
        f"fetching keywords for: {org_id}, campaign: {campaign_id}, date: {date}"
    )
    return asa_api.get_keywords(campaign_id, org_id, date, access_token)


dask_local = DaskTaskRunner(
    cluster_kwargs={
        "processes": True,
        "n_workers": 4,
        "threads_per_worker": 2,
        "dashboard_address": None,
    }
)


@flow(task_runner=dask_local)
def fetch_asa_data(
    start_date: str, end_date: str, orgs: List[str] = ["117960", "2657930", "3737470"]
) -> str:
    start_at = time.time()
    logger = get_run_logger()
    asa_token = asa_api.get_access_token()
    gcp_credentials = GcpCredentials(project="emerald-skill-201716")
    date_range = [
        d.date().isoformat() for d in pd.date_range(start=start_date, end=end_date)
    ]
    for date in date_range:
        for org_id in orgs:
            dfc_future = asa_get_campaigns(org_id, date, asa_token)
            dfc = dfc_future.result()
            if dfc is None or len(dfc) == 0:
                <http://logger.info|logger.info>(f"No campaign data for org: {org_id}, date: {date}")
                continue

            rows_c = dfc.to_dict("records")
            result = bigquery_insert_stream(
                dataset="apple_search_ads",
                table="campaigns_stream",
                records=rows_c,
                gcp_credentials=gcp_credentials,
            )
            for row in dfc.itertuples():
                dfa_future = asa_get_adgroups(row.campaign_id, org_id, date, asa_token)
                dfa = dfa_future.result()
                if dfa is None or len(dfa) == 0:
                    <http://logger.info|logger.info>(
                        f"No adgroup data for org: {org_id}, date: {date}, campaign_id: {row.campaign_id}"
                    )
                    continue

                rows_a = dfa.to_dict("records")
                result = bigquery_insert_stream(
                    dataset="apple_search_ads",
                    table="adgroups_stream",
                    records=rows_a,
                    gcp_credentials=gcp_credentials,
                )
                dfk_future = asa_get_keywords(row.campaign_id, org_id, date, asa_token)
                dfk = dfk_future.result()
                if dfk is None or len(dfk) == 0:
                    <http://logger.info|logger.info>(
                        f"No keyword data for org: {org_id}, date: {date}, campaign_id: {row.campaign_id}"
                    )
                    continue

                rows_k = dfk.to_dict("records")
                result = bigquery_insert_stream(
                    dataset="apple_search_ads",
                    table="keywords_stream",
                    records=rows_k,
                    gcp_credentials=gcp_credentials,
                )

    end_at = time.time()
    <http://logger.info|logger.info>(f"flow run time (mins): {(end_at-start_at)/60}")
    return True


if __name__ == "__main__":
    fetch_asa_data(start_date="2022-05-06", end_date="2022-05-06")
b

Bob Colner

05/12/2022, 4:34 PM
Interesting, so something in my module must not be pickle friendly ? I’m using the same module in 1.0 and it works with dask. In general debugging the stuff is a huge mystery to me
z

Zanie

05/12/2022, 4:38 PM
The error you’re getting doesn’t entirely suggest that, but I’m not sure what’s happening.
b

Bob Colner

05/23/2022, 2:09 PM
So the problem went away after I switched the storage back to temporary local. Still no idea How this error relates to trying to use google cloud storage.
z

Zanie

05/23/2022, 3:38 PM
We’re persisting results to GCS which means we need to ship the GCS object to the tasks which means it gets cloudpickled.
I still don’t understand the error though or how it relates to GCS 😄 Usually here we’d see an error when the GCS client is pickled.
b

Bob Colner

05/23/2022, 3:40 PM
right, but my results were all pd.Dataframes so still not sure why the pickle fail.
z

Zanie

05/23/2022, 3:41 PM
Can you share the full traceback? And perhaps DEBUG level logs?