Hey all, I'm using Prefect Cloud to orchestrate a ...
# ask-community
b
Hey all, I'm using Prefect Cloud to orchestrate a flow of flows and today I started getting failures on the AirbyteConnectionTask with the following stack trace:
Copy code
Task 'AirbyteConnectionTask[4]': Exception encountered during task execution!
Traceback (most recent call last):
  File "/home/ec2-user/.local/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
    logger=self.logger,
  File "/home/ec2-user/.local/lib/python3.7/site-packages/prefect/utilities/executors.py", line 454, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/home/ec2-user/.local/lib/python3.7/site-packages/prefect/utilities/tasks.py", line 456, in method
    return run_method(self, *args, **kwargs)
  File "/home/ec2-user/.local/lib/python3.7/site-packages/prefect/tasks/airbyte/airbyte.py", line 250, in run
    self._check_health_status(session, airbyte_base_url)
  File "/home/ec2-user/.local/lib/python3.7/site-packages/prefect/tasks/airbyte/airbyte.py", line 78, in _check_health_status
    health_status = response.json()["db"]
KeyError: 'db'
I'm using it within a mapped flow. Has anyone run into this issue yet?
c
Hey @Blake Enyart! Nice to see you again 🙂 Thanks for using Prefect.
b
Hey @Chris Reuter! Absolutely! We have been really happy with our trial usage of it so far and looking to use more of it here in 2022.
a
Can you share what Prefect and Airbyte versions do you use? It seems like the version you have doesn’t have the “db” attribute. If you could share how you defined the task in your flow, this would be helpful, too.
b
Absolutely. This is on: • Airbyte: 0.35.5 • Prefect: 0.15.12 (?) I believe this is the case, but I'm using Prefect Cloud so I'm not confident on how to check this. Sending over the task right now as well
Copy code
...

dev_stage = os.environ["DEV_STAGE"]

if dev_stage.lower() == "prd":
    handler = slack_notifier(only_states=[Failed])  # we can call it early
else:
    handler = slack_notifier(ignore_states=[Pending, Running, Finished])


sync_airbyte_connection = AirbyteConnectionTask(
    max_retries=3, retry_delay=timedelta(seconds=10)
)

with Flow(
    f"{dev_stage} - Airbyte - Flow",
    run_config=UniversalRun(
        labels=["airbyte", dev_stage], env={"DEV_STAGE": dev_stage}
    ),
    executor=LocalDaskExecutor(),
    state_handlers=[handler],
) as flow:

    # Airbyte connection strings
    airbyte_connections = [
        "xxx",
       ...
    ]

    airbyte_sync = sync_airbyte_connection.map(
        airbyte_server_host=unmapped(PrefectSecret("AIRBYTE_PUBLIC_IP")),
        airbyte_server_port=unmapped(8000),
        airbyte_api_version=unmapped("v1"),
        connection_id=airbyte_connections,
    )

if dev_stage == 'prd':
    flow.storage = GitHub(
        repo="OrthoFi/data_warehouse_pipeline",  # name of repo
        path="containers/prefect/src/flows/sub_flows/flow_airbyte.py",  # location of flow file in repo
        access_token_secret="GITHUB_ACCESS_TOKEN",  # name of personal access token secret
    )
else:
    flow.storage = GitHub(
        repo="OrthoFi/data_warehouse_pipeline",  # name of repo
        path="containers/prefect/src/flows/sub_flows/flow_airbyte.py",  # location of flow file in repo
        access_token_secret="GITHUB_ACCESS_TOKEN",  # name of personal access token secret
        ref='development'
    )
a
Turns out Airbyte is not even required. The health check function should have logged the response before grabbing the “db” from it. Can you share that log (redact for privacy)?
Copy code
def _check_health_status(self, session, airbyte_base_url):
        get_connection_url = airbyte_base_url + "/health/"
        try:
            response = session.get(get_connection_url)
            <http://self.logger.info|self.logger.info>(response.json())
            health_status = response.json()["db"]
Additionally, what may cause troubles is this line:
Copy code
dev_stage = os.environ["DEV_STAGE"]
Because Prefect reads this information at runtime rather than at registration time, so you need to make sure that it’s stored on your agent. At registration time, Prefect only builds the DAG and stores the metadata.
b
Definitely. And thanks for the callout with the os.environ["DEV_STAGE"]. I hacked that a bit so I could deploy the system through the AWS CDK and further down you will see that I set the environment variable for run time as well so I set it once on deploy and have a variable that is reference at runtime as well if that makes sense
So just to confirm, this is potentially a bug introduced in the recent deployment for the AirbyteConnectionTask and not an introduced bug on my end?
Wait a second, I'm tracking that logging now. I think I see the issue. Is the Prefect Cloud system running on the most recent Prefect Release on GitHub?
I'm seeing this in the stack trace that looks to indicate my Prefect run is on potentially 0.15.11 or older in the logs
a
you can see the prefect version on the flow run page
looks like this has been a bug which got fixed in the latest release https://github.com/PrefectHQ/prefect/commit/048cb6fa756ffdbfe63ab3cbdfd5a6927573f4ac if you upgrade, this should fix your error
b
Yeah, I'm seeing that now, but the stack trace looks like it's from an older version, but screenshot is indicating it is running on the most recent version
Okay, I think I have it solved. The Prefect Cloud is saying that it is running on 0.15.12, but I need to update the Prefect CLI running on the host on the Airbyte EC2 instance. I'm going to try that and see if that works
a
Did you commit your code and updated your package dependencies on the agent? even if you registered your flow with 0.15.12, if your local agent on EC2 uses 0.15.11 then your flow run runs on 11. The easiest is to upgrade Prefect locally and run it this way
yeah, you’re 100% correct, agent must be upgraded 👍
b
That was the issue! It's working again now. I seriously love how easy Prefect has made our orchestration of this first POC. Okay, now I know for future reference to keep the local agents upgraded to be in-sync with the Prefect Cloud version. Thanks so much for helping me troubleshoot this one! This was definitely quite a nuanced error 😅
🙌 3