Hui Zheng
08/17/2021, 10:04 PM"PREFECT__TASKS__DEFAULTS__MAX_RETRIES": 2,
"PREFECT__TASKS__DEFAULTS__RETRY_DELAY": 5,
Now, I want to set an individual task_A to retry=0
(not to retry at all) and I find it impossible to do that.
I will explain the details in the thread.Hui Zheng
08/17/2021, 10:06 PMmax_retries=0
retry_delay=None
otherwise, it will throw exception on this code [1]
https://github.com/PrefectHQ/prefect/blob/6b59d989dec33aad8c62ea2476fee519c32f5c63/src/prefect/core/task.py#L372-L377
However, when I set retry_delay=None
at task_A level, the task_A would use the default retry_delay=5 because of this code [2]
https://github.com/PrefectHQ/prefect/blob/6b59d989dec33aad8c62ea2476fee519c32f5c63/src/prefect/core/task.py#L358-L362
which in turn runs into the same exception in code [1]. cc: @Julie SturgeonKevin Kho
Marvin
08/17/2021, 10:11 PMKevin Kho
raise ENDRUN
to avoid retries inside the task?Hui Zheng
08/17/2021, 10:14 PMHui Zheng
08/17/2021, 10:20 PMENDRUN
because I still want to remaining tasks to executeKevin Kho
ENDRUN
cancels a task. Not the Flow RunHui Zheng
08/17/2021, 10:21 PMKevin Kho
ENDRUN
with raise ENDRUN(Failed("xxx"))
where Failed
is the Prefect State. You also can make it a successful ENDRUN
Hui Zheng
08/17/2021, 10:23 PMHui Zheng
08/17/2021, 10:26 PMexcept signals.FAIL as f:
raise signals.ENDRUN(
state.Failed(
message=f.message,
result=f.result
)
)
Kevin Kho
except Exception as e:
raise signals.ENDRUN(
state.Failed(
message=str(e),
result=e
)
)
Kevin Kho
from prefect import task, Flow
from prefect.engine.runner import ENDRUN
from prefect.engine.state import Skipped
@task
def test_skipped():
skip = Skipped("skipping", result=5)
raise ENDRUN(state=skip)
f = Flow("test", tasks=[test_skipped])
flow_state = f.run()
flow_state.result[test_skipped].result # 5
Hui Zheng
08/18/2021, 4:17 PMsignal.FAIL
does not have attribute message
. Do you know what might possible goes wrong? Shouldn’t the signal.FAIL
at least have a None
message?
class myDbtShellTask(DbtShellTask):
def run(
self,
fs_flow_config: dict = None,
command: str = None,
env: dict = None,
dbt_kwargs: dict = None
) -> str:
try:
return super(myDbtShellTask, self).run(
command=command, env=env,
dbt_kwargs=dbt_kwargs
)
except signals.FAIL as f:
raise signals.ENDRUN(
state.Failed(
message=f.message,
result=f.result
)
)
it errors
message=f.message,
AttributeError: 'FAIL' object has no attribute 'message'
Kevin Kho
Hui Zheng
08/18/2021, 4:59 PMFAIL
object before the failure line.
self.logger.error("exception: {}".format(f))
raise signals.ENDRUN(
state.Failed(
message=f.message,
The log shows f carries a message
[2021-08-18 16:48:21+0000] ERROR - prefect.dbt_execution_step | exception: Command failed with exit code 1
This is the exact the message of the FAIL raised by ShellTask, the parent of dbtShellTask in this code
msg = "Command failed with exit code {}".format(
sub_process.returncode,
)
https://github.com/PrefectHQ/prefect/blob/6b59d989dec33aad8c62ea2476fee519c32f5c63/src/prefect/tasks/shell.py#L155-L157Kevin Kho
Kevin Kho
from prefect import task, Flow, Parameter
import prefect
from datetime import timedelta
from prefect.engine.signals import FAIL, ENDRUN
from prefect.engine import state
from prefect.triggers import always_run
from prefect.tasks.shell import ShellTask
@task(max_retries=3, retry_delay = timedelta(seconds=5), log_stdout=True)
def abc():
x = "this is my value"
try:
ShellTask(log_stderr=True).run(command="eadqwe")
except FAIL as f:
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(f)
raise ENDRUN(
state.Failed(
message=f,
result=x
)
)
return x
@task(trigger=always_run)
def bcd(a):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(a)
return 1
with Flow('my flow') as flow:
a = abc()
b = bcd(a)
flow.register("dsdc")
Kevin Kho
signals.FAIL
instead of FAIL
but I don’t know why immediately.Kevin Kho
Hui Zheng
08/18/2021, 9:05 PMsignals.FAIL
instead of signals.FAIL.message
to the message variable in state.FAILed
, is that right? what about the signals.FAIL.result
, Could I pass signals.FAIL.result
to state.FAILed.result
?Kevin Kho
Hui Zheng
08/18/2021, 9:08 PMraise prefect.engine.signals.FAIL(
msg,
result=lines if self.return_all else line,
)
https://github.com/PrefectHQ/prefect/blob/900f33a6d9eb30c3cb396b7aa0d2a995fbc87459/src/prefect/tasks/shell.py#L166Kevin Kho
Kevin Kho
@task(max_retries=2, retry_delay = timedelta(seconds=5), log_stdout=True, state_handlers=[myhandler])
def abc():
try:
x = ShellTask(log_stderr=True, return_all=True).run(command="eadqwe")
except FAIL as f:
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(f.state.message)
<http://logger.info|logger.info>(f.state.result)
raise ENDRUN(
state.Failed(
message=f.state.message,
result=f.state.result
)
)
return x