Srul Pinkas
12/09/2024, 11:06 AMrun_deployment
in order to allocate separate resources). this creates dozens sub-flows but the parent-job trying to keep updated and manage it crashes due to connection-limit reached error (an internal httpx
crash). I asked marvin about configuring it on prefect and didn't see any immediate solution, perhaps more than 4-5 running in parallel is too much for prefect? am i missing something?
- create many sub-flows but run it through a managed concurrency-limit queue (say, 3 at each given time - once one is done another kicks in). since it's a flow and not a task i had to go through work-pools /work-queue limits configuration. the thing is that since it takes some time to get the actual machine (mostly for the first batch from cluster), during that time it seems there's an error for the sub-flow yet to be executed (-1 signal until later it succeeds). and due to that temp-failure, other sub-flows also start to run and i'm missing my 3-limit (so 6-7 are running in parallel) which is odd as it seems like a common glitch. I looked for a parameter that controls how much time to wait for the first sub-flow-submit failure to avoid this race-edge-case but the only parameter (job_watch_timeout_seconds
) that the docs were promising about, was also described as: "Number of seconds to wait for each event emitted by a job before timing out" - so raising this to 1-2 minutes seems too important/risky.
- The third fallback option i use is creating less but chubbier jobs - each running serially 10 different inputs - and running them in parallel. but that seems like a bad solution for such a platform that should take care of that..
Did i miss any simple way out of this?
Thanks!Bianca Hoch
12/09/2024, 7:04 PM- simply trigger each sub-flow (throughHi Srul! This option makes sense to me. Do you have an example of the error that the parent throws?in order to allocate separate resources). this creates dozens sub-flows but the parent-job trying to keep updated and manage it crashes due to connection-limit reached error (an internalrun_deployment
crash). I asked marvin about configuring it on prefect and didn't see any immediate solution, perhaps more than 4-5 running in parallel is too much for prefect? am i missing something?httpx
Bianca Hoch
12/09/2024, 7:05 PMSrul Pinkas
12/10/2024, 1:45 PMSrul Pinkas
02/07/2025, 3:36 PMrun_deployment
call? if that timeout is reached while running, is it really crashing or does the job keep on running and once succeeds it's all good?Marvin
02/07/2025, 3:36 PMrun_deployment
call in Prefect 2.20. However, generally in Prefect, if a timeout is reached during a flow or task execution, the execution is typically halted. The behavior might vary depending on how the timeout is configured and handled within the flow or task logic itself.
To ensure proper handling of timeouts, you might need to explicitly set and manage them in your flow or task configurations. If a task or flow is set to timeout and it does, the execution would stop unless you've implemented logic to handle such scenarios.
You can explore more about timeouts and deployment configurations in the Prefect documentation: Prefect Deployments Guide.
If you have specific configurations or setups, you might want to check those to confirm the behavior.Srul Pinkas
02/08/2025, 11:31 AMrun_deployment
job, i need to use a work_queue_name
yaml definition that has concurrency limit? is this enough? or do i also need to make sure the work_pool --> name
also represents a concurrent-limited entity? (prefect 2.20)Marvin
02/08/2025, 11:31 AMrun_deployment
job in Prefect 2.20, you need to configure both the work queue and work pool appropriately:
1. Work Queue: You should define a work_queue_name
in your YAML configuration that has a concurrency limit set. This allows you to control how many tasks or flows can run concurrently within that specific queue.
2. Work Pool: Additionally, you need to ensure that the work_pool
itself has a concurrency limit configured. The work pool's concurrency limit acts as an overarching control, ensuring that the total number of concurrent runs across all queues within the pool does not exceed the specified limit.
In summary, both the work queue and the work pool need to have concurrency limits if you want to manage concurrency effectively across your deployments. This setup provides fine-grained control over the execution of runs.
For more detailed instructions, you might want to refer to the Prefect documentation on work pools and queues.Srul Pinkas
02/11/2025, 1:02 PMtask
wrapped functions, how do i control the concurrency to not go above 5 at the same time?Marvin
02/11/2025, 1:03 PM"limited_concurrency"
.
2. Set Concurrency Limit: Use Prefect's task concurrency limits to specify the maximum number of concurrent task runs for tasks with the given tag. You can set this limit to 5.
Here's a high-level example of how you might implement this:
python
from prefect import flow, task
from prefect.tasks import task_input_hash
@task(tags=["limited_concurrency"])
def my_task():
# Task logic here
pass
@flow
def my_flow():
for _ in range(10):
my_task()
if __name__ == "__main__":
my_flow()
To enforce the concurrency limit, you would configure it using the Prefect CLI or UI:
bash
prefect concurrency-limit create --tag limited_concurrency --limit 5
This setup ensures that no more than 5 tasks with the "limited_concurrency"
tag run at the same time. Further details can be found in the Prefect Tasks Documentation.Srul Pinkas
02/11/2025, 1:03 PM@task
decorator or when i do the actual submit?Marvin
02/11/2025, 1:04 PM@task
decorator or when submitting tasks. Concurrency limits are managed via task tags and configured outside of the code, typically using the Prefect CLI or UI.
Here's a concise approach to manage concurrency:
1. Tag your tasks: Use the tags
parameter in the @task
decorator to assign a specific tag to tasks you want to control. For example:
python
from prefect import task, flow
@task(tags=["limited_concurrency"])
def my_task():
# Task logic here
pass
2. Set concurrency limits using the CLI: Use the Prefect CLI to set a concurrency limit on the tag:
bash
prefect concurrency-limit create --tag limited_concurrency --limit 5
This approach ensures that no more than 5 tasks with the "limited_concurrency"
tag run concurrently. Unfortunately, setting concurrency limits directly in code is not supported, and you'll need to use external configuration for this purpose.Srul Pinkas
02/13/2025, 12:36 PMMarvin
02/13/2025, 12:37 PMSlackCredentials
block. Here's how you can load the stored Slack credentials:
1. Load the Slack Credentials Block:
python
from prefect_slack import SlackCredentials
# Replace "BLOCK_NAME" with the name of your block
slack_credentials_block = SlackCredentials.load("BLOCK_NAME")
2. Get the Slack Client:
Once you've loaded the block, you can get a Slack client to interact with Slack:
python
client = slack_credentials_block.get_client()
3. Webhook URL:
If you need to set up a webhook, you can refer to Prefect's webhook documentation for managing webhook URLs. Prefect Cloud assigns a webhook URL when you create a webhook, and it looks something like <https://api.prefect.cloud/hooks/your-webhook-id>
.
For more details, you can refer to the Slack Credentials documentation.
If you need to create a webhook, you can find more information in the webhook guide.
Let me know if you need further assistance!Srul Pinkas
02/19/2025, 3:00 PMlocalhost
instead of a link - where can i configure it to be a proper link?Marvin
02/19/2025, 3:00 PMlocalhost
, you should look into how the notification URLs are being templated. Prefect 3.x allows you to use templated variables for dynamic content in notifications. Specifically, you can use the {{ flow_run|ui_url }}
token to include the correct URL to view the flow run in the Prefect UI.
Here are some steps you can take:
1. Check your Automation Configuration: Make sure your automation configuration is set up correctly to use the ui_url
token. This will ensure that the correct URL is included in your notifications.
2. Use Jinja Templating: Prefect supports Jinja templating, which allows you to dynamically include details such as the flow or pool name. For example, you can use {{ flow_run|ui_url }}
to insert the URL to the flow run in your Slack message.
3. Update Notification Settings: Ensure that your notification settings in the Prefect configuration or your deployment YAML are properly utilizing these tokens.
For more details, you can refer to the Prefect documentation on webhook triggers and automation triggers.
If the issue persists, make sure there are no misconfigurations in your deployment or webhook settings that might be defaulting to localhost
.
If you need further assistance with specific configurations, feel free to ask!Srul Pinkas
02/19/2025, 3:00 PMMarvin
02/19/2025, 3:01 PMlocalhost
, it's likely due to how the URLs are being generated or configured within your setup. Here are some steps and resources to help you configure it correctly:
1. Slack Webhook Configuration: Ensure that your Slack webhook is correctly set up and that you are using the right URL for notifications. You can find more about configuring Slack webhooks in the Prefect Slack integration documentation.
2. Notification Settings: Check your notification settings in Prefect. You can configure notifications to trigger on specific flow run states and specify the Slack webhook URL. More details can be found in the Prefect notifications guide.
3. Flow Run URLs: If the flow run URLs are pointing to localhost
, ensure that your Prefect server or agent is correctly configured to use the appropriate hostname or IP address. This might involve setting environment variables or adjusting your Prefect server configuration.
4. Use of Tokens: Make sure that you are using the correct tokens or placeholders in your notification messages to dynamically generate the correct URLs.
If the issue is due to a bug or a misconfiguration, reviewing your Prefect server and deployment settings might be necessary. If you need more specific guidance, feel free to provide additional details about your setup!Srul Pinkas
02/19/2025, 3:03 PMMarvin
02/19/2025, 3:03 PMlocalhost
, you can adjust the following environment variables:
1. PREFECT_API_URL
: This environment variable should be set to the external URL of your Prefect server's API. For example, if your server is accessible at <https://your-domain.com>
, you would set:
bash
export PREFECT_API_URL="<https://your-domain.com/api>"
2. PREFECT_UI_API_URL
: This is specifically for the UI. It should be set to the external proxy URL. If not set, it defaults to PREFECT_API_URL
. To ensure proper configuration, set it explicitly:
bash
export PREFECT_UI_API_URL="<https://your-domain.com/api>"
These configurations will ensure that links generated in notifications point to the correct domain rather than localhost
.
For more detailed guidance on configuring your Prefect server, you can refer to the Prefect hosting guide and the settings guide. Adjust your server configuration and restart your server to apply these changes.Srul Pinkas
02/20/2025, 5:16 PMwork_pool = await client.read_work_pool(deploy_config["work_pool"])
line inside prefect's internal code, from our CI. The pool name from the yaml are configured on server. Can you think why this would happen? i think this might be the first code-line to interact with the clientMarvin
02/20/2025, 5:16 PMread_work_pool
line taking a long time and eventually crashing might be related to a few potential causes:
1. Work Pool Configuration: Ensure that your work pool is properly configured and that there are active workers connected. A work pool needs at least one online worker sending heartbeats to be considered ready to execute work. You can manage work pools using the Prefect CLI.
2. Network or API Issues: There might be network issues or API endpoint misconfigurations causing the delay. Verify that your Prefect server is accessible and that the API URL is correctly set. You can set the PREFECT_API_URL
environment variable to ensure the client connects to the right endpoint.
3. Timeouts and Retries: Consider configuring timeouts and retries for your client connections if network instability is an issue. This can help handle transient failures more gracefully.
4. Version Mismatch or Bugs: Ensure that your Prefect server and client versions are compatible. Check the Prefect GitHub issues for any known bugs related to read_work_pool
or similar functions that might affect version 2.20.
For more details on managing work pools, you can refer to the work pools documentation. If the problem persists, you might want to look into logs for more specific error messages or consider reaching out to the Prefect community for support.Srul Pinkas
02/24/2025, 3:15 PM0 0 * * 3
means midnight wednesday once a week? @MarvinMarvin
02/24/2025, 3:15 PMmin/hour/day/month/weekday
. Therefore, the expression 0 0 * * 3
indeed schedules a task to run at midnight (00:00) every Wednesday.
For more details on scheduling in Prefect, you can refer to the Prefect 2.x scheduling documentation.
If you have any more questions about scheduling or anything else, feel free to ask.Srul Pinkas
02/26/2025, 10:08 AMday_or
meaning in the yaml?Marvin
02/26/2025, 10:08 AMday_or
parameter in the YAML schedule configuration specifies how the day of the month and the day of the week entries in a cron expression are interpreted.
- If day_or
is set to True
, the schedule will run if either the day of the month or the day of the week condition is satisfied. This means the conditions act as an OR operation.
- If day_or
is set to False
, both the day of the month and the day of the week conditions must be satisfied for the schedule to run, acting as an AND operation.
This behavior allows for more flexible scheduling options, depending on whether you want the task to run on specific days of the week, specific days of the month, or both.
For more details, you can refer to the Prefect 2.x documentation on scheduling.Srul Pinkas
02/27/2025, 12:54 PMMarvin
02/27/2025, 12:55 PMSrul Pinkas
02/28/2025, 11:13 AMMarvin
02/28/2025, 11:13 AMSrul Pinkas
02/28/2025, 11:14 AMMarvin
02/28/2025, 11:14 AMSrul Pinkas
03/03/2025, 11:26 AMMarvin
03/03/2025, 11:26 AMprefect.context
to access certain runtime information, though tags specifically might not be directly accessible this way.
If you are looking for a more concrete solution, you can check the Prefect documentation on flows for any updates or detailed methods to access flow metadata programmatically.
If you have further questions or need more specific guidance, feel free to ask!Srul Pinkas
03/03/2025, 3:35 PMdeployment_id
- from which i can probably get the deployment tags (from the yaml), right? can i get a snippet to extract the deployment tags?
also, since i'm sometimes running locally - can i do it safe not to crash in case i don't have any deployment and just running stuff in local python no server?Marvin
03/03/2025, 3:35 PMdeployment_id
from the prefect.runtime.flow_run
module. If you're running the flow as part of a deployment, you can use this deployment_id
to fetch deployment details and tags. Here's a snippet to safely extract deployment tags, even if running locally without a deployment:
python
from prefect.runtime.flow_run import id as flow_run_id
from prefect.client import get_client
async def get_deployment_tags():
try:
# Only proceed if there's a flow run ID, indicating a deployment
if not flow_run_id:
print("Running locally, no deployment ID available.")
return None
async with get_client() as client:
# Fetch the flow run details
flow_run = await client.read_flow_run(flow_run_id)
deployment_id = flow_run.deployment_id
# Fetch deployment details using the deployment ID
if deployment_id:
deployment = await client.read_deployment(deployment_id)
return deployment.tags
except Exception as e:
print(f"An error occurred: {e}")
return None
# Example usage
import asyncio
tags = asyncio.run(get_deployment_tags())
print(f"Deployment tags: {tags}")
This code checks if there's a flow_run_id
, which indicates a deployment context. If not, it assumes you're running locally and exits gracefully. If there is a deployment context, it fetches the deployment details and retrieves the tags.Srul Pinkas
03/03/2025, 3:47 PMflow
object at all? or just use context to get the live stats?Marvin
03/03/2025, 3:48 PMflow
object to access runtime information when you are trying to get live stats or metadata about the current flow run. Using the prefect.runtime
context is sufficient for accessing this kind of information. The prefect.runtime.flow_run
module provides direct access to various attributes of the flow run, such as its ID, tags, and other metadata.
In the example I provided, I used prefect.runtime.flow_run.id
to check if the code is running within a deployment and to retrieve the deployment ID. This approach is efficient and aligns with accessing runtime metadata directly.
If you have any more questions or need further clarification, feel free to ask!Srul Pinkas
03/10/2025, 11:48 AMMarvin
03/10/2025, 11:48 AMSrul Pinkas
03/10/2025, 11:50 AMMarvin
03/10/2025, 11:50 AMpython
from prefect import task, Flow
from prefect.utilities.notifications import slack_notifier
from prefect.engine.state import Failed, Pending, Running
handler = slack_notifier(only_states=[Failed])
@task
def my_task():
pass
with Flow("my_flow", state_handlers=[handler]) as flow:
my_task()
3. Review Notifications Documentation: The Prefect 2.x documentation on notifications explains how to set up and customize notifications. It might offer additional insights into managing state-based notifications.
4. Community Discussions: If these solutions don't resolve your issue, consider engaging with the Prefect community or submitting a feature request on GitHub. There might be others with similar challenges who have found workarounds.
These steps should help you better control when notifications are sent, reducing false alarms.Srul Pinkas
03/12/2025, 12:16 PMtask
through UI, in retro? if it's completed i don't see a button. is that true also for failed ones?
(prefect 2.20)Marvin
03/12/2025, 12:16 PM