Eli Treuherz
09/12/2022, 9:01 AMraise SKIP("not applicable")
in order to skip a task? If I just return Cancelled(message="not applicable")
instead of returning my task’s actual data, will that have the same effect?Rob Freedy
09/12/2022, 1:28 PMCancelled
has the functionality you are looking for! From the docs: "The run did not complete because a user determined that it should not."
https://docs.prefect.io/concepts/states/?h=states#state-typesEli Treuherz
09/12/2022, 1:32 PMCancelled
state, my confusion is that in Prefect 1 I would raise
a state as an exception in order to set the task run to that state. In Prefect 2 can I return
a state to do the same thing?Rob Freedy
09/12/2022, 1:35 PMEli Treuherz
09/12/2022, 1:36 PMAnna Geller
from prefect import task, flow
from prefect.orion.schemas.states import Completed
@task
def extract_data_from_api():
return {"result": None}
@task
def process_data(data):
if isinstance(data["result"], dict):
pass # do some processing
@flow
def return_state_manually():
raw_data = extract_data_from_api()
if raw_data["result"] is None:
return Completed(message="No new data available, end the run early")
else:
process_data(raw_data)
if __name__ == "__main__":
return_state_manually()
Rob Freedy
09/12/2022, 3:00 PMCancelled
state that you can follow here: https://github.com/PrefectHQ/prefect/issues/6789
Thanks for helping us find this bug!! A work around for this could be to return the Completed
state with the correct message just like in the example above.Eli Treuherz
09/12/2022, 3:18 PM@task
def lambda(request):
# invoke lambda here, parse respoonse to `payload`
if payload.not_applicable:
return Cancelled("not applicable")
return payload.calculated
@flow
def flow():
requests = generate_requests()
for request in requests:
fut = lambda.submit(request)
if fut.wait().is_completed():
save_to_postgres.submit(fut.result())
but my flow runs were still marked as failed. It looks like Cancelled
is treated as a failed state for the purposes of determining whether a flow failed overall, which the docs don’t mention!
I’ll try the None
-returning method you suggestedAnna Geller
Eli Treuherz
09/12/2022, 3:38 PMNone
from the task and checking it in the flow with
for request in requests:
output = lambda(request)
if output.result() is not None:
save_to_postgres(output)
seems to have worked great. Thanks for all your help!lambda
fails and enters a retry loop, previously that wouldn’t be a problem as the for-loop in the flow would have submitted both lambda
and save_to_postgres
tasks asynchronously and would be able to carry on just fine. Now, result()
blocks the floor loop so my flow can’t run concurrently any moreNone
check into the save_to_postgres
task, but that feels sort of weird, scheduling a task that might be a no-op. This feels like something the SKIP signal in Prefect 1 handled fine and it’s harder to get the same behaviour than I expectedlambda
fails, the whole flow fails there because it hasn’t submitted anything after it yet