https://prefect.io logo
a

Alberto de Santos

10/19/2020, 10:02 AM
Given a Map (with the parameter
trigger
setup to
any_successful
), how could I deal with those tasks with the
TRIGGERFAIL
result? Could I make something like
if TRIGGERFAIL: then ...
?
n

nicholas

10/19/2020, 2:55 PM
Hi @Alberto de Santos - you could perhaps use 2 triggers, one for
any_successful
, and one to catch
TRIGGERFAIL
- you'll need a custom method for this, following the signature:
Copy code
trigger_fn(upstream_states: Set[State]) -> bool
You can read more about that here, here, and here.
a

Alberto de Santos

10/19/2020, 3:44 PM
Thanks, I read all those links. Let me again go through them and ask you questions!
However, @nicholas, the problem is that, when
trigger=any_successful
, I collect also those with
TRIGGERFAIL
. So I don’t see how to deal ONLY with those that were successful, neglecting those not successful. I can smell I am close to the answer 🙂
n

nicholas

10/19/2020, 3:56 PM
From your other thread: https://prefect-community.slack.com/archives/CL09KU1K7/p1602943349186800 but in this case you could add another task to handle successful like this:
Copy code
import prefect
from prefect import Flow, task

@task
def return_list():
    return [1, 2, 3, 4]

@task(trigger=prefect.triggers.all_successful)
def parse_value(val):
    if val % 2 != 0:
        raise ValueError("Value is not even!")
    return val

@task(trigger=prefect.triggers.any_failed)
def catch_error(val):
    print(f"Do something with this value error: {val}")

@task(trigger=prefect.triggers.all_successful)
def catch_success(val):
  print("Do something with successful")

with Flow("Raise error on Odd") as flow:
    my_list = return_list()
    def_list = parse_value.map(my_list)
    catch_error.map(def_list)
    catch_success.map(def_list)

flow.run()
a

Alberto de Santos

10/19/2020, 3:59 PM
Thanks! Although it is incomplete precisely the part I am interested in:
Copy code
@task(trigger= MISSING!!)
def catch_success(val):
Thanks :)
n

nicholas

10/19/2020, 3:59 PM
sorry, hit enter too soon
a

Alberto de Santos

10/19/2020, 4:02 PM
you are amazing! Thanks!
😄 1
[2020-10-19 160750] INFO - prefect.TaskRunner | Task ‘get_micro_adjustments[0]‘: Starting task run... [2020-10-19 160750] INFO - prefect.TaskRunner | Task ‘get_micro_adjustments[0]’: finished task run for task with final state: ‘Success’ [2020-10-19 160750] INFO - prefect.TaskRunner | Task ‘get_micro_adjustments[1]‘: Starting task run... [2020-10-19 160750] INFO - prefect.TaskRunner | Task ‘get_micro_adjustments[1]’: finished task run for task with final state: ‘TriggerFailed’ [2020-10-19 160751] INFO - prefect.TaskRunner | Task ‘get_micro_adjustments[2]‘: Starting task run... [2020-10-19 160751] INFO - prefect.TaskRunner | Task ‘get_micro_adjustments[2]’: finished task run for task with final state: ‘Success’
I tried your
catch_success
approach here (in a
map
context). However, as you see, there is still some
TriggerFailed
which I can’t explain.
n

nicholas

10/19/2020, 4:13 PM
Can you share some minimum code?
a

Alberto de Santos

10/19/2020, 4:13 PM
sure 🙂
Bbut only a minimum haha
Don’t want to be embarrased!
n

nicholas

10/19/2020, 4:14 PM
No need to feel embarrassed, we're all learning!
This'll help me figure out where something might be happening unexepctedly
a

Alberto de Santos

10/19/2020, 4:14 PM
sure!
Copy code
@task(trigger=all_successful)
def get_micro_adjustments(baco):
   return baco.get_micro_adjustments(baco.get_threshold())

@task(trigger=all_successful, log_stdout=True)
def reduce_micro_adjustments(micro_adjustments):
    # pd stands for Pandas
    total_micro_adjustments = pd.concat(micro_adjustments)
These are the Tasks
Copy code
# Definir el Flow
with Flow(self.name_flow, environment=env) as flow:

   map_of_baco_per_competicion = \
   self.create_baco.map(competicion = [competicion for competicion in listado_competiciones if competicion not in ['Baco', 'Olimpo', 'Zeus', 'Cronos']],
                   _config = unmapped(self._config))

   micro_adjustments = \
   self.get_micro_adjustments.map(baco = map_of_baco_per_competicion)

   self.reduce_micro_adjustments(micro_adjustments)
