Michael Law
08/04/2021, 6:51 PMMark McDonald
08/04/2021, 7:13 PMstage2_pad = flow_stage2_pad().set_dependencies(upstream_tasks=[stage1_odo, stage1_ordo])Kevin Kho
waitKevin Kho
upstream_tasksMichael Law
08/04/2021, 7:18 PMMichael Law
08/04/2021, 7:18 PMMichael Law
08/04/2021, 7:19 PMMichael Law
08/04/2021, 7:20 PMrunner = FlowRunner()
# This basically just generates json to post to databricks
json = runner.job_json("jobs/logic/enovia/plant_attribute_dim.py")
with Flow('plant_attribute_dim') as flow:
    plant_attribute_run = DatabricksSubmitRun(databricks_conn_secret=runner.databricks_connection, json=json)
    plant_attribute_task = plant_attribute_run()
runner.run(flow)Kevin Kho
DatabricksSubmitRunMichael Law
08/04/2021, 7:23 PMMichael Law
08/04/2021, 7:27 PMKevin Kho
Kevin Kho
Michael Law
08/04/2021, 7:30 PMMichael Law
08/04/2021, 7:31 PMMichael Law
08/04/2021, 7:31 PMMichael Law
08/04/2021, 7:31 PMKevin Kho
create_flow_runwait_for_flow_runMichael Law
08/04/2021, 7:32 PMMichael Law
08/04/2021, 7:33 PMMichael Law
08/04/2021, 7:34 PMKevin Kho
Michael Law
08/04/2021, 7:50 PMMichael Law
08/04/2021, 8:40 PMpr_name = "fdppr2568"
runner = FlowRunner()
with Flow("enovia_flow") as flow:
    odo_run = create_flow_run(flow_name="object_domain_object", project_name=pr_name)
    ordo_run = create_flow_run(flow_name="object_relation_domain_object", project_name=pr_name)
    odo_wait = wait_for_flow_run(odo_run)
    ordo_wait = wait_for_flow_run(ordo_run)
    pad_run = create_flow_run(flow_name="plant_attribute_dim", project_name=pr_name, upstream_tasks=[odo_wait, ordo_wait])
    pcd_run = create_flow_run(flow_name="plant_children_dim", project_name=pr_name, upstream_tasks=[odo_wait, ordo_wait])
    pad_wait = wait_for_flow_run(pad_run)
    pcd_wait = wait_for_flow_run(pcd_run)
    pdo_run = create_flow_run(flow_name="plant_domain_object", project_name=pr_name, upstream_tasks=[pad_wait, pcd_wait])
    pdo_wait = wait_for_flow_run(pdo_run)
    pf_run = create_flow_run(flow_name="plant_fact", project_name=pr_name, upstream_tasks=[pdo_wait])
    pf_wait = wait_for_flow_run(pf_run)
runner.run(flow)Michael Law
08/04/2021, 8:41 PMKevin Kho
create_flow_run(flow_name="object_domain_object", project_name=pr_name, name="xxx")TaskKevin Kho
wait=TrueKevin Kho
Michael Law
08/05/2021, 10:58 AMMichael Law
08/05/2021, 10:58 AMclass FlowRunner:
	def __init__(self):
		self.cluster_id = os.environ["DATABRICKS_CLUSTER_ID"]
		self.mount = os.environ["MOUNT"]
		self.release_version = os.environ["RELEASE_VERSION"]
		self.kubernetes_job_image = os.environ["APP_IMAGE"]
		self.storage_connection = os.environ["AZURE_STORAGE_CONNECTION_STRING"]
		self.databricks_connection = json.loads(os.environ["PREFECT__CONTEXT__SECRETS__DATABRICKS_CONNECTION_STRING"])
        
	def run(self, flow: Flow):
		env = {
			"DATABRICKS_CLUSTER_ID": self.cluster_id,
			"MOUNT": self.mount,
			"RELEASE_VERSION": self.release_version,
			"APP_IMAGE": self.kubernetes_job_image,
			"AZURE_STORAGE_CONNECTION_STRING": self.storage_connection,
			"PREFECT__CONTEXT__SECRETS__DATABRICKS_CONNECTION_STRING": f"'{self.databricks_connection}'"
		}
      
		if (os.environ["DEBUG"] == "1"):
			flow.run(executor=LocalExecutor())
		else:
			flow.run_config = KubernetesRun(image=self.kubernetes_job_image,env=env)
			flow.storage = Azure(container="fdpflows", connection_string=self.storage_connection)Michael Law
08/05/2021, 10:58 AMflow_stage1_odo = StartFlowRun(flow_name="object_domain_object", project_name=pr_name, wait=True)
flow_stage1_ordo = StartFlowRun(flow_name="object_relation_domain_object", project_name=pr_name, wait=True)
flow_stage2_pad = StartFlowRun(flow_name="plant_attribute_dim", project_name=pr_name, wait=True)
flow_stage2_pcd = StartFlowRun(flow_name="plant_children_dim", project_name=pr_name, wait=True)
flow_stage3_pdo = StartFlowRun(flow_name="plant_do_flow", project_name=pr_name, wait=True)
flow_stage4_pf = StartFlowRun(flow_name="plant_fact", project_name=pr_name, wait=True)
runner = FlowRunner()
with Flow("master_flow") as flow:
    stage1_odo = flow_stage1_odo()
    stage1_ordo = flow_stage1_ordo()
    stage2_pad = flow_stage2_pad(upstream_tasks=[stage1_odo, stage1_ordo])
    stage2_pcd = flow_stage2_pcd(upstream_tasks=[stage1_odo, stage1_ordo])
    stage3_pdo = flow_stage3_pdo(upstream_tasks=[stage2_pad, stage2_pcd])
    stage4_pf = flow_stage4_pf(upstream_tasks=[stage3_pdo])
runner.run(flow)Michael Law
08/05/2021, 11:02 AMcreate_flow_run(flow_name="object_domain_object", project_name=pr_name, name="xxx")TaskKevin Kho
my_create_flow_run = create_flow_run.copy()
my_create_flow_run.name = "foo"