Hello, I have a flow that set DEFAULT retry greate...
# ask-community
h
Hello, I have a flow that set DEFAULT retry greater than 0.
Copy code
"PREFECT__TASKS__DEFAULTS__MAX_RETRIES": 2,
        "PREFECT__TASKS__DEFAULTS__RETRY_DELAY": 5,
Now, I want to set an individual task_A to
retry=0
(not to retry at all) and I find it impossible to do that. I will explain the details in the thread.
To not to retry, I shall set at the task_A level
Copy code
max_retries=0
retry_delay=None
otherwise, it will throw exception on this code [1] https://github.com/PrefectHQ/prefect/blob/6b59d989dec33aad8c62ea2476fee519c32f5c63/src/prefect/core/task.py#L372-L377 However, when I set
retry_delay=None
at task_A level, the task_A would use the default retry_delay=5 because of this code [2] https://github.com/PrefectHQ/prefect/blob/6b59d989dec33aad8c62ea2476fee519c32f5c63/src/prefect/core/task.py#L358-L362 which in turn runs into the same exception in code [1]. cc: @Julie Sturgeon
k
@Marvin open “max_retries Can’t Be Set to 0, Making it Hard to Avoid Retries When PREFECT__TASKS__DEFAULTS__MAX_RETRIES is Set”
k
Hey @Hui Zheng, I agree with your assessment here, in the meantime though, maybe you can use
raise ENDRUN
to avoid retries inside the task?
h
ok, I could try that
Hi Kevin, I don’t think I could use
ENDRUN
because I still want to remaining tasks to execute
k
ENDRUN
cancels a task. Not the Flow Run
h
I also want the task_A is be in the FAIL state without retries so that downstream tasks could act accordingly.
k
You can have a failed state with
ENDRUN
with
raise ENDRUN(Failed("xxx"))
where
Failed
is the Prefect State. You also can make it a successful
ENDRUN
h
I see. Ok, I will try that
I would also need to pass over the original FAIL message and results. so I would do this?
Copy code
except signals.FAIL as f:
                raise signals.ENDRUN(
                    state.Failed(
                        message=f.message,
                        result=f.result
                    )
                )
k
I think something like this?
Copy code
except Exception as e:
                raise signals.ENDRUN(
                    state.Failed(
                        message=str(e),
                        result=e
                    )
                )
I just checked and this snippet looks better from here:
Copy code
from prefect import task, Flow
from prefect.engine.runner import ENDRUN
from prefect.engine.state import Skipped

@task
def test_skipped():
    skip = Skipped("skipping", result=5)
    raise ENDRUN(state=skip)

f = Flow("test", tasks=[test_skipped])
flow_state = f.run()

flow_state.result[test_skipped].result # 5
h
Hi @Kevin Kho, I encounter a strange issue that when dbtShellTask fails, the
signal.FAIL
does not have attribute
message
. Do you know what might possible goes wrong? Shouldn’t the
signal.FAIL
at least have a
None
message?
Copy code
class myDbtShellTask(DbtShellTask):

    def run(
        self,
        fs_flow_config: dict = None,
        command: str = None,
        env: dict = None,
        dbt_kwargs: dict = None
    ) -> str:
        try:
            return super(myDbtShellTask, self).run(
                command=command, env=env,
                dbt_kwargs=dbt_kwargs
            )
        except signals.FAIL as f:
            raise signals.ENDRUN(
                state.Failed(
                    message=f.message,
                    result=f.result
                )
            )
it errors
Copy code
message=f.message,
AttributeError: 'FAIL' object has no attribute 'message'
k
Let me run a quick test
h
When I log the
FAIL
object before the failure line.
Copy code
self.logger.error("exception: {}".format(f))
                raise signals.ENDRUN(
                    state.Failed(
                        message=f.message,
The log shows f carries a message
Copy code
[2021-08-18 16:48:21+0000] ERROR - prefect.dbt_execution_step | exception: Command failed with exit code 1
This is the exact the message of the FAIL raised by ShellTask, the parent of dbtShellTask in this code
Copy code
msg = "Command failed with exit code {}".format(
                        sub_process.returncode,
                    )
https://github.com/PrefectHQ/prefect/blob/6b59d989dec33aad8c62ea2476fee519c32f5c63/src/prefect/tasks/shell.py#L155-L157
k
Gotcha, I see what you are trying to do
So this is working for me:
Copy code
from prefect import task, Flow, Parameter
import prefect
from datetime import timedelta
from prefect.engine.signals import FAIL, ENDRUN
from prefect.engine import state
from prefect.triggers import always_run
from prefect.tasks.shell import ShellTask

@task(max_retries=3, retry_delay = timedelta(seconds=5), log_stdout=True)
def abc():
    x = "this is my value"
    try:
        ShellTask(log_stderr=True).run(command="eadqwe")
    except FAIL as f:
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(f)
        raise ENDRUN(
            state.Failed(
                message=f,
                result=x
            )
    )
    return x

@task(trigger=always_run)
def bcd(a):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(a)
    return 1

with Flow('my flow') as flow:
    a = abc()
    b = bcd(a)

flow.register("dsdc")
It behaves differently when using
signals.FAIL
instead of
FAIL
but I don’t know why immediately.
Ignore my last comment. Nothing behaves differently, I think this is a working solution
h
thank you, Kevin. I will give it a try. It seems you pass
signals.FAIL
instead of
signals.FAIL.message
to the message variable in
state.FAILed
, is that right? what about the
signals.FAIL.result
, Could I pass
signals.FAIL.result
to
state.FAILed.result
?
k
What would the value of the result be in this case from the DbtShellTask?
h
it’s the stderr log from its parent ShellTask, a
Copy code
raise prefect.engine.signals.FAIL(
                        msg,
                        result=lines if self.return_all else line,
                    )
https://github.com/PrefectHQ/prefect/blob/900f33a6d9eb30c3cb396b7aa0d2a995fbc87459/src/prefect/tasks/shell.py#L166
k
I see. I’ll look into this some more. Might have to ask another team member tomorrow
Here is the right way to use it. I just learned myself:
Copy code
@task(max_retries=2, retry_delay = timedelta(seconds=5), log_stdout=True, state_handlers=[myhandler])
def abc():
    try:
        x = ShellTask(log_stderr=True, return_all=True).run(command="eadqwe")
    except FAIL as f:
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(f.state.message)
        <http://logger.info|logger.info>(f.state.result)
        raise ENDRUN(
            state.Failed(
                message=f.state.message,
                result=f.state.result
            )
    )
    return x