This is the flow
[2020-10-19 160752] INFO - prefect.TaskRunner | Task ‘reduce_micro_adjustments’: finished task run for task with final state: ‘TriggerFailed’ [2020-10-19 160752] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
Notice also that the
reduce_micro_adjustments
fails due to the
TriggerFailed
When I use
all_successful
, it fails because “some of the upstream tasks failed” When I use
any_successful
, it fails because “none of the upsteam tasks was successful” (which probably it is part in a portion of the
map
)
n

nicholas

10/19/2020, 4:47 PM
Hm, where is
micro_apuestas
coming from? I don't see that defined.
a

Alberto de Santos

10/19/2020, 4:48 PM
sorry, micro_apuestas -> micro_adjustments
now 🙂
n

nicholas

10/19/2020, 5:05 PM
Got it @Alberto de Santos - I misunderstood your question. What you can do instead is collect your failed tasks with some filter method, like:
Copy code
@task(trigger=some_successful)
def collect_failures(results):
  return filter(lambda r: r["success"] == True, results)
and then map over that list without a trigger:
Copy code
@task()
def some_operation(god):
  return {"god": god, "success": god is not 'Baco'}

with Flow("..") as flow:
  results = some_operation.map(['Baco', 'Olimpo', 'Zeus', 'Cronos'])
  
  failures = collect_failures(results)
  # then you can map over the failures with .map(failures)
But using whatever condition you need for failure
a

Alberto de Santos

10/19/2020, 8:45 PM
uhm! Very interesting and I really appreciate your help. Let me test it
By the way, Baco is a god. One of the best: The god of the wine, the orgies, the parties and also the god of the randomness/games/etc…
The word “Bacchanal” in english is after him
And by the way and based in your opinion: • Do you think what I am pursuing is worth doing? • Do you think there is a better way to do it?
n

nicholas

10/19/2020, 8:54 PM
And Bacchus in Latin. 🙂
or Dionysus in Greek 🙂
a

Alberto de Santos

10/19/2020, 8:55 PM
Exactly, Baco is in Spanish
😄 1
n

nicholas

10/19/2020, 8:56 PM
It depends on your end goal with your flow @Alberto de Santos - I don't think it's a terrible pattern to adopt but if it's intermittent failures that you're trying to avoid, I think it makes more sense to use something like retries
a

Alberto de Santos

10/19/2020, 8:59 PM
Thanks, I see, but in this case, if it fails, it would fail forever (in some cases, retries is useful, for sure), but not in this case
What is not clear to me is the following (don’t see it in the doc): • Difference between
any_successful
and
some_successful
And the difference between
triggers
and
if-else
, for instance. I am sure there is an obvious difference, but not to me
n

nicholas

10/19/2020, 9:02 PM
any_successful
does what you're expecting (any of the upstream tasks are successful) while
some_successful
(and
some_failed
) allows you to specify how many upstream tasks must be succesful, min or max
a

Alberto de Santos

10/19/2020, 9:04 PM
My understanding why
any_successful
is not working as expected with
map
is (imho) that when the map is done, some tasks can be all of them not successful, and then everything fails.
n

nicholas

10/19/2020, 9:12 PM
I'd encourage you to read the docs on each of those (triggers and conditionals) - there are definitely some similarities. That sounds correct @Alberto de Santos - if no tasks succeed, the downstream tasks will end in
TriggerFailed
states
a

Alberto de Santos

10/19/2020, 9:14 PM
Thanks, Nicholas, I promised I read the documentation (several times), but wasn’t enough for me (notice my profile is not so skilled in programming as people in this forum)
n

nicholas

10/19/2020, 9:16 PM
No worries at all! I would boil the difference between the two down to: Triggers are used to determine whether a task should run based on upstream states while Conditionals are used to determine whether a task should run based on upstream data
a

Alberto de Santos

10/19/2020, 9:21 PM
Amazing, thanks, then I have a clearer understanding that it is Trigger the thing I should use
Thanks!
n

nicholas

10/19/2020, 9:23 PM
😄
a

Alberto de Santos

10/19/2020, 10:42 PM
In the end (and obviously with your incredible help), I managed to do it!!
Lessons learned: • Built-in Triggers (
any_successful
, …) should be regarded as Vertical checks, in terms of assessing to what extent a sequences of
Tasks
have been successful or failed. •
Map
are many tasks but at the same time, and thus, when I was checking if them were or not successful, I was indeed checking different things. The tasks belonging to a
map
are just a level in a sequence of Vertical Tasks •
FilterTask
is what helped me filter out those Tasks coming from a non-TriggerFailed status.
Very helpful
n

nicholas

10/19/2020, 10:47 PM
You nailed it, well done!