Carlos Cueto
02/10/2023, 1:54 PMwork queue
or work pool
in what seems to be random. I liked the old way of seeing the work queue because it gave me an idea of the execution environment of said flow. Is this intended?Tanay Kothari
02/12/2023, 3:18 AMTask run '<id>' received abort during orchestration: This run cannot transition to the RUNNING state from the RUNNING state. Task run is in RUNNING state.
prefect.exceptions.MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.
I’m getting this error. I have a Dask cluster on EKS. When I run the prefect flow locally, it connects to the dask cluster and runs it just fine. When I deploy it to the cloud, my prefect agent on Fargate runs the first task in my DAG and then crashes with this error a few seconds later. I’m using Prefect 2.8.0.Alex Turek
02/13/2023, 6:29 PMStéphan Taljaard
02/13/2023, 8:24 PMData Ops
02/14/2023, 2:13 PMJoseph Thickpenny Ryan
02/14/2023, 4:22 PMSidharth Ramalingam
02/15/2023, 3:36 PMJai P
02/15/2023, 4:01 PMJoseph Thickpenny Ryan
02/15/2023, 4:08 PMNikhil Jain
02/15/2023, 8:31 PMLate
or Pending
state for over 10 minutes, but we are seeing this even if the flow completed successfully and in less than 10 minutes.
Prefect flow Late or Pending for over 10 minutes
Flow run build-meta/spectral-anteater entered stateat 2023-02-15T19:39:53.508606+00:00.Completed
Flow run URL: https://app.prefect.cloud/account/adee12e2-25d9-428a-9a77-32d8d7ca2ac7/workspace/28562586-0812-4e92-803f-99ce64384cbd/flow-runs/flow-run/81433951-84a0-4bb1-95e9-22a942912d3a
State message: All states completed.
Thet Naing
02/16/2023, 1:43 AMmap
for tasks. Is there a way to simplify the visualization so that it doesn't show every single task mapping on the UI? It also makes it rather difficult to see and understand what's happening in the flowJP
02/16/2023, 9:06 AMName: {{ work_queue.name }}
Last polled: {{ work_queue.last_polled }}
Late run count: {{ work_queue.late_runs_count }}
URL: {{ work_queue|ui_url }}
But.. why are all but 2 notifications with no values? (see screenshot below).
The two valid notifications are
1. The first one at 21:53 CEST
2. At 22:54 CEST (visible in screenshot)
edit: For the record.. We do not intend to run such trigger condition (it was a mis-configuration). We do not really care being notified every 10 seconds about a queue being healthy 😅 Hence the subject of the notifications in screenshot is mis-leadingkarteek
02/16/2023, 4:05 PMnicholasnet
02/16/2023, 5:54 PMAaron Gonzalez
02/16/2023, 10:53 PMgsutil rsync <s3://some-key/dt=yyyy-mm-dd/>
<gs://some-key/dt=yyyy-mm-dd/>
😢
I am going to give prefect-shell
a try for the first time and want to know if people have had a lot of experience with it?
For my use case I have about 12K different rsync's I am going to need to run and I don't know which of these patterns is preferable:
for src in s3_sources_12k:
dest = f'<gs://some-dest/{src}>'
ShellOperation(
commands=[f"gsutil rsync -r {src} {dest}"],
env=env_var_map,
).run()
or
with ShellOperation(
commands=[
"gsutil rsync -r src1 dest1",
"gsutil rsync -r src2 dest2",
"gsutil rsync -r src3 dest3",
...
"gsutil rsync -r src12k dest12k",
],
env=env_var_map,
) as shell_operation:
shell_process = shell_operation.trigger()
shell_process.wait_for_completion()
shell_output = shell_process.fetch_result()
Thet Naing
02/17/2023, 7:45 PMnull
for all parameters, even when there are defaults set and when we choose Customize Run
to input parameters. Is this a known issue?
This seems to have begun when the latest release of Prefect was pushed, about 3 hours ago.Chris Whatley
02/17/2023, 9:27 PMThet Naing
02/20/2023, 3:38 PMlogging.yml
set with PREFECT_LOGGING_SETTINGS_PATH
.
Does anyone have examples of how this is done for cloud deployments?Adam Gold
02/20/2023, 3:58 PMprefect agent start --pool "$PREFECT_ENV" --work-queue default
.
1. It takes really long for the flow to be submitted. Notice the time here is more than 30 seconds for the task to be created, before even running:
15:25:43.676 | INFO | prefect.agent - Submitting flow run '0dda37d3-87e4-46e2-9266-920e7dae9113'
15:25:44.499 | INFO | prefect.infrastructure.process - Opening process 'congenial-falcon'...
15:25:44.998 | INFO | prefect.agent - Completed submission of flow run '0dda37d3-87e4-46e2-9266-920e7dae9113'
<frozen runpy>:128: RuntimeWarning: 'prefect.engine' found in sys.modules after import of package 'prefect', but prior to execution of 'prefect.engine'; this may result in unpredictable behaviour
15:26:01.483 | INFO | Flow run 'congenial-falcon' - Downloading flow code from storage at '/app'
15:26:16.402 | INFO | Flow run 'congenial-falcon' - Created task run 'return_value-0' for task 'return_value'
2. It downloads the code for every flow, making the pod go out of memory very quickly: Pod ephemeral local storage usage exceeds the total limit
I am probably missing something here, but would love some help 🙏Aaron Gonzalez
02/20/2023, 4:07 PMYou can structure a job as a single task or as multiple, independent tasks (up to 10,000 tasks) that can be executed in parallel. Each task runs one container instance and can be configured to retry in case of failure. Each task is aware of its index, which is stored in theenvironment variable. The overall count of tasks is stored in theCLOUD_RUN_TASK_INDEX
environment variable. If you are processing data in parallel, your code is responsible for determining which task handles which subset of the data.CLOUD_RUN_TASK_COUNT
Emil Ostergaard
02/22/2023, 1:13 PMCompleted
at 2023-02-22T11:07:44.757581+00:00.
Flow ID: x
Flow run ID: y
Flow run URL: z
State message: All states completed.
Using prefect 2.8.1Carlos Cueto
02/22/2023, 2:33 PMTushar Gupta
02/22/2023, 4:05 PMJason Vertrees
02/22/2023, 4:34 PM[{'path': ['set_task_run_states'], 'message': "'NoneType' object has no attribute 'flow_id'", 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
Please help.Stéphan Taljaard
02/22/2023, 6:08 PMAndrew Richards
02/23/2023, 4:28 PM--- Orion logging error ---
Traceback (most recent call last):
File "/root/micromamba/envs/prefect/lib/python3.7/site-packages/prefect/logging/handlers.py", line 151, in send_logs
await client.create_logs(self._pending_logs)
File "/root/micromamba/envs/prefect/lib/python3.7/site-packages/prefect/client/orion.py", line 1843, in create_logs
await <http://self._client.post|self._client.post>(f"/logs/", json=serialized_logs)
File "/root/micromamba/envs/prefect/lib/python3.7/site-packages/httpx/_client.py", line 1855, in post
extensions=extensions,
File "/root/micromamba/envs/prefect/lib/python3.7/site-packages/httpx/_client.py", line 1527, in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
File "/root/micromamba/envs/prefect/lib/python3.7/site-packages/prefect/client/base.py", line 253, in send
response.raise_for_status()
File "/root/micromamba/envs/prefect/lib/python3.7/site-packages/httpx/_models.py", line 736, in raise_for_status
raise HTTPStatusError(message, request=request, response=self)
httpx.HTTPStatusError: Client error '429 Too Many Requests' for url '<https://api.prefect.cloud/api/accounts/><redacted>/workspaces/<redacted>/logs/'
Data Ops
02/23/2023, 9:06 PMData Ops
02/23/2023, 9:07 PMjustabill
02/24/2023, 2:24 PMAaron Gonzalez
02/24/2023, 4:57 PMdeployments.py
I wrote:
def deploy_factory(name:str, flow, param: dict = dict(), cron: str = '') -> Deployment:
kwargs = {
'flow': flow,
'name': name,
'infrastructure': CloudRunJob.load(f"aether-flows-cloud-run-job-{ENV}"),
'work_queue_name': f"{ENV}_aether",
'version': VERSION,
'output': f'{name}.yaml',
'skip_upload': True,
}
if param:
kwargs['parameters'] = param,
if cron:
kwargs['schedule'] = CronSchedule(cron=cron)
return Deployment.build_from_flow(**kwargs)
aether_space_metrics_1h_parquet_rsync = deploy_factory(
name=f'aether_space_metrics_1h_parquet_rsync_{ENV}',
flow=space_metrics_parquet_rsync,
param={'date': 'current', 'agg_window': '1h'},
cron=sched('0 23 * * *')
)
aether_space_metrics_1h_parquet_rsync.apply(work_queue_concurrency=20)
Aaron Gonzalez
02/24/2023, 4:57 PMdeployments.py
I wrote:
def deploy_factory(name:str, flow, param: dict = dict(), cron: str = '') -> Deployment:
kwargs = {
'flow': flow,
'name': name,
'infrastructure': CloudRunJob.load(f"aether-flows-cloud-run-job-{ENV}"),
'work_queue_name': f"{ENV}_aether",
'version': VERSION,
'output': f'{name}.yaml',
'skip_upload': True,
}
if param:
kwargs['parameters'] = param,
if cron:
kwargs['schedule'] = CronSchedule(cron=cron)
return Deployment.build_from_flow(**kwargs)
aether_space_metrics_1h_parquet_rsync = deploy_factory(
name=f'aether_space_metrics_1h_parquet_rsync_{ENV}',
flow=space_metrics_parquet_rsync,
param={'date': 'current', 'agg_window': '1h'},
cron=sched('0 23 * * *')
)
aether_space_metrics_1h_parquet_rsync.apply(work_queue_concurrency=20)
2023-02-01
) and agg_window (1h
), but I'm curious if this is just a UI bug or an actual prod run with default params is going to use what I defined in code 🤔