https://prefect.io logo
Title
f

Fredrik Blomgren

03/10/2022, 9:47 AM
Hello! I’m new here and evaluating a switch from Airflow to Prefect. We’re not doing any processing in Python/DataFrames/etc. but are using GCP with Beam/Dataflow (don’t see any specific Prefect integration, but I assume it can be easily be done by packaging it in a container and running with a RunNamespacedJob task or similar), BigQuery, Avro on GCS and DBT. Almost all examples I see using cloud storage works with pickled dataframes that are pulled locally when executing the task. I’m looking to use a setup that references Avro datasets on GCS and/or BQ tables and passes them as in/outputs to tasks running as external services (Dataflow jobs, BQ queries, DBT models), similar to Luigi’s Target class. Given Prefect’s focus on dataflow I would expect that to be a supported pattern but haven’t yet been able to find any guides/tutorials/discussions about this, does anyone have any tips? Preferably for Orion as we’re just getting started, but otherwise for 1.0 as a reference. Thanks a lot!
a

Anna Geller

03/10/2022, 10:35 AM
Hi and welcome to the community, @Fredrik Blomgren! The key element you mentioned was:
I want to pass data from GCS/BQ to tasks running as external services (Dataflow jobs, BQ queries, DBT models)
Obviously, I'm not an unbiaed person here, but given this requirement, you should strongly consider using Prefect rather than Airflow, since Prefect provides a native functionality to pass data between tasks without any hacks. In Airflow, there are tons of hacks you would need to do to just pass data you extracted from your data warehouse to your downstream transformation tasks. I wrote about this here on Medium way before I joined Prefect, so this article is not biased. If you are looking for examples of how you can orchestrate the data stack you described, I have an example repo (and also a workshop recording linked here) that uses part of the stack you mentioned: • GCS & BigQuery for data storage - the actual data warehouse, • dbt for in-warehouse transformations, • Prefect for orchestration, • Monte Carlo for data quality and lineage. Regarding Dataflow, there is an API that allows you to create a Dataflow job via a POST request that could be triggered from a Prefect task. So that's definitely doable. If you need more support with migrating to Prefect and e.g. building the Dataflow integration, you can reach out to our Professional Services.
f

Fredrik Blomgren

03/10/2022, 11:12 AM
Thanks a lot for that thorough info @Anna Geller, that’s exactly what I was after! Yeah, it’s those limitations in Airflow that has prompted us to look elsewhere 🙂 But to make sure I understand, there’s no supported/recommended way in Prefect to pass references to e.g. GCS data or BQ tables between tasks? Like in your example repo; to return data references from the
extract_and_load_raw_data
tasks and use that as an input to the
load_to_bq
tasks, instead of passing various paths around and explicitly declaring the
upstream_tasks
dependencies? I.e. when using Prefect you’re in some ways working on a lower level of abstraction than Luigi? Thanks again!
More like what’s described in the ETL with Prefect example, where data is passed from one task to the next
a

Anna Geller

03/10/2022, 11:27 AM
I'm not that deeply familiar with Luigi 😄 Perhaps you could then specify those references as a Parameter task as I did in this example with table and schema name? There are plenty of ways how you may pass data between tasks in Prefect. And I think you are right that I could have done it this way (it's actually easier that way so good call!): #1 extract returns GCS destination_file_name
@task(max_retries=3, retry_delay=timedelta(minutes=1), log_stdout=True)
def extract_and_load_raw_data(
    creds: dict, source_file_name: str, destination_file_name: str
):
    records = []
    for _ in range(100):
        single_row = dict(
            id=str(uuid4()),
            user_id=random.randint(1, 1001),
            order_date=datetime.utcnow().date(),
            status=random.choice(
                ["completed", "placed", "shipped", "return_pending", "returned"]
            ),
        )
        records.append(single_row)
    df = pd.DataFrame(records)
    df["order_date"] = pd.to_datetime(df["order_date"])
    df["status"] = df["status"].astype("category")
    with tempfile.TemporaryDirectory() as tmpdir:
        local_file_path = os.path.join(tmpdir, source_file_name)
        df.to_parquet(local_file_path, index=False)
        upload_local_file_to_gcs(creds, local_file_path, destination_file_name)
        print(f"File {source_file_name} saved to GCS")
    return destination_file_name
#2 load accepts this GCS destination_file_name as input
@task(max_retries=3, retry_delay=timedelta(minutes=1), log_stdout=True)
def load_to_bq(creds: dict, bq_schema_name: str, bq_table_name: str, destination_file_name: str):
    gcs_dest_uri = f"gs://{BUCKET_NAME}/{destination_file_name}"
    credentials = service_account.Credentials.from_service_account_info(creds)
    bq_client = bigquery.Client(credentials=credentials)
    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.PARQUET,
        write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
        schema=[
            bigquery.SchemaField(name="id", field_type="STRING", mode="REQUIRED"),
            bigquery.SchemaField(name="user_id", field_type="INTEGER", mode="REQUIRED"),
            bigquery.SchemaField(
                name="order_date", field_type="TIMESTAMP", mode="REQUIRED"
            ),
            bigquery.SchemaField(name="status", field_type="STRING", mode="REQUIRED"),
        ],
    )
    table_id = f"{PROJECT_NAME}.{bq_schema_name}.{bq_table_name}"
    load_job = bq_client.load_table_from_uri(gcs_dest_uri, table_id, job_config=job_config)
    load_job.result()  # Waits for the job to complete.
    destination_table = bq_client.get_table(table_id)
    print("The table has now {} rows.".format(destination_table.num_rows))
#3 flow - no
upstream_tasks
in between:
gcs_destination_uri = extract_and_load_raw_data(creds_dict, local_file_name, dest_file_name)
    load = load_to_bq(creds_dict, bq_schema, bq_tbl, gcs_destination_uri)
This way you wouldn't need the
upstream_tasks
because the GCS_URI is passed to the bigquery task as a data dependency which is enough to establish the dependency between those tasks (as you correctly pointed out)
f

Fredrik Blomgren

03/10/2022, 11:30 AM
I’ve seen the Result objects which I guess is what I’m after but haven’t found any end-to-end example or tutorial on how to use them in practise for data inputs/outputs to processing by external service (BQ or Dataflow in my case, but I guess Spark cluster/Snowflake/etc would be similar). Or if there’s something similar in Orion (maybe that’s Storage?). I see a GCS result but no BQ one…
Right, so it seems to be doable but not deeply supported in Prefect, I guess we’d just have to try it out and see if it’s ergonomic enough for our usecase 🙂 Thanks again for the patience with all my questions! 🙏
a

Anna Geller

03/10/2022, 11:35 AM
Results are mostly meant to be used for handling data pipeline failures - e.g. when your task fails and you want to restart the flow run from a failed task. For storing data to a data warehouse or GCS it's better to do it as we discussed since this way you have full control over the load logic and how your data will look like in the end in your end destination
Pretty much everything that is doable in Python, is doable in Prefect
f

Fredrik Blomgren

03/10/2022, 11:38 AM
Alright, thanks for noting on the Result! Haha, yeah that’s what’s enticing, having code-as-workflows without too much cruft in-between
💯 1
:upvote: 1
k

Kevin Kho

03/10/2022, 2:57 PM
+1 on not using the result for distributed dataframes. You will get more control over data movement if to handle it yourself.