https://prefect.io logo
r

Riley Hun

11/17/2020, 12:56 AM
Hello everyone, May I please kindly request some assistance debugging a flow I've registered to the Prefect Core UI I have hosted on GCP? It seems to work fine when I run the flow on my local machine against the remote Dask Cluster. But when I register the flow, it fails and the error in the logs isn't sufficiently detailed enough. Thanks in advance!
Error From the Prefect UI Logs and the terminal window of the VM hosting the Prefect Core Server
My build script for dockerizing + registering the flow. I am also using the
daskdev/dask
dockerfile
Copy code
parser = argparse.ArgumentParser()
    parser.add_argument(
        '--flow', type=str, required=True,
        help='Module path which contains the flow.'
    )
    parser.add_argument(
        '--project', type=str, required=True,
        help='Project name in which to register the flow.'
    )
    parser.add_argument(
        '--image_name', type=str, required=True,
        help='Name of the image which is associated with the storage'
    )
    parser.add_argument(
        '--image_tag', type=str, required=True,
        help='Tag of the image which is associated with the storage'
    )
    parser.add_argument(
        '--env_file_path', type=str,
        help='Json file containing environment variables to be stored in the image'
    )
    parser.add_argument(
        '--python_dependencies', type=str,
        help='List of python packages to be pip installed in the image, separated by comma'
    )
    parser.add_argument(
        '--gcp_project_id', type=str, required=True,
        help='GCP Project name'
    )
    parser.add_argument(
        '--release', type=str, required=True,
        help='Name of GKE Cluster'
    )
    parser.add_argument(
        '--dask_scheduler_address', type=str, required=True,
        help='Scheduler Address of Dask Cluster'
    )
    parser.add_argument(
        '--tls_ca', type=str, required=True,
        help='File path of tls certificate file for authorized access to Dask Cluster'
    )
    parser.add_argument(
        '--tls_key', type=str, required=True,
        help='File path of tls key file for authorized access to Dask Cluster'
    )
    parser.add_argument(
        '--dockerfile', type=str, required=True,
        help='Path to dockerfile for building flow container'
    )

    args = parser.parse_args()

    # get env vars
    envs = {}
    if args.env_file_path:
        with open(args.env_file_path) as f:
            envs = json.load(f)

    # get python dependencies
    # to do: list all packages in values.yaml as ell
    python_dependencies = ["google-cloud-storage",
                           "google-cloud-logging",
                           "hvac",
                           "google-api-core",
                           "snowflake-sqlalchemy",
                           "snowflake-connector-python",
                           "SQLAlchemy",
                           "requests",
                           "ravenpackapi",
                           "pandas",
                           "prefect",
                           "lz4",
                           "fsspec"
                           ]

    if args.python_dependencies:
        python_dependencies.append(args.python_dependencies.split(','))

    flows = [importlib.import_module(i).flow for i in args.flow.split(',')]
    print(f'Listing flows: {flows}')

    storage = Docker(
        image_name=args.image_name,
        image_tag=args.image_tag,
        dockerfile=args.dockerfile,
        registry_url=f'<http://gcr.io/{args.gcp_project_id}/|gcr.io/{args.gcp_project_id}/>',
        env_vars=envs,
        python_dependencies=python_dependencies
    )

    security = Security(tls_ca_file=args.tls_ca,
                        tls_client_cert=args.tls_ca,
                        tls_client_key=args.tls_key,
                        require_encryption=True)

    for flow in flows:
        flow.storage = storage
        flow.environment.executor = DaskExecutor(
            address=args.dask_scheduler_address,
            client_kwargs={'security': security}
        )
        path = storage.add_flow(flow)
        print(f'storing flow {flow.name} at {path} at the image.')

    storage = storage.build()

    # run shell script for updating dask workers w/ prefect flow image
    my_env = dict(os.environ)
    my_env['RELEASE'] = args.release
    my_env['GCP_PROJECT_ID'] = args.gcp_project_id
    my_env['IMAGE_NAME'] = args.image_name
    my_env['IMAGE_TAG'] = args.image_tag

    subprocess.Popen(['./deployment/setup_dask.sh'], env=my_env)

    for flow in flows:
        flow.register(project_name=args.project, build=False)
