Marc Lipoff

    Marc Lipoff

    5 months ago
    Whats the best way to have complex trigger logic? For example, I have 3 upstream tasks. I want the target task to run if: • upstream A. is a mapped task. it must have at least 1 successful child task • upstream B is success. • upstream C can be state.
    Kevin Kho

    Kevin Kho

    5 months ago
    I think this is very hard to accomplish. I was trying to make a custom trigger to distinguish but you don’t have access to which state came from which task at that point. My personal suggestion is to create an intermediate task with trigger
    always_run
    that takes these inputs and handles the
    True/False
    logic or raises SKIP for the downstream task. The trigger is just lacking as an interface for this kind of complex logic I think.
    Marc Lipoff

    Marc Lipoff

    5 months ago
    Cool. Let me try that.
    @Kevin Kho how do I access the inputted tasks' states, from within this trigger function?
    Kevin Kho

    Kevin Kho

    5 months ago
    Let me make an example one sec
    Oops sorry I got caught up in stuff. Working on this.
    I think this script should give ideas
    from prefect import Flow, task
    from prefect.triggers import always_run
    import prefect
    from prefect.engine.signals import FAIL, SKIP
    from prefect.exceptions import PrefectSignal
    
    @task
    def mapped_task(x):
        if x == 2:
            raise ValueError()
        else:
            return x + 1
    
    @task
    def other_task():
        return "succeeded"
    
    @task
    def state_upstream():
        raise ValueError("error")
    
    @task(trigger=always_run)
    def trigger_task(up_a, up_b, up_c):
        <http://prefect.context.logger.info|prefect.context.logger.info>(up_a)
        <http://prefect.context.logger.info|prefect.context.logger.info>(up_b)
        <http://prefect.context.logger.info|prefect.context.logger.info>(type(up_c))
        # get none error values
        cond_a = (len([x for x in up_a if not isinstance(x, BaseException)]) > 0)
        cond_b = isinstance(up_b, str)
        cond_c = isinstance(up_c, BaseException)
        if cond_a or cond_b or cond_c:
            return True
        else:
            raise SKIP
    
    @task
    def final_downstream(input):
        return input
    
    with Flow('...') as flow:
        task_a = mapped_task.map([1,2,3])
        task_b = other_task()
        task_c = state_upstream()
        run_next = trigger_task(task_a, task_b, task_c)
        final_downstream(run_next)
    
    flow.run()
    Marc Lipoff

    Marc Lipoff

    5 months ago
    cool. let me try this