liren zhang
04/01/2021, 10:28 PMZanie
liren zhang
04/01/2021, 10:41 PMimport prefect
from prefect import Flow, task, Parameter, context
from prefect.storage import Docker, S3
from prefect.engine import signals
from prefect.run_configs import LocalRun
from prefect.tasks.prefect import StartFlowRun
from prefect import Client
from botocore.client import ClientError
import os
import boto3
import hashlib
import datetime
from datetime import timedelta
import time
import json
@task
def get_parameters(date_to_process, flow_run_id) -> dict:
logger = prefect.context.get("logger")
if date_to_process == None:
date_to_process = prefect.context.get("yesterday")
else:
# Validate the parameter is a date
date_format = "%Y-%m-%d"
try:
date_obj = datetime.datetime.strptime(date_to_process, date_format)
except ValueError:
logger.error("Incorrect data format, should be YYYY-MM-DD")
raise signals.FAIL()
<http://logger.info|logger.info>(f"Processing data from {date_to_process}")
return {'date_to_process': date_to_process, 'flow_run_id': prefect.context.get('flow_run_id'), 'other_keys': 'test'}
#date_to_process = Parameter("date_to_process", default='2021-01-01')
# assumes you have registered the following flows
dim_ubi_v_trigger = StartFlowRun(flow_name="dim_ubi_v_trigger", project_name="TEL-DE-Dev", wait=True)
dim_ubi_v = StartFlowRun(flow_name="dim_ubi_v", project_name="TEL-DE-Dev", wait=True)
dim_ubi_tel_src_trigger = StartFlowRun(flow_name="dim_ubi_tel_src_trigger", project_name="TEL-DE-Dev", wait=True)
dim_ubi_tel_src = StartFlowRun(flow_name="dim_ubi_tel_src", project_name="TEL-DE-Dev", wait=True)
dim_ubi_pt_source = StartFlowRun(flow_name="dim_ubi_tel_src", project_name="TEL-DE-Dev", wait=True)
fact_ubi_initial_connection_event=StartFlowRun(flow_name="fact_ubi_initial_connection_event", project_name="TEL-DE-Dev", wait=True)
with Flow("parent-flow") as flow:
date_to_process = Parameter(name="date_to_process", default=None)
flow_run_id = Parameter(name="flow_run_id", default = 'xxxxxx')
vp = get_parameters(date_to_process, flow_run_id)
dim_ubi_v_trigger = dim_ubi_v_trigger(upstream_tasks=[vp], parameters=vp)
dim_ubi_v = dim_ubi_v(upstream_tasks=[dim_ubi_v_trigger], parameters=vp)
dim_ubi_tel_src_trigger = dim_ubi_tel_src_trigger(upstream_tasks=[vp], parameters=vp)
dim_ubi_tel_src = dim_ubi_tel_src(upstream_tasks=[dim_ubi_tel_src_trigger],parameters=vp)
dim_ubi_pt_source = dim_ubi_pt_source(upstream_tasks=[dim_ubi_v,dim_ubi_tel_src])
fact_ubi_initial_connection_event=fact_ubi_initial_connection_event(upstream_tasks=[dim_ubi_v], parameters=vp)
if __name__ == "__main__":
flow.visualize()
Zanie
liren zhang
04/01/2021, 10:59 PM