An Vu Trong
10/09/2024, 5:46 AMfrom prefect import flow
from src.db.mongodb_connection import (
mongodb_fintech_conn,
mongodb_staging_conn,
)
from src.services.data_pipeline import DataPipeline
pipeline = DataPipeline(
staging_connection=mongodb_staging_conn,
fintechdb_connection=mongodb_fintech_conn,
)
@flow
async def run_staging():
await pipeline.raw_staging()
and this is my prefect.yamlbuild
- prefect_docker.deployments.steps.build_docker_image:
id: build_image
requires: prefect-docker>=0.3.1
image_name: prefectdock
tag: dev
dockerfile: auto
# pull section allows you to provide instructions for cloning this project in remote locations
pull:
- prefect.deployments.steps.set_working_directory:
directory: /opt/prefect/fintech_api
# the deployments section allows you to provide configuration for deploying flows
deployments:
- name: raw_storage
tags: [pipeline]
description: Fetch and store data from Wifeed API to MongoDB
schedule:
- interval: 30
flow_name: run_staging
entrypoint: src/services/flow_deployment.py:run_staging
parameters: {}
work_pool:
name: docker-wp
work_queue_name: primary-queue
job_variables:
image: '{{ build_image.image }}'
# Run settings to remove containers after each run
run:
remove_containers: true
the deployment log is okay:
prefect deploy --all
12:43:49.953 | INFO | Task run '_load_wifeed_urls' - Created task run '_load_wifeed_urls' for task '_load_wifeed_urls'
12:43:49.975 | INFO | Task run '_load_wifeed_urls' - Finished in state Completed()
? Would you like to configure schedules for this deployment? [y/n] (y): n
Running deployment build steps...
> Running build_docker_image step...
Step 1/4 : FROM prefecthq/prefect:3.0.4-python3.12
---> 23f4d5c00a91
Step 2/4 : COPY . /opt/prefect/fintech_api/
---> 2fea9c1149ee
Step 3/4 : WORKDIR /opt/prefect/fintech_api/
---> Running in 0f2eb12d6af8
---> c3ac3b50b5ea
Step 4/4 : LABEL io.prefect.version=3.0.4
---> Running in 7c1849b0f268
---> f71d6cdb75a1
Successfully built f71d6cdb75a1
ā Deployment 'run-staging/raw_storage' successfully created with id '7c1f3b71-4338-428b-8c31-6be0d6dd1b67'.
View Deployment in UI: <http://127.0.0.1:4200/deployments/deployment/7c1f3b71-4338-428b-8c31-6be0d6dd1b67>
? Would you like to save configuration for this deployment for faster deployments in the future? [y/n]: n
To execute flow runs from this deployment, start a worker in a separate terminal that pulls work from the 'docker-wp' work pool:
$ prefect worker start --pool 'docker-wp'
To schedule a run for this deployment, use the following command:
$ prefect deployment run 'run-staging/raw_storage'
But in the run, it say error:
2024-10-09 12:44:50 +-+---------------- 1 ----------------
2024-10-09 12:44:50 | Traceback (most recent call last):
2024-10-09 12:44:50 | File "/usr/local/lib/python3.12/site-packages/httpx/_transports/default.py", line 72, in map_httpcore_exceptions
2024-10-09 12:44:50 | yield
2024-10-09 12:44:50 | File "/usr/local/lib/python3.12/site-packages/httpx/_transports/default.py", line 377, in handle_async_request
2024-10-09 12:44:50 | resp = await self._pool.handle_async_request(req)
2024-10-09 12:44:50 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50 | File "/usr/local/lib/python3.12/site-packages/httpcore/_async/connection_pool.py", line 216, in handle_async_request
2024-10-09 12:44:50 | raise exc from None
2024-10-09 12:44:50 | File "/usr/local/lib/python3.12/site-packages/httpcore/_async/connection_pool.py", line 196, in handle_async_request
2024-10-09 12:44:50 | response = await connection.handle_async_request(
2024-10-09 12:44:50 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50 | File "/usr/local/lib/python3.12/site-packages/httpcore/_async/connection.py", line 99, in handle_async_request
2024-10-09 12:44:50 | raise exc
2024-10-09 12:44:50 | File "/usr/local/lib/python3.12/site-packages/httpcore/_async/connection.py", line 76, in handle_async_request
2024-10-09 12:44:50 | stream = await self._connect(request)
2024-10-09 12:44:50 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50 | File "/usr/local/lib/python3.12/site-packages/httpcore/_async/connection.py", line 122, in _connect
2024-10-09 12:44:50 | stream = await self._network_backend.connect_tcp(**kwargs)
2024-10-09 12:44:50 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50 | File "/usr/local/lib/python3.12/site-packages/httpcore/_backends/auto.py", line 30, in connect_tcp
2024-10-09 12:44:50 | return await self._backend.connect_tcp(
2024-10-09 12:44:50 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50 | File "/usr/local/lib/python3.12/site-packages/httpcore/_backends/anyio.py", line 115, in connect_tcp
2024-10-09 12:44:50 | with map_exceptions(exc_map):
2024-10-09 12:44:50 | ^^^^^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50 | File "/usr/local/lib/python3.12/contextlib.py", line 158, in __exit__
2024-10-09 12:44:50 | self.gen.throw(value)
2024-10-09 12:44:50 | File "/usr/local/lib/python3.12/site-packages/httpcore/_exceptions.py", line 14, in map_exceptions
2024-10-09 12:44:50 | raise to_exc(exc) from exc
2024-10-09 12:44:50 | httpcore.ConnectError: All connection attempts failed
2024-10-09 12:44:50 |
2024-10-09 12:44:50 | The above exception was the direct cause of the following exception:
2024-10-09 12:44:50 |
2024-10-09 12:44:50 | Traceback (most recent call last):
2024-10-09 12:44:50 | File "/usr/local/lib/python3.12/site-packages/prefect/runner/runner.py", line 495, in execute_flow_run
2024-10-09 12:44:50 | flow_run = await self._client.read_flow_run(flow_run_id)
2024-10-09 12:44:50 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50 | File "/usr/local/lib/python3.12/site-packages/prefect/client/orchestration.py", line 2033, in read_flow_run
2024-10-09 12:44:50 | response = await self._client.get(f"/flow_runs/{flow_run_id}")
2024-10-09 12:44:50 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50 | File "/usr/local/lib/python3.12/site-packages/httpx/_client.py", line 1814, in get
2024-10-09 12:44:50 | return await self.request(
2024-10-09 12:44:50 | ^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50 | File "/usr/local/lib/python3.12/site-packages/httpx/_client.py", line 1585, in request
2024-10-09 12:44:50 | return await self.send(request, auth=auth, follow_redirects=follow_redirects)
2024-10-09 12:44:50 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50 | File "/usr/local/lib/python3.12/site-packages/prefect/client/base.py", line 330, in send
2024-10-09 12:44:50 | response = await self._send_with_retry(
2024-10-09 12:44:50 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50 | File "/usr/local/lib/python3.12/site-packages/prefect/client/base.py", line 254, in _send_with_retry
2024-10-09 12:44:50 | response = await send(request, *send_args, **send_kwargs)
2024-10-09 12:44:50 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50 | File "/usr/local/lib/python3.12/site-packages/httpx/_client.py", line 1674, in send
2024-10-09 12:44:50 | response = await self._send_handling_auth(
2024-10-09 12:44:50 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50 | File "/usr/local/lib/python3.12/site-packages/httpx/_client.py", line 1702, in _send_handling_auth
2024-10-09 12:44:50 | response = await self._send_handling_redirects(
2024-10-09 12:44:50 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50 | File "/usr/local/lib/python3.12/site-packages/httpx/_client.py", line 1739, in _send_handling_redirects
2024-10-09 12:44:50 | response = await self._send_single_request(request)
2024-10-09 12:44:50 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50 | File "/usr/local/lib/python3.12/site-packages/httpx/_client.py", line 1776, in _send_single_request
2024-10-09 12:44:50 | response = await transport.handle_async_request(request)
2024-10-09 12:44:50 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50 | File "/usr/local/lib/python3.12/site-packages/httpx/_transports/default.py", line 376, in handle_async_request
2024-10-09 12:44:50 | with map_httpcore_exceptions():
2024-10-09 12:44:50 | ^^^^^^^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50 | File "/usr/local/lib/python3.12/contextlib.py", line 158, in __exit__
2024-10-09 12:44:50 | self.gen.throw(value)
2024-10-09 12:44:50 | File "/usr/local/lib/python3.12/site-packages/httpx/_transports/default.py", line 89, in map_httpcore_exceptions
2024-10-09 12:44:50 | raise mapped_exc(message) from exc
2024-10-09 12:44:50 | httpx.ConnectError: All connection attempts failed
2024-10-09 12:44:50 +------------------------------------
2024-10-09 12:44:50 An exception occurred.
In the log of prefect: Reported flow run '53db904b-e99d-4d05-8e61-cba25875b257' as crashed: Flow run infrastructure exited with non-zero status code 1.
---
Also, The containters that generated from each run did not remove by itself. Thanks you if you can help meMark
10/09/2024, 10:12 AMMark
10/09/2024, 10:13 AM