Hey folks, Looking for some assistance here if po...
# ask-community
m
Hey folks, Looking for some assistance here if possible, as I’m a little confused. Given the flow below, which basically orchestrates a bunch of other flows, I’d expect this to run the dependant flows as per the dependencies defined, but it does not, it kicks them all of at the same time. * see thread It was my understanding given the schematic that is drawn that, that it would wait for the upstream tasks to be completed (maybe my upstream vs down stream is confused here). The ‘FlowRunner’ class here is just a wrapper around an environment variable to run locally or via Kubernetes. * see thread Can anyone see any obvious mistakes here.
m
I think you want to use the set_dependencies method after the task is invoked, so like:
Copy code
stage2_pad = flow_stage2_pad().set_dependencies(upstream_tasks=[stage1_odo, stage1_ordo])
k
@Michael Law I have been staying at this a bit and I am not seeing anything immediate. It looks right to me. Your schematic shows them as connected so the only thing I can think of is that the
wait
is not being respected. Was this ever working?
Hey @Mark McDonald, i think he has that covered with the
upstream_tasks
in the run calls. If the schematic looks good, I suppose this won’t be a dependency issue. Can you post the schematic @Michael Law?
👍 1
m
It looks fine but never appears to wait, it just kicks all of the tasks of at the same time
Sorry, i should be clearer. It kicks them off in order as per the schematic, but does not wait for the result from the sub flows
This is a sample of one of the sub flows.
Copy code
runner = FlowRunner()
# This basically just generates json to post to databricks
json = runner.job_json("jobs/logic/enovia/plant_attribute_dim.py")

with Flow('plant_attribute_dim') as flow:
    plant_attribute_run = DatabricksSubmitRun(databricks_conn_secret=runner.databricks_connection, json=json)
    plant_attribute_task = plant_attribute_run()

runner.run(flow)
k
Can you add a time.sleep of like a minute to those subflows to see if that is respected? I suspect
DatabricksSubmitRun
is just marking as complete after submission causing the flow to be labelled as success?
m
I was thinking this too, but I can see in the portal all of the sub flows running simultaneously after the parent completes. Kind of like they are being kicked off asyncronously without an await
This shows them all kicking off within the minute
k
What version are you on for Prefect?
Just confirming you re-registered them?
m
We are using the cloud version and the latest docker image with python 3.7
I registered all of the flows via the command line too
4 August 2021,041859 agent INFO Submitted for execution: Job prefect-job-f0583609 4 August 2021,041900 prefect.Azure INFO Downloading plant-children-dim/2021-08-04t14-56-49-138960-00-00 from fdpflows 4 August 2021,041900 prefect.CloudFlowRunner INFO Beginning Flow run for ‘plant_children_dim’ 4 August 2021,041901 prefect.CloudTaskRunner INFO Task ‘DatabricksSubmitRun’: Starting task run... 4 August 2021,041902 prefect.DatabricksSubmitRun INFO Run submitted with run_id: 2454 4 August 2021,041902 prefect.DatabricksSubmitRun INFO Run submitted with config : { [redacted] } 4 August 2021,041902 prefect.DatabricksSubmitRun INFO View run status, Spark UI, and logs at https://adb-[redacted].azuredatabricks.net/?o=[redacted]#job/1744/run/1 4 August 2021,041902 prefect.DatabricksSubmitRun INFO DatabricksSubmitRun in run state: {‘life_cycle_state’: ‘PENDING’, ‘result_state’: None, ‘state_message’: ‘’} 4 August 2021,041902 prefect.DatabricksSubmitRun INFO View run status, Spark UI, and logs at https://adb-[redacted].azuredatabricks.net/?o=[redacted]#job/1744/run/1 4 August 2021,041902 prefect.DatabricksSubmitRun INFO Sleeping for 30 seconds. 4 August 2021,041932 prefect.DatabricksSubmitRun INFO DatabricksSubmitRun in run state: {‘life_cycle_state’: ‘RUNNING’, ‘result_state’: None, ‘state_message’: ‘In run’} 4 August 2021,041932 prefect.DatabricksSubmitRun INFO View run status, Spark UI, and logs at https://adb-[redacted].azuredatabricks.net/?o=[redacted]#job/1744/run/1 4 August 2021,041932 prefect.DatabricksSubmitRun INFO Sleeping for 30 seconds. 4 August 2021,042002 prefect.DatabricksSubmitRun INFO DatabricksSubmitRun in run state: {‘life_cycle_state’: ‘RUNNING’, ‘result_state’: None, ‘state_message’: ‘In run’} 4 August 2021,042002 prefect.DatabricksSubmitRun INFO View run status, Spark UI, and logs at https://adb-[redacted].azuredatabricks.net/?o=[redacted]#job/1744/run/1 4 August 2021,042002 prefect.DatabricksSubmitRun INFO Sleeping for 30 seconds. 4 August 2021,042032 prefect.DatabricksSubmitRun INFO DatabricksSubmitRun in run state: {‘life_cycle_state’: ‘RUNNING’, ‘result_state’: None, ‘state_message’: ‘In run’} 4 August 2021,042032 prefect.DatabricksSubmitRun INFO View run status, Spark UI, and logs at https://adb-[redacted].azuredatabricks.net/?o=[redacted]#job/1744/run/1 4 August 2021,042032 prefect.DatabricksSubmitRun INFO Sleeping for 30 seconds. 4 August 2021,042102 prefect.DatabricksSubmitRun INFO DatabricksSubmitRun completed successfully. 4 August 2021,042102 prefect.DatabricksSubmitRun INFO View run status, Spark UI, and logs at https://adb-[redacted].azuredatabricks.net/?o=[redacted]#job/1744/run/1 4 August 2021,042103 prefect.CloudTaskRunner INFO Task ‘DatabricksSubmitRun’: Finished task run for task with final state: ‘Success’ 4 August 2021,042103 prefect.CloudFlowRunner INFO Flow run SUCCESS: all reference tasks succeeded
That is one of the logs which shows the sub flow took longer than the minute the parent suggested
k
Would you like to try the new tasks
create_flow_run
and
wait_for_flow_run
that came out in 0.15 ? Hopefully the behavior of those will be better?
👍 1
m
I understand that DatabricksSubmitRun is simply submitting a job to the cluster, but when I run the flow individually, as you can see it does wait
I’ll try that now @Kevin Kho
Will it build out the shcematic for me or is there some specific syntax I have to use?
k
The syntax would be something like this and yes it builds out the schematic as well since these are tasks
m
Progress, the parent job now appears to be waiting for the databricks jobs to complete, is there more detailed documentation around create_flow_run and wait_for_flow_run
@Kevin Kho this works, thanks for your help.
Copy code
pr_name = "fdppr2568"

