Alberto de Santos
10/19/2020, 10:02 AMtrigger setup to any_successful ), how could I deal with those tasks with the TRIGGERFAIL result? Could I make something like if TRIGGERFAIL: then ... ?nicholas
Alberto de Santos
10/19/2020, 3:44 PMAlberto de Santos
10/19/2020, 3:50 PMtrigger=any_successful , I collect also those with TRIGGERFAIL . So I don’t see how to deal ONLY with those that were successful, neglecting those not successful.
I can smell I am close to the answer 🙂nicholas
nicholas
import prefect
from prefect import Flow, task
@task
def return_list():
return [1, 2, 3, 4]
@task(trigger=prefect.triggers.all_successful)
def parse_value(val):
if val % 2 != 0:
raise ValueError("Value is not even!")
return val
@task(trigger=prefect.triggers.any_failed)
def catch_error(val):
print(f"Do something with this value error: {val}")
@task(trigger=prefect.triggers.all_successful)
def catch_success(val):
print("Do something with successful")
with Flow("Raise error on Odd") as flow:
my_list = return_list()
def_list = parse_value.map(my_list)
catch_error.map(def_list)
catch_success.map(def_list)
flow.run()Alberto de Santos
10/19/2020, 3:59 PM@task(trigger= MISSING!!)
def catch_success(val):
Thanks :)nicholas
Alberto de Santos
10/19/2020, 4:02 PMAlberto de Santos
10/19/2020, 4:12 PMAlberto de Santos
10/19/2020, 4:13 PMcatch_success approach here (in a map context). However, as you see, there is still some TriggerFailed which I can’t explain.nicholas
Alberto de Santos
10/19/2020, 4:13 PMAlberto de Santos
10/19/2020, 4:13 PMAlberto de Santos
10/19/2020, 4:14 PMnicholas
nicholas
Alberto de Santos
10/19/2020, 4:14 PMAlberto de Santos
10/19/2020, 4:16 PM@task(trigger=all_successful)
def get_micro_adjustments(baco):
return baco.get_micro_adjustments(baco.get_threshold())
@task(trigger=all_successful, log_stdout=True)
def reduce_micro_adjustments(micro_adjustments):
# pd stands for Pandas
total_micro_adjustments = pd.concat(micro_adjustments)Alberto de Santos
10/19/2020, 4:16 PMAlberto de Santos
10/19/2020, 4:17 PM# Definir el Flow
with Flow(self.name_flow, environment=env) as flow:
map_of_baco_per_competicion = \
self.create_baco.map(competicion = [competicion for competicion in listado_competiciones if competicion not in ['Baco', 'Olimpo', 'Zeus', 'Cronos']],
_config = unmapped(self._config))
micro_adjustments = \
self.get_micro_adjustments.map(baco = map_of_baco_per_competicion)
self.reduce_micro_adjustments(micro_adjustments)Alberto de Santos
10/19/2020, 4:17 PMAlberto de Santos
10/19/2020, 4:18 PMAlberto de Santos
10/19/2020, 4:18 PMreduce_micro_adjustments fails due to the TriggerFailedAlberto de Santos
10/19/2020, 4:37 PMAlberto de Santos
10/19/2020, 4:38 PMall_successful , it fails because “some of the upstream tasks failed”
When I use any_successful , it fails because “none of the upsteam tasks was successful” (which probably it is part in a portion of the map )nicholas
micro_apuestas coming from? I don't see that defined.Alberto de Santos
10/19/2020, 4:48 PMAlberto de Santos
10/19/2020, 4:48 PMnicholas
@task(trigger=some_successful)
def collect_failures(results):
return filter(lambda r: r["success"] == True, results)
and then map over that list without a trigger:
@task()
def some_operation(god):
return {"god": god, "success": god is not 'Baco'}
with Flow("..") as flow:
results = some_operation.map(['Baco', 'Olimpo', 'Zeus', 'Cronos'])
failures = collect_failures(results)
# then you can map over the failures with .map(failures)nicholas
Alberto de Santos
10/19/2020, 8:45 PMAlberto de Santos
10/19/2020, 8:46 PMAlberto de Santos
10/19/2020, 8:46 PMAlberto de Santos
10/19/2020, 8:50 PMnicholas
nicholas
Alberto de Santos
10/19/2020, 8:55 PMnicholas
Alberto de Santos
10/19/2020, 8:59 PMAlberto de Santos
10/19/2020, 9:00 PMany_successful and some_successfulAlberto de Santos
10/19/2020, 9:00 PMtriggers and if-else , for instance. I am sure there is an obvious difference, but not to menicholas
any_successful does what you're expecting (any of the upstream tasks are successful) while some_successful (and some_failed ) allows you to specify how many upstream tasks must be succesful, min or maxAlberto de Santos
10/19/2020, 9:04 PMany_successful is not working as expected with map is (imho) that when the map is done, some tasks can be all of them not successful, and then everything fails.nicholas
TriggerFailed statesAlberto de Santos
10/19/2020, 9:14 PMnicholas
Alberto de Santos
10/19/2020, 9:21 PMAlberto de Santos
10/19/2020, 9:22 PMnicholas
Alberto de Santos
10/19/2020, 10:42 PMAlberto de Santos
10/19/2020, 10:47 PMany_successful , …) should be regarded as Vertical checks, in terms of assessing to what extent a sequences of Tasks have been successful or failed.
• Map are many tasks but at the same time, and thus, when I was checking if them were or not successful, I was indeed checking different things. The tasks belonging to a map are just a level in a sequence of Vertical Tasks
• FilterTask is what helped me filter out those Tasks coming from a non-TriggerFailed status.Alberto de Santos
10/19/2020, 10:47 PMnicholas