Hey, anyone fetching data from Salesforce using Pr...
# ask-community
h
Hey, anyone fetching data from Salesforce using Prefect and care to share their approach for doing that?
k
I think there is no Salesforce task because the schema changes a lot. Maybe you can look into usign something like Airbyte to fetch the data and then orchestrate it with Prefect. We’ll see what the community says
h
there is also an API we could use and just make it a custom task
k
Yes you could
d
@Henrietta Salonen I’ve got a flow I can clean up and share with you
Goes from the Salesforce API -> BigQuery but it would be easy to adapt for snowflake/redshift
🙏 1
h
thank you, we’ll probably use S3 as the first point of storage
i appreciate that you can share it 🙂
d
Of course!
Copy code
import json
from datetime import timedelta
from io import StringIO

import prefect
from prefect import task, Flow, Parameter, unmapped
from prefect.tasks.secrets import PrefectSecret
from prefect.engine.signals import SKIP, FAIL
from prefect.executors import LocalDaskExecutor

import pendulum
from google.cloud import bigquery
from google.cloud import firestore
import pandas_gbq
from pandas import DataFrame
from simple_salesforce import Salesforce
from google.oauth2 import service_account


# Utility Functions
def create_firestore_client(gcp_credentials):
    credentials = service_account.Credentials.from_service_account_info(gcp_credentials)
    return firestore.Client(project="", credentials=credentials)  # project name here


def create_bigquery_client(gcp_credentials):
    credentials = service_account.Credentials.from_service_account_info(gcp_credentials)
    return bigquery.Client(project="", credentials=credentials)  # project name here


def format_datetime(datetime_string):
    if not datetime_string:
        return None

    datetime = pendulum.parse(datetime_string)

    return datetime.format("YYYY-MM-DD HH:mm:ss")


@task(
    name="Load Table Metadata",
    max_retries=5,
    retry_delay=timedelta(seconds=5),
    log_stdout=True,
)
def load_table_metadata(table_names, gcp_credentials):
    firestore_client = create_firestore_client(gcp_credentials)

    tables = list(
        {
            t.id: t.to_dict()
            for t in firestore_client.collection("salesforce_tables").get()
        }.items()
    )
    print("tables retrieved")

    if table_names:
        tables = [x for x in tables if x[0] in table_names]
        print("filtered tables")
        print(tables)

    return tables


@task(
    name="Login to SalesForce",
    max_retries=5,
    retry_delay=timedelta(seconds=5),
    log_stdout=True,
)
def login_to_sf(un, pw, tk):
    sf = Salesforce(username=un, password=pw, security_token=tk)
    return sf


@task(
    name="Get Field Names",
    max_retries=5,
    retry_delay=timedelta(seconds=5),
    log_stdout=True,
)
def get_field_names(client, table_meta_data):
    # Get data i.e. client.Opportunity.describe()
    table_name, config = table_meta_data
    sf_object = getattr(client, table_name)
    data = sf_object.describe()
    # Get columns/field names
    config["column_names"] = [field["name"] for field in data["fields"]]
    return table_meta_data


@task(
    name="Create Dataframe",
    max_retries=5,
    retry_delay=timedelta(seconds=5),
    log_stdout=True,
)
def create_dataframe(client, table_meta_data):
    print(table_meta_data)
    table_name, config = table_meta_data

    query = f"SELECT {','.join(config['column_names'])} FROM {table_name}"

    last_updated_timestamp = config.get("last_updated")
    if last_updated_timestamp:
        query = f"{query} WHERE SystemModstamp > {pendulum.parse(last_updated_timestamp).to_iso8601_string()}"

    query = query.replace("\n", "")

    print(f"Query: {query}")

    data = client.query_all(query, include_deleted=True)

    # Check data entries incase there's an empty list
    if not data["records"]:
        print(data)
        raise SKIP(message=f"No new records for {table_name}")

    print(f"New Records Cout: {len(data['records'])}")

    keys = data["records"][0].keys()
    print(keys)
    records = DataFrame(data["records"], columns=keys)
    config["records"] = records

    return table_meta_data


# Upload to BigQuery
@task(
    name="Upload to BigQuery",
    max_retries=5,
    retry_delay=timedelta(seconds=5),
    log_stdout=True,
)
def upload_to_bq(table_meta_data, dataset_name, gcp_credentials):
    logger = prefect.context.get("logger")
    table_name, config = table_meta_data
    bq_client = create_bigquery_client(gcp_credentials=gcp_credentials)
    print(f"Table Name: {table_name}")

    table_ref = bq_client.dataset(dataset_name).table(table_name)

    timestamp_columns = config.get("timestamp_columns")

    if timestamp_columns:
        transform_config = {}

        for column in config["records"].columns:
            if column in timestamp_columns:
                transform_config[column] = format_datetime
            else:
                transform_config[column] = lambda value: value

        config["records"] = config["records"].transform(func=transform_config)

    json_records = config["records"].to_json(
        compression=None, orient="records", default_handler=json.dumps, lines=True
    )
    json_file = StringIO(json_records)
    write_disposition = (
        bigquery.WriteDisposition.WRITE_APPEND
        if config["last_updated"]
        else bigquery.WriteDisposition.WRITE_TRUNCATE
    )
    print(f"write_disposition: {write_disposition}")

    try:
        load_job = bq_client.load_table_from_file(
            file_obj=json_file,
            destination=table_ref,
            job_config=bigquery.LoadJobConfig(
                autodetect=not config["last_updated"],
                rewind=True,
                write_disposition=write_disposition,
                source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
            ),
        )
        errors = load_job.result()
        logger.debug(errors)
        <http://logger.info|logger.info>("transfer complete")
    except Exception as exception:
        logger.critical(exception)
        for error in load_job.errors:
            logger.critical(error)
        raise FAIL(exception)

    return table_meta_data