runner = FlowRunner()

with Flow("enovia_flow") as flow:
    odo_run = create_flow_run(flow_name="object_domain_object", project_name=pr_name)
    ordo_run = create_flow_run(flow_name="object_relation_domain_object", project_name=pr_name)
    odo_wait = wait_for_flow_run(odo_run)
    ordo_wait = wait_for_flow_run(ordo_run)

    pad_run = create_flow_run(flow_name="plant_attribute_dim", project_name=pr_name, upstream_tasks=[odo_wait, ordo_wait])
    pcd_run = create_flow_run(flow_name="plant_children_dim", project_name=pr_name, upstream_tasks=[odo_wait, ordo_wait])
    pad_wait = wait_for_flow_run(pad_run)
    pcd_wait = wait_for_flow_run(pcd_run)

    pdo_run = create_flow_run(flow_name="plant_domain_object", project_name=pr_name, upstream_tasks=[pad_wait, pcd_wait])
    pdo_wait = wait_for_flow_run(pdo_run)

    pf_run = create_flow_run(flow_name="plant_fact", project_name=pr_name, upstream_tasks=[pdo_wait])
    pf_wait = wait_for_flow_run(pf_run)

runner.run(flow)
Can I give them friendly names for my schematic?
k
Yeah
create_flow_run(flow_name="object_domain_object", project_name=pr_name, name="xxx")
. The task takes the kwargs of the
Task
class.
And I have no clue what’s up with StartFlowRun, first time I’ve seen that with
wait=True
already specified.
👍 1
Could you move code to the thread btw to the chat when you get a chance so that we can free up space in the main channel?
m
Yeah no worries
Copy code
class FlowRunner:
	def __init__(self):
		self.cluster_id = os.environ["DATABRICKS_CLUSTER_ID"]
		self.mount = os.environ["MOUNT"]
		self.release_version = os.environ["RELEASE_VERSION"]
		self.kubernetes_job_image = os.environ["APP_IMAGE"]
		self.storage_connection = os.environ["AZURE_STORAGE_CONNECTION_STRING"]
		self.databricks_connection = json.loads(os.environ["PREFECT__CONTEXT__SECRETS__DATABRICKS_CONNECTION_STRING"])
        
	def run(self, flow: Flow):
		env = {
			"DATABRICKS_CLUSTER_ID": self.cluster_id,
			"MOUNT": self.mount,
			"RELEASE_VERSION": self.release_version,
			"APP_IMAGE": self.kubernetes_job_image,
			"AZURE_STORAGE_CONNECTION_STRING": self.storage_connection,
			"PREFECT__CONTEXT__SECRETS__DATABRICKS_CONNECTION_STRING": f"'{self.databricks_connection}'"
		}
      
		if (os.environ["DEBUG"] == "1"):
			flow.run(executor=LocalExecutor())
		else:
			flow.run_config = KubernetesRun(image=self.kubernetes_job_image,env=env)
			flow.storage = Azure(container="fdpflows", connection_string=self.storage_connection)
Copy code
flow_stage1_odo = StartFlowRun(flow_name="object_domain_object", project_name=pr_name, wait=True)
flow_stage1_ordo = StartFlowRun(flow_name="object_relation_domain_object", project_name=pr_name, wait=True)
flow_stage2_pad = StartFlowRun(flow_name="plant_attribute_dim", project_name=pr_name, wait=True)
flow_stage2_pcd = StartFlowRun(flow_name="plant_children_dim", project_name=pr_name, wait=True)
flow_stage3_pdo = StartFlowRun(flow_name="plant_do_flow", project_name=pr_name, wait=True)
flow_stage4_pf = StartFlowRun(flow_name="plant_fact", project_name=pr_name, wait=True)

runner = FlowRunner()

with Flow("master_flow") as flow:
    stage1_odo = flow_stage1_odo()
    stage1_ordo = flow_stage1_ordo()
    stage2_pad = flow_stage2_pad(upstream_tasks=[stage1_odo, stage1_ordo])
    stage2_pcd = flow_stage2_pcd(upstream_tasks=[stage1_odo, stage1_ordo])
    stage3_pdo = flow_stage3_pdo(upstream_tasks=[stage2_pad, stage2_pcd])
    stage4_pf = flow_stage4_pf(upstream_tasks=[stage3_pdo])

runner.run(flow)
Yeah 
create_flow_run(flow_name="object_domain_object", project_name=pr_name, name="xxx")
 . The task takes the kwargs of the 
Task
 class. @Kevin Kho just a heads up but for 0.15.3 name doesnt appear to be a property on that method
k
That’s right. Will make note of that. You can do
Copy code
my_create_flow_run = create_flow_run.copy()
my_create_flow_run.name = "foo"