Thread
#prefect-community
    r

    Riley Hun

    1 year ago
    Hello everyone, May I please kindly request some assistance debugging a flow I've registered to the Prefect Core UI I have hosted on GCP? It seems to work fine when I run the flow on my local machine against the remote Dask Cluster. But when I register the flow, it fails and the error in the logs isn't sufficiently detailed enough. Thanks in advance!
    Error From the Prefect UI Logs and the terminal window of the VM hosting the Prefect Core Server
    My test flow is very simple:
    @task
    def extract():
        """Get a list of data"""
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>('An info message')
        return random.sample(range(1, 100), 5)
    
    @task
    def transform(data):
        """Multiply the input by 10"""
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>('Another info message')
        return [i * 10 for i in data]
    
    @task
    def load(data):
        """Print the data to indicate it was received"""
        print("Here's your data: {}".format(data))
    
    schedule = IntervalSchedule(
        start_date=datetime.utcnow() + timedelta(seconds=60),
        interval=timedelta(minutes=5),
    )
    
    with Flow('ETL') as flow:
        e = extract()
        t = transform(e)
        l = load(t)
    
    flow.schedule = schedule
    My build script for dockerizing + registering the flow. I am also using the
    daskdev/dask
    dockerfile
    parser = argparse.ArgumentParser()
        parser.add_argument(
            '--flow', type=str, required=True,
            help='Module path which contains the flow.'
        )
        parser.add_argument(
            '--project', type=str, required=True,
            help='Project name in which to register the flow.'
        )
        parser.add_argument(
            '--image_name', type=str, required=True,
            help='Name of the image which is associated with the storage'
        )
        parser.add_argument(
            '--image_tag', type=str, required=True,
            help='Tag of the image which is associated with the storage'
        )
        parser.add_argument(
            '--env_file_path', type=str,
            help='Json file containing environment variables to be stored in the image'
        )
        parser.add_argument(
            '--python_dependencies', type=str,
            help='List of python packages to be pip installed in the image, separated by comma'
        )
        parser.add_argument(
            '--gcp_project_id', type=str, required=True,
            help='GCP Project name'
        )
        parser.add_argument(
            '--release', type=str, required=True,
            help='Name of GKE Cluster'
        )
        parser.add_argument(
            '--dask_scheduler_address', type=str, required=True,
            help='Scheduler Address of Dask Cluster'
        )
        parser.add_argument(
            '--tls_ca', type=str, required=True,
            help='File path of tls certificate file for authorized access to Dask Cluster'
        )
        parser.add_argument(
            '--tls_key', type=str, required=True,
            help='File path of tls key file for authorized access to Dask Cluster'
        )
        parser.add_argument(
            '--dockerfile', type=str, required=True,
            help='Path to dockerfile for building flow container'
        )
    
        args = parser.parse_args()
    
        # get env vars
        envs = {}
        if args.env_file_path:
            with open(args.env_file_path) as f:
                envs = json.load(f)
    
        # get python dependencies
        # to do: list all packages in values.yaml as ell
        python_dependencies = ["google-cloud-storage",
                               "google-cloud-logging",
                               "hvac",
                               "google-api-core",
                               "snowflake-sqlalchemy",
                               "snowflake-connector-python",
                               "SQLAlchemy",
                               "requests",
                               "ravenpackapi",
                               "pandas",
                               "prefect",
                               "lz4",
                               "fsspec"
                               ]
    
        if args.python_dependencies:
            python_dependencies.append(args.python_dependencies.split(','))
    
        flows = [importlib.import_module(i).flow for i in args.flow.split(',')]
        print(f'Listing flows: {flows}')
    
        storage = Docker(
            image_name=args.image_name,
            image_tag=args.image_tag,
            dockerfile=args.dockerfile,
            registry_url=f'<http://gcr.io/{args.gcp_project_id}/|gcr.io/{args.gcp_project_id}/>',
            env_vars=envs,
            python_dependencies=python_dependencies
        )
    
        security = Security(tls_ca_file=args.tls_ca,
                            tls_client_cert=args.tls_ca,
                            tls_client_key=args.tls_key,
                            require_encryption=True)
    
        for flow in flows:
            flow.storage = storage
            flow.environment.executor = DaskExecutor(
                address=args.dask_scheduler_address,
                client_kwargs={'security': security}
            )
            path = storage.add_flow(flow)
            print(f'storing flow {flow.name} at {path} at the image.')
    
        storage = storage.build()
    
        # run shell script for updating dask workers w/ prefect flow image
        my_env = dict(os.environ)
        my_env['RELEASE'] = args.release
        my_env['GCP_PROJECT_ID'] = args.gcp_project_id
        my_env['IMAGE_NAME'] = args.image_name
        my_env['IMAGE_TAG'] = args.image_tag
    
        subprocess.Popen(['./deployment/setup_dask.sh'], env=my_env)
    
        for flow in flows:
            flow.register(project_name=args.project, build=False)
    Kyle Moon-Wright

    Kyle Moon-Wright

    1 year ago
    Hey @Riley Hun, Nothing is jumping out at me with the information you’ve provided above, so we may need more information which we can dig for by either:- starting the agent with 
    show_flow_logs=True
     if using 
    flow.run_agent()
     or 
    --show-flow-logs
     if starting from the CLI- setting 
    PREFECT__LOGGING__LEVEL="DEBUG"
     to get more verbose logging output from your flow run
    r

    Riley Hun

    1 year ago
    Thanks @Kyle Moon-Wright. I'll try that now. 🙂
    Here is the more detailed logs.
    @Kyle Moon-Wright Wow - I'm such a dummy. I spent nearly the entire day trying to debug this and I did not think to check the kubernetes logs of the dask workers. 🤦
    [2020-11-17 07:27:13+0000] INFO - prefect.CloudTaskRunner | Task 'extract': Starting task run...
    [2020-11-17 07:27:13+0000] ERROR - prefect.CloudTaskRunner | Task 'extract': Unexpected error while running task: AttributeError("'FunctionTask' object has no attribute task_run_name. Did you call this object within a function that should have beendecorated with @prefect.task?")
    Traceback (most recent call last):
      File "/opt/conda/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 273, in run
        self.set_task_run_name(task_inputs=task_inputs)
      File "/opt/conda/lib/python3.7/site-packages/prefect/engine/cloud/task_runner.py", line 342, in set_task_run_name
        task_run_name = self.task.task_run_name
      File "/opt/conda/lib/python3.7/site-packages/prefect/tasks/core/function.py", line 69, in __getattr__
        f"'FunctionTask' object has no attribute {k}."
    AttributeError: 'FunctionTask' object has no attribute task_run_name. Did you call this object within a function that should have beendecorated with @prefect.task?
    I tried looking up this error on the thread but couldn't find anything that could help.
    Kyle Moon-Wright

    Kyle Moon-Wright

    1 year ago
    Ah, these logs are much more helpful haha - it was definitely strange that we weren’t seeing the TaskRunner kick in. I think this error typically occurs when the local prefect version used to register the flow doesn’t match the version of Prefect in the execution environment.
    r

    Riley Hun

    1 year ago
    Hi @Kyle Moon-Wright, Thanks so much for advising. The good news is that I don't have any errors anymore after making sure the prefect versions are consistent between dask workers, docker image of the flows, and the local prefect version. The bad news is that the task isn't completing and stalling:
    [2020-11-17 21:02:12+0000] INFO - prefect.CloudFlowRunner | Flow run RUNNING: terminal tasks are incomplete.
    No errors in the kubernetes logs
    Kyle Moon-Wright

    Kyle Moon-Wright

    1 year ago
    Hmm, this one is tougher but did you restart your flow at all? I think it’s related to this issue, in which you restart a flow run with an archived version of the flow. Someone recently had a similar issue which sounds familiar to this error you’re encountering.
    r

    Riley Hun

    1 year ago
    I'm not sure what is meant by restarting a flow, but I did clear out the backlog of the flows in the UI Server before submitting the new flow to the server.
    @Kyle Moon-Wright The error that I'm getting that's causing the terminal tasks to be incomplete is as follows:
    Failed to set task state with error: ConnectionError(MaxRetryError("HTTPConnectionPool(host='host.docker.internal', port=4200): Max retries exceeded with url: /graphql