Slackbot
10/28/2022, 7:34 PMLuca Schneider
10/28/2022, 8:06 PMKirill Popov
10/28/2022, 8:22 PM19:41:45.583 | INFO | prefect.agent - Submitting flow run 'd03564b9-ad4e-4310-8662-184616d6406f'
19:41:47.416 | INFO | prefect.agent - Completed submission of flow run 'd03564b9-ad4e-4310-8662-184616d6406f'
19:41:48.471 | INFO | prefect.infrastructure.kubernetes-job - Job 'expert-lyrebird-xjq42': Pod has status 'Pending'.
19:41:49.155 | INFO | prefect.infrastructure.kubernetes-job - Job 'expert-lyrebird-xjq42': Pod has status 'Running'.
19:41:52.098 | INFO | Flow run 'expert-lyrebird' - Created task run 'My Example Task-c06c9343-0' for task 'My Example Task'
19:41:52.098 | INFO | Flow run 'expert-lyrebird' - Executing 'My Example Task-c06c9343-0' immediately...
19:41:52.345 | INFO | Task run 'My Example Task-c06c9343-0' - Finished in state Completed()
19:41:52.443 | INFO | Flow run 'expert-lyrebird' - Finished in state Completed()
When running the agent locally but in docker container, the logs look similar.
Now I want the agent to run in a kubernetes deployment to automatically manage its uptime. Only 1 replica of agent pod is used.
However I only receive part of the log in the agent cli output -- it looks like no info is coming back from the worker pods back to the agent pod...
19:59:58.687 | INFO | prefect.agent - Submitting flow run 'a510b13d-b0b5-4067-8610-518e6a2b45a8'
19:59:59.268 | INFO | prefect.agent - Completed submission of flow run 'a510b13d-b0b5-4067-8610-518e6a2b45a8'
19:59:59.305 | INFO | prefect.infrastructure.kubernetes-job - Job 'secret-iguana-zlm4f': Pod has status 'Pending'.
20:00:01.349 | INFO | prefect.infrastructure.kubernetes-job - Job 'secret-iguana-zlm4f': Pod has status 'Running'.
What is going on ? how can I make the agent running on K8S produce a complete log ?
let me know if you have any ideas -- I am fairly new to k8s so might be something basic, but looks unexpected to meYD
10/28/2022, 11:31 PMnohup prefect agent start -q … > ~/tmp/prefect_agent.log &
or is there a different way to keep the agent running?
is there best practice around this in the docs? I did not find it so far
ThanksAhmed Ezzat
10/29/2022, 12:23 AMSander
10/29/2022, 8:22 PMTyson Chavarie
10/29/2022, 10:08 PMTyson Chavarie
10/29/2022, 10:23 PMTyson Chavarie
10/29/2022, 10:27 PMFailed to load and execute flow run: ClientError([{'path': ['secret_value'], 'message': 'Unable to complete operation', 'extensions': {'code': 'API_ERROR'}}])
we are hard down...pk13055
10/30/2022, 10:29 AMorion
+ agent
) on a remote server. However, I get an error message in the orion logs after changing PREFECT_ORION_API_HOST
from 0.0.0.0
to SERVER_IP
. The particular error is:
ERROR: [Errno 99] error while attempting to bind on address ('SERVER_IP', 4200): cannot assign requested address
This does not make sense, since I can specifically access the same SERVER_IP
through the browser and the particular port has been allowed through the firewall as well.
(PS - orion appears to connect for a brief second, and then immediately throws this error and attempts to restart).
Additionally, attached below is the relevant section of my `docker-compose.yml`:
version: "3.9"
services:
db:
image: timescale/timescaledb:latest-pg14
volumes:
- $PWD/data/db:/var/lib/postgresql/data
- $PWD/config/db:/docker-entrypoint-initdb.d/
healthcheck:
test: [ "CMD-SHELL", "pg_isready" ]
interval: 10s
timeout: 5s
retries: 5
environment:
- POSTGRES_DB=$POSTGRES_DB
- POSTGRES_USER=$POSTGRES_USER
- POSTGRES_PASSWORD=$POSTGRES_PASSWORD
networks:
- db_network
orion:
image: prefecthq/prefect:2.6.3-python3.10
restart: always
volumes:
- $PWD/prefect:/root/.prefect
entrypoint: [ "prefect", "orion", "start" ]
environment:
- PREFECT_ORION_API_HOST=$SERVER_IP
- PREFECT_ORION_DATABASE_CONNECTION_URL=$PREFECT_DB_URL
ports:
- 4200:4200
depends_on:
- db
networks:
- prefect_network
- db_network
agent:
build: ./prefect
restart: always
entrypoint: [ "prefect", "agent", "start", "-q", "main_queue" ]
environment:
- PREFECT_API_URL=<http://orion:4200/api>
networks:
- prefect_network
- db_network
cli:
build: ./prefect
entrypoint: "bash"
working_dir: "/root/flows"
volumes:
- "$PWD/prefect/flows:/root/flows"
environment:
- PREFECT_API_URL=<http://orion:4200/api>
networks:
- prefect_network
- db_network
networks:
db_network:
prefect_network:
Rabea Yousof
10/30/2022, 10:44 AMDzmitry Aliashkevich
10/30/2022, 2:32 PMStephen Herron
10/30/2022, 4:51 PMAdam
10/30/2022, 9:55 PMmerlin
10/30/2022, 9:59 PM# i've left out all the imports and task definitions
# trino_flows.py
@flow(name="extract write")
def extract_write(config):
logger = get_run_logger()
<http://logger.info|logger.info>(f"extract file: {config.filepath}")
sql = load_sqlfile(config.filepath)
trino_cnxn = trino_connect()
data = send_query(trino_cnxn, sql)
write_output(data, config.outfile_path)
# file: extract_write.py
filepath = Path(sys.argv[1])
extract_config = ExtractConfig(filepath=filepath)
with tags(extract_config.dataset_name, "extract"):
extract_write(extract_config)
In development I'm calling the script with:
python src/extract_write.py src/extracts/weekly_date.sql
So the ExtractConfig object creates a dataset_name, rundate, and filepath field used by the flow code.
How do I build/apply a deployment when I'm passing an object to the flow function in my script?Deepanshu Aggarwal
10/31/2022, 6:05 AM{
"state": {
"type": "SCHEDULED",
"name": "string",
"message": "Run started",
"data": "string",
"state_details": {
"flow_run_id": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
"task_run_id": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
"child_flow_run_id": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
"scheduled_time": "2022-10-31T06:00:35.234Z",
"cache_key": "string",
"cache_expiration": "2022-10-31T06:00:35.234Z",
"untrackable_result": false
},
"timestamp": "2022-10-31T06:00:35.234Z",
"id": "3fa85f64-5717-4562-b3fc-2c963f66afa6"
},
"name": "my-flow-run",
"parameters": {},
"context": {
"my_var": "my_val"
},
"infrastructure_document_id": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
"empirical_policy": {
"retries": 0,
"retry_delay": 0
},
"tags": [
"tag-1",
"tag-2"
],
"idempotency_key": "string",
"parent_task_run_id": "3fa85f64-5717-4562-b3fc-2c963f66afa6"
}
iKeepo w
10/31/2022, 6:24 AMStephen Lloyd
10/31/2022, 8:27 AMAlejandro
10/31/2022, 10:52 AMimport random
from collections import namedtuple
from datetime import date, datetime, time
import pandas as pd
from prefect import flow, task
WeatherConditions = namedtuple(
"WeatherConditions", ["wind_speed", "temperature", "rel_humidity"]
)
@task
def register_current_weather() -> WeatherConditions:
return WeatherConditions(
wind_speed=random.weibullvariate(3, 1.5),
temperature=random.uniform(-5, 25),
rel_humidity=random.uniform(0, 100),
)
@task
def upload_to_database(station_data: pd.DataFrame) -> None:
print("Updating weather database with the following data:")
print(station_data)
print("Observations were successfully recorded")
@flow
def surface_station_daily_weather(station: str, freq: str = "H") -> pd.DataFrame:
print(f"Daily weather observations for station {station.title()!r}")
timestamps = pd.date_range(
start=date.today(), end=datetime.combine(datetime.now(), time.max), freq=freq
)
observations = [register_current_weather() for _ in range(len(timestamps))]
return pd.DataFrame(data=observations, index=timestamps)
@flow
def weather_app(station_names: list[str]) -> None:
print("Welcome to the world's fastest weather data collection application!")
for station in station_names:
station_weather = surface_station_daily_weather(station=station, freq="3H")
upload_to_database(station_data=station_weather)
print(
"Daily observations have been updated for all operational stations. See you soon!"
)
if __name__ == "__main__":
STATIONS = [
"bilbao_station",
"oviedo_station",
"salamanca_station",
"badajoz_station",
]
weather_app(station_names=STATIONS)
I was wondering what is the recommended way to run subflows in parallel (not concurrently). In this case, the subflow surface_station_daily_weather
is executed sequentially (as far as I know there is no way to use the submit
mechanism with a flow). Is it advisable to use the multiprocessing library for this purpose? Or is there any built-in functionality for it?Stephen Herron
10/31/2022, 12:33 PMAdam
10/31/2022, 12:50 PMVadym Dytyniak
10/31/2022, 12:51 PMStéphan Taljaard
10/31/2022, 1:20 PMprefect.context
, notably the flow_name
, flow_run_name
, and flow_run_id
. Is there a way to pass the entire flow run context to a task? Or, is there a way to access the flow run context from within a task?
If I uncomment the middle line in my flow function (see in the thread), the flow run seems to become unresponsive (I guess it's because it's waiting for "this flow run"'s state, but the flow is still running...)Carlo
10/31/2022, 2:22 PMrun_deployment
w/ SequentialTaskRunner
. However when the first in a chain failed, it didn't block the remaining run_deployments
. In fact, they ran and the parent completed. How do I ensure the dependencies are honored? Flow definition in threadOscar Björhn
10/31/2022, 3:11 PMRahul Kadam
10/31/2022, 3:19 PMXavier Babu
10/31/2022, 3:37 PMTim Enders
10/31/2022, 3:45 PMTim Enders
10/31/2022, 3:59 PMTraceback (most recent call last):
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/engine.py", line 1334, in report_task_run_crashes
yield
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/engine.py", line 1070, in begin_task_run
connect_error = await client.api_healthcheck()
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/client/orion.py", line 204, in api_healthcheck
await self._client.get("/health")
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/httpx/_client.py", line 1751, in get
return await self.request(
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/httpx/_client.py", line 1527, in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/client/base.py", line 159, in send
await super().send(*args, **kwargs)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/httpx/_client.py", line 1614, in send
response = await self._send_handling_auth(
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/httpx/_client.py", line 1642, in _send_handling_auth
response = await self._send_handling_redirects(
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/httpx/_client.py", line 1679, in _send_handling_redirects
response = await self._send_single_request(request)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/httpx/_client.py", line 1716, in _send_single_request
response = await transport.handle_async_request(request)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/httpx/_transports/default.py", line 353, in handle_async_request
resp = await self._pool.handle_async_request(req)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 252, in handle_async_request
await self.response_closed(status)
asyncio.exceptions.CancelledError
10:56:53.688 | ERROR | Task run 'Get-Items-d8ed86f1-2473' - Crash detected! Execution was cancelled by the runtime environment.
10:56:53.688 | DEBUG | Task run 'Get-Items-d8ed86f1-2473' - Crash details:
Traceback (most recent call last):
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/anyio/_core/_synchronization.py", line 314, in acquire
self.acquire_nowait()
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/anyio/_core/_synchronization.py", line 342, in acquire_nowait
raise WouldBlock
anyio.WouldBlock
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 237, in handle_async_request
response = await connection.handle_async_request(request)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/httpcore/_async/connection.py", line 90, in handle_async_request
return await self._connection.handle_async_request(request)
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/httpcore/_async/http2.py", line 96, in handle_async_request
await self._max_streams_semaphore.acquire()
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/httpcore/_synchronization.py", line 46, in acquire
await self._semaphore.acquire()
File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/anyio/_core/_synchronization.py", line 319, in acquire
await event.wait()
File "/usr/lib/python3.10/asyncio/locks.py", line 214, in wait
await fut
asyncio.exceptions.CancelledError
Nic
10/31/2022, 4:28 PM--params='{"question": "ultimate", "answer": 42}'
Returns
+- Error ---------------------------------------------------------------------+
| Got unexpected extra arguments (ultimate, answer: 42}') |
+-----------------------------------------------------------------------------+
The --param is working, but I've tried many different combinations of --params without success.
Is it possible to provide a working example or see if anything has changed since it's not working?Nic
10/31/2022, 4:28 PM--params='{"question": "ultimate", "answer": 42}'
Returns
+- Error ---------------------------------------------------------------------+
| Got unexpected extra arguments (ultimate, answer: 42}') |
+-----------------------------------------------------------------------------+
The --param is working, but I've tried many different combinations of --params without success.
Is it possible to provide a working example or see if anything has changed since it's not working?Jeff Hale
10/31/2022, 4:57 PMprefect version
show?Nic
10/31/2022, 5:02 PMVersion: 2.6.4
API version: 0.8.2
Python version: 3.10.6
Git commit: 51e92dda
Built: Thu, Oct 20, 2022 3:11 PM
OS/Arch: win32/AMD64
Profile: default
Server type: cloud
Jeff Hale
10/31/2022, 5:28 PMNic
10/31/2022, 5:51 PMJeff Hale
10/31/2022, 8:47 PMparam
flag works. Does that work for you to input your params?
Alternatively, the Python deployment method might be an option.Nic
10/31/2022, 9:01 PMJeff Hale
10/31/2022, 9:17 PM