Dekel R

    Dekel R

    6 months ago
    Hey, I’m trying to run a flow with LocalDaskExecutor - this is my code (simplified - I have multiple tasks after those 2)-
    CREDENTIALS_SECRET_NAME = 'PREFECT_SERVICE_ACCOUNT_CREDENTIALS'
    
    
    with Flow('comparable-products-data-extraction-development',
              storage=Docker(registry_url="us-central1-docker.pkg.dev/xxx/",
                             dockerfile="./Dockerfile"), executor=LocalDaskExecutor(scheduler="processes")) as flow:  # , schedule=daily_schedule
        raw_credentials = PrefectSecret(CREDENTIALS_SECRET_NAME)
        # raw_credentials = get_local_credentials()
        google_credentials = parse_credentials(credentials=raw_credentials)
    And here is the parse_credentials task -
    from prefect import task
    from google.oauth2 import service_account
    
    
    @task()
    def parse_credentials(credentials):
        return service_account.Credentials.from_service_account_info(credentials)
    This works fine in multiple flows when not using Dask executor - and it also runs without any problem locally when using Dask…. When running this in Prefect cloud - (PREFECT_SERVICE_ACCOUNT_CREDENTIALS is saved as a json secret, and it works in other flows) I get this Dask error - (see at the my first comment) Any solution for this? what am I missing? Thanks
    Anna Geller

    Anna Geller

    6 months ago
    Can you move the traceback to the thread to keep the main channel clean? You should instantiate the GCP service account within your task rather than within your flow to make it serializable with cloudpickle. Your Flow must be cloudpickle-serializable to be used with Dask since this is how Dask can pass data between workers. If you want an example to see how you can combine Prefect Secret with GCP Service account, here is one:
    def upload_local_file_to_gcs(
        creds: dict, source_file_name: str, destination_file_name: str
    ):
        credentials = service_account.Credentials.from_service_account_info(creds)
        gcs_client = gcs.Client(project=PROJECT_NAME, credentials=credentials)
        bucket = gcs_client.bucket(BUCKET_NAME)
        blob = bucket.blob(blob_name=destination_file_name)
        blob.upload_from_filename(source_file_name)
    Dekel R

    Dekel R

    6 months ago
    Sure
    Unexpected error: TypeError("cannot pickle '_cffi_backend.FFI' object")
    Traceback (most recent call last):
      File "/usr/local/lib/python3.9/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/usr/local/lib/python3.9/site-packages/prefect/engine/flow_runner.py", line 643, in get_flow_run_state
        final_states = executor.wait(
      File "/usr/local/lib/python3.9/site-packages/prefect/executors/dask.py", line 685, in wait
        return dask.compute(
      File "/usr/local/lib/python3.9/site-packages/dask/base.py", line 570, in compute
        results = schedule(dsk, keys, **kwargs)
      File "/usr/local/lib/python3.9/site-packages/dask/multiprocessing.py", line 219, in get
        result = get_async(
      File "/usr/local/lib/python3.9/site-packages/dask/local.py", line 507, in get_async
        raise_exception(exc, tb)
      File "/usr/local/lib/python3.9/site-packages/dask/local.py", line 315, in reraise
        raise exc
      File "/usr/local/lib/python3.9/site-packages/dask/local.py", line 222, in execute_task
        result = dumps((result, id))
      File "/usr/local/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
        cp.dump(obj)
      File "/usr/local/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 602, in dump
        return Pickler.dump(self, obj)
    TypeError: cannot pickle '_cffi_backend.FFI' object
    The only problem here is the fact I have 10 plus tasks - all using the same set of credentials. So - I have to send the raw credentials to each one of them and perform the “service_account.Credentials.from_service_account_info(creds)” inside each task?
    Anna Geller

    Anna Geller

    6 months ago
    yeah, it's the easiest - it's a single line of code. If you prefer, you can move it into some function and call it that way - still one line of code. But passing HTTP client between tasks in Dask won't work because it cannot be serialized
    Dekel R

    Dekel R

    6 months ago
    Understood. Thanks!