Laura Vaida
02/12/2021, 11:57 AMTyler Wanner
02/12/2021, 3:17 PMfrom prefect import task, Flow
from prefect.tasks.secrets import PrefectSecret
@task
def my_task(credentials):
"""A task that requires credentials to access something. Passing the
credentials in as an argument allows you to change how/where the
credentials are loaded (though we recommend using `PrefectSecret` tasks to
load them."""
pass
with Flow("example") as flow:
my_secret = PrefectSecret("MYSECRET")
res = my_task(credentials=my_secret)
does this help?Laura Vaida
02/12/2021, 3:34 PMflow.storage= Docker(registry_url="<http://gcr.io/keller-steering-enabling/flows|gcr.io/keller-steering-enabling/flows>")
flow.run_config = KubernetesRun()
flow.run(executor=executor)
flow.register('Billwerk')
Tyler Wanner
02/12/2021, 3:35 PMLaura Vaida
02/12/2021, 3:42 PM@task
def oauth(client_id, username, client_secret, password):
from prefect.tasks.secrets import PrefectSecret
from oauthlib.oauth2 import LegacyApplicationClient
from requests_oauthlib import OAuth2Session
oauth = OAuth2Session(client=LegacyApplicationClient(client_id=client_id))
token = oauth.fetch_token(token_url='<https://app.billwerk.com/oauth/token>',
username=username,
password=password,
client_id=client_id,
client_secret=client_secret)
pass
with Flow("billwerk-contracts") as flow:
BW_Username = PrefectSecret("BW_Username")
BW_Password = PrefectSecret("BW_Password")
BW_Client_Id = PrefectSecret("BW_Client_Id")
BW_Client_Secret = PrefectSecret("BW_Client_Secret")
oauth(username = BW_Username, password =BW_Password,client_id = BW_Client_Id, client_secret = BW_Client_Secret)
like that?from prefect.tasks.secrets import PrefectSecret
from oauthlib.oauth2 import LegacyApplicationClient
from requests_oauthlib import OAuth2Session
@task
def oauth(client_id, username, client_secret, password):
oauth = OAuth2Session(client=LegacyApplicationClient(client_id=client_id))
token = oauth.fetch_token(token_url='<https://app.billwerk.com/oauth/token>',
username=username,
password=password,
client_id=client_id,
client_secret=client_secret)
pass
with Flow("billwerk-contracts") as flow:
BW_Username = PrefectSecret("BW_Username")
BW_Password = PrefectSecret("BW_Password")
BW_Client_Id = PrefectSecret("BW_Client_Id")
BW_Client_Secret = PrefectSecret("BW_Client_Secret")
oauth(username = BW_Username, password =BW_Password,client_id = BW_Client_Id, client_secret = BW_Client_Secret)
Tyler Wanner
02/12/2021, 4:03 PMLaura Vaida
02/12/2021, 4:26 PMnicholas
02/12/2021, 4:38 PMLaura Vaida
02/12/2021, 4:41 PMimport requests
import fsspec
import sys
import json
import csv
import os.path
import snowflake.connector
import sysconfig
import configparser
import pandas
from oauthlib.oauth2 import LegacyApplicationClient
from requests_oauthlib import OAuth2Session
from pandas import json_normalize
from datetime import datetime
import openpyxl
import numpy as np
import csv
from prefect.executors import DaskExecutor
from prefect.engine.results import GCSResult
from prefect.storage import GCS
from google.cloud import storage
from prefect.storage import Docker
from prefect.run_configs import KubernetesRun
import docker
import google.cloud
from prefect import task, Flow
import datetime
import random
from prefect.tasks.secrets import PrefectSecret
from prefect.client import Secret
from prefect.tasks.secrets import PrefectSecret
from oauthlib.oauth2 import LegacyApplicationClient
from requests_oauthlib import OAuth2Session
#define executor as dask executor with tcp from load balancer
executor = DaskExecutor(address="35.198.104.230:8786") # tcp:port address of dask load balancer
"""""
Local stored access secrets
@task
def get_config():
from oauthlib.oauth2 import LegacyApplicationClient
from requests_oauthlib import OAuth2Session
config = configparser.ConfigParser()
config.read('C:/Users/laura.vaida.000/billwerk_api.ini')
# Example of retrieving config data
# config.get("billwerk_api", 'client_id')
######################################################
# Getting access token with client_id and client_secret
######################################################
oauth = OAuth2Session(client=LegacyApplicationClient(client_id=config.get("billwerk_api", 'client_id')))
token = oauth.fetch_token(token_url='<https://app.billwerk.com/oauth/token>',
username=config.get("billwerk_api", 'username'),
password=config.get("billwerk_api", 'password'),
client_id=config.get("billwerk_api", 'client_id'),
client_secret=config.get("billwerk_api", 'client_secret'))
"""
@task
def oauth(client_id, username, client_secret, password):
from oauthlib.oauth2 import LegacyApplicationClient
from requests_oauthlib import OAuth2Session
oauth = OAuth2Session(client=LegacyApplicationClient(client_id=client_id))
token = oauth.fetch_token(token_url='<https://app.billwerk.com/oauth/token>',
username=username,
password=password,
client_id=client_id,
client_secret=client_secret)
pass
@task
def creating_dataframe():
bw_contracts_call = requests.get('<https://app.billwerk.com/api/v1/contracts?access_token=>' + token['access_token'])
bw_contracts_start = requests.get('<https://app.billwerk.com/api/v1/contracts?access_token=>' + token['access_token']).json()
# Getting last contract id for while loop
last_entry = bw_contracts_start[199]['Id']
contracts_pandaframe = json_normalize(bw_contracts_start, sep='_')
#Converting in string for merge function
contracts_pandaframe['Id'] = contracts_pandaframe['Id'].astype(str)
#separately reading phases from dataframe
phases_pandaframe = json_normalize(bw_contracts_start, record_path=['Phases'], record_prefix='Phases_', sep='_')
#Converting in string for merge function
phases_pandaframe['Id'] = contracts_pandaframe['Id'].astype(str)
#contracts_pandaframe.dtypes
#phases_pandaframe.dtypes
#Merging phases to initial dataframe
contracts_pandaframe = contracts_pandaframe.merge(phases_pandaframe, on ='Id', how ='left')
column_names = contracts_pandaframe.columns
length_contracts = len(contracts_pandaframe['Id'])
###########################################
# Optional for retrieving column information
# print(contracts_pandaframe.columns)
# print(<http://contracts_pandaframe.info|contracts_pandaframe.info>)
###########################################
bw_contracts_summary = pandas.DataFrame(data=[], columns=[column_names])
##################
# While-Loop
##################
@task
def while_loop():
customer_ids = pandas.Series(data='', dtype=str)
type(last_entry)
#while last entry is not inherited and the length is greater than 1 (last call inherits only 1 entry)
while min(customer_ids.str.contains(last_entry)) == False and length_contracts > 1:
#combining all entries written in summary and the new entries from last call
contracts_pandaframe_combined = [contracts_pandaframe, bw_contracts_summary]
if bw_contracts_summary.empty == False:
contracts_concat = pandas.concat(contracts_pandaframe_combined)
bw_contracts_summary = contracts_concat
else:
bw_contracts_summary = contracts_pandaframe
customer_ids = bw_contracts_summary['Id']
row_count = (bw_contracts_summary.shape[0])
print(row_count)
# Get next entries before first entry
bw_contracts = requests.get(
'<https://app.billwerk.com/api/v1/contracts/?from=>' + last_entry + '&access_token=' + token[
'access_token']).json()
length = len(json_normalize(bw_contracts, sep='_')['Id'])
contracts_pandaframe = json_normalize(bw_contracts, sep='_')[1:length]
# Converting in string for merge function
contracts_pandaframe['Id'] = contracts_pandaframe['Id'].astype(str)
# separately reading phases from dataframe
phases_pandaframe = json_normalize(bw_contracts, record_path=['Phases'], record_prefix='Phases_', sep='_')[1:length]
# Converting in string for merge function
phases_pandaframe['Id'] = contracts_pandaframe['Id'].astype(str)
# Merging phases to initial dataframe
contracts_pandaframe = contracts_pandaframe.merge(phases_pandaframe, on='Id', how='left')
if contracts_pandaframe.empty == False:
last_entry = contracts_pandaframe["Id"].iloc[-1]
else:
last_entry = json_normalize(bw_contracts, sep='_')['Id'][0]
length_contracts = len(contracts_pandaframe['Id'])
print('Already included')
# bw_contracts_summary.columns
# Deleting unnecessary columns from all contracts
@task
def modifying_columns():
bw_contracts_summary = bw_contracts_summary.drop(
columns=['PaymentBearer_EmailAddress', 'PaymentBearer_Holder', 'PaymentBearer_Last4', 'PaymentBearer_LastName',
'PaymentBearer_FirstName', 'PaymentBearer_MaskedCardPan', 'ThresholdBillingDisabled', 'CustomerName', 'Phases'])
# Adding current timestamp
current_date = datetime.today().strftime('%Y-%m-%d')
bw_contracts_summary['Timestamp_Loading'] = current_date
fieldnames = bw_contracts_summary.columns
""""
##Saving locally
path = r'C://Users/laura.vaida.000/Documents/Python_Scripts/Billwerk_Contracts.csv'
contracts_pandaframe.to_csv(r'C://Users/laura.vaida.000/Documents/Python_Scripts/Billwerk_Contracts_'+ current_date+'.csv', index = False, header=True)
##store result in gc bucket
gcs_result = GCSResult(bucket='billwerk', location='billwerk_contracts.csv')
with Flow("store to gcp bucket", result = 'billwerk') as store_in_gcp_bucket:
##Transforming to csv file
csv_file =bw_contracts_summary.to_csv(index= False)
store_in_gcp_bucket.run()
state=store_in_gcp_bucket.run()
state.result[csv_file]._result.location
""""
with Flow("billwerk-contracts") as flow:
BW_Username = PrefectSecret("BW_Username")
BW_Password = PrefectSecret("BW_Password")
BW_Client_Id = PrefectSecret("BW_Client_Id")
BW_Client_Secret = PrefectSecret("BW_Client_Secret")
oauth(username = BW_Username, password =BW_Password,client_id = BW_Client_Id, client_secret = BW_Client_Secret)
flow.storage= Docker(registry_url="<http://gcr.io/keller-steering-enabling/flows|gcr.io/keller-steering-enabling/flows>")
flow.run_config = KubernetesRun()
flow.run(executor=executor)
flow.register('Billwerk')
nicholas
02/12/2021, 4:49 PMfrom prefect import task, Flow
from prefect.client import Secret
from oauthlib.oauth2 import LegacyApplicationClient
from requests_oauthlib import OAuth2Session
@task
def oauth():
username = Secret("BW_Username").get()
password = Secret("BW_Password").get()
client_id = Secret("BW_Client_Id").get()
client_secret = Secret("BW_Client_Secret").get()
oauth = OAuth2Session(client=LegacyApplicationClient(client_id=client_id))
token = oauth.fetch_token(
token_url="<https://app.billwerk.com/oauth/token>",
username=username,
password=password,
client_id=client_id,
client_secret=client_secret,
)
pass
with Flow("billwerk-contracts") as flow:
oauth()
flow.run()
I'm not sure this will solve your issue but it might help narrow it downLaura Vaida
02/12/2021, 5:09 PMnicholas
02/12/2021, 5:10 PMLaura Vaida
02/12/2021, 5:11 PMnicholas
02/12/2021, 5:19 PM~/.prefect/config.toml
is this set?
[context.secrets]
use_local_secrets = True
BW_Username = "some_val"
# rest of your secrets
Laura Vaida
02/12/2021, 5:22 PMnicholas
02/12/2021, 5:39 PM.toml
file, the ordering/header sections is important. To set a secret in that file, it should be under the [context.secrets]
section (it looks like you've got that under logging
)Laura Vaida
02/12/2021, 5:41 PMnicholas
02/12/2021, 5:44 PMexport PREFECT__CONTEXT__SECRETS__BW_Username = "some val"
?Laura Vaida
02/12/2021, 5:45 PMnicholas
02/12/2021, 5:47 PMsetenv
?Laura Vaida
02/12/2021, 5:47 PMnicholas
02/12/2021, 5:48 PMecho %PREFECT__CONTEXT__SECRETS__BW_Username%
you see the value printed correctly, right?Laura Vaida
02/12/2021, 5:54 PMnicholas
02/12/2021, 5:59 PMecho $Env:PREFECT__CONTEXT__SECRETS__BW_Username
Laura Vaida
02/12/2021, 6:02 PMnicholas
02/12/2021, 6:04 PMLaura Vaida
02/12/2021, 6:10 PM(prefect) C:\WINDOWS\system32>python
Python 3.8.0 (default, Nov 6 2019, 16:00:02) [MSC v.1916 64 bit (AMD64)] :: Anaconda, Inc. on win32
Type "help", "copyright", "credits" or "license" for more information.
>>> from prefect.client import Secret
Traceback (most recent call last):
File "C:\Users\laura.vaida.000\anaconda3\envs\prefect\lib\site-packages\toml\decoder.py", line 511, in loads
ret = decoder.load_line(line, currentlevel, multikey,
File "C:\Users\laura.vaida.000\anaconda3\envs\prefect\lib\site-packages\toml\decoder.py", line 778, in load_line
value, vtype = self.load_value(pair[1], strictly_valid)
File "C:\Users\laura.vaida.000\anaconda3\envs\prefect\lib\site-packages\toml\decoder.py", line 816, in load_value
raise ValueError("Only all lowercase booleans allowed")
ValueError: Only all lowercase booleans allowed