Henrietta Salonen

    Henrietta Salonen

    8 months ago
    Hey, anyone fetching data from Salesforce using Prefect and care to share their approach for doing that?
    Kevin Kho

    Kevin Kho

    8 months ago
    I think there is no Salesforce task because the schema changes a lot. Maybe you can look into usign something like Airbyte to fetch the data and then orchestrate it with Prefect. We’ll see what the community says
    Henrietta Salonen

    Henrietta Salonen

    8 months ago
    there is also an API we could use and just make it a custom task
    Kevin Kho

    Kevin Kho

    8 months ago
    Yes you could
    Dylan

    Dylan

    8 months ago
    @Henrietta Salonen I’ve got a flow I can clean up and share with you
    Goes from the Salesforce API -> BigQuery but it would be easy to adapt for snowflake/redshift
    Henrietta Salonen

    Henrietta Salonen

    8 months ago
    thank you, we’ll probably use S3 as the first point of storage
    i appreciate that you can share it 🙂
    Dylan

    Dylan

    8 months ago
    Of course!
    import json
    from datetime import timedelta
    from io import StringIO
    
    import prefect
    from prefect import task, Flow, Parameter, unmapped
    from prefect.tasks.secrets import PrefectSecret
    from prefect.engine.signals import SKIP, FAIL
    from prefect.executors import LocalDaskExecutor
    
    import pendulum
    from google.cloud import bigquery
    from google.cloud import firestore
    import pandas_gbq
    from pandas import DataFrame
    from simple_salesforce import Salesforce
    from google.oauth2 import service_account
    
    
    # Utility Functions
    def create_firestore_client(gcp_credentials):
        credentials = service_account.Credentials.from_service_account_info(gcp_credentials)
        return firestore.Client(project="", credentials=credentials)  # project name here
    
    
    def create_bigquery_client(gcp_credentials):
        credentials = service_account.Credentials.from_service_account_info(gcp_credentials)
        return bigquery.Client(project="", credentials=credentials)  # project name here
    
    
    def format_datetime(datetime_string):
        if not datetime_string:
            return None
    
        datetime = pendulum.parse(datetime_string)
    
        return datetime.format("YYYY-MM-DD HH:mm:ss")
    
    
    @task(
        name="Load Table Metadata",
        max_retries=5,
        retry_delay=timedelta(seconds=5),
        log_stdout=True,
    )
    def load_table_metadata(table_names, gcp_credentials):
        firestore_client = create_firestore_client(gcp_credentials)
    
        tables = list(
            {
                t.id: t.to_dict()
                for t in firestore_client.collection("salesforce_tables").get()
            }.items()
        )
        print("tables retrieved")
    
        if table_names:
            tables = [x for x in tables if x[0] in table_names]
            print("filtered tables")
            print(tables)
    
        return tables
    
    
    @task(
        name="Login to SalesForce",
        max_retries=5,
        retry_delay=timedelta(seconds=5),
        log_stdout=True,
    )
    def login_to_sf(un, pw, tk):
        sf = Salesforce(username=un, password=pw, security_token=tk)
        return sf
    
    
    @task(
        name="Get Field Names",
        max_retries=5,
        retry_delay=timedelta(seconds=5),
        log_stdout=True,
    )
    def get_field_names(client, table_meta_data):
        # Get data i.e. client.Opportunity.describe()
        table_name, config = table_meta_data
        sf_object = getattr(client, table_name)
        data = sf_object.describe()
        # Get columns/field names
        config["column_names"] = [field["name"] for field in data["fields"]]
        return table_meta_data
    
    
    @task(
        name="Create Dataframe",
        max_retries=5,
        retry_delay=timedelta(seconds=5),
        log_stdout=True,
    )
    def create_dataframe(client, table_meta_data):
        print(table_meta_data)
        table_name, config = table_meta_data
    
        query = f"SELECT {','.join(config['column_names'])} FROM {table_name}"
    
        last_updated_timestamp = config.get("last_updated")
        if last_updated_timestamp:
            query = f"{query} WHERE SystemModstamp > {pendulum.parse(last_updated_timestamp).to_iso8601_string()}"
    
        query = query.replace("\n", "")
    
        print(f"Query: {query}")
    
        data = client.query_all(query, include_deleted=True)
    
        # Check data entries incase there's an empty list
        if not data["records"]:
            print(data)
            raise SKIP(message=f"No new records for {table_name}")
    
        print(f"New Records Cout: {len(data['records'])}")
    
        keys = data["records"][0].keys()
        print(keys)
        records = DataFrame(data["records"], columns=keys)
        config["records"] = records
    
        return table_meta_data
    
    
    # Upload to BigQuery
    @task(
        name="Upload to BigQuery",
        max_retries=5,
        retry_delay=timedelta(seconds=5),
        log_stdout=True,
    )
    def upload_to_bq(table_meta_data, dataset_name, gcp_credentials):
        logger = prefect.context.get("logger")
        table_name, config = table_meta_data
        bq_client = create_bigquery_client(gcp_credentials=gcp_credentials)
        print(f"Table Name: {table_name}")
    
        table_ref = bq_client.dataset(dataset_name).table(table_name)
    
        timestamp_columns = config.get("timestamp_columns")
    
        if timestamp_columns:
            transform_config = {}
    
            for column in config["records"].columns:
                if column in timestamp_columns:
                    transform_config[column] = format_datetime
                else:
                    transform_config[column] = lambda value: value
    
            config["records"] = config["records"].transform(func=transform_config)
    
        json_records = config["records"].to_json(
            compression=None, orient="records", default_handler=json.dumps, lines=True
        )
        json_file = StringIO(json_records)
        write_disposition = (
            bigquery.WriteDisposition.WRITE_APPEND
            if config["last_updated"]
            else bigquery.WriteDisposition.WRITE_TRUNCATE
        )
        print(f"write_disposition: {write_disposition}")
    
        try:
            load_job = bq_client.load_table_from_file(
                file_obj=json_file,
                destination=table_ref,
                job_config=bigquery.LoadJobConfig(
                    autodetect=not config["last_updated"],
                    rewind=True,
                    write_disposition=write_disposition,
                    source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
                ),
            )
            errors = load_job.result()
            logger.debug(errors)
            <http://logger.info|logger.info>("transfer complete")
        except Exception as exception:
            logger.critical(exception)
            for error in load_job.errors:
                logger.critical(error)
            raise FAIL(exception)
    
        return table_meta_data
    
    
    @task(
        name="Update Table Metadata",
        max_retries=5,
        retry_delay=timedelta(seconds=5),
        log_stdout=True,
        skip_on_upstream_skip=True,
    )
    def update_table_metadata(table_meta_data, gcp_credentials):
        firestore_client = create_firestore_client(gcp_credentials)
        table_name, config = table_meta_data
    
        print(f"Table name: {table_name}")
    
        records = config.get("records")
        if records.empty:
            raise SKIP("These aren't the records you're looking for.")
    
        new_last_updated = str(records["SystemModstamp"].max())
    
        firestore_client.collection("salesforce_tables").document(table_name).update(
            {"last_updated": new_last_updated}
        )
        print(f"""Document {table_name} last_updated {new_last_updated}""")
    
    
    executor = LocalDaskExecutor(num_workers=4)
    
    with Flow(
        "Salesforce Extraction Flow",
        executor=executor,
    ) as flow:
        TABLE_NAMES = Parameter(name="table_names", default=None, required=False)
        GCP_CREDENTIALS = PrefectSecret("GCP_CREDENTIALS")
        username, password, token = (
            PrefectSecret("SALESFORCE_USERNAME"),
            PrefectSecret("SALESFORCE_PASSWORD"),
            PrefectSecret("SALESFORCE_SECURITY_TOKEN"),
        )
        DESTINATION_DATASET_NAME = Parameter(default="salesforce")
    
        table_meta_data = load_table_metadata(
            table_names=TABLE_NAMES, gcp_credentials=GCP_CREDENTIALS
        )
        client = login_to_sf(username, password, token)
        table_meta_data_with_columns = get_field_names.map(
            client=unmapped(client), table_meta_data=table_meta_data
        )
        table_meta_data_with_columns_records = create_dataframe.map(
            client=unmapped(client), table_meta_data=table_meta_data_with_columns
        )
        final_config = upload_to_bq.map(
            table_meta_data=table_meta_data_with_columns_records,
            dataset_name=unmapped(DESTINATION_DATASET_NAME),
            gcp_credentials=unmapped(GCP_CREDENTIALS),
        )
        update_table_metadata.map(
            table_meta_data=final_config, gcp_credentials=unmapped(GCP_CREDENTIALS)
        )
    requirements.txt
    aiohttp==3.6.2
    appdirs==1.4.4
    appnope==0.1.0
    async-timeout==3.0.1
    attrs==20.2.0
    Authlib==0.14.3
    backcall==0.2.0
    black==20.8b1
    cachetools==4.1.1
    certifi==2020.6.20
    cffi==1.14.3
    chardet==3.0.4
    click==7.1.2
    cloudpickle==1.6.0
    croniter==0.3.34
    cryptography==3.1.1
    dask==2.30.0
    dask-kubernetes==0.11.0
    decorator==4.4.2
    Deprecated==1.2.12
    distributed==2.30.0
    docker==4.3.1
    google-api-core==1.22.4
    google-auth==1.22.1
    google-auth-oauthlib==0.4.1
    google-cloud-bigquery==1.28.0
    google-cloud-bigquery-storage==1.1.0
    google-cloud-core==1.4.3
    google-cloud-firestore==1.9.0
    google-cloud-storage==1.31.2
    google-crc32c==1.0.0
    google-resumable-media==1.1.0
    googleapis-common-protos==1.52.0
    graphviz==0.14.2
    grpcio==1.32.0
    HeapDict==1.0.1
    idna==2.10
    ipython==7.18.1
    ipython-genutils==0.2.0
    jedi==0.17.2
    kubernetes==11.0.0b2
    kubernetes-asyncio==11.3.0
    marshmallow==3.8.0
    marshmallow-oneofschema==2.1.0
    msgpack==1.0.0
    multidict==4.7.6
    mypy-extensions==0.4.3
    natsort==7.0.1
    numpy==1.19.2
    oauthlib==3.1.0
    pandas==1.1.3
    pandas-gbq==0.14.0
    parso==0.7.1
    pathspec==0.8.0
    pendulum==2.1.2
    pexpect==4.8.0
    pickleshare==0.7.5
    prefect==0.15.4
    prompt-toolkit==3.0.7
    protobuf==3.13.0
    psutil==5.7.2
    ptyprocess==0.6.0
    pyarrow==1.0.1
    pyasn1==0.4.8
    pyasn1-modules==0.2.8
    pycparser==2.20
    pydata-google-auth==1.1.0
    PyGithub==1.55
    Pygments==2.7.1
    PyJWT==2.1.0
    PyNaCl==1.4.0
    python-box==5.1.1
    python-dateutil==2.8.1
    python-slugify==4.0.1
    pytz==2020.1
    pytzdata==2020.1
    PyYAML==5.3.1
    regex==2020.9.27
    requests==2.24.0
    requests-oauthlib==1.3.0
    rsa==4.6
    simple-salesforce==1.10.1
    six==1.15.0
    sortedcontainers==2.2.2
    tabulate==0.8.7
    tblib==1.7.0
    text-unidecode==1.3
    toml==0.10.1
    toolz==0.11.1
    tornado==6.0.4
    traitlets==5.0.4
    typed-ast==1.4.1
    typing-extensions==3.7.4.3
    urllib3==1.25.10
    wcwidth==0.2.5
    websocket-client==0.57.0
    wrapt==1.12.1
    yarl==1.6.0
    zict==2.0.0
    I’m using firestore here to maintain metadata about the different tables, but you could convert this to use the kv store: https://docs.prefect.io/orchestration/concepts/kv_store.html
    The SFDC download auto-detects new fields on SFDC objects but doesn’t auto-update the BigQuery schema. Since you’re going to S3 first anyway I think you can probable handle things a little more gracefully
    I also dropped all of the flow run related config since it contained keys to things (storage, result, run config)
    Let me know how it goes! 😄
    Henrietta Salonen

    Henrietta Salonen

    8 months ago
    wonderful, thank you for sharing! I’ll keep you posted. Have a lovely weekend
    Dylan

    Dylan

    8 months ago
    You too!