Laura Vaida
03/08/2021, 2:46 PMnicholas
03/08/2021, 2:47 PMLaura Vaida
03/08/2021, 2:48 PMnicholas
03/08/2021, 2:52 PMLaura Vaida
03/08/2021, 2:53 PMnicholas
03/08/2021, 2:55 PMLaura Vaida
03/08/2021, 2:58 PMnicholas
03/08/2021, 3:01 PMLaura Vaida
03/08/2021, 3:06 PMgcs_result = GCSResult(bucket='billwerk')
@task
def oauth():
bw_username=Secret("BW_Username").get()
bw_client_secret=Secret("BW_Client_Secret").get()
bw_client_id=Secret("BW_Client_Id").get()
bw_password=Secret("BW_Password").get()
oauth = OAuth2Session(client=LegacyApplicationClient(client_id=Secret("BW_Client_Id").get()))
token = oauth.fetch_token(
token_url="<https://app.billwerk.com/oauth/token>",
username=bw_username,
password=bw_password,
client_id=bw_client_id,
client_secret=bw_client_secret
)
return token
@task(log_stdout=True)
def creating_json(token):
bw_contracts_call = requests.get('<https://app.billwerk.com/api/v1/contracts?access_token=>' + token['access_token'])
return requests.get('<https://app.billwerk.com/api/v1/contracts?access_token=>' + token['access_token']).json()
@task(cache_for=datetime.timedelta(hours=1), result=gcs_result, log_stdout=True)
def input_for_loop(json):
# Getting last contract id for while loop
last_entry = json[199]['Id']
contracts_pandaframe = json_normalize(json, sep='_')
column_names = contracts_pandaframe.columns
length_contracts = len(contracts_pandaframe['Id'])
return last_entry, contracts_pandaframe, column_names, length_contracts
##################
# While-Loop
##################
@task(log_stdout=True)
def while_loop(last_entry,length_contracts,bw_contracts_summary,customer_ids):
#while last entry is not inherited and the length is greater than 1 (last call inherits only 1 entry)
while min(customer_ids.str.contains(last_entry)) == False and length_contracts > 1:
#combining all entries written in summary and the new entries from last call
contracts_pandaframe_combined = [df, bw_contracts_summary]
if bw_contracts_summary.empty == False:
contracts_concat = pandas.concat(contracts_pandaframe_combined)
bw_contracts_summary = contracts_concat
else:
bw_contracts_summary = df
customer_ids = bw_contracts_summary['Id']
row_count = (bw_contracts_summary.shape[0])
print(row_count)
# Get next entries before first entry
bw_contracts = requests.get(
'<https://app.billwerk.com/api/v1/contracts/?from=>' + last_entry + '&access_token=' + token[
'access_token']).json()
length = len(json_normalize(bw_contracts, sep='_')['Id'])
contracts_pandaframe = json_normalize(bw_contracts, sep='_')[1:length]
if contracts_pandaframe.empty == False:
last_entry = contracts_pandaframe["Id"].iloc[-1]
else:
last_entry = json_normalize(bw_contracts, sep='_')['Id'][0]
length_contracts = len(contracts_pandaframe['Id'])
return bw_contracts_summary
print('Already included')
# bw_contracts_summary.columns
# Deleting unnecessary columns from all contracts
@task
def modifying_columns():
bw_contracts_summary = bw_contracts_summary.drop(
columns=['PaymentBearer_EmailAddress', 'PaymentBearer_Holder', 'PaymentBearer_Last4', 'PaymentBearer_LastName',
'PaymentBearer_FirstName', 'PaymentBearer_MaskedCardPan', 'ThresholdBillingDisabled', 'CustomerName'])
# Adding current timestamp
current_date = datetime.today().strftime("%Y-%m-%d %H:%M:%S")
bw_contracts_summary['Timestamp_Loading'] = current_date
fieldnames = bw_contracts_summary.columns
#####################################
#Writing data to snowflake
#####################################
#For writing data directly into snowflake table/stage without gcp
###Deleting old data
@task
def deleting_old_table():
from snowflake.connector import connect
ctx = connect(**config['snowflake'], use_openssl_only=True)
cur = ctx.cursor().execute('delete from "KELLERSPORTS_STAGE"."STAGE_BILLWERK_CONTRACTS"."CONTRACTS_PYTHON_SCRIPT"')
@task
# Create a write function with the input data and table_name
def write_to_snowflake(data, table_name):
from snowflake.connector.pandas_tools import write_pandas
from snowflake.connector import connect
ctx = connect(**config['snowflake'], use_openssl_only=True)
cur = ctx.cursor()
cur.execute('USE SCHEMA "STAGE_BILLWERK_CONTRACTS"')
write_pandas(ctx, data, table_name=table_name, quote_identifiers=False)
with Flow("billwerk-contracts") as flow:
token = oauth()
json = creating_json(token=token)
input_for_loop(json)
bw_contracts_summary = pandas.DataFrame(data=[], columns=[column_names])
customer_ids = pandas.Series(data='', dtype=str)
customer_ids = pandas.Series(data='', dtype=str)
while_loop = (last_entry, length_contracts, bw_contracts_summary, customer_ids)
#modifying_columns()
#deleting_old_table()
#write_to_snowflake(bw_contracts_summary, "CONTRACTS_PYTHON_SCRIPT")
flow.storage = Docker(registry_url="<http://gcr.io/keller-steering-enabling/flows|gcr.io/keller-steering-enabling/flows>", image_name="billwerk-contracts",
python_dependencies=["pandas", "oauthlib ", "requests", "requests_oauthlib", "datetime", "oauth2client", "snowflake"], secrets=["GCP_CREDENTIALS"])
flow.run_config = KubernetesRun()
flow.register('Billwerk')
nicholas
03/08/2021, 3:11 PMflow.visualize()
, assuming you have graphviz
installedLaura Vaida
03/08/2021, 3:12 PMnicholas
03/08/2021, 3:17 PMwhile_loop
task definition with this line:
while_loop = (last_entry, length_contracts, bw_contracts_summary, customer_ids)
If you want to explicitly create 4 tasks from input_for_loop
, you'll need to call that task 4 times(last_entry, length_contracts)
customer_ids = pandas.Series(data='', dtype=str)
customer_ids = pandas.Series(data='', dtype=str)
aren't Prefect tasks, they'll be evaluated as constants at the runtime of your script (when you register it) and not at flow run time as discreet tasksLaura Vaida
03/08/2021, 3:19 PMnicholas
03/08/2021, 3:27 PM@task
def get_customer_ids():
return pandas.Series(data='', dtype=str)
and call that task in your flow contextLaura Vaida
03/08/2021, 3:29 PMnicholas
03/08/2021, 3:38 PMLaura Vaida
03/08/2021, 4:39 PMgcs_result = GCSResult(bucket='billwerk')
@task
def oauth():
bw_username = Secret("BW_Username").get()
bw_client_secret = Secret("BW_Client_Secret").get()
bw_client_id = Secret("BW_Client_Id").get()
bw_password = Secret("BW_Password").get()
oauth = OAuth2Session(client=LegacyApplicationClient(client_id=Secret("BW_Client_Id").get()))
token = oauth.fetch_token(
token_url="<https://app.billwerk.com/oauth/token>",
username=bw_username,
password=bw_password,
client_id=bw_client_id,
client_secret=bw_client_secret
)
return token
@task(log_stdout=True)
def creating_json(token):
bw_contracts_call = requests.get('<https://app.billwerk.com/api/v1/contracts?access_token=>' + token['access_token'])
return requests.get('<https://app.billwerk.com/api/v1/contracts?access_token=>' + token['access_token']).json()
@task(log_stdout=True)
def get_last_entry(json):
# Getting last contract id for while loop
last_entry = json[199]['Id']
print(last_entry)
return last_entry
@task()
def creating_dataframe(json):
return json_normalize(json, sep='_')
@task()
def get_column_names(df):
return df.columns
@task()
def get_length_contracts(df):
return len(df['Id'])
@task()
def get_bw_contracts_summary():
return pandas.DataFrame(data=[], columns=[column_names])
@task()
def get_customer_ids():
return pandas.Series(data='', dtype=str)
###########################################
# Optional for retrieving column information
# print(contracts_pandaframe.columns)
# print(<http://contracts_pandaframe.info|contracts_pandaframe.info>)
###########################################
##################
# While-Loop
##################
@task(log_stdout=True)
def while_loop(token,contracts_pandaframe,last_entry,length_contracts,bw_contracts_summary,customer_ids):
print(min(customer_ids))
print(last_entry)
print(length_contracts)
print (min(customer_ids.str.contains(last_entry)))
#while last entry is not inherited and the length is greater than 1 (last call inherits only 1 entry)
while min(customer_ids.str.contains(last_entry)) == False and length_contracts > 1:
#combining all entries written in summary and the new entries from last call
contracts_pandaframe_combined = [contracts_pandaframe, bw_contracts_summary]
if bw_contracts_summary.empty == False:
contracts_concat = pandas.concat(contracts_pandaframe_combined)
bw_contracts_summary = contracts_concat
else:
bw_contracts_summary = contracts_pandaframe
customer_ids = bw_contracts_summary['Id']
row_count = (bw_contracts_summary.shape[0])
print(row_count)
# Get next entries before first entry
bw_contracts = requests.get(
'<https://app.billwerk.com/api/v1/contracts/?from=>' + last_entry + '&access_token=' + token[
'access_token']).json()
length = len(json_normalize(bw_contracts, sep='_')['Id'])
contracts_pandaframe = json_normalize(bw_contracts, sep='_')[1:length]
if contracts_pandaframe.empty == False:
last_entry = contracts_pandaframe["Id"].iloc[-1]
else:
last_entry = json_normalize(bw_contracts, sep='_')['Id'][0]
length_contracts = len(contracts_pandaframe['Id'])
return bw_contracts_summary
print('Already included')
with Flow("billwerk-contracts") as flow:
token = oauth()
json = creating_json(token=token)
last_entry = get_last_entry(json=json)
contracts_pandaframe = creating_dataframe(json)
column_names = get_column_names(df=contracts_pandaframe)
length_contracts = get_length_contracts(df=contracts_pandaframe)
bw_contracts_summary = get_bw_contracts_summary()
customer_ids = get_customer_ids()
all_contracts = while_loop(token, contracts_pandaframe, last_entry, length_contracts, bw_contracts_summary, customer_ids)
nicholas
03/08/2021, 4:50 PMLaura Vaida
03/08/2021, 4:50 PMnicholas
03/08/2021, 4:51 PMLaura Vaida
03/08/2021, 4:51 PM##################
# While-Loop
##################
@task(log_stdout=True)
def while_loop(token,contracts_pandaframe,last_entry,length_contracts,bw_contracts_summary,customer_ids):
print(min(customer_ids))
print(last_entry)
print(length_contracts)
print (min(customer_ids.str.contains(last_entry)))
#while last entry is not inherited and the length is greater than 1 (last call inherits only 1 entry)
while min(customer_ids.str.contains(last_entry)) == False and length_contracts > 1:
#combining all entries written in summary and the new entries from last call
contracts_pandaframe_combined = [contracts_pandaframe, bw_contracts_summary]
if bw_contracts_summary.empty == False:
contracts_concat = pandas.concat(contracts_pandaframe_combined)
bw_contracts_summary = contracts_concat
else:
bw_contracts_summary = contracts_pandaframe
customer_ids = bw_contracts_summary['Id']
row_count = (bw_contracts_summary.shape[0])
print(row_count)
# Get next entries before first entry
bw_contracts = requests.get(
'<https://app.billwerk.com/api/v1/contracts/?from=>' + last_entry + '&access_token=' + token[
'access_token']).json()
length = len(json_normalize(bw_contracts, sep='_')['Id'])
contracts_pandaframe = json_normalize(bw_contracts, sep='_')[1:length]
if contracts_pandaframe.empty == False:
last_entry = contracts_pandaframe["Id"].iloc[-1]
else:
last_entry = json_normalize(bw_contracts, sep='_')['Id'][0]
length_contracts = len(contracts_pandaframe['Id'])
return bw_contracts_summary
print('Already included')
nicholas
03/08/2021, 4:55 PMLaura Vaida
03/08/2021, 4:56 PM@task(log_stdout=True)
def while_loop(token,contracts_pandaframe,last_entry,length_contracts,bw_contracts_summary,customer_ids):
print(min(customer_ids))
print(last_entry)
print(length_contracts)
print (min(customer_ids.str.contains(last_entry)))
#while last entry is not inherited and the length is greater than 1 (last call inherits only 1 entry)
while min(customer_ids.str.contains(last_entry)) == False and length_contracts > 1:
#combining all entries written in summary and the new entries from last call
contracts_pandaframe_combined = [contracts_pandaframe, bw_contracts_summary]
if bw_contracts_summary.empty == False:
contracts_concat = pandas.concat(contracts_pandaframe_combined)
bw_contracts_summary = contracts_concat
else:
bw_contracts_summary = contracts_pandaframe
customer_ids = bw_contracts_summary['Id']
row_count = (bw_contracts_summary.shape[0])
print(row_count)
# Get next entries before first entry
bw_contracts = requests.get(
'<https://app.billwerk.com/api/v1/contracts/?from=>' + last_entry + '&access_token=' + token[
'access_token']).json()
length = len(json_normalize(bw_contracts, sep='_')['Id'])
contracts_pandaframe = json_normalize(bw_contracts, sep='_')[1:length]
if contracts_pandaframe.empty == False:
last_entry = contracts_pandaframe["Id"].iloc[-1]
else:
last_entry = json_normalize(bw_contracts, sep='_')['Id'][0]
length_contracts = len(contracts_pandaframe['Id'])
print(length_contracts)
print(last_entry)
print(customer_ids)
print(min(customer_ids.str.contains(last_entry)))
return bw_contracts_summary
print('Already included')
while min(customer_ids.str.contains(last_entry)) == False and length_contracts > 1:
print('Already included')
in the end when the loop is done which doesn't happen 🤔nicholas
03/08/2021, 8:21 PMmin(customer_ids.str.contains(last_entry)) == False
doesn't make a lot of sense to me since min
returns the number for the lowest character alphabetically in a string and .contains
isn't a normal python method afaik (there's the string class method __contains__
but that doesn't seem to be what you're using); maybe this is a pandas class method? min(bool)
should throw an exception, since the min
method takes an iterable.while last_entry in customer_ids.str and length_contracts > 1:
but I'm not totally sure.Laura Vaida
03/08/2021, 8:53 PM