My test flow is very simple:
Copy code
@task
def extract():
    """Get a list of data"""
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>('An info message')
    return random.sample(range(1, 100), 5)

@task
def transform(data):
    """Multiply the input by 10"""
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>('Another info message')
    return [i * 10 for i in data]

@task
def load(data):
    """Print the data to indicate it was received"""
    print("Here's your data: {}".format(data))

schedule = IntervalSchedule(
    start_date=datetime.utcnow() + timedelta(seconds=60),
    interval=timedelta(minutes=5),
)

with Flow('ETL') as flow:
    e = extract()
    t = transform(e)
    l = load(t)

flow.schedule = schedule
k

Kyle Moon-Wright

11/17/2020, 1:19 AM
Hey @Riley Hun, Nothing is jumping out at me with the information you’ve provided above, so we may need more information which we can dig for by either: - starting the agent with 
show_flow_logs=True
 if using 
flow.run_agent()
 or 
--show-flow-logs
 if starting from the CLI - setting 
PREFECT__LOGGING__LEVEL="DEBUG"
 to get more verbose logging output from your flow run
r

Riley Hun

11/17/2020, 2:04 AM
Thanks @Kyle Moon-Wright. I'll try that now. 🙂
Here is the more detailed logs.
@Kyle Moon-Wright Wow - I'm such a dummy. I spent nearly the entire day trying to debug this and I did not think to check the kubernetes logs of the dask workers. 🤦
Copy code
[2020-11-17 07:27:13+0000] INFO - prefect.CloudTaskRunner | Task 'extract': Starting task run...
[2020-11-17 07:27:13+0000] ERROR - prefect.CloudTaskRunner | Task 'extract': Unexpected error while running task: AttributeError("'FunctionTask' object has no attribute task_run_name. Did you call this object within a function that should have beendecorated with @prefect.task?")
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 273, in run
    self.set_task_run_name(task_inputs=task_inputs)
  File "/opt/conda/lib/python3.7/site-packages/prefect/engine/cloud/task_runner.py", line 342, in set_task_run_name
    task_run_name = self.task.task_run_name
  File "/opt/conda/lib/python3.7/site-packages/prefect/tasks/core/function.py", line 69, in __getattr__
    f"'FunctionTask' object has no attribute {k}."
AttributeError: 'FunctionTask' object has no attribute task_run_name. Did you call this object within a function that should have beendecorated with @prefect.task?
I tried looking up this error on the thread but couldn't find anything that could help.
k

Kyle Moon-Wright

11/17/2020, 4:42 PM
Ah, these logs are much more helpful haha - it was definitely strange that we weren’t seeing the TaskRunner kick in. I think this error typically occurs when the local prefect version used to register the flow doesn’t match the version of Prefect in the execution environment.
r

Riley Hun

11/17/2020, 9:03 PM
Hi @Kyle Moon-Wright, Thanks so much for advising. The good news is that I don't have any errors anymore after making sure the prefect versions are consistent between dask workers, docker image of the flows, and the local prefect version. The bad news is that the task isn't completing and stalling:
Copy code
[2020-11-17 21:02:12+0000] INFO - prefect.CloudFlowRunner | Flow run RUNNING: terminal tasks are incomplete.
No errors in the kubernetes logs
k

Kyle Moon-Wright

11/17/2020, 11:16 PM
Hmm, this one is tougher but did you restart your flow at all? I think it’s related to this issue, in which you restart a flow run with an archived version of the flow. Someone recently had a similar issue which sounds familiar to this error you’re encountering.
r

Riley Hun

11/17/2020, 11:35 PM
I'm not sure what is meant by restarting a flow, but I did clear out the backlog of the flows in the UI Server before submitting the new flow to the server.
@Kyle Moon-Wright The error that I'm getting that's causing the terminal tasks to be incomplete is as follows:
Copy code
Failed to set task state with error: ConnectionError(MaxRetryError("HTTPConnectionPool(host='host.docker.internal', port=4200): Max retries exceeded with url: /graphql
3 Views