Hi everyone, hope you are doing well. I have a lit...
# prefect-getting-started
a
Hi everyone, hope you are doing well. I have a little problem and hope that you can help me. I'm deploying this flow:
Copy code
from prefect import flow

from src.db.mongodb_connection import (
    mongodb_fintech_conn,
    mongodb_staging_conn,
)
from src.services.data_pipeline import DataPipeline

pipeline = DataPipeline(
    staging_connection=mongodb_staging_conn,
    fintechdb_connection=mongodb_fintech_conn,
)

@flow
async def run_staging():
    await pipeline.raw_staging()
and this is my prefect.yamlbuild
Copy code
- prefect_docker.deployments.steps.build_docker_image:
    id: build_image
    requires: prefect-docker>=0.3.1
    image_name: prefectdock
    tag: dev
    dockerfile: auto

# pull section allows you to provide instructions for cloning this project in remote locations
pull:
- prefect.deployments.steps.set_working_directory:
    directory: /opt/prefect/fintech_api

# the deployments section allows you to provide configuration for deploying flows
deployments:
- name: raw_storage
  tags: [pipeline]
  description: Fetch and store data from Wifeed API to MongoDB
  schedule:
    - interval: 30
  flow_name: run_staging
  entrypoint: src/services/flow_deployment.py:run_staging
  parameters: {}
  work_pool:
    name: docker-wp
    work_queue_name: primary-queue
    job_variables:
      image: '{{ build_image.image }}'

# Run settings to remove containers after each run
run:
  remove_containers: true
the deployment log is okay:
Copy code
prefect deploy --all
12:43:49.953 | INFO    | Task run '_load_wifeed_urls' - Created task run '_load_wifeed_urls' for task '_load_wifeed_urls'
12:43:49.975 | INFO    | Task run '_load_wifeed_urls' - Finished in state Completed()
? Would you like to configure schedules for this deployment? [y/n] (y): n
Running deployment build steps...
 > Running build_docker_image step...
Step 1/4 : FROM prefecthq/prefect:3.0.4-python3.12
 ---> 23f4d5c00a91
Step 2/4 : COPY . /opt/prefect/fintech_api/
 ---> 2fea9c1149ee
Step 3/4 : WORKDIR /opt/prefect/fintech_api/
 ---> Running in 0f2eb12d6af8
 ---> c3ac3b50b5ea
Step 4/4 : LABEL io.prefect.version=3.0.4
 ---> Running in 7c1849b0f268
 ---> f71d6cdb75a1
Successfully built f71d6cdb75a1

│ Deployment 'run-staging/raw_storage' successfully created with id '7c1f3b71-4338-428b-8c31-6be0d6dd1b67'.                                                                                                                        

View Deployment in UI: <http://127.0.0.1:4200/deployments/deployment/7c1f3b71-4338-428b-8c31-6be0d6dd1b67>

? Would you like to save configuration for this deployment for faster deployments in the future? [y/n]: n

To execute flow runs from this deployment, start a worker in a separate terminal that pulls work from the 'docker-wp' work pool:

        $ prefect worker start --pool 'docker-wp'

To schedule a run for this deployment, use the following command:

        $ prefect deployment run 'run-staging/raw_storage'
