KhTan
03/09/2022, 12:41 AMfrom prefect import Flow
from prefect.schedules import CronSchedule
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
schedule = IntervalSchedule(
start_date=datetime.utcnow()
interval=timedelta(hours=24),
)
with Flow("parent-flow", schedule=schedule) as flow:
# assumes you have registered the following flows in a project named "examples"
flow_a = create_flow_run(flow_name="A", project_name="examples")
wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True)
flow_b = create_flow_run(flow_name="B", project_name="examples")
wait_for_flow_b = wait_for_flow_run(flow_b, raise_final_state=True)
flow_b.set_upstream(wait_for_flow_a)
flow.run()
the flow was set up similar to the example in the documentation.
for a side question, is b.set_upstream(wait_for_flow_a) identical as wait_for_flow_a.set_downstream(b)?Kevin Kho
03/09/2022, 1:52 AMKhTan
03/09/2022, 1:58 AMcursor = conn.cursor()
for rec in records:
insert_script = f"""
INSERT INTO {tbl} (
col1, col2, ...
)
VALUES {rec}
"""
cursor.execute(insert_script)
conn.commit()
cursor.close()
conn.close()
with Flow(
"A",
schedule=schedule, #also scheduled on run based on current timestamp
state_handlers=[gmail_notifier(only_states=[Failed])],
executor=LocalDaskExecutor(cluster_kwargs={"n_workers": 6}),
run_config=LocalRun(),
) as flow:
records=query_from_source()
df = postprocess(records)
write_to_db("table_name", df)
flow.run()
Kevin Kho
03/09/2022, 3:35 AMKhTan
03/09/2022, 7:03 PMAnna Geller
03/09/2022, 8:37 PMKhTan
03/09/2022, 9:01 PMKevin Kho
03/09/2022, 9:04 PMAnna Geller
03/09/2022, 9:22 PMjust one more question, if i add a new subflow, should i run it to register it first, or will it automatically register after being added to a parent flow?@KhTan it’s a good question though because using Orion subflows, you don’t need to register or deploy those separately, while in Prefect <= 1.0 you have to