https://prefect.io logo
h

Hui Zheng

10/29/2020, 1:19 AM
I successfully ran a flow locally. however, when I deploy the flow to prefect-cloud, I got this warning message at the
healthcheck
step.
Copy code
/opt/prefect/healthcheck.py:149: UserWarning: Task <Task: fetch_runnable_models> has retry settings but some upstream dependencies do not have result types. See <https://docs.prefect.io/core/concepts/results.html> for more details.
  result_check(flows)
please see the thread for more details
fetch_runnable_models
is the second task in the flow. The flow up to this task is defined as below
Copy code
with Flow('Data Platform Flow') as flow:

    # secrets inside prefect.context.secrets come from:
    # locally: ~/.prefect/flow.config.toml
    # remotely: Prefect Cloud Secret Mgmt dash.
    dbt_cloud_tkn = prefect.context.secrets[
        'DBT_CLOUD_API_PERSONAL_ACCESS_TOKEN'
    ]

    # dbt_cloud_api = API(dbt_cloud_tkn)

    current_stamp = get_fixed_stamp_now()

    fs_models = fetch_runnable_models(
        current_stamp
    )
the first task is defined as below
Copy code
@task(log_stdout=True)
def get_fixed_stamp_now() -> datetime:

    print('{} Flow w/ Target Release {} in env {}'.format(
        getenv('SCHEDULE_TYPE'),
        getenv('TARGET_RELEASE_VERSION'),
        getenv('ENVIRONMENT', default='stage'))
    ) # noqa

    # round down to the minute
    stamp = pytz.utc.localize(
            datetime.utcnow()
        ).replace(second=0, microsecond=0)

    return stamp
at flow global level, I set
Copy code
"PREFECT__TASKS__DEFAULTS__MAX_RETRIES": 2,
        "PREFECT__TASKS__DEFAULTS__RETRY_DELAY": 10,
        "PREFECT__TASKS__DEFAULTS__CHECKPOINT": True,
c

Chris White

10/29/2020, 1:25 AM
Hi @Hui Zheng - this isn’t necessarily a problem, depending on what scenarios you hope to cover with this Flow. Assuming you are using
Docker
storage, this Flow is not configured to store your task outputs anywhere. This means that, if
fetch_runnable_models
were to fail, and the process / job were to unexpectedly die before the retry could proceed, you would not be able to successfully retry the task because it would not be able to recreate its upstream data dependency. In normal operating scenarios though, the retry will occur within the same process and you should be fine - it’s the edge case when the process / job unexpectedly dies that this warning is highlighting
h

Hui Zheng

10/30/2020, 12:21 AM
Thank you, Chris. yes, we are using
Docker
storage.. Because this flow is high critical, I do hope to recover from the worst edge cases, such as job were to unexpectedly die. How do I store your task outputs outside the docker container? I already set the flow result with the following settings to build out the container.
Copy code
flow.result = GCSResult(bucket=...)
it seems not cover everything given to the warning message?
c

Chris White

10/30/2020, 12:26 AM
Hmmm, are you setting that result prior to registering the flow / building its storage?
h

Hui Zheng

10/30/2020, 12:28 AM
yes, I set the result prior to registering the flow / building its storage step. It has been done like that, and I never see those warning message before. There is one change recently though around the time these warning message appears. I declared some variables in my
flow.py
file at the global level outside the scope of tasks and flow. and then use those variables in some tasks. Are those variables the cause of the warning?
Copy code
schedule_type = getenv('SCHEDULE_TYPE')
target_release_u = getenv('TARGET_RELEASE_VERSION').replace('.', '_')
environment = getenv("ENVIRONMENT", default="stage")
fs_collection_name = '_'.join([environment, target_release_u, schedule_type])
c

Chris White

10/30/2020, 12:33 AM
No that wouldn’t do it, the check is incredibly simple: https://github.com/PrefectHQ/prefect/blob/master/src/prefect/environments/storage/_healthcheck.py#L67 You can see that if the flow has a result, the check is skipped; so for some reason, your Flow object within your Docker image does not have a result attribute
h

Hui Zheng

10/30/2020, 10:57 PM
Could that related to our another conversation, https://prefect-community.slack.com/archives/CL09KU1K7/p1603838678211000 my t`ask_B` in this case is the
SkipNoneDbtShellTask
task below. You suggested that the 
task_B
 could check if the provided value is 
None
 and 
SKIP
 if so. However, actually, when I implement it, I did it a bit differently. I say ``task_B` that is
SkipNoneDbtShellTask
, return None if the provided value is
None
.
Copy code
class SkipNoneDbtShellTask(DbtShellTask):
    @defaults_from_attrs("command", "env", "dbt_kwargs")
    def run(
        self,
        command: str = None,
        # env: dict = None,
        # dbt_kwargs: dict = None
        **kwargs: Any
    ) -> str:
        if command is None:
            return None
        else:
            return super(SkipNoneDbtShellTask, self).run(
                command=command, **kwargs)