Fredrik Blomgren
03/10/2022, 9:47 AMAnna Geller
03/10/2022, 10:35 AMI want to pass data from GCS/BQ to tasks running as external services (Dataflow jobs, BQ queries, DBT models)Obviously, I'm not an unbiaed person here, but given this requirement, you should strongly consider using Prefect rather than Airflow, since Prefect provides a native functionality to pass data between tasks without any hacks. In Airflow, there are tons of hacks you would need to do to just pass data you extracted from your data warehouse to your downstream transformation tasks. I wrote about this here on Medium way before I joined Prefect, so this article is not biased. If you are looking for examples of how you can orchestrate the data stack you described, I have an example repo (and also a workshop recording linked here) that uses part of the stack you mentioned: • GCS & BigQuery for data storage - the actual data warehouse, • dbt for in-warehouse transformations, • Prefect for orchestration, • Monte Carlo for data quality and lineage. Regarding Dataflow, there is an API that allows you to create a Dataflow job via a POST request that could be triggered from a Prefect task. So that's definitely doable. If you need more support with migrating to Prefect and e.g. building the Dataflow integration, you can reach out to our Professional Services.
Fredrik Blomgren
03/10/2022, 11:12 AMextract_and_load_raw_data
tasks and use that as an input to the load_to_bq
tasks, instead of passing various paths around and explicitly declaring the upstream_tasks
dependencies? I.e. when using Prefect you’re in some ways working on a lower level of abstraction than Luigi? Thanks again!Anna Geller
03/10/2022, 11:27 AM@task(max_retries=3, retry_delay=timedelta(minutes=1), log_stdout=True)
def extract_and_load_raw_data(
creds: dict, source_file_name: str, destination_file_name: str
):
records = []
for _ in range(100):
single_row = dict(
id=str(uuid4()),
user_id=random.randint(1, 1001),
order_date=datetime.utcnow().date(),
status=random.choice(
["completed", "placed", "shipped", "return_pending", "returned"]
),
)
records.append(single_row)
df = pd.DataFrame(records)
df["order_date"] = pd.to_datetime(df["order_date"])
df["status"] = df["status"].astype("category")
with tempfile.TemporaryDirectory() as tmpdir:
local_file_path = os.path.join(tmpdir, source_file_name)
df.to_parquet(local_file_path, index=False)
upload_local_file_to_gcs(creds, local_file_path, destination_file_name)
print(f"File {source_file_name} saved to GCS")
return destination_file_name
#2 load accepts this GCS destination_file_name as input
@task(max_retries=3, retry_delay=timedelta(minutes=1), log_stdout=True)
def load_to_bq(creds: dict, bq_schema_name: str, bq_table_name: str, destination_file_name: str):
gcs_dest_uri = f"gs://{BUCKET_NAME}/{destination_file_name}"
credentials = service_account.Credentials.from_service_account_info(creds)
bq_client = bigquery.Client(credentials=credentials)
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.PARQUET,
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
schema=[
bigquery.SchemaField(name="id", field_type="STRING", mode="REQUIRED"),
bigquery.SchemaField(name="user_id", field_type="INTEGER", mode="REQUIRED"),
bigquery.SchemaField(
name="order_date", field_type="TIMESTAMP", mode="REQUIRED"
),
bigquery.SchemaField(name="status", field_type="STRING", mode="REQUIRED"),
],
)
table_id = f"{PROJECT_NAME}.{bq_schema_name}.{bq_table_name}"
load_job = bq_client.load_table_from_uri(gcs_dest_uri, table_id, job_config=job_config)
load_job.result() # Waits for the job to complete.
destination_table = bq_client.get_table(table_id)
print("The table has now {} rows.".format(destination_table.num_rows))
#3 flow - no upstream_tasks
in between:
gcs_destination_uri = extract_and_load_raw_data(creds_dict, local_file_name, dest_file_name)
load = load_to_bq(creds_dict, bq_schema, bq_tbl, gcs_destination_uri)
This way you wouldn't need the upstream_tasks
because the GCS_URI is passed to the bigquery task as a data dependency which is enough to establish the dependency between those tasks (as you correctly pointed out)Fredrik Blomgren
03/10/2022, 11:30 AMAnna Geller
03/10/2022, 11:35 AMFredrik Blomgren
03/10/2022, 11:38 AMKevin Kho
03/10/2022, 2:57 PM