pyinthesky
02/09/2024, 7:01 PM(prefect-server, prefect-worker-init <-- pool creation, prefect-worker-sync, prefect-worker-publish)
and a prefect.yaml
file
### Python Version
⢠3.11.x
### prefect.yaml
- name: User Sync
description: "Sync user data"
schedule:
interval: 60
timezone: UTC
entrypoint: src/flows/sync.py:sync_users
parameters: {}
work_pool:
name: sync-pool
work_queue_name: default
### Output from prefect-worker-sync
2024-02-09 13:55:41 18:55:41.271 | INFO | prefect.flow_runs.worker - Opening process...
2024-02-09 13:55:41 18:55:41.274 | DEBUG | prefect.flow_runs.worker - Process running command: /opt/app-root/bin/python -m prefect.engine in /tmp/tmp2lc5l6b1prefect
2024-02-09 13:55:41 18:55:41.276 | ERROR | prefect.flow_runs.worker - Failed to submit flow run '21fd9312-f9d6-426d-bdcc-1824342fc6ad' to infrastructure.
2024-02-09 13:55:41 Traceback (most recent call last):
2024-02-09 13:55:41 File "/opt/app-root/lib64/python3.11/site-packages/prefect/workers/base.py", line 896, in _submit_run_and_capture_errors
2024-02-09 13:55:41 result = await self.run(
2024-02-09 13:55:41 ^^^^^^^^^^^^^^^
2024-02-09 13:55:41 File "/opt/app-root/lib64/python3.11/site-packages/prefect/workers/process.py", line 185, in run
2024-02-09 13:55:41 process = await run_process(
2024-02-09 13:55:41 ^^^^^^^^^^^^^^^^^^
2024-02-09 13:55:41 File "/opt/app-root/lib64/python3.11/site-packages/prefect/utilities/processutils.py", line 258, in run_process
2024-02-09 13:55:41 async with open_process(
2024-02-09 13:55:41 File "/usr/lib64/python3.11/contextlib.py", line 204, in __aenter__
2024-02-09 13:55:41 return await anext(self.gen)
2024-02-09 13:55:41 ^^^^^^^^^^^^^^^^^^^^^
2024-02-09 13:55:41 File "/opt/app-root/lib64/python3.11/site-packages/prefect/utilities/processutils.py", line 202, in open_process
2024-02-09 13:55:41 process = await anyio.open_process(command, **kwargs)
2024-02-09 13:55:41 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-02-09 13:55:41 File "/opt/app-root/lib64/python3.11/site-packages/anyio/_core/_subprocesses.py", line 126, in open_process
2024-02-09 13:55:41 return await get_asynclib().open_process(
2024-02-09 13:55:41 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-02-09 13:55:41 File "/opt/app-root/lib64/python3.11/site-packages/anyio/_backends/_asyncio.py", line 1041, in open_process
2024-02-09 13:55:41 process = await asyncio.create_subprocess_exec(
2024-02-09 13:55:41 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-02-09 13:55:41 File "/usr/lib64/python3.11/asyncio/subprocess.py", line 221, in create_subprocess_exec
2024-02-09 13:55:41 transport, protocol = await loop.subprocess_exec(
2024-02-09 13:55:41 ^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-02-09 13:55:41 File "/usr/lib64/python3.11/asyncio/base_events.py", line 1694, in subprocess_exec
2024-02-09 13:55:41 transport = await self._make_subprocess_transport(
2024-02-09 13:55:41 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-02-09 13:55:41 File "/usr/lib64/python3.11/asyncio/unix_events.py", line 198, in _make_subprocess_transport
2024-02-09 13:55:41 with events.get_child_watcher() as watcher:
2024-02-09 13:55:41 ^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-02-09 13:55:41 File "/usr/lib64/python3.11/asyncio/events.py", line 811, in get_child_watcher
2024-02-09 13:55:41 return get_event_loop_policy().get_child_watcher()
2024-02-09 13:55:41 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-02-09 13:55:41 File "/usr/lib64/python3.11/asyncio/events.py", line 637, in get_child_watcher
2024-02-09 13:55:41 raise NotImplementedError
### compose.yml
prefect-worker-init:
entrypoint: []
command: |-
sh -c '
prefect work-pool create --type process sync-pool
prefect work-pool update --concurrency-limit 2 sync-pool
prefect --no-prompt deploy --all &&
true
'
prefect-worker-sync:
entrypoint:
[
"prefect",
"worker",
"start",
"-p",
"sync-pool",
"--with-healthcheck"
]
environment:
- APPLICATION_NAME=local.pipeline
volumes:
- ./sync:/app
healthcheck:
test:
[
"CMD-SHELL",
"httpx <http://127.0.0.1:8080/health> || exit 1"
]
interval: 10s
timeout: 30s
retries: 5
depends_on:
prefect-server:
condition: service_started
prefect-worker-init:
condition: service_completed_successfully