https://prefect.io logo
m

Maity

07/26/2023, 1:23 AM
Hi there! Is it possible to filter while using map and return_state? I want to get all the states returned from a task map, and then pass the states onto another funciton so I may handle failed states. i.e:
# with each of the files, transform it into the APVM schema
apvms = transform_answer_to_apvm_schema.map(
answers,
unmapped(apvm_template),
unmapped(apvm_template_mappings),
return_state=True,
)
paths = write_apvm_to_storage.map(apvms, answers)
But in the above example, only the successful results will map to the write to storage function. Am I best off not using map and iterating through all the states using a loop?
n

Nate

07/26/2023, 4:47 AM
hi @Maity how would you like to handle failed states in terms of writing to storage? you can filter out whichever states you like thanks to
return_state=True
and handle each as you need to downstream does that answer your question?
Copy code
from prefect import flow, task
from prefect.states import Completed, Failed, State
from prefect.utilities.collections import listrepr

@task
def add_42(v: int) -> int:
    try:
        return v + 42
    except Exception as e:
        return Failed(message=f"eek!: {e!r}")

def handler(state) -> State:
    print(f"Handling state: {state!r}")
    return state
    
@task
def write_to_storage(v):
    print(f"Writing {v!r} to storage")

@flow(log_prints=True)
def foo():
    result_states = add_42.map([1, None, 3], return_state=True)

    completed, failed = (
        filter(lambda s: s.is_completed(), result_states),
        filter(lambda s: s.is_failed(), result_states),
    )
    
    failures = list(map(handler, failed))
    
    write_to_storage.map(completed)
    
    return Completed(message=f"Finished with some failures: {listrepr(failures)}")
    
if __name__ == "__main__":
    foo()
m

Maity

08/01/2023, 5:56 AM
thank you, that helped me out.