Michael Law
08/04/2021, 6:51 PMMark McDonald
08/04/2021, 7:13 PMstage2_pad = flow_stage2_pad().set_dependencies(upstream_tasks=[stage1_odo, stage1_ordo])
Kevin Kho
wait
is not being respected. Was this ever working?Kevin Kho
upstream_tasks
in the run calls. If the schematic looks good, I suppose this won’t be a dependency issue. Can you post the schematic @Michael Law?Michael Law
08/04/2021, 7:18 PMMichael Law
08/04/2021, 7:18 PMMichael Law
08/04/2021, 7:19 PMMichael Law
08/04/2021, 7:20 PMrunner = FlowRunner()
# This basically just generates json to post to databricks
json = runner.job_json("jobs/logic/enovia/plant_attribute_dim.py")
with Flow('plant_attribute_dim') as flow:
plant_attribute_run = DatabricksSubmitRun(databricks_conn_secret=runner.databricks_connection, json=json)
plant_attribute_task = plant_attribute_run()
runner.run(flow)
Kevin Kho
DatabricksSubmitRun
is just marking as complete after submission causing the flow to be labelled as success?Michael Law
08/04/2021, 7:23 PMMichael Law
08/04/2021, 7:27 PMKevin Kho
Kevin Kho
Michael Law
08/04/2021, 7:30 PMMichael Law
08/04/2021, 7:31 PMMichael Law
08/04/2021, 7:31 PMMichael Law
08/04/2021, 7:31 PMKevin Kho
create_flow_run
and wait_for_flow_run
that came out in 0.15 ? Hopefully the behavior of those will be better?Michael Law
08/04/2021, 7:32 PMMichael Law
08/04/2021, 7:33 PMMichael Law
08/04/2021, 7:34 PMKevin Kho
Michael Law
08/04/2021, 7:50 PMMichael Law
08/04/2021, 8:40 PMpr_name = "fdppr2568"
runner = FlowRunner()
with Flow("enovia_flow") as flow:
odo_run = create_flow_run(flow_name="object_domain_object", project_name=pr_name)
ordo_run = create_flow_run(flow_name="object_relation_domain_object", project_name=pr_name)
odo_wait = wait_for_flow_run(odo_run)
ordo_wait = wait_for_flow_run(ordo_run)
pad_run = create_flow_run(flow_name="plant_attribute_dim", project_name=pr_name, upstream_tasks=[odo_wait, ordo_wait])
pcd_run = create_flow_run(flow_name="plant_children_dim", project_name=pr_name, upstream_tasks=[odo_wait, ordo_wait])
pad_wait = wait_for_flow_run(pad_run)
pcd_wait = wait_for_flow_run(pcd_run)
pdo_run = create_flow_run(flow_name="plant_domain_object", project_name=pr_name, upstream_tasks=[pad_wait, pcd_wait])
pdo_wait = wait_for_flow_run(pdo_run)
pf_run = create_flow_run(flow_name="plant_fact", project_name=pr_name, upstream_tasks=[pdo_wait])
pf_wait = wait_for_flow_run(pf_run)
runner.run(flow)
Michael Law
08/04/2021, 8:41 PMKevin Kho
create_flow_run(flow_name="object_domain_object", project_name=pr_name, name="xxx")
. The task takes the kwargs of the Task
class.Kevin Kho
wait=True
already specified.Kevin Kho
Michael Law
08/05/2021, 10:58 AMMichael Law
08/05/2021, 10:58 AMclass FlowRunner:
def __init__(self):
self.cluster_id = os.environ["DATABRICKS_CLUSTER_ID"]
self.mount = os.environ["MOUNT"]
self.release_version = os.environ["RELEASE_VERSION"]
self.kubernetes_job_image = os.environ["APP_IMAGE"]
self.storage_connection = os.environ["AZURE_STORAGE_CONNECTION_STRING"]
self.databricks_connection = json.loads(os.environ["PREFECT__CONTEXT__SECRETS__DATABRICKS_CONNECTION_STRING"])
def run(self, flow: Flow):
env = {
"DATABRICKS_CLUSTER_ID": self.cluster_id,
"MOUNT": self.mount,
"RELEASE_VERSION": self.release_version,
"APP_IMAGE": self.kubernetes_job_image,
"AZURE_STORAGE_CONNECTION_STRING": self.storage_connection,
"PREFECT__CONTEXT__SECRETS__DATABRICKS_CONNECTION_STRING": f"'{self.databricks_connection}'"
}
if (os.environ["DEBUG"] == "1"):
flow.run(executor=LocalExecutor())
else:
flow.run_config = KubernetesRun(image=self.kubernetes_job_image,env=env)
flow.storage = Azure(container="fdpflows", connection_string=self.storage_connection)
Michael Law
08/05/2021, 10:58 AMflow_stage1_odo = StartFlowRun(flow_name="object_domain_object", project_name=pr_name, wait=True)
flow_stage1_ordo = StartFlowRun(flow_name="object_relation_domain_object", project_name=pr_name, wait=True)
flow_stage2_pad = StartFlowRun(flow_name="plant_attribute_dim", project_name=pr_name, wait=True)
flow_stage2_pcd = StartFlowRun(flow_name="plant_children_dim", project_name=pr_name, wait=True)
flow_stage3_pdo = StartFlowRun(flow_name="plant_do_flow", project_name=pr_name, wait=True)
flow_stage4_pf = StartFlowRun(flow_name="plant_fact", project_name=pr_name, wait=True)
runner = FlowRunner()
with Flow("master_flow") as flow:
stage1_odo = flow_stage1_odo()
stage1_ordo = flow_stage1_ordo()
stage2_pad = flow_stage2_pad(upstream_tasks=[stage1_odo, stage1_ordo])
stage2_pcd = flow_stage2_pcd(upstream_tasks=[stage1_odo, stage1_ordo])
stage3_pdo = flow_stage3_pdo(upstream_tasks=[stage2_pad, stage2_pcd])
stage4_pf = flow_stage4_pf(upstream_tasks=[stage3_pdo])
runner.run(flow)
Michael Law
08/05/2021, 11:02 AMcreate_flow_run(flow_name="object_domain_object", project_name=pr_name, name="xxx")
. The task takes the kwargs of the Task
class.
@Kevin Kho just a heads up but for 0.15.3 name doesnt appear to be a property on that methodKevin Kho
my_create_flow_run = create_flow_run.copy()
my_create_flow_run.name = "foo"