Hey guys, A question about sub-flows concurrency ...
# ask-community
s
Hey guys, A question about sub-flows concurrency basic control issue: I have the need to create a flow to run sub-flow that are a bit heavy (on memory mostly, take ~10 min to run), and i need it with a few dozens different-inputs. i'm using prefect 2.20 I tried two approaches: - simply trigger each sub-flow (through
run_deployment
in order to allocate separate resources). this creates dozens sub-flows but the parent-job trying to keep updated and manage it crashes due to connection-limit reached error (an internal
httpx
crash). I asked marvin about configuring it on prefect and didn't see any immediate solution, perhaps more than 4-5 running in parallel is too much for prefect? am i missing something? - create many sub-flows but run it through a managed concurrency-limit queue (say, 3 at each given time - once one is done another kicks in). since it's a flow and not a task i had to go through work-pools /work-queue limits configuration. the thing is that since it takes some time to get the actual machine (mostly for the first batch from cluster), during that time it seems there's an error for the sub-flow yet to be executed (-1 signal until later it succeeds). and due to that temp-failure, other sub-flows also start to run and i'm missing my 3-limit (so 6-7 are running in parallel) which is odd as it seems like a common glitch. I looked for a parameter that controls how much time to wait for the first sub-flow-submit failure to avoid this race-edge-case but the only parameter (
job_watch_timeout_seconds
) that the docs were promising about, was also described as: "Number of seconds to wait for each event emitted by a job before timing out" - so raising this to 1-2 minutes seems too important/risky. - The third fallback option i use is creating less but chubbier jobs - each running serially 10 different inputs - and running them in parallel. but that seems like a bad solution for such a platform that should take care of that.. Did i miss any simple way out of this? Thanks!
b
- simply trigger each sub-flow (through
run_deployment
in order to allocate separate resources). this creates dozens sub-flows but the parent-job trying to keep updated and manage it crashes due to connection-limit reached error (an internal
httpx
crash). I asked marvin about configuring it on prefect and didn't see any immediate solution, perhaps more than 4-5 running in parallel is too much for prefect? am i missing something?
Hi Srul! This option makes sense to me. Do you have an example of the error that the parent throws?
As for managing concurrency, have you already explored setting deployment-level concurrency?
s
Thanks for your reply @Bianca Hoch ! errors on parent-flow: we suspect maybe this is do to some karpenter running in the background and killing our pods (perhaps spot-machine related) - whtether the prefect-server itself causing the connection-errors, or sub-flow pods (where the server does not recognize the pod is dead for some reason, so no retry is initiated, just keeps waiting). still investigating. as for concurrency - we're using prefect 2.20 and not 3, where it seems the deployment-concurrency-limit is enabled, but in general it doesn't mention the issue of not recognizing machines are on-the-way before sending new tasks
Hey @Marvin in prefect 2.20, what's the default timeout for a
run_deployment
call? if that timeout is reached while running, is it really crashing or does the job keep on running and once succeeds it's all good?
m
I couldn't find specific information about the default timeout for the
run_deployment
call in Prefect 2.20. However, generally in Prefect, if a timeout is reached during a flow or task execution, the execution is typically halted. The behavior might vary depending on how the timeout is configured and handled within the flow or task logic itself. To ensure proper handling of timeouts, you might need to explicitly set and manage them in your flow or task configurations. If a task or flow is set to timeout and it does, the execution would stop unless you've implemented logic to handle such scenarios. You can explore more about timeouts and deployment configurations in the Prefect documentation: Prefect Deployments Guide. If you have specific configurations or setups, you might want to check those to confirm the behavior.
s
Hey @Marvin - in order to use a concurrency limit in a
run_deployment
job, i need to use a
work_queue_name
yaml definition that has concurrency limit? is this enough? or do i also need to make sure the
work_pool --> name
also represents a concurrent-limited entity? (prefect 2.20)
m
To use a concurrency limit with a
run_deployment
job in Prefect 2.20, you need to configure both the work queue and work pool appropriately: 1. Work Queue: You should define a
work_queue_name
in your YAML configuration that has a concurrency limit set. This allows you to control how many tasks or flows can run concurrently within that specific queue. 2. Work Pool: Additionally, you need to ensure that the
work_pool
itself has a concurrency limit configured. The work pool's concurrency limit acts as an overarching control, ensuring that the total number of concurrent runs across all queues within the pool does not exceed the specified limit. In summary, both the work queue and the work pool need to have concurrency limits if you want to manage concurrency effectively across your deployments. This setup provides fine-grained control over the execution of runs. For more detailed instructions, you might want to refer to the Prefect documentation on work pools and queues.
s
Hey @Marvin - in prefect 2.20, if i submit many
task
wrapped functions, how do i control the concurrency to not go above 5 at the same time?
m
To control task concurrency in Prefect 2.20 and ensure that no more than 5 tasks run simultaneously, you can use task concurrency limits with task tags. Here's how you can achieve this: 1. Tag Your Tasks: Assign a specific tag to the tasks you want to limit. For example, you might tag all relevant tasks with
"limited_concurrency"
. 2. Set Concurrency Limit: Use Prefect's task concurrency limits to specify the maximum number of concurrent task runs for tasks with the given tag. You can set this limit to 5. Here's a high-level example of how you might implement this:
Copy code
python
from prefect import flow, task
from prefect.tasks import task_input_hash

