Maity
07/26/2023, 1:23 AM# with each of the files, transform it into the APVM schema
apvms = transform_answer_to_apvm_schema.map(
answers,
unmapped(apvm_template),
unmapped(apvm_template_mappings),
return_state=True,
)
paths = write_apvm_to_storage.map(apvms, answers)
But in the above example, only the successful results will map to the write to storage function. Am I best off not using map and iterating through all the states using a loop?Nate
07/26/2023, 4:47 AMreturn_state=True
and handle each as you need to downstream
does that answer your question?from prefect import flow, task
from prefect.states import Completed, Failed, State
from prefect.utilities.collections import listrepr
@task
def add_42(v: int) -> int:
try:
return v + 42
except Exception as e:
return Failed(message=f"eek!: {e!r}")
def handler(state) -> State:
print(f"Handling state: {state!r}")
return state
@task
def write_to_storage(v):
print(f"Writing {v!r} to storage")
@flow(log_prints=True)
def foo():
result_states = add_42.map([1, None, 3], return_state=True)
completed, failed = (
filter(lambda s: s.is_completed(), result_states),
filter(lambda s: s.is_failed(), result_states),
)
failures = list(map(handler, failed))
write_to_storage.map(completed)
return Completed(message=f"Finished with some failures: {listrepr(failures)}")
if __name__ == "__main__":
foo()
Maity
08/01/2023, 5:56 AM