https://prefect.io logo
#prefect-community
Title
# prefect-community
d

Dekel R

03/13/2022, 4:50 PM
Hey, I’m trying to run a flow with LocalDaskExecutor - this is my code (simplified - I have multiple tasks after those 2)-
Copy code
CREDENTIALS_SECRET_NAME = 'PREFECT_SERVICE_ACCOUNT_CREDENTIALS'


with Flow('comparable-products-data-extraction-development',
          storage=Docker(registry_url="us-central1-docker.pkg.dev/xxx/",
                         dockerfile="./Dockerfile"), executor=LocalDaskExecutor(scheduler="processes")) as flow:  # , schedule=daily_schedule
    raw_credentials = PrefectSecret(CREDENTIALS_SECRET_NAME)
    # raw_credentials = get_local_credentials()
    google_credentials = parse_credentials(credentials=raw_credentials)
And here is the parse_credentials task -
Copy code
from prefect import task
from google.oauth2 import service_account


@task()
def parse_credentials(credentials):
    return service_account.Credentials.from_service_account_info(credentials)
This works fine in multiple flows when not using Dask executor - and it also runs without any problem locally when using Dask…. When running this in Prefect cloud - (PREFECT_SERVICE_ACCOUNT_CREDENTIALS is saved as a json secret, and it works in other flows) I get this Dask error - (see at the my first comment) Any solution for this? what am I missing? Thanks
a

Anna Geller

03/13/2022, 4:54 PM
Can you move the traceback to the thread to keep the main channel clean? You should instantiate the GCP service account within your task rather than within your flow to make it serializable with cloudpickle. Your Flow must be cloudpickle-serializable to be used with Dask since this is how Dask can pass data between workers. If you want an example to see how you can combine Prefect Secret with GCP Service account, here is one:
Copy code
def upload_local_file_to_gcs(
    creds: dict, source_file_name: str, destination_file_name: str
):
    credentials = service_account.Credentials.from_service_account_info(creds)
    gcs_client = gcs.Client(project=PROJECT_NAME, credentials=credentials)
    bucket = gcs_client.bucket(BUCKET_NAME)
    blob = bucket.blob(blob_name=destination_file_name)
    blob.upload_from_filename(source_file_name)
👀 1
d

Dekel R

03/13/2022, 4:56 PM
Sure
Copy code
Unexpected error: TypeError("cannot pickle '_cffi_backend.FFI' object")
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/prefect/engine/flow_runner.py", line 643, in get_flow_run_state
    final_states = executor.wait(
  File "/usr/local/lib/python3.9/site-packages/prefect/executors/dask.py", line 685, in wait
    return dask.compute(
  File "/usr/local/lib/python3.9/site-packages/dask/base.py", line 570, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/dask/multiprocessing.py", line 219, in get
    result = get_async(
  File "/usr/local/lib/python3.9/site-packages/dask/local.py", line 507, in get_async
    raise_exception(exc, tb)
  File "/usr/local/lib/python3.9/site-packages/dask/local.py", line 315, in reraise
    raise exc
  File "/usr/local/lib/python3.9/site-packages/dask/local.py", line 222, in execute_task
    result = dumps((result, id))
  File "/usr/local/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "/usr/local/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 602, in dump
    return Pickler.dump(self, obj)
TypeError: cannot pickle '_cffi_backend.FFI' object
The only problem here is the fact I have 10 plus tasks - all using the same set of credentials. So - I have to send the raw credentials to each one of them and perform the “service_account.Credentials.from_service_account_info(creds)” inside each task?
a

Anna Geller

03/13/2022, 5:58 PM
yeah, it's the easiest - it's a single line of code. If you prefer, you can move it into some function and call it that way - still one line of code. But passing HTTP client between tasks in Dask won't work because it cannot be serialized
d

Dekel R

03/13/2022, 6:06 PM
Understood. Thanks!
👍 1
14 Views