Joël Luijmes
10/31/2020, 4:30 PMfrom prefect import task
from prefect import Flow
from prefect.tasks.postgres.postgres import PostgresFetch
from prefect.engine.results import GCSResult
result = GCSResult(bucket='airflow-test-gynzy-net', location='{task_name}.txt')
max_product_id = PostgresFetch(
'manager',
'postgres',
'localhost',
15432,
# query='SELECT MAX(id) from products'
query='SELECT 188000',
name='Test 😅',
result=result
)
query_products = PostgresFetch(
'manager',
'postgres',
'localhost',
15432,
fetch='all',
)
@task
def query_formatter(max_id):
(id, ) = max_id
return f'SELECT * FROM products WHERE id > {id} LIMIT 10'
with Flow("PostgreSQL -> BigQuery Sync") as flow:
max_id = max_product_id()
query = query_formatter(max_id)
products = query_products(query=query)
# flow.register('')
state = flow.run()
storage = GCS(bucket='airflow-test-gynzy-net')
with Flow("PostgreSQL -> BigQuery Sync", storage=storage) as flow:
Marwan Sarieddine
10/31/2020, 4:42 PMexport PREFECT__FLOWS__CHECKPOINTING=true
Joël Luijmes
10/31/2020, 4:46 PMTraceback (most recent call last):
File "/Users/joell/.pyenv/versions/3.8.6/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/Users/joell/.pyenv/versions/3.8.6/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 881, in get_task_run_state
result = self.result.write(value, **formatting_kwargs)
File "/Users/joell/.pyenv/versions/3.8.6/lib/python3.8/site-packages/prefect/engine/results/gcs_result.py", line 77, in write
self.gcs_bucket.blob(new.location).upload_from_string(binary_data)
File "/Users/joell/.pyenv/versions/3.8.6/lib/python3.8/site-packages/prefect/engine/results/gcs_result.py", line 41, in gcs_bucket
client = get_storage_client()
File "/Users/joell/.pyenv/versions/3.8.6/lib/python3.8/site-packages/prefect/utilities/gcp.py", line 53, in get_storage_client
return get_google_client(storage, credentials=credentials, project=project)
File "/Users/joell/.pyenv/versions/3.8.6/lib/python3.8/site-packages/prefect/utilities/gcp.py", line 29, in get_google_client
credentials = Credentials.from_service_account_info(credentials)
File "/Users/joell/.pyenv/versions/3.8.6/lib/python3.8/site-packages/google/oauth2/service_account.py", line 210, in from_service_account_info
signer = _service_account_info.from_dict(
File "/Users/joell/.pyenv/versions/3.8.6/lib/python3.8/site-packages/google/auth/_service_account_info.py", line 46, in from_dict
missing = keys_needed.difference(six.iterkeys(data))
File "/Users/joell/.pyenv/versions/3.8.6/lib/python3.8/site-packages/six.py", line 583, in iterkeys
return iter(d.keys(**kw))
AttributeError: 'str' object has no attribute 'keys'
Seems like it parses the keyfile incorrectly from the environment variable?Marwan Sarieddine
10/31/2020, 5:10 PMJoël Luijmes
10/31/2020, 5:14 PMPREFECT__CONTEXT__SECRETS__GCP_CREDENTIALS
everything works as expected 🙂 But if I want to go on produciton, I need to be able to provide that keyfile.export PREFECT__CONTEXT__SECRETS__GCP_CREDENTIALS=$(cat ~/secrets/airflow-test-gcp-keyfile.json)
Marwan Sarieddine
10/31/2020, 5:19 PMexport AWS_CREDENTIALS='{"ACCESS_KEY": "${AWS_ACCESS_KEY_ID}", "SECRET_ACCESS_KEY": "${AWS_SECRET_ACCESS_KEY}"}'
Joël Luijmes
10/31/2020, 5:26 PMPREFECT__CONTEXT__SECRETS__GCP_CREDENTIALS
as environment variable?x = prefect.context.get("secrets", {}).get(
"GCP_CREDENTIALS"
)
print(type(x))
Shows that the x is str
when passing in the keyfile as env. But if I pass {"x":"y"}
, as env, the type is prefect.configuration.Config