@task(tags=["limited_concurrency"])
def my_task():
    # Task logic here
    pass

@flow
def my_flow():
    for _ in range(10):
        my_task()

if __name__ == "__main__":
    my_flow()
To enforce the concurrency limit, you would configure it using the Prefect CLI or UI:
Copy code
bash
prefect concurrency-limit create --tag limited_concurrency --limit 5
This setup ensures that no more than 5 tasks with the
"limited_concurrency"
tag run at the same time. Further details can be found in the Prefect Tasks Documentation.
s
@Marvin there's no way to do this in code? when i define the
@task
decorator or when i do the actual submit?
m
In Prefect 2.20, you cannot directly set a concurrency limit in code using the
@task
decorator or when submitting tasks. Concurrency limits are managed via task tags and configured outside of the code, typically using the Prefect CLI or UI. Here's a concise approach to manage concurrency: 1. Tag your tasks: Use the
tags
parameter in the
@task
decorator to assign a specific tag to tasks you want to control. For example:
Copy code
python
   from prefect import task, flow

   @task(tags=["limited_concurrency"])
   def my_task():
       # Task logic here
       pass
2. Set concurrency limits using the CLI: Use the Prefect CLI to set a concurrency limit on the tag:
Copy code
bash
   prefect concurrency-limit create --tag limited_concurrency --limit 5
This approach ensures that no more than 5 tasks with the
"limited_concurrency"
tag run concurrently. Unfortunately, setting concurrency limits directly in code is not supported, and you'll need to use external configuration for this purpose.
s
hey @Marvin, in prerect 2.20, how do i get the slack-token that i put as a block through prefect-UI directly as webhook-URL? must i define a block? how do i do it?
m
To retrieve a Slack token you've stored as a block in Prefect 2.20, you can use the
SlackCredentials
block. Here's how you can load the stored Slack credentials: 1. Load the Slack Credentials Block:
Copy code
python
   from prefect_slack import SlackCredentials

   # Replace "BLOCK_NAME" with the name of your block
   slack_credentials_block = SlackCredentials.load("BLOCK_NAME")
2. Get the Slack Client: Once you've loaded the block, you can get a Slack client to interact with Slack:
Copy code
python
   client = slack_credentials_block.get_client()
