Joël Luijmes

    Joël Luijmes

    1 year ago
    Hey, I want to persist my results on GCS for a specific task. Which works if I execute the task from an agent, but it fails when I want to run the flow locally. It just doesn’t write to the bucket, nor give any logs that it is trying to access GCS.
    from prefect import task
    from prefect import Flow
    from prefect.tasks.postgres.postgres import PostgresFetch
    from prefect.engine.results import GCSResult
    
    result = GCSResult(bucket='airflow-test-gynzy-net', location='{task_name}.txt')
    
    max_product_id = PostgresFetch(
        'manager', 
        'postgres',
        'localhost',
        15432,
        # query='SELECT MAX(id) from products'
        query='SELECT 188000',
        name='Test 😅',
        result=result
    )
    
    query_products = PostgresFetch(
        'manager',
        'postgres',
        'localhost',
        15432,
        fetch='all',
    )
    
    @task
    def query_formatter(max_id):
        (id, ) = max_id
        return f'SELECT * FROM products WHERE id > {id} LIMIT 10'
    
    
    with Flow("PostgreSQL -> BigQuery Sync") as flow:
        max_id = max_product_id()
        query = query_formatter(max_id)
        products = query_products(query=query)
    
    
    # flow.register('')
    state = flow.run()
    When using the storage, its not actually using GCS when running locally. Or is the whole storage thing only used when flow is ran at the agent?
    storage = GCS(bucket='airflow-test-gynzy-net')
    
    with Flow("PostgreSQL -> BigQuery Sync", storage=storage) as flow:
    m

    Marwan Sarieddine

    1 year ago
    you probably need to enable checkpointing when running the flow locally, it is enabled by default when "running from an agent"
    you'll need this environment variable set to true ...
    export PREFECT__FLOWS__CHECKPOINTING=true
    Joël Luijmes

    Joël Luijmes

    1 year ago
    Hmm I already got that set, but still doesn’t work for GCS.
    It did work for the LocalResult, could there be a bug maybe?
    Oh wait, I think I ctually hadn’t set that. But its kinda broken still?
    Traceback (most recent call last):
      File "/Users/joell/.pyenv/versions/3.8.6/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/Users/joell/.pyenv/versions/3.8.6/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 881, in get_task_run_state
        result = self.result.write(value, **formatting_kwargs)
      File "/Users/joell/.pyenv/versions/3.8.6/lib/python3.8/site-packages/prefect/engine/results/gcs_result.py", line 77, in write
        self.gcs_bucket.blob(new.location).upload_from_string(binary_data)
      File "/Users/joell/.pyenv/versions/3.8.6/lib/python3.8/site-packages/prefect/engine/results/gcs_result.py", line 41, in gcs_bucket
        client = get_storage_client()
      File "/Users/joell/.pyenv/versions/3.8.6/lib/python3.8/site-packages/prefect/utilities/gcp.py", line 53, in get_storage_client
        return get_google_client(storage, credentials=credentials, project=project)
      File "/Users/joell/.pyenv/versions/3.8.6/lib/python3.8/site-packages/prefect/utilities/gcp.py", line 29, in get_google_client
        credentials = Credentials.from_service_account_info(credentials)
      File "/Users/joell/.pyenv/versions/3.8.6/lib/python3.8/site-packages/google/oauth2/service_account.py", line 210, in from_service_account_info
        signer = _service_account_info.from_dict(
      File "/Users/joell/.pyenv/versions/3.8.6/lib/python3.8/site-packages/google/auth/_service_account_info.py", line 46, in from_dict
        missing = keys_needed.difference(six.iterkeys(data))
      File "/Users/joell/.pyenv/versions/3.8.6/lib/python3.8/site-packages/six.py", line 583, in iterkeys
        return iter(d.keys(**kw))
    AttributeError: 'str' object has no attribute 'keys'
    Seems like it parses the keyfile incorrectly from the environment variable?
    m

    Marwan Sarieddine

    1 year ago
    seems like the error is a failure to fetch GCS credentials right ?
    one way to resolve this is to set the proper GCS env variable locally I believe - but I haven't used GCS in a while (more familiar with AWS .... )
    Joël Luijmes

    Joël Luijmes

    1 year ago
    Yes, I probably am passing the keyfile wrong. If I unset the
    PREFECT__CONTEXT__SECRETS__GCP_CREDENTIALS
    everything works as expected 🙂 But if I want to go on produciton, I need to be able to provide that keyfile.
    It probably is due the fact the keyfile is a json object. Any suggestions on how to probably set that as an environment varible?
    I used
    export PREFECT__CONTEXT__SECRETS__GCP_CREDENTIALS=$(cat ~/secrets/airflow-test-gcp-keyfile.json)
    m

    Marwan Sarieddine

    1 year ago
    hmm - not sure ... FWIW - I have seen AWS_CREDENTIALS being set like this (i.e. writing out the json string ...)
    export AWS_CREDENTIALS='{"ACCESS_KEY": "${AWS_ACCESS_KEY_ID}", "SECRET_ACCESS_KEY": "${AWS_SECRET_ACCESS_KEY}"}'
    Joël Luijmes

    Joël Luijmes

    1 year ago
    Hm, usually applications require a path to GCP keyfile (probably due the newlines in the private key). Does anyone know how to properly supply
    PREFECT__CONTEXT__SECRETS__GCP_CREDENTIALS
    as environment variable?
    x = prefect.context.get("secrets", {}).get(
            "GCP_CREDENTIALS"
        )
    
    print(type(x))
    Shows that the x is
    str
    when passing in the keyfile as env. But if I pass
    {"x":"y"}
    , as env, the type is
    prefect.configuration.Config