Hello Prefect community. I have an issue with retr...
# prefect-community
l
Hello Prefect community. I have an issue with retrying failed flows runs with checkpointing where some tasks have secret outputs. Since their output value is not persisted I would expect them to be re-computed when restarting the flow run, but they are just always
None
. Reproducible example in thread. Thanks in advance for any help 🙂
My flow run may sometimes fail and I want to be able to restart it. Because of that I'm using the
flow.result
to store checkpoints. However, some tasks contain secret values that I don't want to store anywhere - so I've subclassed the
SecretBase
which indeed doesn't store the result. However, when I restart the flow from the failed state, the value of the secret is
None
(I suppose because there is no
Result
object for it), however I would expect it to be re-computed. Below is a simple example I was able to reproduce my issue with, here is the error I get when I re-run the flow from failed state. Is there something I'm doing wrong?
Copy code
Unexpected error: TypeError("'NoneType' object is not subscriptable",)
Traceback (most recent call last):
  File ".../home/user/bss/dice/Tournesol/prefect_flows/flows//venv/lib/python3.6/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File ".../venv/lib/python3.6/site-packages/prefect/engine/task_runner.py", line 860, in get_task_run_state
    logger=self.logger,
  File ".../venv/lib/python3.6/site-packages/prefect/utilities/executors.py", line 298, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File ".../secret_result.py", line 35, in use_secret
    <http://prefect.context.logger.info|prefect.context.logger.info>(secret['path'])
TypeError: 'NoneType' object is not subscriptable
Copy code
import os
import random
from os import environ

import prefect
from prefect import task, Flow, Parameter
from prefect.engine.results import LocalResult
from prefect.tasks.secrets import SecretBase

result_directory = os.path.abspath(
    os.path.join(os.path.dirname(__file__), "..", "results")
)

result = LocalResult(
    dir=result_directory,
    location="{flow_name}/"
    "{scheduled_start_time:%d-%m_%H-%M-%S}/"
    "{task_full_name}-{task_run_id}.prefect_result",
)


class CustomSecret(SecretBase):
    def run(self):
        return {'path': environ.get('PATH'), 'home': environ.get('HOME')}


@task
def fragile_task(error_probability: float) -> None:
    if random.random() <= error_probability:
        raise ValueError('I\'m fragile, just retry me')


@task
def use_secret(secret: CustomSecret) -> None:
    <http://prefect.context.logger.info|prefect.context.logger.info>(secret['path'])
    <http://prefect.context.logger.info|prefect.context.logger.info>(secret['home'])


with Flow('test', result=result) as flow:
    error_prob = Parameter('error_probability', default=0.7)
    secret = CustomSecret()

    a = fragile_task(error_prob)
    b = use_secret(secret)

    flow.set_dependencies(b, upstream_tasks=[a])

if __name__ == '__main__':
    flow.register(project_name='default')
j
Hi @Lukas N. thanks for providing a reproducible example! This looks to be a common occurrence with any task that inherits from the SecretBase class (even the PrefectSecret task). I’m going to open this as an issue to investigate how we can make sure these secret tasks are recomputed on retry. In the meantime you can always access secrets directly through the
prefect.client.Secret
object inside of your task or instead of a custom secret you could have a task that grabs the secret value and has
checkpoint=False
to not persist the secret data when passing it to the next task.
@Marvin open “Recompute secret tasks when retrying flow from failed”
l
Just tried it with the
checkpoint=False
and the behaviour is the same. The value is not re-computed. I'm shooting blanks here, but my previous issue might be related https://github.com/PrefectHQ/prefect/issues/3618. I feel like there needs to be a distinction between these results: •
None
as in the task output was None, the value is persisted, the task should not be run again •
None
as in the task output was not persisted (e.g. checkpointing is turned off), the task should be run again
Thanks for opening the issue though đź‘Ť will keep an eye on it
j
Yeah I follow what you’re saying, there’s a heavy distinction between actually not having a result and choosing to not persist a result 🤔