https://prefect.io logo
#prefect-community
Title
# prefect-community
n

Niels Prins

09/05/2022, 1:58 PM
Hello all, could use some advice. I have the following usecase, I'm getting a list of Delta's from een external API, these delta's need to be processed in Sequence, n + 1 cannot start before n has finnished and should not start if n fails. Processing a delta requires multiple tasks (2) that can be processed in parallel and a clean up task. I want the possibility to restart a subtask if it fails. What would be the propper construnct? I have explored LOOP, this will run through the tasks in sequence and a restart results in restarting from the first deltat on. I have tried a Flow of Flows, this will not allow for the dynamic tasks. Combining them resulted in a sub flow that was run for each delta in sequence, but a failure in the sub flow gives the complete end of the Looping task. Restarting results in running from the beginnen etc. Is there a better option for this usecase in prefect 1+?
1
a

Anna Geller

09/05/2022, 9:05 PM
can you move the code block to the thread? this helps us to keep the main channel cleaner
🙌 1
to answer your question: how would you approach it in Python without Prefect? This would be a good direction to consider. Not relying on Prefect 1 constructs such as LOOP will also make it easier to migrate to Prefect 2
n

Niels Prins

09/06/2022, 8:53 AM
Code snippet from original question See code snippet for what I got right now
Copy code
import time
from typing import List
from uuid import uuid4

import prefect
from prefect import Flow, Parameter, Task, task
from prefect.engine.signals import FAIL, LOOP, signal_from_state
from prefect.executors import LocalDaskExecutor
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run


@task
def get_deltas() -> List:
    return [1, 2, 3, 4]


class A(Task):
    def run(self, v):
        <http://self.logger.info|self.logger.info>(f"{v} A")
        if v == 3:
            raise ValueError("This is not the value you where looking for")
        for i in range(4):
            time.sleep(1)
            <http://self.logger.info|self.logger.info>(f"{v} - {i}")
        return "A"


class B(Task):
    def run(self, v):
        <http://self.logger.info|self.logger.info>(f"{v} B")
        time.sleep(5)
        return "B"


class C(Task):
    def run(
        self,
    ):
        <http://self.logger.info|self.logger.info>("C")
        time.sleep(2)
        return "C"


class Sequenced(Task):
    def run(self, ds: List):
        LOG = prefect.context.get("logger")
        loop_payload = prefect.context.get("task_loop_result", {"ds": ds})
        ds = loop_payload["ds"]
        if not ds:
            return

        d = ds[0]
        <http://LOG.info|LOG.info>(d)
        # setting this is required, if not the flow is detected as already run, and will not be started
        idempotency_key = str(uuid4())
        id = create_flow_run.run(
            idempotency_key=idempotency_key,
            flow_name="sub",
            project_name="test",
            parameters={"delta_id": d},
        )
        LOG.warning(id)
        # raising final state will result in the final state of this task being raised
        # this will lead to not calling the loop value
        flow_run = wait_for_flow_run.run(id, stream_logs=True, raise_final_state=False)

        state_signal = signal_from_state(flow_run.state)(
            message=f"{id} finished in state {flow_run.state}",
            result=flow_run,
        )
        if isinstance(state_signal, FAIL):
            raise state_signal

        raise LOOP(message="", result={"ds": ds[1:]})


with Flow(name="sub", executor=LocalDaskExecutor()) as sub:
    delta_id = Parameter(name="delta_id", required=True)
    a = A()(delta_id)
    b = B()(delta_id)
    C()(upstream_tasks=[a, b])


with Flow(name="main") as flow:

    deltas = get_deltas()
    Sequenced()(deltas)

if __name__ == "__main__":
    sub.register("test")
    flow.register("test")
    flow.run()
I would recursively apply the function on the head of the list. Made a POC of this, staring a new flow on succesfull completion of the delta process. Was fun, but we decided to move onther way, which was fitting for the use case. The whole thing gave me the oppertunity to talk about migrating again. At least that is now back on the table =D
🙌 1
🙏 1
a

Anna Geller

09/06/2022, 10:41 AM
definitely something that would be much easier to do in Prefect 2, especially if you got it working in Python already. Looping is definitely a viable solution here for Prefect 1, too - LMK if there's anything we can help with. It seems like you have a good grasp on how to handle it and your code looks good
n

Niels Prins

09/06/2022, 1:16 PM
gratitude thank you
🙌 1
jeeej, GREEN LIGHT for starting a POC with prefect 2 on k8s ;D
❤️ 1
a

Anna Geller

09/12/2022, 2:40 PM
Great to hear that! 🙌 You can reach out to cs@prefect.io if you need help from infrastructure experts for your PoC
🙌 1