Hi! I was trying to set up a parent flow. It shows...
# prefect-server
k
Hi! I was trying to set up a parent flow. It shows run succeeded, which doesn’t appear to be the case, as one of the flows is supposed to write to a database and nothing was written. however, if running one flow (a part of the parent flow), it does write to db. what could have gone wrong here? thanks!
Copy code
from prefect import Flow
from prefect.schedules import CronSchedule
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run


schedule = IntervalSchedule(
        start_date=datetime.utcnow() 
        interval=timedelta(hours=24),
    )


with Flow("parent-flow", schedule=schedule) as flow:
    
    # assumes you have registered the following flows in a project named "examples"
    flow_a = create_flow_run(flow_name="A", project_name="examples")
    wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True)
    
    flow_b = create_flow_run(flow_name="B", project_name="examples")
    wait_for_flow_b = wait_for_flow_run(flow_b, raise_final_state=True)
    
    
    flow_b.set_upstream(wait_for_flow_a)
flow.run()
the flow was set up similar to the example in the documentation. for a side question, is b.set_upstream(wait_for_flow_a) identical as wait_for_flow_a.set_downstream(b)?
k
Those are identical. I think this all looks good though. I think I’l need to see the content of the flow that write into the database to have a better idea
k
the content of the individua flow that write to db looks like this
Copy code
cursor = conn.cursor()

        for rec in records:

            insert_script = f"""
            INSERT INTO {tbl} (
                col1, col2, ...
            ) 
                VALUES {rec}
            """
             cursor.execute(insert_script)

        conn.commit()
        cursor.close()
        conn.close()
Copy code
with Flow(
        "A",
        schedule=schedule, #also scheduled on run based on current timestamp
        state_handlers=[gmail_notifier(only_states=[Failed])],
        executor=LocalDaskExecutor(cluster_kwargs={"n_workers": 6}),
        run_config=LocalRun(),
    ) as flow:

        records=query_from_source()
        df = postprocess(records)
        write_to_db("table_name", df)

    flow.run()
is there anyway I can view the logged info on the UI under logs tab? I might have used logging wrongly. could see it in terminal while running in local mode but not under LOGS tab when registering the runs. all it shows is ‘creating flow run’, ‘finished task run’ but not the logger.info from different tasks
k
The log will appear in the logs of the subflow page. There should be a flow run under that subflow
Yeah the subflow looks pretty normal
k
thanks for looking into it! found a potential issue (one of the subflow got stuck on its own) btw given that each subflow only needs to run once per day, once the parent flow is working, should i remove schedule from all the subflow and only leave it in the parent flow (to avoid running them twice)?
a
You're spot on @KhTan, when building a flow-of-flows, the most common approach is to only schedule the parent flow and the parent triggers child flow runs as defined by your parent flow logic
k
thank you Anna!
just one more question, if i add a new subflow, should i run it to register it first, or will it automatically register after being added to a parent flow?
k
You need to register separately
🙏 1
a
just one more question, if i add a new subflow, should i run it to register it first, or will it automatically register after being added to a parent flow?
@KhTan it’s a good question though because using Orion subflows, you don’t need to register or deploy those separately, while in Prefect <= 1.0 you have to
👍 1
The reason for it is that in Prefect 1.0 subflows are not really subflows but rather something what we call “orchestrator pattern”. This pattern means that you trigger already deployed (registered) flows via API calls initiated from a parent flow.