Daniel Katz
11/11/2021, 2:28 PMAnna Geller
Daniel Katz
11/11/2021, 2:32 PMAnna Geller
from prefect import task, Flow
from prefect.backend import FlowRunView
from prefect.engine.signals import signal_from_state
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
FLOW_NAME = "daniel_multiple_flows"
PROJECT_NAME = "your_project_name_for_flows_a_b_c"
@task
def raise_flow_run_state(flow_run: FlowRunView):
flow_run_state = flow_run.state
if not flow_run_state.is_successful():
exc = signal_from_state(flow_run_state)(
f"{flow_run.flow_run_id} finished in state {flow_run_state}"
)
raise exc
return flow_run
with Flow(FLOW_NAME,) as flow:
flow_a_flow_run_id = create_flow_run(flow_name="flow_a", project_name=PROJECT_NAME,)
flow_a_flow_run_view = wait_for_flow_run(flow_a_flow_run_id)
final_flow_a_flow_run_view = raise_flow_run_state(flow_a_flow_run_view)
flow_b_run_id = create_flow_run(flow_name="flow_b", project_name=PROJECT_NAME)
flow_b_run_view = wait_for_flow_run(flow_b_run_id)
final_flow_b_run_view = raise_flow_run_state(flow_b_run_view)
final_flow_a_flow_run_view.set_downstream(flow_b_run_id)
flow_c_run_id = create_flow_run(flow_name="flow_c", project_name=PROJECT_NAME,)
flow_c_run_view = wait_for_flow_run(flow_c_run_id)
final_flow_c_run_view = raise_flow_run_state(flow_c_run_view)
final_flow_b_run_view.set_downstream(flow_c_run_id)
Anna Geller
Daniel Katz
11/11/2021, 2:36 PMDaniel Katz
11/11/2021, 2:36 PMAnna Geller
import pendulum
@task
def check_if_5_am():
now = pendulum.now(tz="your_tz")
if now < pendulum.today(tz="your_tz").replace(hour=5, minute=0):
time.sleep(123) # calculate how long
not a fan of this added sleep but if it’s important that it’s absolutely 5 AM and not a minute earlier, that’s the best solution I can think of atm without changing those external dependenciesKevin Kho
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by