Samuel Hinton
03/13/2023, 6:43 AMMatthieu Lhonneux
03/13/2023, 8:08 AMMuhammad Husnain
03/13/2023, 8:43 AMMuhammad Husnain
03/13/2023, 9:22 AMprefect logger
inside a task runner
?
I'm using the RayTaskRunner
and I've defined a custom class using the following
class AnyscaleTaskRunner(RayTaskRunner):
def __init__(self, **init_kwargs: Any):
some code
with Prefect flows and I have some information that is only available within the AnyscaleTaskRunner
class.
Is there any way that I can access the prefect context or the prefect logger in the AnyscaleTaskRunner
class that I created?Samuel Hinton
03/13/2023, 9:56 AMKasia
03/13/2023, 10:52 AMRichard Alexander
03/13/2023, 12:48 PMagents
, work_pools
and storage_blocks
. I have several agents (each polling a different work pool) on the same server trying to poll jobs from the same s3
storage block, but only one of them is working. Can multiple agents/work pools be connected to the same storage block?Robert Esteves
03/13/2023, 3:57 PM#!/usr/bin/env python3
"""
Author: Robert Esteves
Date: 2023-03-12
Purpose: This script is designed to ingest a csv file and return a json object
"""
import csv
import json
from pathlib import Path
from prefect import flow, task
from prefect.deployments import Deployment
@task(name="Get Source Data From Disk Drive")
def get_data(source_file: str) -> json:
"""
get_data
Retrieves the data from flat file and stores the result in a json object.
:param source_file:
:return:
"""
raw_data = []
with open(source_file, 'r') as src_file:
csv_reader = csv.DictReader(src_file, delimiter=',')
for row in csv_reader:
raw_data.append(row)
return raw_data
@task(name="Store the JSON object in the Disk Drive")
def store_results(source_lst: list, tgt_file_name: str, tgt_file_path: str = 'C:/sandbox_target_folder') -> None:
"""
store_data
This function stores a json object in the disk drive.
:param source_lst:
:param tgt_file_name:
:param tgt_file_path:
:return:
"""
p = Path(tgt_file_path)
tgt_file = f"{p.as_posix()}/{tgt_file_name}"
results = json.dumps(source_lst, indent=4)
with open(tgt_file, 'w') as tgt:
tgt.write(results)
# A Flow is a function that contains several tasks
@flow(name="Pipeline Run Demo")
def process_data():
"""
process_data
This function executes the two functions that will get the data from the disk drive
and convert the results to json format.
:return:
"""
data = get_data('C:/sandbox_source_folder/sales.csv')
store_results(data, 'sales.json')
if __name__ == "__main__":
deploy_flow = Deployment.build_from_flow(
flow=process_data,
name="Prefect Tutorial 3 With Deployment",
version=1,
work_queue_name="Robert 1"
)
deploy_flow.apply()
# process_data()
Robert Esteves
03/13/2023, 4:01 PMAlbert Wong
03/13/2023, 5:40 PMExecStart=/home/ubuntu/apps/dataflows/venv/bin/prefect server start --host 0.0.0.0
Receiving the following error. Anyone ever see this and/or knows how to fix this?
File "/home/ubuntu/.pyenv/versions/3.11.2/lib/python3.11/asyncio/base_subprocess.py", line 36, in __init__
self._start(args=args, shell=shell, stdin=stdin, stdout=stdout,
File "/home/ubuntu/.pyenv/versions/3.11.2/lib/python3.11/asyncio/unix_events.py", line 810, in _start
self._proc = subprocess.Popen(
^^^^^^^^^^^^^^^^^
File "/home/ubuntu/.pyenv/versions/3.11.2/lib/python3.11/subprocess.py", line 1024, in __init__
self._execute_child(args, executable, preexec_fn, close_fds,
File "/home/ubuntu/.pyenv/versions/3.11.2/lib/python3.11/subprocess.py", line 1901, in _execute_child
raise child_exception_type(errno_num, err_msg, err_filename)
FileNotFoundError: [Errno 2] No such file or directory: 'uvicorn'
Joshua Gitter
03/13/2023, 6:33 PM""
then it defaults to whatever I had in the deployment. Is there any way to kickoff this run with the str param = ""
without changing the deployment?Timo Sugliani
03/13/2023, 6:35 PMtsugliani-zpodengineagent | Starting v2.8.5 agent connected to <http://zpodengineserver:4200/api>...
tsugliani-zpodengineagent |
tsugliani-zpodengineagent | ___ ___ ___ ___ ___ ___ _____ _ ___ ___ _ _ _____
tsugliani-zpodengineagent | | _ \ _ \ __| __| __/ __|_ _| /_\ / __| __| \| |_ _|
tsugliani-zpodengineagent | | _/ / _|| _|| _| (__ | | / _ \ (_ | _|| .` | | |
tsugliani-zpodengineagent | |_| |_|_\___|_| |___\___| |_| /_/ \_\___|___|_|\_| |_|
tsugliani-zpodengineagent |
tsugliani-zpodengineagent |
tsugliani-zpodengineagent | Agent started! Looking for work from queue(s): default...
tsugliani-zpodengineagent | 17:38:43.676 | INFO | prefect.agent - Submitting flow run '8c51875b-69b7-4847-bc44-cbbabb1b9e58'
tsugliani-zpodengineagent | 17:38:43.678 | INFO | prefect.agent - Submitting flow run 'f4cbd07c-d8ef-4e4f-b501-9f62c1ef5cb9'
tsugliani-zpodengineagent | 17:38:43.679 | INFO | prefect.agent - Submitting flow run '4eafa4b5-024c-46ac-958d-7b5e4abbeaea'
tsugliani-zpodengineagent | 17:38:43.679 | INFO | prefect.agent - Submitting flow run '15c6c370-e807-436f-8c8b-38a2d1b85291'
tsugliani-zpodengineagent | 17:38:43.680 | INFO | prefect.agent - Submitting flow run '1cd6de3e-2404-42f4-a9d6-d9335ef42ab9'
tsugliani-zpodengineagent | 17:38:43.680 | INFO | prefect.agent - Submitting flow run '465e5392-1b1d-412d-830a-8eea270e8f4c'
tsugliani-zpodengineagent | 17:38:43.681 | INFO | prefect.agent - Submitting flow run 'd19d5a9f-5361-480d-9d41-c1b570c09152'
tsugliani-zpodengineagent | 17:38:43.681 | INFO | prefect.agent - Submitting flow run '322dae96-9349-4eea-940e-5d10cd5dedec'
tsugliani-zpodengineagent | 17:38:43.682 | INFO | prefect.agent - Flow run limit reached; 8 flow runs in progress.
tsugliani-zpodengineagent | 17:38:44.217 | ERROR | prefect.agent - Failed to submit flow run '465e5392-1b1d-412d-830a-8eea270e8f4c' to infrastructure.
tsugliani-zpodengineagent | Traceback (most recent call last):
tsugliani-zpodengineagent | File "/usr/local/lib/python3.11/site-packages/prefect/agent.py", line 484, in _submit_run_and_capture_errors
tsugliani-zpodengineagent | result = await infrastructure.run(task_status=task_status)
tsugliani-zpodengineagent | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
tsugliani-zpodengineagent | File "/usr/local/lib/python3.11/site-packages/prefect/infrastructure/docker.py", line 322, in run
tsugliani-zpodengineagent | container = await run_sync_in_worker_thread(self._create_and_start_container)
tsugliani-zpodengineagent | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
tsugliani-zpodengineagent | File "/usr/local/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 91, in run_sync_in_worker_thread
tsugliani-zpodengineagent | return await anyio.to_thread.run_sync(
tsugliani-zpodengineagent | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
tsugliani-zpodengineagent | File "/usr/local/lib/python3.11/site-packages/anyio/to_thread.py", line 31, in run_sync
tsugliani-zpodengineagent | return await get_asynclib().run_sync_in_worker_thread(
tsugliani-zpodengineagent | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
tsugliani-zpodengineagent | File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
tsugliani-zpodengineagent | return await future
tsugliani-zpodengineagent | ^^^^^^^^^^^^
tsugliani-zpodengineagent | File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 867, in run
tsugliani-zpodengineagent | result = context.run(func, *args)
tsugliani-zpodengineagent | ^^^^^^^^^^^^^^^^^^^^^^^^
tsugliani-zpodengineagent | File "/usr/local/lib/python3.11/site-packages/prefect/infrastructure/docker.py", line 425, in _create_and_start_container
tsugliani-zpodengineagent | docker_client = self._get_client()
tsugliani-zpodengineagent | ^^^^^^^^^^^^^^^^^^
tsugliani-zpodengineagent | File "/usr/local/lib/python3.11/site-packages/prefect/infrastructure/docker.py", line 650, in _get_client
tsugliani-zpodengineagent | docker_client = docker.from_env()
tsugliani-zpodengineagent | ^^^^^^^^^^^^^^^
tsugliani-zpodengineagent | AttributeError: module 'docker' has no attribute 'from_env'
tsugliani-zpodengineagent | 17:38:44.281 | INFO | prefect.agent - Completed submission of flow run '465e5392-1b1d-412d-830a-8eea270e8f4c'
Every flow-run is executed using a Docker Container block
.
I did try to limit task concurrency
, work pool concurrency
and also prefect agent --limit
but haven't found a way to understand why this is still happening.
This seems to fail at the beginning for a few flow runs, with the following error
State Message
Submission failed. AttributeError: module 'docker' has no attribute 'from_env'
What I don't get is why is this happening on a few flow runs and not others at the initial launch, I can easily launch those failed flow runs later with the same specified custom parameter where It failed the first time and it will succeed without any issues. (I attached a screenshot of a succeeded one that failed previously)
If anyone could has any suggestions/pointers/insights to help troubleshoot this issue It would help greatly 🙂
Note: I'm using the prefecthq/prefect:2.8.5-python3.11
official docker images for this (server & agent are launched as docker containers and the agent container has a volume /var/run/docker.sock:/var/run/docker.sock
to allow him launching the docker engine blocks from the host environment to avoid "docker in docker" execution (https://jpetazzo.github.io/2015/09/03/do-not-use-docker-in-docker-for-ci/)
PS: I tried searching here and GitHub Issues which do mention sometimes this issue but with nothing really conclusive to me.
• https://github.com/PrefectHQ/prefect/issues/6519scott
03/13/2023, 6:39 PMAndrew
03/13/2023, 8:52 PMJohn Horn
03/13/2023, 8:54 PM2023-03-13 13:48:43 ___ ___ ___ ___ ___ ___ _____ _ ___ ___ _ _ _____
2023-03-13 13:48:43 | _ \ _ \ __| __| __/ __|_ _| /_\ / __| __| \| |_ _|
2023-03-13 13:48:43 | _/ / _|| _|| _| (__ | | / _ \ (_ | _|| .` | | |
2023-03-13 13:48:43 |_| |_|_\___|_| |___\___| |_| /_/ \_\___|___|_|\_| |_|
2023-03-13 13:48:43
2023-03-13 13:48:43
2023-03-13 13:48:43 Agent started! Looking for work from queue(s): kubernetes...
2023-03-13 13:48:43 An exception occurred.
2023-03-13 13:48:43 Traceback (most recent call last):
2023-03-13 13:48:43 File "/usr/local/lib/python3.8/site-packages/prefect/cli/_utilities.py", line 41, in wrapper
2023-03-13 13:48:43 return fn(*args, **kwargs)
2023-03-13 13:48:43 File "/usr/local/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 230, in coroutine_wrapper
2023-03-13 13:48:43 return run_async_in_new_loop(async_fn, *args, **kwargs)
2023-03-13 13:48:43 File "/usr/local/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 181, in run_async_in_new_loop
2023-03-13 13:48:43 return anyio.run(partial(__fn, *args, **kwargs))
2023-03-13 13:48:43 File "/usr/local/lib/python3.8/site-packages/anyio/_core/_eventloop.py", line 70, in run
2023-03-13 13:48:43 return asynclib.run(func, *args, **backend_options)
2023-03-13 13:48:43 File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 292, in run
2023-03-13 13:48:43 return native_run(wrapper(), debug=debug)
2023-03-13 13:48:43 File "/usr/local/lib/python3.8/asyncio/runners.py", line 44, in run
2023-03-13 13:48:43 return loop.run_until_complete(main)
2023-03-13 13:48:43 File "/usr/local/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
2023-03-13 13:48:43 return future.result()
2023-03-13 13:48:43 File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
2023-03-13 13:48:43 return await func(*args)
2023-03-13 13:48:43 File "/usr/local/lib/python3.8/site-packages/prefect/cli/agent.py", line 201, in start
2023-03-13 13:48:43 tg.start_soon(
2023-03-13 13:48:43 File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
2023-03-13 13:48:43 raise exceptions[0]
2023-03-13 13:48:43 File "/usr/local/lib/python3.8/site-packages/prefect/utilities/services.py", line 46, in critical_service_loop
2023-03-13 13:48:43 await workload()
2023-03-13 13:48:43 File "/usr/local/lib/python3.8/site-packages/prefect/agent.py", line 203, in get_and_submit_flow_runs
2023-03-13 13:48:43 async for work_queue in self.get_work_queues():
2023-03-13 13:48:43 File "/usr/local/lib/python3.8/site-packages/prefect/agent.py", line 144, in get_work_queues
2023-03-13 13:48:43 work_queue = await self.client.read_work_queue_by_name(
2023-03-13 13:48:43 File "/usr/local/lib/python3.8/site-packages/prefect/client/orchestration.py", line 853, in read_work_queue_by_name
2023-03-13 13:48:43 return schemas.core.WorkQueue.parse_obj(response.json())
2023-03-13 13:48:43 File "/usr/local/lib/python3.8/site-packages/httpx/_models.py", line 756, in json
2023-03-13 13:48:43 return jsonlib.loads(self.text, **kwargs)
2023-03-13 13:48:43 File "/usr/local/lib/python3.8/json/__init__.py", line 357, in loads
2023-03-13 13:48:43 return _default_decoder.decode(s)
2023-03-13 13:48:43 File "/usr/local/lib/python3.8/json/decoder.py", line 337, in decode
2023-03-13 13:48:43 obj, end = self.raw_decode(s, idx=_w(s, 0).end())
2023-03-13 13:48:43 File "/usr/local/lib/python3.8/json/decoder.py", line 355, in raw_decode
2023-03-13 13:48:43 raise JSONDecodeError("Expecting value", s, err.value) from None
2023-03-13 13:48:43 json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
Dan Cabrol
03/13/2023, 9:28 PMDavid Hlavaty
03/13/2023, 11:16 PM@task
def large_data() -> str:
return "this is very very very large data returned by Prefect task"
@flow
def sub_flow() -> int:
l = large_data()
length = len(l)
# Statement 1: output of task `large_data` is cached in-memory inside `sub_flow` context
return length
@flow
def example():
length = sub_flow()
# Statement 2: output of task `large_data` is NO longer cached in-memory
# only output of `sub_flow` is cached in-memory at this point
Are the two statements in comments correct? More generally:
Once (sub)flow returns, all results cached in-memory (other than the return value(s) of the (sub)flow itself) are released and can be garbage collected?
When inspecting flow run context in debugger, I didn't see any state that would suggest above is not true.
Thanks!Mohammad Kaif Rizvi
03/14/2023, 11:52 AMWojciech Kieliszek
03/14/2023, 12:07 PMstart_time = datetime.utcnow() + timedelta(seconds=retry_after)
raise signals.RETRY(
start_time=start_time,
message=f"Rate limited. Waiting {retry_after}s.",
)
This way I could implement my own retry logic (e.g. not based only on exception class, but exception data and define retry_after
based on the API response).
Is it possible with Prefect v2? I am searching the docs, but couldn’t find if returning or raising states manually is supported. Something like this:
start_time = pendulum.now("UTC").add(seconds=retry_after)
return AwaitingRetry(
scheduled_time=start_time,
message=f"Rate limited. Waiting {retry_after}s.",
)
Joseph Thickpenny Ryan
03/14/2023, 1:47 PMprefect.context.get("scheduled_start_time")
and I'm looking to replicate that in Prefect 2, does anyone know if this is the same as prefect.context.get_run_context().flow_run.dict()["expected_start_time"]
? The data you get back from the context in Prefect 2 has a next_scheduled_start_time
but the scheduled_start_time
from Prefect 1 that the variable name suggests still exists seems to be missing so I'm hoping it's just been replaced by expected_start_time
Andreas
03/14/2023, 1:56 PMKanishk
03/14/2023, 2:15 PMMatthew Scanlon
03/14/2023, 2:18 PMMohammad Kaif Rizvi
03/14/2023, 2:21 PMJon
03/14/2023, 2:41 PMget_task_run_result
occasionally fails to get the task result of a child flow. looking at the timestamps, the task result is available. any sense for what's going on?
Task 'get_task_run_result': Exception encountered during task execution!
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/usr/local/lib/python3.9/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/usr/local/lib/python3.9/site-packages/prefect/tasks/prefect/flow_run.py", line 239, in get_task_run_result
return task_run.get_result()
File "/usr/local/lib/python3.9/site-packages/prefect/backend/task_run.py", line 73, in get_result
raise ValueError("The task result cannot be loaded if it is not finished.")
ValueError: The task result cannot be loaded if it is not finished.
Nick Hoffmann
03/14/2023, 4:27 PMRobert Esteves
03/14/2023, 5:23 PMAaron Guan
03/14/2023, 6:57 PM<http://mysite.org/prefect|mysite.org/prefect>
. Thank you!Robert Kowalski
03/15/2023, 11:33 AMtags
or something similar in @flow
decorator. On the other hand flow decorator has version parameter but it is not available in REST API endpoint. Is it possible to add tags or version in flow decorator and get access for it by REST API ?Wellington Braga
03/15/2023, 1:36 PMPREFECT_API_URL
variable to `0.0.0.0:4200/ap`i , but it's simply ignored. Is there any solution for this?
issue: https://github.com/PrefectHQ/prefect/issues/5648