Dmitry Klionsky
02/02/2021, 11:45 AMimport random
import prefect
from prefect import task, Flow
from prefect import Parameter
from prefect.engine.signals import FAIL, LOOP, PAUSE
@task(max_retries=3, retry_delay=datetime.timedelta(seconds=5))
def get_ips() -> list:
if random.random() > 0.8:
raise FAIL("Failed to get IPs")
count = random.randint(2, 6)
ips = [f"IP-{i}" for i in range(1, count)]
print(ips)
return ips
@task
def seq_reduce(func, init: any, lst: list) -> any:
loop_payload = prefect.context.get("task_loop_result", {})
index = loop_payload.get("index", 0)
acc = loop_payload.get("acc", init)
if index >= len(lst):
return acc
acc = func(lst[index], acc)
raise LOOP(message=f"acc={acc} index={index}", result=dict(acc=acc, index=index+1))
def seq_map(func, lst: list) -> list:
def aux(x, acc: list) -> list:
acc.append(func(x))
return acc
return seq_reduce(aux, [], lst)
@task
def stop_ip(ip: str) -> None:
print("stop_ip: " + ip)
pass
@task
def wait_for_ip(ip: str) -> str:
print("wait_for_ip: " + ip)
return ip + "-new"
@task
def init_ip(ip: str) -> None:
print("init_ip: " + ip)
if random.random() > 0.5:
raise PAUSE("Initing IP failed. Waiting for human input")
return "Done"
with Flow("process_ip") as process_ip_flow:
old_ip = Parameter("old_ip")
stop_ip(old_ip)
new_ip = wait_for_ip(old_ip)
init_ip(new_ip)
def process_ip(old_ip):
state = process_ip_flow.run(old_ip=old_ip)
#process_ip_flow.visualize(flow_state=state)
return state.result[new_ip].result
with Flow("process_ips") as process_ips_flow:
old_ips = get_ips()
new_ips = seq_map(process_ip, old_ips)
if __name__ == "__main__":
state = process_ips_flow.run()
#process_ips_flow.visualize(flow_state=state)
print(state.result[new_ips].result)
Is the above what Prefect is designed for? Are there some other approaches?Jim Crist-Harif
02/02/2021, 3:21 PMDmitry Klionsky
02/02/2021, 3:25 PM