Hello! Is there a way to add custom states to the ...
# prefect-community
d
Hello! Is there a way to add custom states to the graphql service? We have a custom state called
GracefulFail
, which extends
Success
(we use it for tasks that did not succeed but should not stop the rest of the downstream tasks). Unfortunately, using these on prefect server gives a 500, leaving the task out of date on the UI…
Copy code
Failed to set task state with error: ClientError([{'message': '{\'_schema\': "Invalid data type: [None, {\'_schema\': \'Unsupported object type: GracefulFail\'}]"}', 'locations': [{'line': 6, 'column': 13}], 'path': ['set_task_run_states', 'states', 0, 'id'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}])
Traceback (most recent call last):
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/engine/cloud/task_runner.py", line 115, in call_runner_target_handlers
    state = self.client.set_task_run_state(
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/client/client.py", line 1082, in set_task_run_state
    result = self.graphql(
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/client/client.py", line 225, in graphql
    raise ClientError(result["errors"])
prefect.utilities.exceptions.ClientError: [{'message': '{\'_schema\': "Invalid data type: [None, {\'_schema\': \'Unsupported object type: GracefulFail\'}]"}', 'locations': [{'line': 6, 'column': 13}], 'path': ['set_task_run_states', 'states', 0, 'id'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
z
Hi @David Ojeda! I can give you some guidance here, but I want to qualify it by saying that adding a state to Prefect is nontrivial, and I'm not sure I'd recommend it without full understanding of what it affects. A good example here is that a lot of the UI logic is hard-coded based on the Core-maintained set of states-- adding a new state will likely break certain views. This doesn't mean it's impossible, but I want to attach a caveat emptor. If you do want to add new states, here's a general list of things you'll need to update: • the state needs to be implemented in your version of Prefect (sounds like this is already done). This includes the proper serialization logic, as you'll be serializing and deserializing as you communicate between Core and the server. • if you want to implement state-specific logic for
GracefulFail
, you'll need to put logic in
api/states.py
. • you'll need to update the UI to properly parse and render the new
GracefulFail
state. If it's at all helpful, the error you linked appears to be linked to marshmallow, so I'd recommend taking a look at the serialization logic for
GracefulFail
.
upvote 1
d
Hi @Zachary Hughes, thanks for the response. we are actually using two custom states that have no real state logic. They are particularly useful for us to determine when: • a task has failed but it is not a cathastrophic failure and we can go on • a task has not really processed its input because we already hit our cache (not prefects’ cache though, just our internal file cache, because most of our tasks have file inputs and file outputs) we use them along with signals to express those two use cases, it helps us read our flow results very quickly and the associated code is quite brief:
Copy code
from prefect.engine.signals import PrefectStateSignal
from prefect.engine.state import Success


class SkippedResult(Success):
    color = '#cab2d6'


class GracefulFail(Success):
    color = '#dd1c77'


class SKIPRESULT(PrefectStateSignal):
    _state_cls = SkippedResult


class GRACEFULFAIL(PrefectStateSignal):
    _state_cls = GracefulFail
So, with that scope, I think that we save ourselves from the complexity of creating new states…
OK I’ve just read your edit, let me take a look at the serialization then
👍 1
z
Here's the link to a good jumping off-point in the codebase. https://github.com/PrefectHQ/prefect/blob/master/src/prefect/serialization/state.py#L181
d
Excellent, I’ve just arrived at that point as well
I have some ideas on how to proceed now, let me get back to you
z
Sounds good!
d
I managed to modify the state serialization by adding the appropriate schemas and chaning the
StateSchema
class so that my two states are correctly serializable with marshmallow, but I still encounter another error and I can’t find who or where this is generated:
Copy code
[2020-04-28 18:09:46] ERROR - prefect.CloudTaskRunner | Failed to set task state with error: ClientError([{'message': "{'type': ['Unsupported value: SkippedResult']}", 'locations': [{'line': 4, 'column': 13}], 'path': ['set_task_run_states', 'states', 0, 'id'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}])
Traceback (most recent call last):
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/engine/cloud/task_runner.py", line 115, in call_runner_target_handlers
    state = self.client.set_task_run_state(
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/client/client.py", line 1082, in set_task_run_state
    result = self.graphql(
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/client/client.py", line 225, in graphql
    raise ClientError(result["errors"])
prefect.utilities.exceptions.ClientError: [{'message': "{'type': ['Unsupported value: SkippedResult']}", 'locations': [{'line': 4, 'column': 13}], 'path': ['set_task_run_states', 'states', 0, 'id'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
[2020-04-28 18:09:46] INFO - prefect.CloudTaskRunner | Task 'SaveResponse[10]': finished task run for task with final state: 'ClientFailed'
Any clues?
z
Are you able to view your Prefect Server logs? I'd expect to see a more informative message there, either from the Apollo or GraphQL container.
d
Ah! I see… The graphql logs are actually informative: they show a marshmallow error:
Copy code
ERROR:    {'type': ['Unsupported value: SkippedResult']}

GraphQL request:4:7
3 |     states {
4 |       id
  |       ^
5 |       status
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/graphql/execution/execute.py", line 670, in complete_value_catching_error
    return_type, field_nodes, info, path, result
  File "/usr/local/lib/python3.7/site-packages/graphql/execution/execute.py", line 735, in complete_value
    raise result
  File "/prefect-server/src/prefect_server/graphql/states.py", line 73, in set_state
    state = state_schema.load(state_input["state"])
  File "/usr/local/lib/python3.7/site-packages/marshmallow_oneofschema/one_of_schema.py", line 144, in load
    raise exc
marshmallow.exceptions.ValidationError: {'type': ['Unsupported value: SkippedResult']}

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/graphql/execution/execute.py", line 670, in complete_value_catching_error
    return_type, field_nodes, info, path, result
  File "/usr/local/lib/python3.7/site-packages/graphql/execution/execute.py", line 735, in complete_value
    raise result
  File "/prefect-server/src/prefect_server/graphql/states.py", line 73, in set_state
    state = state_schema.load(state_input["state"])
  File "/usr/local/lib/python3.7/site-packages/marshmallow_oneofschema/one_of_schema.py", line 144, in load
    raise exc
graphql.error.graphql_error.GraphQLError: {'type': ['Unsupported value: SkippedResult']}
I realize my mistake now: My graphql service is using the official prefect image; not the one that has the custom states
z
Aha, that'd do it!
d
🤦
z
Happens to the best of us. If I had a dime for every time I couldn't figure out why local changes weren't being reflected in my tests... 😂