R Zo
03/25/2022, 8:09 AMAnna Geller
03/25/2022, 9:05 AMimport pandas as pd
import prefect
from prefect import task, Flow
from prefect.executors import LocalDaskExecutor, LocalExecutor
from prefect.run_configs import LocalRun
FLOW_NAME = "01_extract_load"
@task
def extract_and_load(dataset: str) -> None:
logger = prefect.context.get("logger")
file = f"<https://raw.githubusercontent.com/anna-geller/jaffle_shop/main/data/{dataset}.csv>"
df = pd.read_csv(file)
# load_df_to_snowflake(df, dataset)
<http://logger.info|logger.info>("Dataset %s with %d rows loaded to DB", dataset, len(df))
del df # just to be sure memory is cleaned up
with Flow(
FLOW_NAME, executor=LocalDaskExecutor(), run_config=LocalRun(labels=["dev"]),
) as flow:
datasets = ["raw_customers", "raw_orders", "raw_payments"]
dataframes = extract_and_load.map(datasets)
if __name__ == "__main__":
flow.run() # running all tasks in parallel
print("=== Now running sequentially: ===")
flow.executor = LocalExecutor()
flow.run()
You should see the following logs:
[2022-03-25 14:51:54+0100] INFO - prefect.FlowRunner | Beginning Flow run for '01_extract_load'
[2022-03-25 14:51:54+0100] INFO - prefect.TaskRunner | Task 'extract_and_load': Starting task run...
[2022-03-25 14:51:54+0100] INFO - prefect.TaskRunner | Task 'extract_and_load': Finished task run for task with final state: 'Mapped'
[2022-03-25 14:51:54+0100] INFO - prefect.TaskRunner | Task 'extract_and_load[2]': Starting task run...
[2022-03-25 14:51:54+0100] INFO - prefect.TaskRunner | Task 'extract_and_load[1]': Starting task run...
[2022-03-25 14:51:54+0100] INFO - prefect.TaskRunner | Task 'extract_and_load[0]': Starting task run...
[2022-03-25 14:51:54+0100] INFO - prefect.extract_and_load[2] | Dataset raw_payments with 113 rows loaded to DB
[2022-03-25 14:51:54+0100] INFO - prefect.TaskRunner | Task 'extract_and_load[2]': Finished task run for task with final state: 'Success'
[2022-03-25 14:51:54+0100] INFO - prefect.extract_and_load[0] | Dataset raw_customers with 100 rows loaded to DB
[2022-03-25 14:51:54+0100] INFO - prefect.TaskRunner | Task 'extract_and_load[0]': Finished task run for task with final state: 'Success'
[2022-03-25 14:51:54+0100] INFO - prefect.extract_and_load[1] | Dataset raw_orders with 99 rows loaded to DB
[2022-03-25 14:51:54+0100] INFO - prefect.TaskRunner | Task 'extract_and_load[1]': Finished task run for task with final state: 'Success'
[2022-03-25 14:51:54+0100] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
=== Now running sequentially: ===
[2022-03-25 14:51:54+0100] INFO - prefect.FlowRunner | Beginning Flow run for '01_extract_load'
[2022-03-25 14:51:54+0100] INFO - prefect.TaskRunner | Task 'extract_and_load': Starting task run...
[2022-03-25 14:51:54+0100] INFO - prefect.TaskRunner | Task 'extract_and_load': Finished task run for task with final state: 'Mapped'
[2022-03-25 14:51:54+0100] INFO - prefect.TaskRunner | Task 'extract_and_load[0]': Starting task run...
[2022-03-25 14:51:54+0100] INFO - prefect.extract_and_load[0] | Dataset raw_customers with 100 rows loaded to DB
[2022-03-25 14:51:54+0100] INFO - prefect.TaskRunner | Task 'extract_and_load[0]': Finished task run for task with final state: 'Success'
[2022-03-25 14:51:54+0100] INFO - prefect.TaskRunner | Task 'extract_and_load[1]': Starting task run...
[2022-03-25 14:51:54+0100] INFO - prefect.extract_and_load[1] | Dataset raw_orders with 99 rows loaded to DB
[2022-03-25 14:51:55+0100] INFO - prefect.TaskRunner | Task 'extract_and_load[1]': Finished task run for task with final state: 'Success'
[2022-03-25 14:51:55+0100] INFO - prefect.TaskRunner | Task 'extract_and_load[2]': Starting task run...
[2022-03-25 14:51:55+0100] INFO - prefect.extract_and_load[2] | Dataset raw_payments with 113 rows loaded to DB
[2022-03-25 14:51:55+0100] INFO - prefect.TaskRunner | Task 'extract_and_load[2]': Finished task run for task with final state: 'Success'
[2022-03-25 14:51:55+0100] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
R Zo
03/27/2022, 6:50 AM