Hui Zheng
10/29/2020, 1:19 AMhealthcheck
step.
/opt/prefect/healthcheck.py:149: UserWarning: Task <Task: fetch_runnable_models> has retry settings but some upstream dependencies do not have result types. See <https://docs.prefect.io/core/concepts/results.html> for more details.
result_check(flows)
please see the thread for more detailsfetch_runnable_models
is the second task in the flow. The flow up to this task is defined as below
with Flow('Data Platform Flow') as flow:
# secrets inside prefect.context.secrets come from:
# locally: ~/.prefect/flow.config.toml
# remotely: Prefect Cloud Secret Mgmt dash.
dbt_cloud_tkn = prefect.context.secrets[
'DBT_CLOUD_API_PERSONAL_ACCESS_TOKEN'
]
# dbt_cloud_api = API(dbt_cloud_tkn)
current_stamp = get_fixed_stamp_now()
fs_models = fetch_runnable_models(
current_stamp
)
@task(log_stdout=True)
def get_fixed_stamp_now() -> datetime:
print('{} Flow w/ Target Release {} in env {}'.format(
getenv('SCHEDULE_TYPE'),
getenv('TARGET_RELEASE_VERSION'),
getenv('ENVIRONMENT', default='stage'))
) # noqa
# round down to the minute
stamp = pytz.utc.localize(
datetime.utcnow()
).replace(second=0, microsecond=0)
return stamp
"PREFECT__TASKS__DEFAULTS__MAX_RETRIES": 2,
"PREFECT__TASKS__DEFAULTS__RETRY_DELAY": 10,
"PREFECT__TASKS__DEFAULTS__CHECKPOINT": True,
Chris White
Docker
storage, this Flow is not configured to store your task outputs anywhere. This means that, if fetch_runnable_models
were to fail, and the process / job were to unexpectedly die before the retry could proceed, you would not be able to successfully retry the task because it would not be able to recreate its upstream data dependency.
In normal operating scenarios though, the retry will occur within the same process and you should be fine - it’s the edge case when the process / job unexpectedly dies that this warning is highlightingHui Zheng
10/30/2020, 12:21 AMDocker
storage.. Because this flow is high critical, I do hope to recover from the worst edge cases, such as job were to unexpectedly die. How do I store your task outputs outside the docker container? I already set the flow result with the following settings to build out the container.
flow.result = GCSResult(bucket=...)
it seems not cover everything given to the warning message?Chris White
Hui Zheng
10/30/2020, 12:28 AMflow.py
file at the global level outside the scope of tasks and flow. and then use those variables in some tasks. Are those variables the cause of the warning?
schedule_type = getenv('SCHEDULE_TYPE')
target_release_u = getenv('TARGET_RELEASE_VERSION').replace('.', '_')
environment = getenv("ENVIRONMENT", default="stage")
fs_collection_name = '_'.join([environment, target_release_u, schedule_type])
Chris White
Hui Zheng
10/30/2020, 10:57 PMSkipNoneDbtShellTask
task below. You suggested that the task_B
could check if the provided value is None
and SKIP
if so. However, actually, when I implement it, I did it a bit differently. I say ``task_B`
that is SkipNoneDbtShellTask
, return None if the provided value is None
.
class SkipNoneDbtShellTask(DbtShellTask):
@defaults_from_attrs("command", "env", "dbt_kwargs")
def run(
self,
command: str = None,
# env: dict = None,
# dbt_kwargs: dict = None
**kwargs: Any
) -> str:
if command is None:
return None
else:
return super(SkipNoneDbtShellTask, self).run(
command=command, **kwargs)