Bruno Nunes
04/03/2022, 6:54 PMConfigure Prefect to communicate with the server with:
prefect config set PREFECT_API_URL=<http://0.0.0.0:4200/api>
Check out the dashboard at <http://0.0.0.0:4200>
12:51:20.774 | WARNING | prefect.agent - No work queue found named 'kubernetes'
I've edited the service to be type LoadBalanced and used the external IP to set the PREFECT_API_URL
prefect config set PREFECT_API_URL=<http://xx.xx.xx.xx:4200/api>
I've updated the sqlalchemy as suggested in here and created a new work-queue called kubernetes.
I've created a new storage pointing to my azure blob storage and created the new deployment and run it. Everything finishes without errors but I don't see any activity in my cluster nor nothing being created in the UI.
Can you give me some guidance on what I might be missing?Anna Geller
prefect work-queue create local -t local
and an agent for this queue:
prefect agent start QUEUE_ID
and in a separate terminal window, you can run the commands from the docstring and using this simple flow and deployment example:
"""
export AWS_ACCESS_KEY_ID=xxx
export AWS_SECRET_ACCESS_KEY=xxx
export PREFECT_API_URL=<https://api-beta.prefect.io/api/accounts/xxx/workspaces/xxx>
export PREFECT_API_KEY=xxx
prefect deployment create flows/kubernetes_flow.py
prefect deployment run kubernetes-flow/dev
kubectl get pods --watch
"""
from prefect import flow, task, get_run_logger
from prefect.deployments import DeploymentSpec
from prefect.flow_runners import KubernetesFlowRunner
import platform
import os
@task
def say_hi():
logger = get_run_logger()
<http://logger.info|logger.info>("Hello world!")
@task
def print_platform_info():
logger = get_run_logger()
<http://logger.info|logger.info>(
"Platform information: IP = %s, Python = %s, Platform type = %s, OS Version = %s",
platform.node(),
platform.python_version(),
platform.platform(),
platform.version(),
)
@flow
def kubernetes_flow():
hi = say_hi()
print_platform_info(wait_for=[hi])
DeploymentSpec(
name="dev",
flow=kubernetes_flow,
tags=["local"],
flow_runner=KubernetesFlowRunner(
env=dict(
AWS_ACCESS_KEY_ID=os.environ.get("AWS_ACCESS_KEY_ID"),
AWS_SECRET_ACCESS_KEY=os.environ.get("AWS_SECRET_ACCESS_KEY"),
PREFECT_API_URL=os.environ.get("PREFECT_API_URL"),
PREFECT_API_KEY=os.environ.get("PREFECT_API_KEY"),
),
),
)
replace all "xxx" with your credentials - this assumes S3 storage, thus the AWS credentialsBruno Nunes
04/04/2022, 6:43 PM20:22:00.508 | INFO | prefect.agent - Submitting flow run 'dc7a181b-7ad1-487b-85be-6ecdd24210bc'
20:22:00.675 | INFO | prefect.flow_runner.kubernetes - RUNNING
20:22:02.267 | INFO | prefect.flow_runner.kubernetes - Flow run 'organic-butterfly' has job settings = {'metadata': {'generateName': 'organic-butterfly', 'namespace': 'default', 'labels': {'io.prefect.flow-run-id': 'dc7a181b-7ad1-487b-85be-6ecdd24210bc', 'io.prefect.flow-run-name': 'organic-butterfly', 'app': 'orion'}}, 'spec': {'template': {'spec': {'restartPolicy': 'Never', 'containers': [{'name': 'job', 'image': 'prefecthq/prefect:2.0b2-python3.8', 'command': ['python', '-m', 'prefect.engine', 'dc7a181b-7ad1-487b-85be-6ecdd24210bc'], 'env': [{'name': 'PREFECT_API_URL', 'value': '<http://orion:4200/api'}]}]}}>, 'backoff_limit': 4}}
20:22:03.057 | INFO | prefect.agent - Completed submission of flow run 'dc7a181b-7ad1-487b-85be-6ecdd24210bc'
20:22:03.237 | INFO | prefect.flow_runner.kubernetes - Flow run job 'organic-butterfly9zndk' has status {'active': 1,
'completed_indexes': None,
'completion_time': None,
'conditions': None,
'failed': None,
'ready': None,
'start_time': datetime.datetime(2022, 4, 4, 18, 22, 2, tzinfo=tzutc()),
'succeeded': None,
'uncounted_terminated_pods': None}
20:22:03.238 | INFO | prefect.flow_runner.kubernetes - Starting watch for pod to start. Job: organic-butterfly9zndk
18:22:05.080 | ERROR | prefect.engine - Engine execution of flow run 'dc7a181b-7ad1-487b-85be-6ecdd24210bc' exited with unexpected exception
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/httpcore/_exceptions.py", line 8, in map_exceptions
yield
File "/usr/local/lib/python3.8/site-packages/httpcore/backends/asyncio.py", line 101, in connect_tcp
stream: anyio.abc.ByteStream = await anyio.connect_tcp(
File "/usr/local/lib/python3.8/site-packages/anyio/_core/_sockets.py", line 152, in connect_tcp
gai_res = await getaddrinfo(target_host, remote_port, family=family,
File "/usr/local/lib/python3.8/site-packages/anyio/_core/_sockets.py", line 419, in getaddrinfo
gai_res = await get_asynclib().getaddrinfo(encoded_host, port, family=family, type=type,
File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 1570, in getaddrinfo
result = await get_running_loop().getaddrinfo(
File "/usr/local/lib/python3.8/asyncio/base_events.py", line 825, in getaddrinfo
return await self.run_in_executor(
File "/usr/local/lib/python3.8/concurrent/futures/thread.py", line 57, in run
result = self.fn(*self.args, **self.kwargs)
File "/usr/local/lib/python3.8/socket.py", line 918, in getaddrinfo
for res in _socket.getaddrinfo(host, port, family, type, proto, flags):
socket.gaierror: [Errno -2] Name or service not known
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/httpx/_transports/default.py", line 60, in map_httpcore_exceptions
yield
File "/usr/local/lib/python3.8/site-packages/httpx/_transports/default.py", line 353, in handle_async_request
resp = await self._pool.handle_async_request(req)
File "/usr/local/lib/python3.8/site-packages/httpcore/_async/connection_pool.py", line 253, in handle_async_request
raise exc
File "/usr/local/lib/python3.8/site-packages/httpcore/_async/connection_pool.py", line 237, in handle_async_request
response = await connection.handle_async_request(request)
File "/usr/local/lib/python3.8/site-packages/httpcore/_async/connection.py", line 86, in handle_async_request
raise exc
File "/usr/local/lib/python3.8/site-packages/httpcore/_async/connection.py", line 63, in handle_async_request
stream = await self._connect(request)
File "/usr/local/lib/python3.8/site-packages/httpcore/_async/connection.py", line 111, in _connect
stream = await self._network_backend.connect_tcp(**kwargs)
File "/usr/local/lib/python3.8/site-packages/httpcore/backends/auto.py", line 23, in connect_tcp
return await self._backend.connect_tcp(
File "/usr/local/lib/python3.8/site-packages/httpcore/backends/asyncio.py", line 101, in connect_tcp
stream: anyio.abc.ByteStream = await anyio.connect_tcp(
File "/usr/local/lib/python3.8/contextlib.py", line 131, in __exit__
self.gen.throw(type, value, traceback)
File "/usr/local/lib/python3.8/site-packages/httpcore/_exceptions.py", line 12, in map_exceptions
raise to_exc(exc)
httpcore.ConnectError: [Errno -2] Name or service not known
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 916, in <module>
enter_flow_run_engine_from_subprocess(flow_run_id)
File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 131, in enter_flow_run_engine_from_subprocess
return anyio.run(retrieve_flow_then_begin_flow_run, flow_run_id)
File "/usr/local/lib/python3.8/site-packages/anyio/_core/_eventloop.py", line 56, in run
return asynclib.run(func, *args, **backend_options)
File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 233, in run
return native_run(wrapper(), debug=debug)
File "/usr/local/lib/python3.8/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/local/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 228, in wrapper
return await func(*args)
File "/usr/local/lib/python3.8/site-packages/prefect/client.py", line 82, in with_injected_client
return await fn(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 191, in retrieve_flow_then_begin_flow_run
flow_run = await client.read_flow_run(flow_run_id)
File "/usr/local/lib/python3.8/site-packages/prefect/client.py", line 1096, in read_flow_run
response = await self._client.get(f"/flow_runs/{flow_run_id}")
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/httpx.py", line 102, in get
return await self.request(
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/httpx.py", line 60, in request
response = await self.send(
File "/usr/local/lib/python3.8/site-packages/httpx/_client.py", line 1593, in send
response = await self._send_handling_auth(
File "/usr/local/lib/python3.8/site-packages/httpx/_client.py", line 1621, in _send_handling_auth
response = await self._send_handling_redirects(
File "/usr/local/lib/python3.8/site-packages/httpx/_client.py", line 1658, in _send_handling_redirects
response = await self._send_single_request(request)
File "/usr/local/lib/python3.8/site-packages/httpx/_client.py", line 1695, in _send_single_request
response = await transport.handle_async_request(request)
File "/usr/local/lib/python3.8/site-packages/httpx/_transports/default.py", line 353, in handle_async_request
resp = await self._pool.handle_async_request(req)
File "/usr/local/lib/python3.8/contextlib.py", line 131, in __exit__
self.gen.throw(type, value, traceback)
File "/usr/local/lib/python3.8/site-packages/httpx/_transports/default.py", line 77, in map_httpcore_exceptions
raise mapped_exc(message) from exc
httpx.ConnectError: [Errno -2] Name or service not known
20:22:06.813 | INFO | prefect.flow_runner.kubernetes - Starting watch for job completion: organic-butterfly9zndk
20:22:11.945 | ERROR | prefect.flow_runner.kubernetes - Job 'organic-butterfly9zndk' never completed.
Anna Geller
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by