Alberto de Santos
10/19/2020, 10:02 AMtrigger
setup to any_successful
), how could I deal with those tasks with the TRIGGERFAIL
result? Could I make something like if TRIGGERFAIL: then ...
?nicholas
Alberto de Santos
10/19/2020, 3:44 PMtrigger=any_successful
, I collect also those with TRIGGERFAIL
. So I don’t see how to deal ONLY with those that were successful, neglecting those not successful.
I can smell I am close to the answer 🙂nicholas
import prefect
from prefect import Flow, task
@task
def return_list():
return [1, 2, 3, 4]
@task(trigger=prefect.triggers.all_successful)
def parse_value(val):
if val % 2 != 0:
raise ValueError("Value is not even!")
return val
@task(trigger=prefect.triggers.any_failed)
def catch_error(val):
print(f"Do something with this value error: {val}")
@task(trigger=prefect.triggers.all_successful)
def catch_success(val):
print("Do something with successful")
with Flow("Raise error on Odd") as flow:
my_list = return_list()
def_list = parse_value.map(my_list)
catch_error.map(def_list)
catch_success.map(def_list)
flow.run()
Alberto de Santos
10/19/2020, 3:59 PM@task(trigger= MISSING!!)
def catch_success(val):
Thanks :)nicholas
Alberto de Santos
10/19/2020, 4:02 PMcatch_success
approach here (in a map
context). However, as you see, there is still some TriggerFailed
which I can’t explain.nicholas
Alberto de Santos
10/19/2020, 4:13 PMnicholas
Alberto de Santos
10/19/2020, 4:14 PM@task(trigger=all_successful)
def get_micro_adjustments(baco):
return baco.get_micro_adjustments(baco.get_threshold())
@task(trigger=all_successful, log_stdout=True)
def reduce_micro_adjustments(micro_adjustments):
# pd stands for Pandas
total_micro_adjustments = pd.concat(micro_adjustments)
# Definir el Flow
with Flow(self.name_flow, environment=env) as flow:
map_of_baco_per_competicion = \
self.create_baco.map(competicion = [competicion for competicion in listado_competiciones if competicion not in ['Baco', 'Olimpo', 'Zeus', 'Cronos']],
_config = unmapped(self._config))
micro_adjustments = \
self.get_micro_adjustments.map(baco = map_of_baco_per_competicion)
self.reduce_micro_adjustments(micro_adjustments)
reduce_micro_adjustments
fails due to the TriggerFailed
all_successful
, it fails because “some of the upstream tasks failed”
When I use any_successful
, it fails because “none of the upsteam tasks was successful” (which probably it is part in a portion of the map
)nicholas
micro_apuestas
coming from? I don't see that defined.Alberto de Santos
10/19/2020, 4:48 PMnicholas
@task(trigger=some_successful)
def collect_failures(results):
return filter(lambda r: r["success"] == True, results)
and then map over that list without a trigger:
@task()
def some_operation(god):
return {"god": god, "success": god is not 'Baco'}
with Flow("..") as flow:
results = some_operation.map(['Baco', 'Olimpo', 'Zeus', 'Cronos'])
failures = collect_failures(results)
# then you can map over the failures with .map(failures)
Alberto de Santos
10/19/2020, 8:45 PMnicholas
Alberto de Santos
10/19/2020, 8:55 PMnicholas
Alberto de Santos
10/19/2020, 8:59 PMany_successful
and some_successful
triggers
and if-else
, for instance. I am sure there is an obvious difference, but not to menicholas
any_successful
does what you're expecting (any of the upstream tasks are successful) while some_successful
(and some_failed
) allows you to specify how many upstream tasks must be succesful, min or maxAlberto de Santos
10/19/2020, 9:04 PMany_successful
is not working as expected with map
is (imho) that when the map is done, some tasks can be all of them not successful, and then everything fails.nicholas
TriggerFailed
statesAlberto de Santos
10/19/2020, 9:14 PMnicholas
Alberto de Santos
10/19/2020, 9:21 PMnicholas
Alberto de Santos
10/19/2020, 10:42 PMany_successful
, …) should be regarded as Vertical checks, in terms of assessing to what extent a sequences of Tasks
have been successful or failed.
• Map
are many tasks but at the same time, and thus, when I was checking if them were or not successful, I was indeed checking different things. The tasks belonging to a map
are just a level in a sequence of Vertical Tasks
• FilterTask
is what helped me filter out those Tasks coming from a non-TriggerFailed status.nicholas