Niels Prins
09/05/2022, 1:58 PMAnna Geller
09/05/2022, 9:05 PMNiels Prins
09/06/2022, 8:53 AMimport time
from typing import List
from uuid import uuid4
import prefect
from prefect import Flow, Parameter, Task, task
from prefect.engine.signals import FAIL, LOOP, signal_from_state
from prefect.executors import LocalDaskExecutor
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
@task
def get_deltas() -> List:
return [1, 2, 3, 4]
class A(Task):
def run(self, v):
<http://self.logger.info|self.logger.info>(f"{v} A")
if v == 3:
raise ValueError("This is not the value you where looking for")
for i in range(4):
time.sleep(1)
<http://self.logger.info|self.logger.info>(f"{v} - {i}")
return "A"
class B(Task):
def run(self, v):
<http://self.logger.info|self.logger.info>(f"{v} B")
time.sleep(5)
return "B"
class C(Task):
def run(
self,
):
<http://self.logger.info|self.logger.info>("C")
time.sleep(2)
return "C"
class Sequenced(Task):
def run(self, ds: List):
LOG = prefect.context.get("logger")
loop_payload = prefect.context.get("task_loop_result", {"ds": ds})
ds = loop_payload["ds"]
if not ds:
return
d = ds[0]
<http://LOG.info|LOG.info>(d)
# setting this is required, if not the flow is detected as already run, and will not be started
idempotency_key = str(uuid4())
id = create_flow_run.run(
idempotency_key=idempotency_key,
flow_name="sub",
project_name="test",
parameters={"delta_id": d},
)
LOG.warning(id)
# raising final state will result in the final state of this task being raised
# this will lead to not calling the loop value
flow_run = wait_for_flow_run.run(id, stream_logs=True, raise_final_state=False)
state_signal = signal_from_state(flow_run.state)(
message=f"{id} finished in state {flow_run.state}",
result=flow_run,
)
if isinstance(state_signal, FAIL):
raise state_signal
raise LOOP(message="", result={"ds": ds[1:]})
with Flow(name="sub", executor=LocalDaskExecutor()) as sub:
delta_id = Parameter(name="delta_id", required=True)
a = A()(delta_id)
b = B()(delta_id)
C()(upstream_tasks=[a, b])
with Flow(name="main") as flow:
deltas = get_deltas()
Sequenced()(deltas)
if __name__ == "__main__":
sub.register("test")
flow.register("test")
flow.run()
Anna Geller
09/06/2022, 10:41 AMNiels Prins
09/06/2022, 1:16 PMAnna Geller
09/12/2022, 2:40 PM