3. Webhook URL: If you need to set up a webhook, you can refer to Prefect's webhook documentation for managing webhook URLs. Prefect Cloud assigns a webhook URL when you create a webhook, and it looks something like
<https://api.prefect.cloud/hooks/your-webhook-id>
. For more details, you can refer to the Slack Credentials documentation. If you need to create a webhook, you can find more information in the webhook guide. Let me know if you need further assistance!
s
Hi @Marvin - the automatic slack notifications i have set (with an incoming slack webhook) links to
localhost
instead of a link - where can i configure it to be a proper link?
m
To configure the Slack webhook notifications to use a proper link instead of
localhost
, you should look into how the notification URLs are being templated. Prefect 3.x allows you to use templated variables for dynamic content in notifications. Specifically, you can use the
{{ flow_run|ui_url }}
token to include the correct URL to view the flow run in the Prefect UI. Here are some steps you can take: 1. Check your Automation Configuration: Make sure your automation configuration is set up correctly to use the
ui_url
token. This will ensure that the correct URL is included in your notifications. 2. Use Jinja Templating: Prefect supports Jinja templating, which allows you to dynamically include details such as the flow or pool name. For example, you can use
{{ flow_run|ui_url }}
to insert the URL to the flow run in your Slack message. 3. Update Notification Settings: Ensure that your notification settings in the Prefect configuration or your deployment YAML are properly utilizing these tokens. For more details, you can refer to the Prefect documentation on webhook triggers and automation triggers. If the issue persists, make sure there are no misconfigurations in your deployment or webhook settings that might be defaulting to
localhost
. If you need further assistance with specific configurations, feel free to ask!
s
@Marvin i'm using prefect 2.20, not 3
m
In Prefect 2.x, if your Slack notifications are linking to
localhost
, it's likely due to how the URLs are being generated or configured within your setup. Here are some steps and resources to help you configure it correctly: 1. Slack Webhook Configuration: Ensure that your Slack webhook is correctly set up and that you are using the right URL for notifications. You can find more about configuring Slack webhooks in the Prefect Slack integration documentation. 2. Notification Settings: Check your notification settings in Prefect. You can configure notifications to trigger on specific flow run states and specify the Slack webhook URL. More details can be found in the Prefect notifications guide. 3. Flow Run URLs: If the flow run URLs are pointing to
localhost
, ensure that your Prefect server or agent is correctly configured to use the appropriate hostname or IP address. This might involve setting environment variables or adjusting your Prefect server configuration. 4. Use of Tokens: Make sure that you are using the correct tokens or placeholders in your notification messages to dynamically generate the correct URLs. If the issue is due to a bug or a misconfiguration, reviewing your Prefect server and deployment settings might be necessary. If you need more specific guidance, feel free to provide additional details about your setup!
s
@Marvin - let's go again - i'm using prefect 2.20, and my automatic slack notifications work properly with an incoming webhook token - all configuered ok, except for the links that are to localhost:4200. these make the links not-usable. which environ-variables, or any other configuration, we can play with -in order to get the real host domain that will actually work?
m
To configure your Prefect 2.20 server to use a real host domain instead of
localhost
, you can adjust the following environment variables: 1.
PREFECT_API_URL
: This environment variable should be set to the external URL of your Prefect server's API. For example, if your server is accessible at
<https://your-domain.com>
, you would set:
Copy code
bash
   export PREFECT_API_URL="<https://your-domain.com/api>"
2.
PREFECT_UI_API_URL
: This is specifically for the UI. It should be set to the external proxy URL. If not set, it defaults to
PREFECT_API_URL
. To ensure proper configuration, set it explicitly:
Copy code
bash
   export PREFECT_UI_API_URL="<https://your-domain.com/api>"
