https://prefect.io logo
e

Eli Treuherz

09/12/2022, 9:01 AM
Is there a Prefect 2 equivalent to doing
raise SKIP("not applicable")
in order to skip a task? If I just
return Cancelled(message="not applicable")
instead of returning my task’s actual data, will that have the same effect?
1
r

Rob Freedy

09/12/2022, 1:28 PM
I believe
Cancelled
has the functionality you are looking for! From the docs: "The run did not complete because a user determined that it should not." https://docs.prefect.io/concepts/states/?h=states#state-types
e

Eli Treuherz

09/12/2022, 1:32 PM
Sorry, I wasn’t clear enough there. I get that I want the
Cancelled
state, my confusion is that in Prefect 1 I would
raise
a state as an exception in order to set the task run to that state. In Prefect 2 can I
return
a state to do the same thing?
r

Rob Freedy

09/12/2022, 1:35 PM
Yep! You can manually return a state, there's some examples here: https://docs.prefect.io/concepts/flows/?h=state#return-a-manual-state
e

Eli Treuherz

09/12/2022, 1:36 PM
Those docs show returning a state in flows, it’s not obvious from them whether the same thing can be done in tasks
@Rob Freedy I gave this a go, and returning the state in the task works fine! The problem is that my flow is overall marked as Failed and I’m not sure why. There are no Failed or Crashed tasks, there are just Completed, Cancelled, and NotReady tasks downstream of the cancelled ones. Are these NotReady tasks failing the run?
a

Anna Geller

09/12/2022, 2:46 PM
Hi Eli, you can totally return states, check out the draft of this blog post (not released yet but might be already helpful to you) https://medium.com/p/5eaabdfbb70e#bac0-f33b95e8f816
this seems to match your use case with skipping:
Copy code
from prefect import task, flow
from prefect.orion.schemas.states import Completed


@task
def extract_data_from_api():
    return {"result": None}


@task
def process_data(data):
    if isinstance(data["result"], dict):
        pass  # do some processing


@flow
def return_state_manually():
    raw_data = extract_data_from_api()
    if raw_data["result"] is None:
        return Completed(message="No new data available, end the run early")
    else:
        process_data(raw_data)


if __name__ == "__main__":
    return_state_manually()
upvote 2
r

Rob Freedy

09/12/2022, 3:00 PM
We also discovered a bug when raising the
Cancelled
state that you can follow here: https://github.com/PrefectHQ/prefect/issues/6789 Thanks for helping us find this bug!! A work around for this could be to return the
Completed
state with the correct message just like in the example above.
👍 1
🙏 1
e

Eli Treuherz

09/12/2022, 3:18 PM
Thanks Anna, I ’ve just ended up doing something like this
Copy code
@task
def lambda(request):
    # invoke lambda here, parse respoonse to `payload`
    if payload.not_applicable:
        return Cancelled("not applicable")
    return payload.calculated

@flow
def flow():
    requests = generate_requests()
    for request in requests:
        fut = lambda.submit(request)
        if fut.wait().is_completed():
            save_to_postgres.submit(fut.result())
but my flow runs were still marked as failed. It looks like
Cancelled
is treated as a failed state for the purposes of determining whether a flow failed overall, which the docs don’t mention! I’ll try the
None
-returning method you suggested
a

Anna Geller

09/12/2022, 3:35 PM
Great point Eli and well done! The docs mention here https://docs.prefect.io/concepts/states/ that the Cancelled state indicates "The run did not complete because a user determined that it should not." - it's essentially interrupting and failing the run early because you determined that it should not continue -- but I understand what you mean, in Prefect 1.0 Cancelled was marked in the UI as grey to indicate it's neither success, nor failure
it's a tricky one because it's not really a Failed state but still a terminal state
e

Eli Treuherz

09/12/2022, 3:38 PM
yep, totally understand the difficulty, I’ve just got a lot of Prefect 1 to unlearn! Returning
None
from the task and checking it in the flow with
Copy code
for request in requests:
    output = lambda(request)
    if output.result() is not None:
        save_to_postgres(output)
seems to have worked great. Thanks for all your help!
👍 2
🙏 1
sorry to resurrect this, but I’ve found a downside of running the tasks this way. If
lambda
fails and enters a retry loop, previously that wouldn’t be a problem as the for-loop in the flow would have submitted both
lambda
and
save_to_postgres
tasks asynchronously and would be able to carry on just fine. Now,
result()
blocks the floor loop so my flow can’t run concurrently any more
I could move the
None
check into the
save_to_postgres
task, but that feels sort of weird, scheduling a task that might be a no-op. This feels like something the SKIP signal in Prefect 1 handled fine and it’s harder to get the same behaviour than I expected
and if
lambda
fails, the whole flow fails there because it hasn’t submitted anything after it yet
5 Views