Hey Folks, I'm hopeful that someone can give me s...
# prefect-community
m
Hey Folks, I'm hopeful that someone can give me some tips as to what I am doing wrong with the BigQueryLoadGoogleCloudStorage Task. I have built a flow to export data from a MySQL database and then upload it to GCS (all works fine). When the flow attempt to run the BigQueryLoadGoogleCloudStorage task, I get the following Error:
Copy code
Unexpected error: PicklingError('Pickling client objects is explicitly not supported.\nClients have non-trivial state that is local and unpickleable.',)
I'm configuring the security credentials for GCP in
.prefect/config.toml
under the
[context.secrets]
section using the
GCP_CREDENTIALS
field. Doing
flow.run()
, the flow completes sucessfully. I then do
flow.register(...)
and then start the agent with
flow.run_agent()
and manually kick off a run in the UI. The following is an example of how I am adding the
BigQueryLoadGoogleCloudStorage
task to the flow:
Copy code
from prefect import Flow
from prefect.tasks.gcp.bigquery import BigQueryLoadGoogleCloudStorage
from google.cloud.bigquery import SchemaField

load_to_bigquery = BigQueryLoadGoogleCloudStorage(
    dataset_id="data_lake_dev",
    project= "data-platform"
)

with Flow("Move Data From DB to BQ") as flow:
    load_to_bigquery(
        uri="<https://storage.cloud.google.com/data-platform-dev/a-csv-file.csv>",
        table="website_data",
        schema=[
            SchemaField(name="id", field_type="INT64", mode="REQUIRED"),
            SchemaField(name="uuid", field_type="STRING"),
            SchemaField(name="site", field_type="STRING"),
            SchemaField(name="path", field_type="STRING"),
            SchemaField(name="type", field_type="STRING", mode="REQUIRED"),
            SchemaField(name="reported_at", field_type="TIMESTAMP", mode="REQUIRED"),
            SchemaField(name="helpful", field_type="BOOL", mode="REQUIRED"),
            SchemaField(name="message", field_type="STRING")
        ]
    )
I've solved my problem. I tried to save and then load the flow to see if I could get the pickle error to happen. But the flow executed successfully. The part that I missed from my example above was this:
Copy code
flow.register(...)
flow.run_agent()
Doing this caused the issue. Removing the
flow.register()
call (as the flow had already been registered) and simply running
flow.run_agent()
resulted in a successful execution.