These configurations will ensure that links generated in notifications point to the correct domain rather than
localhost
. For more detailed guidance on configuring your Prefect server, you can refer to the Prefect hosting guide and the settings guide. Adjust your server configuration and restart your server to apply these changes.
s
hey @Marvin - we're trying to deploy to prod prefect-server for the first time, and the deploy-command took forever (and crashed after 25 minutes) while running the
work_pool = await client.read_work_pool(deploy_config["work_pool"])
line inside prefect's internal code, from our CI. The pool name from the yaml are configured on server. Can you think why this would happen? i think this might be the first code-line to interact with the client
m
The issue you're experiencing with the
read_work_pool
line taking a long time and eventually crashing might be related to a few potential causes: 1. Work Pool Configuration: Ensure that your work pool is properly configured and that there are active workers connected. A work pool needs at least one online worker sending heartbeats to be considered ready to execute work. You can manage work pools using the Prefect CLI. 2. Network or API Issues: There might be network issues or API endpoint misconfigurations causing the delay. Verify that your Prefect server is accessible and that the API URL is correctly set. You can set the
PREFECT_API_URL
environment variable to ensure the client connects to the right endpoint. 3. Timeouts and Retries: Consider configuring timeouts and retries for your client connections if network instability is an issue. This can help handle transient failures more gracefully. 4. Version Mismatch or Bugs: Ensure that your Prefect server and client versions are compatible. Check the Prefect GitHub issues for any known bugs related to
read_work_pool
or similar functions that might affect version 2.20. For more details on managing work pools, you can refer to the work pools documentation. If the problem persists, you might want to look into logs for more specific error messages or consider reaching out to the Prefect community for support.
s
in prefect 2.20, the cron-format in the yaml is min/hour/day/month/weekday? so
0 0 * * 3
means midnight wednesday once a week? @Marvin
m
Yes, in Prefect 2.20, the cron format in the YAML configuration follows the standard cron syntax, which is
min/hour/day/month/weekday
. Therefore, the expression
0 0 * * 3
indeed schedules a task to run at midnight (00:00) every Wednesday. For more details on scheduling in Prefect, you can refer to the Prefect 2.x scheduling documentation. If you have any more questions about scheduling or anything else, feel free to ask.
s
@Marvin what's the
day_or
meaning in the yaml?
m
In Prefect, the
day_or
parameter in the YAML schedule configuration specifies how the day of the month and the day of the week entries in a cron expression are interpreted. - If
day_or
is set to
True
, the schedule will run if either the day of the month or the day of the week condition is satisfied. This means the conditions act as an OR operation. - If
day_or
is set to
False
, both the day of the month and the day of the week conditions must be satisfied for the schedule to run, acting as an AND operation. This behavior allows for more flexible scheduling options, depending on whether you want the task to run on specific days of the week, specific days of the month, or both. For more details, you can refer to the Prefect 2.x documentation on scheduling.
s
@Marvin in prefect 2.20, is there a way to download log-files of a run that's already complete? or at least search in logs instead of UI scrolling?
m
In Prefect 2.20, while you can view logs for completed runs through the Prefect UI, there isn't a direct feature to download log files or search logs programmatically from the UI itself. However, you can customize logging configurations and capture logs using standard Python logging, which might be redirected to a file or another logging service. This way, you can access and search logs outside of the Prefect UI. For more information on logging configurations, you can refer to the Prefect logging guide. If you need to search logs programmatically, consider setting up a logging configuration that writes logs to a file or external logging service where you can perform searches more efficiently.
s
@Marvin i can't search in logs either, right? only scroll
m
Correct, in the Prefect UI, you are limited to scrolling through logs for searching. There isn't a built-in feature for searching logs directly within the UI. To perform searches on logs, you would need to set up a logging configuration that exports logs to a file or an external logging service where you can perform more advanced search operations.
s
@Marvin can i change the loading window-size, to get more in each chunk? the sliding-window scrolling takes forever
m
Currently, there doesn't appear to be a built-in feature or a documented method to adjust the log loading window size in Prefect's UI to load more logs in each chunk. This limitation can indeed make scrolling through logs a bit cumbersome. If this is a significant pain point, you might consider exporting logs to an external logging service or file where you can configure the viewing settings to better suit your needs. Alternatively, you could check for any updates or feature requests related to this on the Prefect GitHub repository for potential workarounds or future enhancements.
s
@Marvin can i get the tags of a current flow - programmatically by code, while running inside the flow?
m
To programmatically access the tags of a flow while it's running, you can use the Prefect API to retrieve the flow's metadata. However, the specific method to do this isn't directly documented in the excerpts I found. Generally, you would use the Prefect context or runtime environment to access metadata related to the flow. In Prefect 2.x, you might consider using the
prefect.context
to access certain runtime information, though tags specifically might not be directly accessible this way. If you are looking for a more concrete solution, you can check the Prefect documentation on flows for any updates or detailed methods to access flow metadata programmatically. If you have further questions or need more specific guidance, feel free to ask!
s
@Marvin it seems i can get the current run's
deployment_id
- from which i can probably get the deployment tags (from the yaml), right? can i get a snippet to extract the deployment tags? also, since i'm sometimes running locally - can i do it safe not to crash in case i don't have any deployment and just running stuff in local python no server?
m
You can access the
deployment_id
from the
prefect.runtime.flow_run
module. If you're running the flow as part of a deployment, you can use this
deployment_id
to fetch deployment details and tags. Here's a snippet to safely extract deployment tags, even if running locally without a deployment:
Copy code
python
from prefect.runtime.flow_run import id as flow_run_id
from prefect.client import get_client

async def get_deployment_tags():
    try:
        # Only proceed if there's a flow run ID, indicating a deployment
        if not flow_run_id:
            print("Running locally, no deployment ID available.")
            return None

        async with get_client() as client:
            # Fetch the flow run details
            flow_run = await client.read_flow_run(flow_run_id)
            deployment_id = flow_run.deployment_id

            # Fetch deployment details using the deployment ID
            if deployment_id:
                deployment = await client.read_deployment(deployment_id)
                return deployment.tags

    except Exception as e:
        print(f"An error occurred: {e}")
        return None

