Paco Ibañez
02/22/2023, 10:29 PMrun_deployment
how can I minimize the time that the agent will take to pick up the new flow run?John Horn
02/22/2023, 11:40 PM# prefect version
23:39:04.743 | DEBUG | prefect.client - Connecting to API at <https://api.prefect.cloud/api/accounts/57e0246d-d752-4737-b29a-192e8f6eb886/workspaces/6d439078-f534-4a0a-89e0-9c4ced644d6a/>
Version: 2.5.0
API version: 0.8.2
Python version: 3.8.16
Git commit: eac37918
Built: Thu, Oct 6, 2022 12:41 PM
OS/Arch: linux/aarch64
Profile: default
Server type: cloud
I am trying to create a deployment that calls a flow with a flow_run_name parameter.
I am guessing that my prefect version needs to be updated however I don't know how to upgrade the prefect cloud version.
My actual local prefect agent is using the latest 2.8.2 but I'm guessing it is prefect cloud that is stuck on 2.5.0 and I see no documentation on how to update that.
@flow(flow_run_name='testing123')
TypeError: flow() got an unexpected keyword argument 'flow_run_name'
Jarvis Stubblefield
02/23/2023, 1:15 AMprefect.exceptions.PrefectHTTPStatusError: Server error '500 Internal Server Error' for url '<https://api.prefect.cloud/api/accounts/5626ffe9-0140-4e88-babc-4a4fc614bb99/workspaces/ee8a533d-2754-420e-87f2-2d6b084984af/flow_runs/>'
Response: {'exception_message': 'Internal Server Error'}
For more information check: <https://httpstatuses.com/500>
Samuel Hinton
02/23/2023, 2:06 AMfrom zoneinfo import ZoneInfo
from dateutil.rrule import MINUTELY, rrule
from prefect import flow
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import RRuleSchedule
london = ZoneInfo("Europe/London")
@flow
def some_flow():
print("whoa")
Deployment.build_from_flow(
flow=some_flow,
schedule=RRuleSchedule.from_rrule(
rrule(
freq=MINUTELY,
interval=10,
dtstart=dt(2020, 1, 1, tzinfo=london),
byhour=range(8, 9),
)
),
)
Naively, I’d expect this to work, but it fails because ZoneInfo doesnt have a name attribute
schedule=RRuleSchedule.from_rrule(
File "/Users/sh/Projects/flows/.venv/lib/python3.9/site-packages/prefect/server/schemas/schedules.py", line 410, in from_rrule
timezone = rrule._dtstart.tzinfo.name
AttributeError: 'zoneinfo.ZoneInfo' object has no attribute 'name'
Theres an easy workaround here (like setting the timezone name explicitly like is done in schedules.py:413, but thought it might be good to get this working with pythons new standard way of handling timezones, especially because the old from datetime import timezone
requires specifying an hour delta which actually doesnt get used by prefect2Samuel Hinton
02/23/2023, 2:26 AMprefect.settings.PREFECT_API_URL
has getters but not setters and I dont want to just go under the hood. Using os.environ
doesnt seem to work, because the env var is set after importing prefect (use case is we have dev and prod versions of prefect and we want to loop over envs and upload a common flow to all of them)Samuel Hinton
02/23/2023, 2:58 AMflows
folder filled with 20 python files, each with, say, 2 flows inside them, if I try to deploy the 20 flows, every single deployment will reupload all 20 files, which is incredibly slow.
Is there a nicer way of doing this, such that I can upload everything just a single time? I see theres a skip_upload
I could use, but it feels a bit of an anti-pattern for me to use a bool flag like “already_uploaded” to toggle the skip_upload flagJenia Varavva
02/23/2023, 3:23 AMWalter Cavinaw
02/23/2023, 5:11 AMDmytro Ponomarchuk
02/23/2023, 8:36 AMUnhealthy
Work Queue?
It just stopped processing any new flows without any reason. And no errors in the Agent log which is running on AWS ECS infrastructureTolga Karahan
02/23/2023, 9:04 AMvalidate=False
.`Samuel Bunce
02/23/2023, 9:15 AMprefect\client\base.py", line 130, in raise_for_status
raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__
prefect.exceptions.PrefectHTTPStatusError: Server error '502 Bad Gateway' for url
'<https://api.prefect.cloud/api/accounts/xxxxx/flow_runs/filter>'
For more information check: <https://httpstatuses.com/502>
Agent stopped!
which we believe is caused by a failure to communicate with the cloud instance. We have a default argument for number of tries, which is set here to 3 https://github.com/PrefectHQ/prefect/blob/42e5a978390575b8bd6ef4c258ba703f7e28fca2/src/prefect/utilities/services.py#L18 - ideally we would like to be able to reconfigure it or set it to infinite so that when our agents cannot communicate with cloud, they don't all collapse - we had to restart everything this morning.
As another point, this obviously generated hundreds of late flows. Why is there no 'select all' function on the UI, or capability to remove all late flows. The best I have been able to do so far is to go through and manually click to remove all late flows. Feel like there should be more functionality for actions on batches of flow runsStephen Lloyd
02/23/2023, 11:32 AMTolga Karahan
02/23/2023, 2:03 PMflapili
02/23/2023, 2:32 PM"data": {
"type": "reference",
"serializer_type": "compressed/json",
"storage_block_id": "e3311389-34e1-491c-aaef-cbcf6e6ea977",
"storage_key": "e4940b1d2ada42a7a1ac77710cae4d18"
},
Sam Garvis
02/23/2023, 3:02 PMSteph Clacksman
02/23/2023, 3:31 PMMarcel
02/23/2023, 3:46 PMKyle Austin
02/23/2023, 3:46 PM10:44:28.065 | WARNING | Task run 'email_send_message-357' - Task run '52fa4b9c-deb0-407c-a992-ccde4685dfcd' received abort during orchestration: The enclosing flow must be running to begin task execution. Task run is in PENDING state.
I almost always I am getting the following error message
prefect.exceptions.MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.
I am setting
persist_result=True
in all my task decorators too. Plus I have set concurrency limits tags so none of these tasks have more than 30 running at once. But all 5k tasks are still being submitted and created all at once!
Here is kinda how the code looks like in the flow now
humana_smtp = EmailServerCredentials.load("some-smtp-server")
for email in emails:
email_send_message.submit(
subject=email.subject,
msg=email.rendered_html_template,
email_server_credentials=humana_smtp,
email_from=email.email_from,
email_to=<http://email.to|email.to>,
email_to_cc=<http://email.cc|email.cc>,
email_to_bcc=email.bcc,
attachments=email.attachments,
images=email.images,
dry_run=dry_run,
)
I have done something like this to prevent it from submitting all 5k at once and throttle it down to working with 50 at a time
email_chunks_for_sending = chunkify(emails, 50)
humana_smtp = EmailServerCredentials.load("some-smtp-server")
for chunk in email_chunks_for_sending:
wait_for_complete_object = []
for email in chunk:
sent = email_send_message.submit(
subject=email.subject,
msg=email.rendered_html_template,
email_server_credentials=humana_smtp,
email_from=email.email_from,
email_to=<http://email.to|email.to>,
email_to_cc=<http://email.cc|email.cc>,
email_to_bcc=email.bcc,
attachments=email.attachments,
images=email.images,
dry_run=dry_run,
)
wait_for_complete_object.append(sent)
[future.result() for future in wait_for_complete_object]
here chunkify I stole from another post on slack looks like
def chunkify(xs, size):
return (xs[pos : pos + size] for pos in range(0, len(xs), size))
Is there a way to set a limit to the number of tasks that are submitted to the task runner at a given time? Task concurrency didnt do the trick for me -- it only prevented the number of tasks running at given time.Jean-Michel Provencher
02/23/2023, 4:21 PMLeon Kozlowski
02/23/2023, 5:42 PMJacob Bedard
02/23/2023, 6:09 PMJacob Bedard
02/23/2023, 6:15 PMAric Huang
02/23/2023, 7:15 PMGhislain Picard
02/23/2023, 7:42 PMBianca Hoch
02/23/2023, 8:16 PMBianca Hoch
02/23/2023, 9:19 PMBilly McMonagle
02/23/2023, 9:59 PMKubernetesJob
infrastructure.Tomás Emilio Silva Ebensperger
02/23/2023, 11:00 PMPrefect 2.0
Is it possible to run a deployment from one server and have that flow picked up by an agent that is running on a different server?Zeeshan Nazar
02/23/2023, 11:32 PM#!/bin/bash
eval "$(conda shell.bash hook)"
conda activate ETL
prefect server start
And created this systemd service:
[Unit]
Description=Prefect
[Service]
Type=simple
ExecStart=/home/zeephremia/files_required/initate.sh
User=root
Group=root
[Install]
WantedBy=multi-user.target
Whenever I run the shell script locally, it works but whenever the systemd service triggers it, it throws this error:
prefect_service.service - Prefect
Loaded: loaded (/etc/systemd/system/prefect_service.service; enabled; vendor preset: enabled)
Active: inactive (dead) since Thu 2023-02-23 23:29:43 UTC; 3s ago
Process: 2233 ExecStart=/home/zeephremia/files_required/initate.sh (code=exited, status=0/SUCCESS)
Main PID: 2233 (code=exited, status=0/SUCCESS)
Feb 23 23:29:43 linux-server systemd[1]: Started Prefect.
Feb 23 23:29:43 linux-server initate.sh[2233]: Starting the script...
Feb 23 23:29:43 linux-server initate.sh[2234]: /home/zeephremia/files_required/initate.sh: line 5: conda: command not found
Feb 23 23:29:43 linux-server initate.sh[2235]: /home/zeephremia/files_required/initate.sh: line 6: conda: command not found
Feb 23 23:29:43 linux-server initate.sh[2237]: /home/zeephremia/files_required/initate.sh: line 7: prefect: command not found
Feb 23 23:29:43 linux-server initate.sh[2233]: Script completed.
Feb 23 23:29:43 linux-server systemd[1]: prefect_service.service: Succeeded.
Would really appreciate any help, thank you! 🙂muhammad
02/24/2023, 12:05 AMn
successive flow failures of a deployment within a specified
time frame or without any time frame. Don't need alerts for every flow failure.
I went through the documentation here and it's not super clear.
<https://docs.prefect.io/ui/automations/#triggers>
Has anyone else tried doing something like this before and could offer some advice or tips? Thanks!muhammad
02/24/2023, 12:05 AMn
successive flow failures of a deployment within a specified
time frame or without any time frame. Don't need alerts for every flow failure.
I went through the documentation here and it's not super clear.
<https://docs.prefect.io/ui/automations/#triggers>
Has anyone else tried doing something like this before and could offer some advice or tips? Thanks!Andrew Huang
02/24/2023, 12:36 AM