l

    Lukas N.

    1 year ago
    Hello Prefect community. I have an issue with retrying failed flows runs with checkpointing where some tasks have secret outputs. Since their output value is not persisted I would expect them to be re-computed when restarting the flow run, but they are just always
    None
    . Reproducible example in thread. Thanks in advance for any help 🙂
    My flow run may sometimes fail and I want to be able to restart it. Because of that I'm using the
    flow.result
    to store checkpoints. However, some tasks contain secret values that I don't want to store anywhere - so I've subclassed the
    SecretBase
    which indeed doesn't store the result. However, when I restart the flow from the failed state, the value of the secret is
    None
    (I suppose because there is no
    Result
    object for it), however I would expect it to be re-computed. Below is a simple example I was able to reproduce my issue with, here is the error I get when I re-run the flow from failed state. Is there something I'm doing wrong?
    Unexpected error: TypeError("'NoneType' object is not subscriptable",)
    Traceback (most recent call last):
      File ".../home/user/bss/dice/Tournesol/prefect_flows/flows//venv/lib/python3.6/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File ".../venv/lib/python3.6/site-packages/prefect/engine/task_runner.py", line 860, in get_task_run_state
        logger=self.logger,
      File ".../venv/lib/python3.6/site-packages/prefect/utilities/executors.py", line 298, in run_task_with_timeout
        return task.run(*args, **kwargs)  # type: ignore
      File ".../secret_result.py", line 35, in use_secret
        <http://prefect.context.logger.info|prefect.context.logger.info>(secret['path'])
    TypeError: 'NoneType' object is not subscriptable
    import os
    import random
    from os import environ
    
    import prefect
    from prefect import task, Flow, Parameter
    from prefect.engine.results import LocalResult
    from prefect.tasks.secrets import SecretBase
    
    result_directory = os.path.abspath(
        os.path.join(os.path.dirname(__file__), "..", "results")
    )
    
    result = LocalResult(
        dir=result_directory,
        location="{flow_name}/"
        "{scheduled_start_time:%d-%m_%H-%M-%S}/"
        "{task_full_name}-{task_run_id}.prefect_result",
    )
    
    
    class CustomSecret(SecretBase):
        def run(self):
            return {'path': environ.get('PATH'), 'home': environ.get('HOME')}
    
    
    @task
    def fragile_task(error_probability: float) -> None:
        if random.random() <= error_probability:
            raise ValueError('I\'m fragile, just retry me')
    
    
    @task
    def use_secret(secret: CustomSecret) -> None:
        <http://prefect.context.logger.info|prefect.context.logger.info>(secret['path'])
        <http://prefect.context.logger.info|prefect.context.logger.info>(secret['home'])
    
    
    with Flow('test', result=result) as flow:
        error_prob = Parameter('error_probability', default=0.7)
        secret = CustomSecret()
    
        a = fragile_task(error_prob)
        b = use_secret(secret)
    
        flow.set_dependencies(b, upstream_tasks=[a])
    
    if __name__ == '__main__':
        flow.register(project_name='default')
    j

    josh

    1 year ago
    Hi @Lukas N. thanks for providing a reproducible example! This looks to be a common occurrence with any task that inherits from the SecretBase class (even the PrefectSecret task). I’m going to open this as an issue to investigate how we can make sure these secret tasks are recomputed on retry. In the meantime you can always access secrets directly through the
    prefect.client.Secret
    object inside of your task or instead of a custom secret you could have a task that grabs the secret value and has
    checkpoint=False
    to not persist the secret data when passing it to the next task.
    @Marvin open “Recompute secret tasks when retrying flow from failed”
    Marvin

    Marvin

    1 year ago
    l

    Lukas N.

    1 year ago
    Just tried it with the
    checkpoint=False
    and the behaviour is the same. The value is not re-computed. I'm shooting blanks here, but my previous issue might be related https://github.com/PrefectHQ/prefect/issues/3618. I feel like there needs to be a distinction between these results: •
    None
    as in the task output was None, the value is persisted, the task should not be run again •
    None
    as in the task output was not persisted (e.g. checkpointing is turned off), the task should be run again
    Thanks for opening the issue though 👍 will keep an eye on it
    j

    josh

    1 year ago
    Yeah I follow what you’re saying, there’s a heavy distinction between actually not having a result and choosing to not persist a result 🤔