Blake Enyart
01/14/2022, 1:58 PMTask 'AirbyteConnectionTask[4]': Exception encountered during task execution!
Traceback (most recent call last):
File "/home/ec2-user/.local/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
logger=self.logger,
File "/home/ec2-user/.local/lib/python3.7/site-packages/prefect/utilities/executors.py", line 454, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/home/ec2-user/.local/lib/python3.7/site-packages/prefect/utilities/tasks.py", line 456, in method
return run_method(self, *args, **kwargs)
File "/home/ec2-user/.local/lib/python3.7/site-packages/prefect/tasks/airbyte/airbyte.py", line 250, in run
self._check_health_status(session, airbyte_base_url)
File "/home/ec2-user/.local/lib/python3.7/site-packages/prefect/tasks/airbyte/airbyte.py", line 78, in _check_health_status
health_status = response.json()["db"]
KeyError: 'db'
I'm using it within a mapped flow. Has anyone run into this issue yet?Chris Reuter
01/14/2022, 2:04 PMBlake Enyart
01/14/2022, 2:06 PMAnna Geller
Blake Enyart
01/14/2022, 2:21 PMBlake Enyart
01/14/2022, 2:22 PM...
dev_stage = os.environ["DEV_STAGE"]
if dev_stage.lower() == "prd":
handler = slack_notifier(only_states=[Failed]) # we can call it early
else:
handler = slack_notifier(ignore_states=[Pending, Running, Finished])
sync_airbyte_connection = AirbyteConnectionTask(
max_retries=3, retry_delay=timedelta(seconds=10)
)
with Flow(
f"{dev_stage} - Airbyte - Flow",
run_config=UniversalRun(
labels=["airbyte", dev_stage], env={"DEV_STAGE": dev_stage}
),
executor=LocalDaskExecutor(),
state_handlers=[handler],
) as flow:
# Airbyte connection strings
airbyte_connections = [
"xxx",
...
]
airbyte_sync = sync_airbyte_connection.map(
airbyte_server_host=unmapped(PrefectSecret("AIRBYTE_PUBLIC_IP")),
airbyte_server_port=unmapped(8000),
airbyte_api_version=unmapped("v1"),
connection_id=airbyte_connections,
)
if dev_stage == 'prd':
flow.storage = GitHub(
repo="OrthoFi/data_warehouse_pipeline", # name of repo
path="containers/prefect/src/flows/sub_flows/flow_airbyte.py", # location of flow file in repo
access_token_secret="GITHUB_ACCESS_TOKEN", # name of personal access token secret
)
else:
flow.storage = GitHub(
repo="OrthoFi/data_warehouse_pipeline", # name of repo
path="containers/prefect/src/flows/sub_flows/flow_airbyte.py", # location of flow file in repo
access_token_secret="GITHUB_ACCESS_TOKEN", # name of personal access token secret
ref='development'
)
Anna Geller
def _check_health_status(self, session, airbyte_base_url):
get_connection_url = airbyte_base_url + "/health/"
try:
response = session.get(get_connection_url)
<http://self.logger.info|self.logger.info>(response.json())
health_status = response.json()["db"]
Additionally, what may cause troubles is this line:
dev_stage = os.environ["DEV_STAGE"]
Because Prefect reads this information at runtime rather than at registration time, so you need to make sure that it’s stored on your agent. At registration time, Prefect only builds the DAG and stores the metadata.Blake Enyart
01/14/2022, 2:55 PMBlake Enyart
01/14/2022, 2:57 PMBlake Enyart
01/14/2022, 3:00 PMBlake Enyart
01/14/2022, 3:01 PMAnna Geller
Anna Geller
Blake Enyart
01/14/2022, 3:05 PMBlake Enyart
01/14/2022, 3:08 PMAnna Geller
Anna Geller
Blake Enyart
01/14/2022, 3:19 PM