Dekel R
03/13/2022, 4:50 PMCREDENTIALS_SECRET_NAME = 'PREFECT_SERVICE_ACCOUNT_CREDENTIALS'
with Flow('comparable-products-data-extraction-development',
storage=Docker(registry_url="us-central1-docker.pkg.dev/xxx/",
dockerfile="./Dockerfile"), executor=LocalDaskExecutor(scheduler="processes")) as flow: # , schedule=daily_schedule
raw_credentials = PrefectSecret(CREDENTIALS_SECRET_NAME)
# raw_credentials = get_local_credentials()
google_credentials = parse_credentials(credentials=raw_credentials)
And here is the parse_credentials task -
from prefect import task
from google.oauth2 import service_account
@task()
def parse_credentials(credentials):
return service_account.Credentials.from_service_account_info(credentials)
This works fine in multiple flows when not using Dask executor - and it also runs without any problem locally when using Dask….
When running this in Prefect cloud - (PREFECT_SERVICE_ACCOUNT_CREDENTIALS is saved as a json secret, and it works in other flows) I get this Dask error - (see at the my first comment)
Any solution for this? what am I missing?
ThanksAnna Geller
def upload_local_file_to_gcs(
creds: dict, source_file_name: str, destination_file_name: str
):
credentials = service_account.Credentials.from_service_account_info(creds)
gcs_client = gcs.Client(project=PROJECT_NAME, credentials=credentials)
bucket = gcs_client.bucket(BUCKET_NAME)
blob = bucket.blob(blob_name=destination_file_name)
blob.upload_from_filename(source_file_name)
Dekel R
03/13/2022, 4:56 PMUnexpected error: TypeError("cannot pickle '_cffi_backend.FFI' object")
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/prefect/engine/flow_runner.py", line 643, in get_flow_run_state
final_states = executor.wait(
File "/usr/local/lib/python3.9/site-packages/prefect/executors/dask.py", line 685, in wait
return dask.compute(
File "/usr/local/lib/python3.9/site-packages/dask/base.py", line 570, in compute
results = schedule(dsk, keys, **kwargs)
File "/usr/local/lib/python3.9/site-packages/dask/multiprocessing.py", line 219, in get
result = get_async(
File "/usr/local/lib/python3.9/site-packages/dask/local.py", line 507, in get_async
raise_exception(exc, tb)
File "/usr/local/lib/python3.9/site-packages/dask/local.py", line 315, in reraise
raise exc
File "/usr/local/lib/python3.9/site-packages/dask/local.py", line 222, in execute_task
result = dumps((result, id))
File "/usr/local/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
cp.dump(obj)
File "/usr/local/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 602, in dump
return Pickler.dump(self, obj)
TypeError: cannot pickle '_cffi_backend.FFI' object
Anna Geller
Dekel R
03/13/2022, 6:06 PM