Thread
#prefect-community
    c

    Christopher

    7 months ago
    Hi, I'm looking for a workflow management solution that we can use now to replace our homebrew scripts. I've seen Orion which looks like it would tick our boxes, but I'm aware it's not ready for production use yet. So I have two questions about prefect core:1. Is it possible to have cyclic workflows? I have a flow which transforms input records into a smaller number of output records. I'd like to then process the output records with the same flow again until it outputs no records. 2. Is it possible to have event-driven flows? We've got records coming into a queue which ideally I'd process when the queue size reaches a certain size. I have read the PINs and suspect the answer is no, but just wanted to check. I can work around it by periodically scheduling a flow that checks the size of the queue before exiting if it's too small
    Anna Geller

    Anna Geller

    7 months ago
    Hi! Regarding #1, Prefect Core treats flows as acyclic computational graphs. However, I don’t believe you need them to be cyclic to solve the use case you described. There are a couple of options you could try in Prefect Core: a) If you need to apply the same code over a number of inputs more than once, you can certainly do so by calling the same task twice:
    from prefect import task, Flow
    
    
    @task
    def custom_business_logic(x):
        return x + 1
    
    
    with Flow("hello") as flow:
        data_result = custom_business_logic(42)
        custom_business_logic(data_result)
    b) if you require to build this cyclic workflow in a while-loop pattern, you can use task looping c) my recommendations and the best approach I see for your use case would be to leverage mapping which allows to call the same task multiple times over various inputs and it even does so in parallel
    from prefect import task, Flow
    from prefect.executors import LocalDaskExecutor
    
    
    @task
    def generate_random_numbers():
        return list(range(1, 200))
    
    
    @task
    def add_one(x):
        return x + 1
    
    
    @task(log_stdout=True)
    def print_results(res):
        print(res)
    
    
    with Flow("mapping", executor=LocalDaskExecutor()) as flow:
        numbers = generate_random_numbers()
        result = add_one.map(numbers)
        result_2 = add_one.map(result)
        print_results(result_2)
    Regarding #2 - check out this Discourse topic - it’s 100% possible and you can even implement it in various ways
    c

    Christopher

    7 months ago
    Hi, thank you for those examples. I think you're right that between mapping and looping, we can likely model what we need. Thanks again!