But in the run, it say error:
Copy code
2024-10-09 12:44:50     +-+---------------- 1 ----------------
2024-10-09 12:44:50       | Traceback (most recent call last):
2024-10-09 12:44:50       |   File "/usr/local/lib/python3.12/site-packages/httpx/_transports/default.py", line 72, in map_httpcore_exceptions
2024-10-09 12:44:50       |     yield
2024-10-09 12:44:50       |   File "/usr/local/lib/python3.12/site-packages/httpx/_transports/default.py", line 377, in handle_async_request
2024-10-09 12:44:50       |     resp = await self._pool.handle_async_request(req)
2024-10-09 12:44:50       |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50       |   File "/usr/local/lib/python3.12/site-packages/httpcore/_async/connection_pool.py", line 216, in handle_async_request
2024-10-09 12:44:50       |     raise exc from None
2024-10-09 12:44:50       |   File "/usr/local/lib/python3.12/site-packages/httpcore/_async/connection_pool.py", line 196, in handle_async_request
2024-10-09 12:44:50       |     response = await connection.handle_async_request(
2024-10-09 12:44:50       |                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50       |   File "/usr/local/lib/python3.12/site-packages/httpcore/_async/connection.py", line 99, in handle_async_request
2024-10-09 12:44:50       |     raise exc
2024-10-09 12:44:50       |   File "/usr/local/lib/python3.12/site-packages/httpcore/_async/connection.py", line 76, in handle_async_request
2024-10-09 12:44:50       |     stream = await self._connect(request)
2024-10-09 12:44:50       |              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50       |   File "/usr/local/lib/python3.12/site-packages/httpcore/_async/connection.py", line 122, in _connect
2024-10-09 12:44:50       |     stream = await self._network_backend.connect_tcp(**kwargs)
2024-10-09 12:44:50       |              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50       |   File "/usr/local/lib/python3.12/site-packages/httpcore/_backends/auto.py", line 30, in connect_tcp
2024-10-09 12:44:50       |     return await self._backend.connect_tcp(
2024-10-09 12:44:50       |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50       |   File "/usr/local/lib/python3.12/site-packages/httpcore/_backends/anyio.py", line 115, in connect_tcp
2024-10-09 12:44:50       |     with map_exceptions(exc_map):
2024-10-09 12:44:50       |          ^^^^^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50       |   File "/usr/local/lib/python3.12/contextlib.py", line 158, in __exit__
2024-10-09 12:44:50       |     self.gen.throw(value)
2024-10-09 12:44:50       |   File "/usr/local/lib/python3.12/site-packages/httpcore/_exceptions.py", line 14, in map_exceptions
2024-10-09 12:44:50       |     raise to_exc(exc) from exc
2024-10-09 12:44:50       | httpcore.ConnectError: All connection attempts failed
2024-10-09 12:44:50       | 
2024-10-09 12:44:50       | The above exception was the direct cause of the following exception:
2024-10-09 12:44:50       | 
2024-10-09 12:44:50       | Traceback (most recent call last):
2024-10-09 12:44:50       |   File "/usr/local/lib/python3.12/site-packages/prefect/runner/runner.py", line 495, in execute_flow_run
2024-10-09 12:44:50       |     flow_run = await self._client.read_flow_run(flow_run_id)
2024-10-09 12:44:50       |                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50       |   File "/usr/local/lib/python3.12/site-packages/prefect/client/orchestration.py", line 2033, in read_flow_run
2024-10-09 12:44:50       |     response = await self._client.get(f"/flow_runs/{flow_run_id}")
2024-10-09 12:44:50       |                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50       |   File "/usr/local/lib/python3.12/site-packages/httpx/_client.py", line 1814, in get
2024-10-09 12:44:50       |     return await self.request(
2024-10-09 12:44:50       |            ^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50       |   File "/usr/local/lib/python3.12/site-packages/httpx/_client.py", line 1585, in request
2024-10-09 12:44:50       |     return await self.send(request, auth=auth, follow_redirects=follow_redirects)
2024-10-09 12:44:50       |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50       |   File "/usr/local/lib/python3.12/site-packages/prefect/client/base.py", line 330, in send
2024-10-09 12:44:50       |     response = await self._send_with_retry(
2024-10-09 12:44:50       |                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50       |   File "/usr/local/lib/python3.12/site-packages/prefect/client/base.py", line 254, in _send_with_retry
2024-10-09 12:44:50       |     response = await send(request, *send_args, **send_kwargs)
2024-10-09 12:44:50       |                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50       |   File "/usr/local/lib/python3.12/site-packages/httpx/_client.py", line 1674, in send
2024-10-09 12:44:50       |     response = await self._send_handling_auth(
2024-10-09 12:44:50       |                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50       |   File "/usr/local/lib/python3.12/site-packages/httpx/_client.py", line 1702, in _send_handling_auth
2024-10-09 12:44:50       |     response = await self._send_handling_redirects(
2024-10-09 12:44:50       |                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50       |   File "/usr/local/lib/python3.12/site-packages/httpx/_client.py", line 1739, in _send_handling_redirects
2024-10-09 12:44:50       |     response = await self._send_single_request(request)
2024-10-09 12:44:50       |                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50       |   File "/usr/local/lib/python3.12/site-packages/httpx/_client.py", line 1776, in _send_single_request
2024-10-09 12:44:50       |     response = await transport.handle_async_request(request)
2024-10-09 12:44:50       |                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50       |   File "/usr/local/lib/python3.12/site-packages/httpx/_transports/default.py", line 376, in handle_async_request
2024-10-09 12:44:50       |     with map_httpcore_exceptions():
2024-10-09 12:44:50       |          ^^^^^^^^^^^^^^^^^^^^^^^^^
2024-10-09 12:44:50       |   File "/usr/local/lib/python3.12/contextlib.py", line 158, in __exit__
2024-10-09 12:44:50       |     self.gen.throw(value)
2024-10-09 12:44:50       |   File "/usr/local/lib/python3.12/site-packages/httpx/_transports/default.py", line 89, in map_httpcore_exceptions
2024-10-09 12:44:50       |     raise mapped_exc(message) from exc
2024-10-09 12:44:50       | httpx.ConnectError: All connection attempts failed
2024-10-09 12:44:50       +------------------------------------
2024-10-09 12:44:50 An exception occurred.
In the log of prefect: Reported flow run '53db904b-e99d-4d05-8e61-cba25875b257' as crashed: Flow run infrastructure exited with non-zero status code 1. --- Also, The containters that generated from each run did not remove by itself. Thanks you if you can help me
m
Hi, you should check if your workers have the PREFECT_API_URL setup because it seems that the worker cannot connect to the Prefect orchestration server.
Regarding container deletion, this is a setting you can change in the job variables/template. If you open the settings of your work pool you can change this from the UI