Hey everyone! I’m trying to automate API calls using the map functionality. are the conditional tasks able to work with the mapping results or do I need to reduce the results before performing the condition
which will give a lot of good info about the mapping
k
Kevin Kho
06/02/2021, 6:08 PM
You can do something like this to check if the tasks returned Exceptions. Note that you need an
all_finished
trigger to make sure the downstream task runs
Copy code
import random
import prefect
from prefect import Flow, task
@task
def fail_sometimes(i):
# fail
if i == 2:
# We can either raise a signal
raise prefect.engine.signals.FAIL("FAILED")
else:
return i
# The default trigger is 'all_successful' and we need to allow failures upstream for
# this task to handle them robustly
@task(trigger=prefect.triggers.all_finished)
def check_what_failed(results):
failures = len([x for x in results if isinstance(x, Exception)])
successes = len(results) - len(failures)
return
with Flow(
"mapped-failures",
) as flow:
map_results = fail_sometimes.map([1,2,3,4])
check_what_failed(map_results)
flow.run()
j
John Ramirez
08/19/2021, 9:18 PM
thanks!
John Ramirez
08/19/2021, 9:22 PM
@Kevin Kho would pytest work inside prefect tasks for data validation?
k
Kevin Kho
08/19/2021, 9:23 PM
I have never seen pytest run as code. What are validating on Pandas dataframes?
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.