Italo Barros
08/17/2021, 6:48 PMZach Angell
signals.FAIL
indicates that a task failed. signals.TRIGGERFAIL
indicates that a task trigger failed. Unless you're doing something unusual, signals.FAIL
is likely the one you want to use.
Regarding retries, could you tell me a bit more about what you're trying to do?Kevin Kho
Kevin Kho
from prefect import task, Flow
from datetime import timedelta
from prefect.engine.signals import FAIL
@task(max_retries = 3, retry_delay=timedelta(seconds=2))
def abc():
x = 2
raise FAIL()
return x
with Flow("...") as flow:
abc()
flow.run()
Italo Barros
08/18/2021, 11:07 AM@task(name='Check File', max_retries=10, retry_delay=timedelta(minutes=24), skip_on_upstream_skip=False)
def check_file(file_dir: str):
"""
This method is responsible to check if the
desired file that is available and is acessible.
Args:
file_dir (str): The file DIR location
Raises:
prefect.signals: If the file was not found or
is not acessible
"""
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(f'Checking if .zip file is available at: {file_dir}')
if Path(file_dir) and access(file_dir, R_OK):
dc_zip_file = ZipFile(file_dir)
if dc_zip_file.testzip() is not None:
dc_zip_file.close()
logger.critical(f'The Deck file is corrupted! \nFile: {file_dir}')
raise signals.FAIL
else:
dc_zip_file.close()
<http://logger.info|logger.info>('File is OK! Continuing...')
raise signals.SUCCESS
else:
logger.warning('The File is not available yet')
raise signals.RETRY
with Flow('Test') as loop_check:
check_deck_dir = check_file('C:\WeeklyRVs\20210210.zip')
loop_check.register(project_name='test')
loop_check.run()