Thread
#prefect-community
    a

    Alberto de Santos

    1 year ago
    Given a Map (with the parameter
    trigger
    setup to
    any_successful
    ), how could I deal with those tasks with the
    TRIGGERFAIL
    result? Could I make something like
    if TRIGGERFAIL: then ...
    ?
    nicholas

    nicholas

    1 year ago
    Hi @Alberto de Santos - you could perhaps use 2 triggers, one for
    any_successful
    , and one to catch
    TRIGGERFAIL
    - you'll need a custom method for this, following the signature:
    trigger_fn(upstream_states: Set[State]) -> bool
    You can read more about that here, here, and here.
    a

    Alberto de Santos

    1 year ago
    Thanks, I read all those links. Let me again go through them and ask you questions!
    However, @nicholas, the problem is that, when
    trigger=any_successful
    , I collect also those with
    TRIGGERFAIL
    . So I don’t see how to deal ONLY with those that were successful, neglecting those not successful. I can smell I am close to the answer 🙂
    nicholas

    nicholas

    1 year ago
    From your other thread: https://prefect-community.slack.com/archives/CL09KU1K7/p1602943349186800 but in this case you could add another task to handle successful like this:
    import prefect
    from prefect import Flow, task
    
    @task
    def return_list():
        return [1, 2, 3, 4]
    
    @task(trigger=prefect.triggers.all_successful)
    def parse_value(val):
        if val % 2 != 0:
            raise ValueError("Value is not even!")
        return val
    
    @task(trigger=prefect.triggers.any_failed)
    def catch_error(val):
        print(f"Do something with this value error: {val}")
    
    @task(trigger=prefect.triggers.all_successful)
    def catch_success(val):
      print("Do something with successful")
    
    with Flow("Raise error on Odd") as flow:
        my_list = return_list()
        def_list = parse_value.map(my_list)
        catch_error.map(def_list)
        catch_success.map(def_list)
    
    flow.run()
    a

    Alberto de Santos

    1 year ago
    Thanks! Although it is incomplete precisely the part I am interested in:
    @task(trigger= MISSING!!)
    def catch_success(val):
    Thanks 😃
    nicholas

    nicholas

    1 year ago
    sorry, hit enter too soon
    a

    Alberto de Santos

    1 year ago
    you are amazing! Thanks!
    [2020-10-19 16:07:50] INFO - prefect.TaskRunner | Task ‘get_micro_adjustments[0]‘: Starting task run... [2020-10-19 16:07:50] INFO - prefect.TaskRunner | Task ‘get_micro_adjustments[0]’: finished task run for task with final state: ‘Success’ [2020-10-19 16:07:50] INFO - prefect.TaskRunner | Task ‘get_micro_adjustments[1]‘: Starting task run... [2020-10-19 16:07:50] INFO - prefect.TaskRunner | Task ‘get_micro_adjustments[1]’: finished task run for task with final state: ‘TriggerFailed’ [2020-10-19 16:07:51] INFO - prefect.TaskRunner | Task ‘get_micro_adjustments[2]‘: Starting task run... [2020-10-19 16:07:51] INFO - prefect.TaskRunner | Task ‘get_micro_adjustments[2]’: finished task run for task with final state: ‘Success’
    I tried your
    catch_success
    approach here (in a
    map
    context). However, as you see, there is still some
    TriggerFailed
    which I can’t explain.
    nicholas

    nicholas

    1 year ago
    Can you share some minimum code?
    a

    Alberto de Santos

    1 year ago
    sure 🙂
    Bbut only a minimum haha
    Don’t want to be embarrased!
    nicholas

    nicholas

    1 year ago
    No need to feel embarrassed, we're all learning!
    This'll help me figure out where something might be happening unexepctedly
    a

    Alberto de Santos

    1 year ago
    sure!
    @task(trigger=all_successful)
    def get_micro_adjustments(baco):
       return baco.get_micro_adjustments(baco.get_threshold())
    
    @task(trigger=all_successful, log_stdout=True)
    def reduce_micro_adjustments(micro_adjustments):
        # pd stands for Pandas
        total_micro_adjustments = pd.concat(micro_adjustments)
    These are the Tasks
    # Definir el Flow
    with Flow(self.name_flow, environment=env) as flow:
    
       map_of_baco_per_competicion = \
       self.create_baco.map(competicion = [competicion for competicion in listado_competiciones if competicion not in ['Baco', 'Olimpo', 'Zeus', 'Cronos']],
                       _config = unmapped(self._config))
    
       micro_adjustments = \
       self.get_micro_adjustments.map(baco = map_of_baco_per_competicion)
    
       self.reduce_micro_adjustments(micro_adjustments)
    This is the flow
    [2020-10-19 16:07:52] INFO - prefect.TaskRunner | Task ‘reduce_micro_adjustments’: finished task run for task with final state: ‘TriggerFailed’ [2020-10-19 16:07:52] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
    Notice also that the
    reduce_micro_adjustments
    fails due to the
    TriggerFailed
    When I use
    all_successful
    , it fails because “some of the upstream tasks failed” When I use
    any_successful
    , it fails because “none of the upsteam tasks was successful” (which probably it is part in a portion of the
    map
    )
    nicholas

    nicholas

    1 year ago
    Hm, where is
    micro_apuestas
    coming from? I don't see that defined.
    a

    Alberto de Santos

    1 year ago
    sorry, micro_apuestas -> micro_adjustments
    now 🙂
    nicholas

    nicholas

    1 year ago
    Got it @Alberto de Santos - I misunderstood your question. What you can do instead is collect your failed tasks with some filter method, like:
    @task(trigger=some_successful)
    def collect_failures(results):
      return filter(lambda r: r["success"] == True, results)
    and then map over that list without a trigger:
    @task()
    def some_operation(god):
      return {"god": god, "success": god is not 'Baco'}
    
    with Flow("..") as flow:
      results = some_operation.map(['Baco', 'Olimpo', 'Zeus', 'Cronos'])
      
      failures = collect_failures(results)
      # then you can map over the failures with .map(failures)
    But using whatever condition you need for failure
    a

    Alberto de Santos

    1 year ago
    uhm! Very interesting and I really appreciate your help. Let me test it
    By the way, Baco is a god. One of the best: The god of the wine, the orgies, the parties and also the god of the randomness/games/etc…
    The word “Bacchanal” in english is after him
    And by the way and based in your opinion: • Do you think what I am pursuing is worth doing? • Do you think there is a better way to do it?
    nicholas

    nicholas

    1 year ago
    And Bacchus in Latin. 🙂
    or Dionysus in Greek 🙂
    a

    Alberto de Santos

    1 year ago
    Exactly, Baco is in Spanish
    nicholas

    nicholas

    1 year ago
    It depends on your end goal with your flow @Alberto de Santos - I don't think it's a terrible pattern to adopt but if it's intermittent failures that you're trying to avoid, I think it makes more sense to use something like retries
    a

    Alberto de Santos

    1 year ago
    Thanks, I see, but in this case, if it fails, it would fail forever (in some cases, retries is useful, for sure), but not in this case
    What is not clear to me is the following (don’t see it in the doc): • Difference between
    any_successful
    and
    some_successful
    And the difference between
    triggers
    and
    if-else
    , for instance. I am sure there is an obvious difference, but not to me
    nicholas

    nicholas

    1 year ago
    any_successful
    does what you're expecting (any of the upstream tasks are successful) while
    some_successful
    (and
    some_failed
    ) allows you to specify how many upstream tasks must be succesful, min or max
    a

    Alberto de Santos

    1 year ago
    My understanding why
    any_successful
    is not working as expected with
    map
    is (imho) that when the map is done, some tasks can be all of them not successful, and then everything fails.
    nicholas

    nicholas

    1 year ago
    I'd encourage you to read the docs on each of those (triggers and conditionals) - there are definitely some similarities. That sounds correct @Alberto de Santos - if no tasks succeed, the downstream tasks will end in
    TriggerFailed
    states
    a

    Alberto de Santos

    1 year ago
    Thanks, Nicholas, I promised I read the documentation (several times), but wasn’t enough for me (notice my profile is not so skilled in programming as people in this forum)
    nicholas

    nicholas

    1 year ago
    No worries at all! I would boil the difference between the two down to:Triggers are used to determine whether a task should run based on upstream states while Conditionals are used to determine whether a task should run based on upstream data
    a

    Alberto de Santos

    1 year ago
    Amazing, thanks, then I have a clearer understanding that it is Trigger the thing I should use
    Thanks!
    nicholas

    nicholas

    1 year ago
    😄
    a

    Alberto de Santos

    1 year ago
    In the end (and obviously with your incredible help), I managed to do it!!
    Lessons learned: • Built-in Triggers (
    any_successful
    , …) should be regarded as Vertical checks, in terms of assessing to what extent a sequences of
    Tasks
    have been successful or failed. •
    Map
    are many tasks but at the same time, and thus, when I was checking if them were or not successful, I was indeed checking different things. The tasks belonging to a
    map
    are just a level in a sequence of Vertical Tasks •
    FilterTask
    is what helped me filter out those Tasks coming from a non-TriggerFailed status.
    Very helpful
    nicholas

    nicholas

    1 year ago
    You nailed it, well done!