Henrietta Salonen
01/14/2022, 5:06 PMKevin Kho
Henrietta Salonen
01/14/2022, 5:24 PMKevin Kho
Dylan
Dylan
Henrietta Salonen
01/14/2022, 5:32 PMHenrietta Salonen
01/14/2022, 5:33 PMDylan
Dylan
import json
from datetime import timedelta
from io import StringIO
import prefect
from prefect import task, Flow, Parameter, unmapped
from prefect.tasks.secrets import PrefectSecret
from prefect.engine.signals import SKIP, FAIL
from prefect.executors import LocalDaskExecutor
import pendulum
from google.cloud import bigquery
from google.cloud import firestore
import pandas_gbq
from pandas import DataFrame
from simple_salesforce import Salesforce
from google.oauth2 import service_account
# Utility Functions
def create_firestore_client(gcp_credentials):
credentials = service_account.Credentials.from_service_account_info(gcp_credentials)
return firestore.Client(project="", credentials=credentials) # project name here
def create_bigquery_client(gcp_credentials):
credentials = service_account.Credentials.from_service_account_info(gcp_credentials)
return bigquery.Client(project="", credentials=credentials) # project name here
def format_datetime(datetime_string):
if not datetime_string:
return None
datetime = pendulum.parse(datetime_string)
return datetime.format("YYYY-MM-DD HH:mm:ss")
@task(
name="Load Table Metadata",
max_retries=5,
retry_delay=timedelta(seconds=5),
log_stdout=True,
)
def load_table_metadata(table_names, gcp_credentials):
firestore_client = create_firestore_client(gcp_credentials)
tables = list(
{
t.id: t.to_dict()
for t in firestore_client.collection("salesforce_tables").get()
}.items()
)
print("tables retrieved")
if table_names:
tables = [x for x in tables if x[0] in table_names]
print("filtered tables")
print(tables)
return tables
@task(
name="Login to SalesForce",
max_retries=5,
retry_delay=timedelta(seconds=5),
log_stdout=True,
)
def login_to_sf(un, pw, tk):
sf = Salesforce(username=un, password=pw, security_token=tk)
return sf
@task(
name="Get Field Names",
max_retries=5,
retry_delay=timedelta(seconds=5),
log_stdout=True,
)
def get_field_names(client, table_meta_data):
# Get data i.e. client.Opportunity.describe()
table_name, config = table_meta_data
sf_object = getattr(client, table_name)
data = sf_object.describe()
# Get columns/field names
config["column_names"] = [field["name"] for field in data["fields"]]
return table_meta_data
@task(
name="Create Dataframe",
max_retries=5,
retry_delay=timedelta(seconds=5),
log_stdout=True,
)
def create_dataframe(client, table_meta_data):
print(table_meta_data)
table_name, config = table_meta_data
query = f"SELECT {','.join(config['column_names'])} FROM {table_name}"
last_updated_timestamp = config.get("last_updated")
if last_updated_timestamp:
query = f"{query} WHERE SystemModstamp > {pendulum.parse(last_updated_timestamp).to_iso8601_string()}"
query = query.replace("\n", "")
print(f"Query: {query}")
data = client.query_all(query, include_deleted=True)
# Check data entries incase there's an empty list
if not data["records"]:
print(data)
raise SKIP(message=f"No new records for {table_name}")
print(f"New Records Cout: {len(data['records'])}")
keys = data["records"][0].keys()
print(keys)
records = DataFrame(data["records"], columns=keys)
config["records"] = records
return table_meta_data
# Upload to BigQuery
@task(
name="Upload to BigQuery",
max_retries=5,
retry_delay=timedelta(seconds=5),
log_stdout=True,
)
def upload_to_bq(table_meta_data, dataset_name, gcp_credentials):
logger = prefect.context.get("logger")
table_name, config = table_meta_data
bq_client = create_bigquery_client(gcp_credentials=gcp_credentials)
print(f"Table Name: {table_name}")
table_ref = bq_client.dataset(dataset_name).table(table_name)
timestamp_columns = config.get("timestamp_columns")
if timestamp_columns:
transform_config = {}
for column in config["records"].columns:
if column in timestamp_columns:
transform_config[column] = format_datetime
else:
transform_config[column] = lambda value: value
config["records"] = config["records"].transform(func=transform_config)
json_records = config["records"].to_json(
compression=None, orient="records", default_handler=json.dumps, lines=True
)
json_file = StringIO(json_records)
write_disposition = (
bigquery.WriteDisposition.WRITE_APPEND
if config["last_updated"]
else bigquery.WriteDisposition.WRITE_TRUNCATE
)
print(f"write_disposition: {write_disposition}")
try:
load_job = bq_client.load_table_from_file(
file_obj=json_file,
destination=table_ref,
job_config=bigquery.LoadJobConfig(
autodetect=not config["last_updated"],
rewind=True,
write_disposition=write_disposition,
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
),
)
errors = load_job.result()
logger.debug(errors)
<http://logger.info|logger.info>("transfer complete")
except Exception as exception:
logger.critical(exception)
for error in load_job.errors:
logger.critical(error)
raise FAIL(exception)
return table_meta_data
@task(
name="Update Table Metadata",
max_retries=5,
retry_delay=timedelta(seconds=5),
log_stdout=True,
skip_on_upstream_skip=True,
)
def update_table_metadata(table_meta_data, gcp_credentials):
firestore_client = create_firestore_client(gcp_credentials)
table_name, config = table_meta_data
print(f"Table name: {table_name}")
records = config.get("records")
if records.empty:
raise SKIP("These aren't the records you're looking for.")
new_last_updated = str(records["SystemModstamp"].max())
firestore_client.collection("salesforce_tables").document(table_name).update(
{"last_updated": new_last_updated}
)
print(f"""Document {table_name} last_updated {new_last_updated}""")
executor = LocalDaskExecutor(num_workers=4)
with Flow(
"Salesforce Extraction Flow",
executor=executor,
) as flow:
TABLE_NAMES = Parameter(name="table_names", default=None, required=False)
GCP_CREDENTIALS = PrefectSecret("GCP_CREDENTIALS")
username, password, token = (
PrefectSecret("SALESFORCE_USERNAME"),
PrefectSecret("SALESFORCE_PASSWORD"),
PrefectSecret("SALESFORCE_SECURITY_TOKEN"),
)
DESTINATION_DATASET_NAME = Parameter(default="salesforce")
table_meta_data = load_table_metadata(
table_names=TABLE_NAMES, gcp_credentials=GCP_CREDENTIALS
)
client = login_to_sf(username, password, token)
table_meta_data_with_columns = get_field_names.map(
client=unmapped(client), table_meta_data=table_meta_data
)
table_meta_data_with_columns_records = create_dataframe.map(
client=unmapped(client), table_meta_data=table_meta_data_with_columns
)
final_config = upload_to_bq.map(
table_meta_data=table_meta_data_with_columns_records,
dataset_name=unmapped(DESTINATION_DATASET_NAME),
gcp_credentials=unmapped(GCP_CREDENTIALS),
)
update_table_metadata.map(
table_meta_data=final_config, gcp_credentials=unmapped(GCP_CREDENTIALS)
)
Dylan
aiohttp==3.6.2
appdirs==1.4.4
appnope==0.1.0
async-timeout==3.0.1
attrs==20.2.0
Authlib==0.14.3
backcall==0.2.0
black==20.8b1
cachetools==4.1.1
certifi==2020.6.20
cffi==1.14.3
chardet==3.0.4
click==7.1.2
cloudpickle==1.6.0
croniter==0.3.34
cryptography==3.1.1
dask==2.30.0
dask-kubernetes==0.11.0
decorator==4.4.2
Deprecated==1.2.12
distributed==2.30.0
docker==4.3.1
google-api-core==1.22.4
google-auth==1.22.1
google-auth-oauthlib==0.4.1
google-cloud-bigquery==1.28.0
google-cloud-bigquery-storage==1.1.0
google-cloud-core==1.4.3
google-cloud-firestore==1.9.0
google-cloud-storage==1.31.2
google-crc32c==1.0.0
google-resumable-media==1.1.0
googleapis-common-protos==1.52.0
graphviz==0.14.2
grpcio==1.32.0
HeapDict==1.0.1
idna==2.10
ipython==7.18.1
ipython-genutils==0.2.0
jedi==0.17.2
kubernetes==11.0.0b2
kubernetes-asyncio==11.3.0
marshmallow==3.8.0
marshmallow-oneofschema==2.1.0
msgpack==1.0.0
multidict==4.7.6
mypy-extensions==0.4.3
natsort==7.0.1
numpy==1.19.2
oauthlib==3.1.0
pandas==1.1.3
pandas-gbq==0.14.0
parso==0.7.1
pathspec==0.8.0
pendulum==2.1.2
pexpect==4.8.0
pickleshare==0.7.5
prefect==0.15.4
prompt-toolkit==3.0.7
protobuf==3.13.0
psutil==5.7.2
ptyprocess==0.6.0
pyarrow==1.0.1
pyasn1==0.4.8
pyasn1-modules==0.2.8
pycparser==2.20
pydata-google-auth==1.1.0
PyGithub==1.55
Pygments==2.7.1
PyJWT==2.1.0
PyNaCl==1.4.0
python-box==5.1.1
python-dateutil==2.8.1
python-slugify==4.0.1
pytz==2020.1
pytzdata==2020.1
PyYAML==5.3.1
regex==2020.9.27
requests==2.24.0
requests-oauthlib==1.3.0
rsa==4.6
simple-salesforce==1.10.1
six==1.15.0
sortedcontainers==2.2.2
tabulate==0.8.7
tblib==1.7.0
text-unidecode==1.3
toml==0.10.1
toolz==0.11.1
tornado==6.0.4
traitlets==5.0.4
typed-ast==1.4.1
typing-extensions==3.7.4.3
urllib3==1.25.10
wcwidth==0.2.5
websocket-client==0.57.0
wrapt==1.12.1
yarl==1.6.0
zict==2.0.0
Dylan
Dylan
Dylan
Dylan
Henrietta Salonen
01/14/2022, 7:14 PMDylan