@task(
    name="Update Table Metadata",
    max_retries=5,
    retry_delay=timedelta(seconds=5),
    log_stdout=True,
    skip_on_upstream_skip=True,
)
def update_table_metadata(table_meta_data, gcp_credentials):
    firestore_client = create_firestore_client(gcp_credentials)
    table_name, config = table_meta_data

    print(f"Table name: {table_name}")

    records = config.get("records")
    if records.empty:
        raise SKIP("These aren't the records you're looking for.")

    new_last_updated = str(records["SystemModstamp"].max())

    firestore_client.collection("salesforce_tables").document(table_name).update(
        {"last_updated": new_last_updated}
    )
    print(f"""Document {table_name} last_updated {new_last_updated}""")


executor = LocalDaskExecutor(num_workers=4)

with Flow(
    "Salesforce Extraction Flow",
    executor=executor,
) as flow:
    TABLE_NAMES = Parameter(name="table_names", default=None, required=False)
    GCP_CREDENTIALS = PrefectSecret("GCP_CREDENTIALS")
    username, password, token = (
        PrefectSecret("SALESFORCE_USERNAME"),
        PrefectSecret("SALESFORCE_PASSWORD"),
        PrefectSecret("SALESFORCE_SECURITY_TOKEN"),
    )
    DESTINATION_DATASET_NAME = Parameter(default="salesforce")

    table_meta_data = load_table_metadata(
        table_names=TABLE_NAMES, gcp_credentials=GCP_CREDENTIALS
    )
    client = login_to_sf(username, password, token)
    table_meta_data_with_columns = get_field_names.map(
        client=unmapped(client), table_meta_data=table_meta_data
    )
    table_meta_data_with_columns_records = create_dataframe.map(
        client=unmapped(client), table_meta_data=table_meta_data_with_columns
    )
    final_config = upload_to_bq.map(
        table_meta_data=table_meta_data_with_columns_records,
        dataset_name=unmapped(DESTINATION_DATASET_NAME),
        gcp_credentials=unmapped(GCP_CREDENTIALS),
    )
    update_table_metadata.map(
        table_meta_data=final_config, gcp_credentials=unmapped(GCP_CREDENTIALS)
    )
requirements.txt
Copy code
aiohttp==3.6.2
appdirs==1.4.4
appnope==0.1.0
async-timeout==3.0.1
attrs==20.2.0
Authlib==0.14.3
backcall==0.2.0
black==20.8b1
cachetools==4.1.1
certifi==2020.6.20
cffi==1.14.3
chardet==3.0.4
click==7.1.2
cloudpickle==1.6.0
croniter==0.3.34
cryptography==3.1.1
dask==2.30.0
dask-kubernetes==0.11.0
decorator==4.4.2
Deprecated==1.2.12
distributed==2.30.0
docker==4.3.1
google-api-core==1.22.4
google-auth==1.22.1
google-auth-oauthlib==0.4.1
google-cloud-bigquery==1.28.0
google-cloud-bigquery-storage==1.1.0
google-cloud-core==1.4.3
google-cloud-firestore==1.9.0
google-cloud-storage==1.31.2
google-crc32c==1.0.0
google-resumable-media==1.1.0
googleapis-common-protos==1.52.0
graphviz==0.14.2
grpcio==1.32.0
HeapDict==1.0.1
idna==2.10
ipython==7.18.1
ipython-genutils==0.2.0
jedi==0.17.2
kubernetes==11.0.0b2
kubernetes-asyncio==11.3.0
marshmallow==3.8.0
marshmallow-oneofschema==2.1.0
msgpack==1.0.0
multidict==4.7.6
mypy-extensions==0.4.3
natsort==7.0.1
numpy==1.19.2
oauthlib==3.1.0
pandas==1.1.3
pandas-gbq==0.14.0
parso==0.7.1
pathspec==0.8.0
pendulum==2.1.2
pexpect==4.8.0
pickleshare==0.7.5
prefect==0.15.4
prompt-toolkit==3.0.7
protobuf==3.13.0
psutil==5.7.2
ptyprocess==0.6.0
pyarrow==1.0.1
pyasn1==0.4.8
pyasn1-modules==0.2.8
pycparser==2.20
pydata-google-auth==1.1.0
PyGithub==1.55
Pygments==2.7.1
PyJWT==2.1.0
PyNaCl==1.4.0
python-box==5.1.1
python-dateutil==2.8.1
python-slugify==4.0.1
pytz==2020.1
pytzdata==2020.1
PyYAML==5.3.1
regex==2020.9.27
requests==2.24.0
requests-oauthlib==1.3.0
rsa==4.6
simple-salesforce==1.10.1
six==1.15.0
sortedcontainers==2.2.2
tabulate==0.8.7
tblib==1.7.0
text-unidecode==1.3
toml==0.10.1
toolz==0.11.1
tornado==6.0.4
traitlets==5.0.4
typed-ast==1.4.1
typing-extensions==3.7.4.3
urllib3==1.25.10
wcwidth==0.2.5
websocket-client==0.57.0
wrapt==1.12.1
yarl==1.6.0
zict==2.0.0
I’m using firestore here to maintain metadata about the different tables, but you could convert this to use the kv store: https://docs.prefect.io/orchestration/concepts/kv_store.html
👍 1
The SFDC download auto-detects new fields on SFDC objects but doesn’t auto-update the BigQuery schema. Since you’re going to S3 first anyway I think you can probable handle things a little more gracefully
I also dropped all of the flow run related config since it contained keys to things (storage, result, run config)
Let me know how it goes! 😄
h
wonderful, thank you for sharing! I’ll keep you posted. Have a lovely weekend
d
You too!