# Example usage
import asyncio
tags = asyncio.run(get_deployment_tags())
print(f"Deployment tags: {tags}")
This code checks if there's a
flow_run_id
, which indicates a deployment context. If not, it assumes you're running locally and exits gracefully. If there is a deployment context, it fetches the deployment details and retrieves the tags.
s
@Marvin do i need the
flow
object at all? or just use context to get the live stats?
m
You don't need to create a
flow
object to access runtime information when you are trying to get live stats or metadata about the current flow run. Using the
prefect.runtime
context is sufficient for accessing this kind of information. The
prefect.runtime.flow_run
module provides direct access to various attributes of the flow run, such as its ID, tags, and other metadata. In the example I provided, I used
prefect.runtime.flow_run.id
to check if the code is running within a deployment and to retrieve the deployment ID. This approach is efficient and aligns with accessing runtime metadata directly. If you have any more questions or need further clarification, feel free to ask!
s
Hey @Marvin - i configured slack-notifications upon states of running/crashed/completed of scheduled-jobs, and i get (almost always) notifications of crashing right after starting the run (sometimes even before!) although there was no crash, just some time waiting for a pod to start - then it completes some time after. Can i play with something to get it to wait a bit more before publishing fake-crashes? it's just waiting for resources and it gets by a minute after..
m
It seems you're experiencing premature notifications about crashes due to resource waiting times. While I couldn't find a direct setting to delay notifications, you might consider adjusting how state changes trigger these notifications. Here are a few suggestions: 1. State Handlers: You can use custom state handlers to better control when notifications are sent. By defining logic to delay or conditionally trigger notifications, you can avoid premature alerts. 2. Backoff Strategy: Implement a backoff strategy in your state change logic to delay the transition to a "crashed" state. This can be done by checking if the resources are indeed unavailable for a certain period before declaring a crash. 3. Custom Automation: Create a custom automation script that evaluates the state over a period before sending a notification. This can be done using Prefect's Automations API, which allows for more complex logic when dealing with state changes. For more details, you can explore the Automations API documentation. Additionally, reviewing the state management code might offer insights into implementing a delay mechanism. If these solutions don't align with your needs, consider raising an issue or feature request on Prefect's GitHub repository to discuss potential enhancements with the community.
s
@Marvin i'm using prefect 2.20, not 3
m
In Prefect 2.x, handling premature notifications due to state changes can be tricky. Here's what you can consider: 1. Custom State Handlers: You can create custom state handlers for your flows or tasks to manage when notifications are sent. This allows you to add logic to delay notifications until you're certain a crash has occurred rather than a temporary resource wait. 2. Slack Notifier Configuration: When using the Slack notifier, you can specify which states should trigger notifications. If you haven't already, ensure that you're only notifying on states that truly indicate a problem. Here's a basic example of how you might configure a state handler to ignore certain states:
Copy code
python
    from prefect import task, Flow
    from prefect.utilities.notifications import slack_notifier
    from prefect.engine.state import Failed, Pending, Running

    handler = slack_notifier(only_states=[Failed])

    @task
    def my_task():
        pass

    with Flow("my_flow", state_handlers=[handler]) as flow:
        my_task()
3. Review Notifications Documentation: The Prefect 2.x documentation on notifications explains how to set up and customize notifications. It might offer additional insights into managing state-based notifications. 4. Community Discussions: If these solutions don't resolve your issue, consider engaging with the Prefect community or submitting a feature request on GitHub. There might be others with similar challenges who have found workarounds. These steps should help you better control when notifications are sent, reducing false alarms.
s
@Marvin can you re-run a finished
task
through UI, in retro? if it's completed i don't see a button. is that true also for failed ones? (prefect 2.20)
m
In Prefect 2.20, there is no direct option to re-run a completed or failed task through the UI. The UI does not provide a button or feature to re-run individual tasks retroactively once they have finished, regardless of their state (completed or failed). To re-run a task, you would typically need to re-run the entire flow or manually trigger the task using code. This limitation is consistent with the general design of Prefect's task execution model, where tasks are components of flows and are managed within the context of flow runs. For more detailed guidance or workarounds, you might want to refer to the Prefect documentation or consider reaching out to the Prefect community for support.