Thread
#prefect-community
    Michael Law

    Michael Law

    1 year ago
    Hey folks, Looking for some assistance here if possible, as I’m a little confused. Given the flow below, which basically orchestrates a bunch of other flows, I’d expect this to run the dependant flows as per the dependencies defined, but it does not, it kicks them all of at the same time. * see thread It was my understanding given the schematic that is drawn that, that it would wait for the upstream tasks to be completed (maybe my upstream vs down stream is confused here). The ‘FlowRunner’ class here is just a wrapper around an environment variable to run locally or via Kubernetes. * see thread Can anyone see any obvious mistakes here.
    Mark McDonald

    Mark McDonald

    1 year ago
    I think you want to use the set_dependencies method after the task is invoked, so like:
    stage2_pad = flow_stage2_pad().set_dependencies(upstream_tasks=[stage1_odo, stage1_ordo])
    Kevin Kho

    Kevin Kho

    1 year ago
    @Michael Law I have been staying at this a bit and I am not seeing anything immediate. It looks right to me. Your schematic shows them as connected so the only thing I can think of is that the
    wait
    is not being respected. Was this ever working?
    Hey @Mark McDonald, i think he has that covered with the
    upstream_tasks
    in the run calls. If the schematic looks good, I suppose this won’t be a dependency issue. Can you post the schematic @Michael Law?
    Michael Law

    Michael Law

    1 year ago
    It looks fine but never appears to wait, it just kicks all of the tasks of at the same time
    Sorry, i should be clearer. It kicks them off in order as per the schematic, but does not wait for the result from the sub flows
    This is a sample of one of the sub flows.
    runner = FlowRunner()
    # This basically just generates json to post to databricks
    json = runner.job_json("jobs/logic/enovia/plant_attribute_dim.py")
    
    with Flow('plant_attribute_dim') as flow:
        plant_attribute_run = DatabricksSubmitRun(databricks_conn_secret=runner.databricks_connection, json=json)
        plant_attribute_task = plant_attribute_run()
    
    runner.run(flow)
    Kevin Kho

    Kevin Kho

    1 year ago
    Can you add a time.sleep of like a minute to those subflows to see if that is respected? I suspect
    DatabricksSubmitRun
    is just marking as complete after submission causing the flow to be labelled as success?
    Michael Law

    Michael Law

    1 year ago
    I was thinking this too, but I can see in the portal all of the sub flows running simultaneously after the parent completes. Kind of like they are being kicked off asyncronously without an await
    This shows them all kicking off within the minute
    Kevin Kho

    Kevin Kho

    1 year ago
    What version are you on for Prefect?
    Just confirming you re-registered them?
    Michael Law

    Michael Law

    1 year ago
    We are using the cloud version and the latest docker image with python 3.7
    I registered all of the flows via the command line too
    4 August 2021,04:18:59 agent INFO Submitted for execution: Job prefect-job-f0583609 4 August 2021,04:19:00 prefect.Azure INFO Downloading plant-children-dim/2021-08-04t14-56-49-138960-00-00 from fdpflows 4 August 2021,04:19:00 prefect.CloudFlowRunner INFO Beginning Flow run for ‘plant_children_dim’ 4 August 2021,04:19:01 prefect.CloudTaskRunner INFO Task ‘DatabricksSubmitRun’: Starting task run... 4 August 2021,04:19:02 prefect.DatabricksSubmitRun INFO Run submitted with run_id: 2454 4 August 2021,04:19:02 prefect.DatabricksSubmitRun INFO Run submitted with config : { [redacted] } 4 August 2021,04:19:02 prefect.DatabricksSubmitRun INFO View run status, Spark UI, and logs at https://adb-[redacted].azuredatabricks.net/?o=[redacted]#job/1744/run/1 4 August 2021,04:19:02 prefect.DatabricksSubmitRun INFO DatabricksSubmitRun in run state: {‘life_cycle_state’: ‘PENDING’, ‘result_state’: None, ‘state_message’: ‘’} 4 August 2021,04:19:02 prefect.DatabricksSubmitRun INFO View run status, Spark UI, and logs at https://adb-[redacted].azuredatabricks.net/?o=[redacted]#job/1744/run/1 4 August 2021,04:19:02 prefect.DatabricksSubmitRun INFO Sleeping for 30 seconds. 4 August 2021,04:19:32 prefect.DatabricksSubmitRun INFO DatabricksSubmitRun in run state: {‘life_cycle_state’: ‘RUNNING’, ‘result_state’: None, ‘state_message’: ‘In run’} 4 August 2021,04:19:32 prefect.DatabricksSubmitRun INFO View run status, Spark UI, and logs at https://adb-[redacted].azuredatabricks.net/?o=[redacted]#job/1744/run/1 4 August 2021,04:19:32 prefect.DatabricksSubmitRun INFO Sleeping for 30 seconds. 4 August 2021,04:20:02 prefect.DatabricksSubmitRun INFO DatabricksSubmitRun in run state: {‘life_cycle_state’: ‘RUNNING’, ‘result_state’: None, ‘state_message’: ‘In run’} 4 August 2021,04:20:02 prefect.DatabricksSubmitRun INFO View run status, Spark UI, and logs at https://adb-[redacted].azuredatabricks.net/?o=[redacted]#job/1744/run/1 4 August 2021,04:20:02 prefect.DatabricksSubmitRun INFO Sleeping for 30 seconds. 4 August 2021,04:20:32 prefect.DatabricksSubmitRun INFO DatabricksSubmitRun in run state: {‘life_cycle_state’: ‘RUNNING’, ‘result_state’: None, ‘state_message’: ‘In run’} 4 August 2021,04:20:32 prefect.DatabricksSubmitRun INFO View run status, Spark UI, and logs at https://adb-[redacted].azuredatabricks.net/?o=[redacted]#job/1744/run/1 4 August 2021,04:20:32 prefect.DatabricksSubmitRun INFO Sleeping for 30 seconds. 4 August 2021,04:21:02 prefect.DatabricksSubmitRun INFO DatabricksSubmitRun completed successfully. 4 August 2021,04:21:02 prefect.DatabricksSubmitRun INFO View run status, Spark UI, and logs at https://adb-[redacted].azuredatabricks.net/?o=[redacted]#job/1744/run/1 4 August 2021,04:21:03 prefect.CloudTaskRunner INFO Task ‘DatabricksSubmitRun’: Finished task run for task with final state: ‘Success’ 4 August 2021,04:21:03 prefect.CloudFlowRunner INFO Flow run SUCCESS: all reference tasks succeeded
    That is one of the logs which shows the sub flow took longer than the minute the parent suggested
    Kevin Kho

    Kevin Kho

    1 year ago
    Would you like to try the new tasks
    create_flow_run
    and
    wait_for_flow_run
    that came out in 0.15 ? Hopefully the behavior of those will be better?
    Michael Law

    Michael Law

    1 year ago
    I understand that DatabricksSubmitRun is simply submitting a job to the cluster, but when I run the flow individually, as you can see it does wait
    I’ll try that now @Kevin Kho
    Will it build out the shcematic for me or is there some specific syntax I have to use?
    Kevin Kho

    Kevin Kho

    1 year ago
    The syntax would be something like this and yes it builds out the schematic as well since these are tasks
    Michael Law

    Michael Law

    1 year ago
    Progress, the parent job now appears to be waiting for the databricks jobs to complete, is there more detailed documentation around create_flow_run and wait_for_flow_run
    @Kevin Kho this works, thanks for your help.
    pr_name = "fdppr2568"
    
    runner = FlowRunner()
    
    with Flow("enovia_flow") as flow:
        odo_run = create_flow_run(flow_name="object_domain_object", project_name=pr_name)
        ordo_run = create_flow_run(flow_name="object_relation_domain_object", project_name=pr_name)
        odo_wait = wait_for_flow_run(odo_run)
        ordo_wait = wait_for_flow_run(ordo_run)
    
        pad_run = create_flow_run(flow_name="plant_attribute_dim", project_name=pr_name, upstream_tasks=[odo_wait, ordo_wait])
        pcd_run = create_flow_run(flow_name="plant_children_dim", project_name=pr_name, upstream_tasks=[odo_wait, ordo_wait])
        pad_wait = wait_for_flow_run(pad_run)
        pcd_wait = wait_for_flow_run(pcd_run)
    
        pdo_run = create_flow_run(flow_name="plant_domain_object", project_name=pr_name, upstream_tasks=[pad_wait, pcd_wait])
        pdo_wait = wait_for_flow_run(pdo_run)
    
        pf_run = create_flow_run(flow_name="plant_fact", project_name=pr_name, upstream_tasks=[pdo_wait])
        pf_wait = wait_for_flow_run(pf_run)
    
    runner.run(flow)
    Can I give them friendly names for my schematic?
    Kevin Kho

    Kevin Kho

    1 year ago
    Yeah
    create_flow_run(flow_name="object_domain_object", project_name=pr_name, name="xxx")
    . The task takes the kwargs of the
    Task
    class.
    And I have no clue what’s up with StartFlowRun, first time I’ve seen that with
    wait=True
    already specified.
    Could you move code to the thread btw to the chat when you get a chance so that we can free up space in the main channel?
    Michael Law

    Michael Law

    1 year ago
    Yeah no worries
    class FlowRunner:
    	def __init__(self):
    		self.cluster_id = os.environ["DATABRICKS_CLUSTER_ID"]
    		self.mount = os.environ["MOUNT"]
    		self.release_version = os.environ["RELEASE_VERSION"]
    		self.kubernetes_job_image = os.environ["APP_IMAGE"]
    		self.storage_connection = os.environ["AZURE_STORAGE_CONNECTION_STRING"]
    		self.databricks_connection = json.loads(os.environ["PREFECT__CONTEXT__SECRETS__DATABRICKS_CONNECTION_STRING"])
            
    	def run(self, flow: Flow):
    		env = {
    			"DATABRICKS_CLUSTER_ID": self.cluster_id,
    			"MOUNT": self.mount,
    			"RELEASE_VERSION": self.release_version,
    			"APP_IMAGE": self.kubernetes_job_image,
    			"AZURE_STORAGE_CONNECTION_STRING": self.storage_connection,
    			"PREFECT__CONTEXT__SECRETS__DATABRICKS_CONNECTION_STRING": f"'{self.databricks_connection}'"
    		}
          
    		if (os.environ["DEBUG"] == "1"):
    			flow.run(executor=LocalExecutor())
    		else:
    			flow.run_config = KubernetesRun(image=self.kubernetes_job_image,env=env)
    			flow.storage = Azure(container="fdpflows", connection_string=self.storage_connection)
    flow_stage1_odo = StartFlowRun(flow_name="object_domain_object", project_name=pr_name, wait=True)
    flow_stage1_ordo = StartFlowRun(flow_name="object_relation_domain_object", project_name=pr_name, wait=True)
    flow_stage2_pad = StartFlowRun(flow_name="plant_attribute_dim", project_name=pr_name, wait=True)
    flow_stage2_pcd = StartFlowRun(flow_name="plant_children_dim", project_name=pr_name, wait=True)
    flow_stage3_pdo = StartFlowRun(flow_name="plant_do_flow", project_name=pr_name, wait=True)
    flow_stage4_pf = StartFlowRun(flow_name="plant_fact", project_name=pr_name, wait=True)
    
    runner = FlowRunner()
    
    with Flow("master_flow") as flow:
        stage1_odo = flow_stage1_odo()
        stage1_ordo = flow_stage1_ordo()
        stage2_pad = flow_stage2_pad(upstream_tasks=[stage1_odo, stage1_ordo])
        stage2_pcd = flow_stage2_pcd(upstream_tasks=[stage1_odo, stage1_ordo])
        stage3_pdo = flow_stage3_pdo(upstream_tasks=[stage2_pad, stage2_pcd])
        stage4_pf = flow_stage4_pf(upstream_tasks=[stage3_pdo])
    
    runner.run(flow)
    Yeah 
    create_flow_run(flow_name="object_domain_object", project_name=pr_name, name="xxx")
     . The task takes the kwargs of the 
    Task
     class. @Kevin Kho just a heads up but for 0.15.3 name doesnt appear to be a property on that method
    Kevin Kho

    Kevin Kho

    1 year ago
    That’s right. Will make note of that. You can do
    my_create_flow_run = create_flow_run.copy()
    my_create_flow_run.name = "foo"