https://prefect.io logo
j

Joël Luijmes

10/31/2020, 4:30 PM
Hey, I want to persist my results on GCS for a specific task. Which works if I execute the task from an agent, but it fails when I want to run the flow locally. It just doesn’t write to the bucket, nor give any logs that it is trying to access GCS.
Copy code
from prefect import task
from prefect import Flow
from prefect.tasks.postgres.postgres import PostgresFetch
from prefect.engine.results import GCSResult

result = GCSResult(bucket='airflow-test-gynzy-net', location='{task_name}.txt')

max_product_id = PostgresFetch(
    'manager', 
    'postgres',
    'localhost',
    15432,
    # query='SELECT MAX(id) from products'
    query='SELECT 188000',
    name='Test 😅',
    result=result
)

query_products = PostgresFetch(
    'manager',
    'postgres',
    'localhost',
    15432,
    fetch='all',
)

@task
def query_formatter(max_id):
    (id, ) = max_id
    return f'SELECT * FROM products WHERE id > {id} LIMIT 10'


with Flow("PostgreSQL -> BigQuery Sync") as flow:
    max_id = max_product_id()
    query = query_formatter(max_id)
    products = query_products(query=query)


# flow.register('')
state = flow.run()
When using the storage, its not actually using GCS when running locally. Or is the whole storage thing only used when flow is ran at the agent?
Copy code
storage = GCS(bucket='airflow-test-gynzy-net')

with Flow("PostgreSQL -> BigQuery Sync", storage=storage) as flow:
m

Marwan Sarieddine

10/31/2020, 4:42 PM
you probably need to enable checkpointing when running the flow locally, it is enabled by default when "running from an agent"
you'll need this environment variable set to true ...
Copy code
export PREFECT__FLOWS__CHECKPOINTING=true
j

Joël Luijmes

10/31/2020, 4:46 PM
Hmm I already got that set, but still doesn’t work for GCS.
It did work for the LocalResult, could there be a bug maybe?
Oh wait, I think I ctually hadn’t set that. But its kinda broken still?
Copy code
Traceback (most recent call last):
  File "/Users/joell/.pyenv/versions/3.8.6/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/Users/joell/.pyenv/versions/3.8.6/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 881, in get_task_run_state
    result = self.result.write(value, **formatting_kwargs)
  File "/Users/joell/.pyenv/versions/3.8.6/lib/python3.8/site-packages/prefect/engine/results/gcs_result.py", line 77, in write
    self.gcs_bucket.blob(new.location).upload_from_string(binary_data)
  File "/Users/joell/.pyenv/versions/3.8.6/lib/python3.8/site-packages/prefect/engine/results/gcs_result.py", line 41, in gcs_bucket
    client = get_storage_client()
  File "/Users/joell/.pyenv/versions/3.8.6/lib/python3.8/site-packages/prefect/utilities/gcp.py", line 53, in get_storage_client
    return get_google_client(storage, credentials=credentials, project=project)
  File "/Users/joell/.pyenv/versions/3.8.6/lib/python3.8/site-packages/prefect/utilities/gcp.py", line 29, in get_google_client
    credentials = Credentials.from_service_account_info(credentials)
  File "/Users/joell/.pyenv/versions/3.8.6/lib/python3.8/site-packages/google/oauth2/service_account.py", line 210, in from_service_account_info
    signer = _service_account_info.from_dict(
  File "/Users/joell/.pyenv/versions/3.8.6/lib/python3.8/site-packages/google/auth/_service_account_info.py", line 46, in from_dict
    missing = keys_needed.difference(six.iterkeys(data))
  File "/Users/joell/.pyenv/versions/3.8.6/lib/python3.8/site-packages/six.py", line 583, in iterkeys
    return iter(d.keys(**kw))
AttributeError: 'str' object has no attribute 'keys'
Seems like it parses the keyfile incorrectly from the environment variable?
m

Marwan Sarieddine

10/31/2020, 5:10 PM
seems like the error is a failure to fetch GCS credentials right ?
one way to resolve this is to set the proper GCS env variable locally I believe - but I haven't used GCS in a while (more familiar with AWS .... )
j

Joël Luijmes

10/31/2020, 5:14 PM
Yes, I probably am passing the keyfile wrong. If I unset the
PREFECT__CONTEXT__SECRETS__GCP_CREDENTIALS
everything works as expected 🙂 But if I want to go on produciton, I need to be able to provide that keyfile.
It probably is due the fact the keyfile is a json object. Any suggestions on how to probably set that as an environment varible?
I used
Copy code
export PREFECT__CONTEXT__SECRETS__GCP_CREDENTIALS=$(cat ~/secrets/airflow-test-gcp-keyfile.json)
m

Marwan Sarieddine

10/31/2020, 5:19 PM
hmm - not sure ... FWIW - I have seen AWS_CREDENTIALS being set like this (i.e. writing out the json string ...)
Copy code
export AWS_CREDENTIALS='{"ACCESS_KEY": "${AWS_ACCESS_KEY_ID}", "SECRET_ACCESS_KEY": "${AWS_SECRET_ACCESS_KEY}"}'
j

Joël Luijmes

10/31/2020, 5:26 PM
Hm, usually applications require a path to GCP keyfile (probably due the newlines in the private key). Does anyone know how to properly supply
PREFECT__CONTEXT__SECRETS__GCP_CREDENTIALS
as environment variable?
Copy code
x = prefect.context.get("secrets", {}).get(
        "GCP_CREDENTIALS"
    )

print(type(x))
Shows that the x is
str
when passing in the keyfile as env. But if I pass
{"x":"y"}
, as env, the type is
prefect.configuration.Config
2 Views