https://prefect.io logo
d

Dmitry Klionsky

02/02/2021, 11:45 AM
Hello, I try to understand how applicable Prefect as a deployment tool. In a simplest form what I'm trying to achieve is this: 1. Read IP list of an unknown length (2, 3, 42, etc) 2. For each IP run a number of tasks sequentially (forbidden to run in parallel): 2.1 stop IP 2.2 wait new IP 2.3 init new IP Additional requirements: * Some task should be retryable (done) * Some task should be manually recoverable (done) Here's what I came up with:
Copy code
import random

import prefect
from prefect import task, Flow
from prefect import Parameter
from prefect.engine.signals import FAIL, LOOP, PAUSE

@task(max_retries=3, retry_delay=datetime.timedelta(seconds=5))
def get_ips() -> list:
    if random.random() > 0.8:
        raise FAIL("Failed to get IPs")
    count = random.randint(2, 6)
    ips = [f"IP-{i}" for i in range(1, count)]
    print(ips)
    return ips

@task
def seq_reduce(func, init: any, lst: list) -> any:
    loop_payload = prefect.context.get("task_loop_result", {})

    index = loop_payload.get("index", 0)
    acc = loop_payload.get("acc", init)

    if index >= len(lst):
        return acc

    acc = func(lst[index], acc)
    raise LOOP(message=f"acc={acc} index={index}", result=dict(acc=acc, index=index+1))

def seq_map(func, lst: list) -> list:
    def aux(x, acc: list) -> list:
        acc.append(func(x))
        return acc
    return seq_reduce(aux, [], lst)

@task
def stop_ip(ip: str) -> None:
    print("stop_ip: " + ip)
    pass

@task
def wait_for_ip(ip: str) -> str:
    print("wait_for_ip: " + ip)
    return ip + "-new"

@task
def init_ip(ip: str) -> None:
    print("init_ip: " + ip)
    if random.random() > 0.5:
        raise PAUSE("Initing IP failed. Waiting for human input")
    return "Done"

with Flow("process_ip") as process_ip_flow:
    old_ip = Parameter("old_ip")
    stop_ip(old_ip)
    new_ip = wait_for_ip(old_ip)
    init_ip(new_ip)

def process_ip(old_ip):
    state = process_ip_flow.run(old_ip=old_ip)
    #process_ip_flow.visualize(flow_state=state)
    return state.result[new_ip].result
    
with Flow("process_ips") as process_ips_flow:
    old_ips = get_ips()
    new_ips = seq_map(process_ip, old_ips)

if __name__ == "__main__":
    state = process_ips_flow.run()
    #process_ips_flow.visualize(flow_state=state)
    print(state.result[new_ips].result)
Is the above what Prefect is designed for? Are there some other approaches?
j

Jim Crist-Harif

02/02/2021, 3:21 PM
Hi Dmitry, Using Prefect as a deployment tool should be fine. Prefect is designed to run general workflows - if you can express your workflow in Prefect tasks then you can do whatever you want with it. Internally we have some flows that are used for managing parts of our infrastructure.
Also, in the future please avoid posting large blocks of text in the top-level of the thread. It makes it harder to read the slack channel. We prefer if you have a large code block that you want to post to add it as a comment inside the thread.
d

Dmitry Klionsky

02/02/2021, 3:25 PM
Ah, thanks!