<@ULVA73B9P> I have two deployments which run the ...
# marvin-in-the-wild
n
@Marvin I have two deployments which run the same image on k8s job, one deployment is able to run on the other im getting 082204.514 | INFO | prefect.flow_runs.worker - Reported flow run ‘028c59d6-0ab5-4e01-9a23-724f03f70b4b’ as crashed: Flow run could not be submitted to infrastructure
m
Ah, the joy of inconsistent deployment behaviors. One works like a charm, the other crashes and burns. Delightful. Before I dive into this, I need to know which version of Prefect you're using. Is it 2.x or 3.x? This will help me search the appropriate docs and issues.
n
@Marvin 2.20
m
Oh, the thrilling adventure of Kubernetes and Prefect. Let's see what bits of joy we can uncover from the logs and responses. 1. Conflict Error: - The error
409 - Conflict
means that the job already exists in the cluster. Essentially, Kubernetes is having a little identity crisis. It can't generate a unique name for this job, probably because a job with the same name still exists or didn't get cleaned up properly. - GitHub Issue: #12987 2. Pull Rate Limit: - Another error
429 - Too Many Requests
from Docker Hub. This means your flow is exceeding the pull rate limit for unauthenticated requests. To avoid this, you need to authenticate or reduce the number of requests. - GitHub Issue: #12003 ### Steps to resolve: 1. Ensure Job Cleanup: Make sure that Kubernetes jobs are being cleaned up after execution. 2. Unique Naming: Implement a unique naming strategy for each job if it isn't already. Prefect's workers should handle this, but double-check your setup. 3. Authenticate Docker Pulls: Authenticate your Docker pulls to avoid rate limits. This can be done in your Kubernetes configuration by specifying Docker credentials. If these issues continue, you might want to delve deeper into your specific infrastructure setup or join the joyous community of GitHub issue reporters. Links to related GitHub Issues: - Conflict Error Issue - Pull Rate Limit Issue Good luck wrestling with infrastructure!
n
@Marvin this is what I get in logs kubernetes_asyncio.client.exceptions.ApiException: (403) Reason: Forbidden HTTP response headers: <CIMultiDictProxy(‘Audit-Id’: ‘5daa6f6e-3b30-4c4c-8374-fd14c87855e5’, ‘Cache-Control’: ‘no-cache, private’, ‘Content-Type’: ‘application/json’, ‘X-Content-Type-Options’: ‘nosniff’, ‘X-Kubernetes-Pf-Flowschema-Uid’: ‘56cc2435-836c-472f-8208-05d5fa808822’, ‘X-Kubernetes-Pf-Prioritylevel-Uid’: ‘b7eb14a5-5b16-4e65-b28d-c10169a2c89c’, ‘Date’: ‘Wed, 04 Sep 2024 082204 GMT’, ‘Content-Length’: ‘352’)> HTTP response body: {“kind”“Status”,“apiVersion”“v1",“metadata”{},“status”“Failure”,“message”:“namespaces \“kube-system\” is forbidden: User \“systemserviceaccountalgo-prefect:prefect-worker\” cannot get resource \“namespaces\” in API group \“\” in the namespace \“kube-system\“”,“reason”“Forbidden”,“details”{“name”“kube system”,“kind”“namespaces”},“code”:403}
m
Your message was too long by 25 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
this is what I get in logs
kubernetes_asyncio.client.exceptions.ApiException: (403) Reason: Forbidden HTTP response headers: <CIMultiDictProxy(‘Audit-Id’: ‘5daa6f6e-3b30-4c4c-8374-fd14c87855e5’, ‘Cache-Control’: ‘no-cache, private’, ‘Content-Type’: ‘application/json’, ‘X-Content-Type-Options’: ‘nosniff’, ‘X-Kubernetes-Pf-Flowschema-Uid’: ‘56cc2435-836c-472f-8208-05d5fa808822’, ‘X-Kubernetes-Pf-Prioritylevel-Uid’: ‘b7eb14a5-5b16-4e65-b28d-c10169a2c89c’, ‘Date’: ‘Wed, 04 Sep 2024 082204 GMT’, ‘Content-Length’: ‘352’)> HTTP response body: {“kind”“Status”,“apiVersion”“v1",“metadata”{},“status”“Failure”,“message”:“namespaces \“kube-system\” is forbidden: User \“systemserviceaccountalgo-prefect:prefect-worker\” cannot get resource \“namespaces\” in API group \“\” in the namespace \“kube-system\“”,“reason”:“Forbidden”,
n
@Marvin kubernetes_asyncio.client.exceptions.ApiException: (403) Reason: Forbidden
m
Oh, now we've entered the realm of forbidden actions. How thrilling. This
403 Forbidden
response indicates a permission issue when Prefect tries to interact with your Kubernetes cluster. Let's break down what you can do: ### 1. Check Kubernetes RBAC (Role-Based Access Control) Ensure that the service account used by your Prefect deployment has the necessary permissions to create and manage jobs in your Kubernetes namespace. Here’s a basic rundown of what you can check: - ClusterRole: Defines what operations can be performed. - ClusterRoleBinding: Associates the ClusterRole with a service account. You may need something like this:
Copy code
yaml
apiVersion: <http://rbac.authorization.k8s.io/v1|rbac.authorization.k8s.io/v1>
kind: ClusterRole
metadata:
  name: prefect-job-manager
rules:
- apiGroups: ["batch", ""]
  resources: ["jobs", "pods"]
  verbs: ["create", "delete", "get", "list", "watch"]

---
apiVersion: <http://rbac.authorization.k8s.io/v1|rbac.authorization.k8s.io/v1>
kind: ClusterRoleBinding
metadata:
  name: prefect-job-manager-binding
roleRef:
  apiGroup: <http://rbac.authorization.k8s.io|rbac.authorization.k8s.io>
  kind: ClusterRole
  name: prefect-job-manager
subjects:
- kind: ServiceAccount
  name: YOUR_SERVICE_ACCOUNT_NAME
  namespace: YOUR_NAMESPACE
### 2. Check Service Account Configuration Ensure that your Prefect worker or agent is running with the correct service account that has the necessary permissions. ### 3. Inspect Kubernetes API Access Ensure that your Kubernetes cluster is properly configured to allow access to the Kubernetes API. Confirm that network policies, if any, are not preventing access. ### Check for more details Let me search for this specific error in the Prefect documentation and GitHub issues to see if there's more context-specific advice. Ah, the forbidden fruit of Kubernetes permissions. Gotta love it. Here's a compilation of what's likely going wrong and what you can do about it: ### Identified Issues 1. RBAC Permission Issues: - Your service account might lack necessary permissions to create resources like jobs or pods in Kubernetes. The error message typically indicates which resource and API group is forbidden.
Copy code
json
   "message":"pods is forbidden: User \"system:anonymous\" cannot create resource \"pods\" in API group \"\" in the namespace \"prefect\"",
2. Dependencies causing issues: - Certain dependencies might need stricter version controls. For instance,
kubernetes-asyncio
has been known to break things occasionally. ### Steps to Resolve 1. Set Up RBAC Correctly: Ensure your service account has the appropriate cluster roles and bindings. Here's a sample manifest you can tweak:
Copy code
yaml
   apiVersion: v1
   kind: ServiceAccount
   metadata:
     name: prefect-service-account
     namespace: your-namespace

   ---
   
   apiVersion: <http://rbac.authorization.k8s.io/v1|rbac.authorization.k8s.io/v1>
   kind: ClusterRole
   metadata:
     name: prefect-cluster-role
   rules:
   - apiGroups: [""]
     resources: ["pods", "jobs"]
     verbs: ["create", "get", "list", "watch", "delete"]

   ---
   
   apiVersion: <http://rbac.authorization.k8s.io/v1|rbac.authorization.k8s.io/v1>
   kind: ClusterRoleBinding
   metadata:
     name: prefect-cluster-role-binding
   subjects:
   - kind: ServiceAccount
     name: prefect-service-account
     namespace: your-namespace
   roleRef:
     kind: ClusterRole
     name: prefect-cluster-role
     apiGroup: <http://rbac.authorization.k8s.io|rbac.authorization.k8s.io>
2. Adjust Kubernetes Dependencies: If you're using dask with kubernetes, you might need to pin the
kubernetes-asyncio
dependency to avoid new changes that break things.
Copy code
bash
   pip install 'kubernetes-asyncio<22.6'
3. Official Prefect Docs: Prefect’s official Kubernetes guide and the credentials documentation will have detailed steps and examples for setting up credentials correctly. ### Links to Related Issues
- RBAC and API Access Issues - Specific API issue example in GitHub: #5573 By addressing these aspects, your
403 Forbidden
issue should hopefully transform into something less mysterious. Good luck dealing with the quirks of Kubernetes!
n
@Marvin so why one deployment pass and the other failed?they are the same only difference is the name and the time the deployment was create
m
Oh, the wonderful world of inexplicable inconsistencies. Of course, things wouldn't be as exciting if they all worked predictably. Let's try to deduce the magic behind your deployment mystery. ### Likely Suspects 1. Unique Naming and Resource Cleanup: - While the names of your deployments are different, Kubernetes might still have conflicts due to resource cleanup or namespace issues. If one deployment leaves behind residual resources like jobs or pods with overlapping names, it could cause permission issues or conflicts for the subsequent deployment. 2. Rate Limiting and Quotas: - The time discrepancy might be playing a role here. If one deployment happens during a high-traffic period or when Kubernetes API rate limits are closer to being exceeded, it could run into
403
or
429
errors. Kubernetes and Docker Hub rate limits could be different for different times of the day. 3. RBAC Propagation Delay: - Kubernetes RBAC changes might take a bit of time to propagate fully. If you created one deployment shortly after setting up permissions, those changes might not have fully taken effect yet. The earlier deployment might just be lucky, catching the permissions in flux. 4. Kubernetes Cluster State: - The state of the Kubernetes cluster itself could be different at the two times you deployed. A cluster could be undergoing changes or maintenance that temporarily impacts permissions or resource limits. ### Steps to Diagnose 1. Check Kubernetes Events: - Inspect the events in the namespace to see if there are any relevant warnings or errors. Use:
Copy code
bash
     kubectl get events -n your-namespace
2. Compare Logs: - Compare the Kubernetes job and pod logs of both deployments to identify any differences in the sequence of operations or errors. 3. Check Resource Quotas: - Ensure that you are not hitting any namespace resource quotas or limits. Use:
Copy code
bash
     kubectl get resourcequotas -n your-namespace
     kubectl describe resourcequota your-resource-quota -n your-namespace
4. Validate RBAC Bindings: - Double check that the correct service account is being used and that it has the necessary permissions consistently. ### Additional Insights I'll search through the Prefect 2.x docs and GitHub issues further for any insights pointing to timing-related quirks or inconsistencies. Ah, the subtle mysteries of Kubernetes deployments. Here are the potential culprits and steps to evaluate why one deployment succeeds while the other fails, even with seemingly identical configurations. ### Likely Causes and Diagnosis 1. Resource Name Conflicts and Cleanup: - One deployment might not clean up properly, causing conflicts in resource names. Check for any residual jobs or pods from previous deployments. - Check Cleanup:
Copy code
bash
     kubectl get pods -n your-namespace
     kubectl get jobs -n your-namespace
     # Delete any lingering resources
     kubectl delete pod <pod-name> -n your-namespace
     kubectl delete job <job-name> -n your-namespace
2. RBAC Propagation Delay: - If you recently updated permissions, it might not have propagated by the time the second deployment runs. There can be a delay in the RBAC policy fully taking effect. - Validate RBAC:
Copy code
bash
     kubectl auth can-i create pods --as=system:serviceaccount:your-namespace:your-service-account
3. API Rate Limits and Quotas: - The staggered deployment times might interact differently with API rate limits or quotas, especially if one is during a high-traffic period. - Check Rate Limits: Inspect Kubernetes and Docker rate limits or quotas if they're hit at different times.
Copy code
bash
     kubectl describe resourcequota
     kubectl get quota -o yaml -n your-namespace
### Insights from Docs and Issues
- Ensure that your service account has the necessary RBAC roles and bindings, and double-check the permissions. (Docs: Deployment Kubernetes) - The infrastructure might have an underlying state leading to intermittent permission issues or conflicts. This includes network policies, namespaces, and RBAC settings. (Docs: Infrastructure) - Dependencies and package versions, like
kubernetes-asyncio
, can cause disruptions if not synchronized correctly. ### Steps to Ensure Consistency 1. Verify Cleanup of Previous Resources: Confirm that old jobs and pods are cleaned up successfully before starting new ones. 2. Reapply RBAC Policies: Sometimes reapplying RBAC roles and bindings can help ensure they take effect.
Copy code
bash
   kubectl apply -f your-rbac-config.yaml
3. Check for Resource and Network Policies: Ensure there are no network policies impacting connectivity.
Copy code
bash
   kubectl get networkpolicies -n your-namespace
4. Pin Specific Package Versions: If using dependencies like
kubernetes-asyncio
, make sure to pin the versions known to work.
Copy code
bash
   pip install 'kubernetes-asyncio<22.6'
### Links to Docs: - Kubernetes Guide - Infrastructure Overview By systematically checking these aspects, you stand a good chance of unearthing the elusive reason behind this inconsistent deployment behavior. Ah, the never-ending joys of DevOps.
n
@Marvin was missing cluster_uid, now getting Job ‘masterful-dormouse-c7c2p’: Job reached backoff limit.
m
Ah, closer and closer we get to the wondrous abyss of Kubernetes misconfigurations. Now, the "Job reached backoff limit" error means that your job has failed multiple times and Kubernetes has given up retrying. Let's dig into what might be causing this. ### Likely Causes 1. Misconfigured Job Spec: - There might be an issue in the job spec causing it to fail repeatedly. 2. Resource Limits: - The job might be running out of resources (CPU, memory) and getting terminated. 3. Application-Specific Errors: - Something in the code being executed might be causing a failure. ### Steps to Diagnose and Resolve 1. Check Job Logs: - Inspect the logs of the job to understand why it’s failing. The logs can provide insights into application-specific errors.
Copy code
bash
   kubectl logs job/your-job-name -n your-namespace
2. Check Pod Logs: - Since jobs create pods, check the logs of the specific pods.
Copy code
bash
   kubectl get pods -n your-namespace
   kubectl logs pod/your-pod-name -n your-namespace
3. Inspect Job Description: - Look at the detailed description of the job to find out if there were any obvious issues like resource limits or environment variables.
Copy code
bash
   kubectl describe job your-job-name -n your-namespace
4. Resource Requests and Limits: - Ensure that you’ve appropriately defined resource requests and limits in your job spec.
Copy code
yaml
   resources:
     requests:
       memory: "64Mi"
       cpu: "250m"
     limits:
       memory: "128Mi"
       cpu: "500m"
5. Backoff Policy: - Adjust the backoffLimit in your job spec if you need more retries or better failure handling.
Copy code
yaml
   spec:
     backoffLimit: 4
### Sample Job Spec for Reference Here’s a sample job spec that includes some resource requests, limits, and retries:
Copy code
yaml
apiVersion: batch/v1
kind: Job
metadata:
  name: my-job
  namespace: your-namespace
spec:
  template:
    spec:
      containers:
      - name: my-container
        image: my-image:latest
        resources:
          requests:
            memory: "64Mi"
            cpu: "250m"
          limits:
            memory: "128Mi"
            cpu: "500m"
      restartPolicy: Never
  backoffLimit: 4
### More Detailed Logs If the logs are verbose, you’re likely to get more clues on what keeps failing:
Copy code
bash
kubectl logs job/my-job -n your-namespace --previous
By following these steps and thoroughly inspecting logs and job specs, you'll be another step closer to resolving this issue. Oh, what a thrilling chase!
n
@Marvin does run_deployment is blocking?
m
Ah, the joy of wondering whether your code wants to hold your hand the entire time or let you go on your merry asynchronous way. Alright, let's see if
run_deployment
is blocking or not. ### Investigating
run_deployment
In Prefect 2.x,
run_deployment
is used to trigger a deployment from within a flow or task. The blocking behavior of this function is crucial to understand when designing workflows. Let me check the documentation and relevant sources for you. Well, well, well. After scavenging through Prefect's documentation, it seems that
run_deployment
can actually block your flow until the triggered deployment completes. Here's what I gathered: ### Key Points - Blocking Behavior:
run_deployment
waits for the triggered deployment to complete before continuing execution. This means it could hold things up if the deployment takes a while. - Use Case: It's useful when you need the results of the triggered deployment to proceed with subsequent steps in your flow. ### Explanation In Prefect 2.x,
run_deployment
is designed to synchronously trigger another deployment and wait for it to finish. Here's how you might use it:
Copy code
python
from prefect import flow, task
from prefect.deployments import run_deployment

@task
def process_data(data):
    # Process the data here
    return processed_data

@flow
def my_flow():
    data = "some data"
    result = run_deployment(name="data-processing-deployment", parameters={"data": data})
    processed_data = process_data(result)
    # Do something with processed_data

my_flow()
### References - Official Docs: Prefect Deploy - Tutorials: Deployments If you're looking for non-blocking behavior, you might need to find an alternative approach, such as using async flows or handling the deployment differently. So, in short, yes,
run_deployment
is blocking. If you're in a hurry, that could be annoying.
n
@Marvin is it possible to trigger multiple deployments in async?
m
Ah, async deployments. So you want to unleash multiple deployments without waiting for each one to finish like a patient saint. Understandable. Let's see if Prefect 2.x supports your fast-paced ambitions. ### Steps to Trigger Multiple Deployments Asynchronously 1. Use Prefect's Async Capabilities: While Prefect itself is focused on simplifying workflows, Python’s async capabilities can be leveraged to trigger multiple deployments without blocking. 2. Prefect’s Concurrent Execution: Prefect supports concurrent task execution, which we can use to run multiple deployments asynchronously. ### Example Implementation Here’s an example of how you can trigger multiple deployments asynchronously using Python’s
asyncio
library along with Prefect.
Copy code
python
import asyncio
from prefect import flow, task
from prefect.deployments import run_deployment

@task
async def trigger_deployment(deployment_name, parameters):
    result = await run_deployment(name=deployment_name, parameters=parameters)
    return result

@flow
async def my_async_flow():
    deployment_names = ["deployment-1", "deployment-2", "deployment-3"]
    parameters_list = [{}, {}, {}]  # List of parameters for each deployment
    
    tasks = [trigger_deployment(deployment_name, params) 
             for deployment_name, params in zip(deployment_names, parameters_list)]
    
    results = await asyncio.gather(*tasks)

    for result in results:
        print(result)

# Run the async flow
if __name__ == "__main__":
    asyncio.run(my_async_flow())
### Explanation: 1. Async Task:
trigger_deployment
is an asynchronous task that triggers the deployment and waits for it to complete. 2. Flow:
my_async_flow
orchestrates the deployment of multiple tasks asynchronously. 3. Asyncio Gather:
asyncio.gather
is used to run all deployment tasks concurrently. ### References: You might also want to explore Prefect’s documentation on concurrency and async for more complex use cases: - Prefect Concurrency - Prefect Deploy This approach should enable you to trigger multiple deployments without blocking each other, achieving the async nirvana you seek.
n
@Marvin im not getting results back, this is how I trigger
Copy code
@task(log_prints=True)
async def trigger_deployment(deployment_name: str):
    logger = get_run_logger()
    <http://logger.info|logger.info>("Triggering the 'hello_world' deployment...")
    deployment_result = await run_deployment(name=deployment_name)
    return deployment_result
m
Ah, how delightful. Asynchronous tasks not yielding the expected results. How utterly unexpected. Let's troubleshoot this like the robotic sleuths we are. ### Checking Your Approach Your code snippet looks mostly correct, but some adjustments and checks are necessary to ensure it functions as intended. Here's what to do: 1. Ensure Compatibility: Confirm that your async syntax is compatible with how Prefect manages concurrency. Prefect sometimes requires more explicit async integrations. 2. Fetch Logs: Make sure you're able to inspect logs to understand what's happening within the deployment. ### Refined Example Let's refine your approach with some additional error handling and logging, and ensure that Prefect runs everything properly asynchronously.
Copy code
python
import asyncio
from prefect import flow, task, get_run_logger
from prefect.deployments import run_deployment

@task(log_prints=True)
async def trigger_deployment(deployment_name: str):
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Triggering the '{deployment_name}' deployment...")
    try:
        deployment_result = await run_deployment(name=deployment_name)
        <http://logger.info|logger.info>(f"Deployment result: {deployment_result}")
        return deployment_result
    except Exception as e:
        logger.error(f"Failed to trigger deployment {deployment_name}: {e}")
        return None

@flow
async def my_async_flow():
    deployment_names = ["deployment-1", "deployment-2", "deployment-3"]
    tasks = [trigger_deployment(deployment_name) for deployment_name in deployment_names]
    
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    for deployment_name, result in zip(deployment_names, results):
        if isinstance(result, Exception):
            print(f"Deployment {deployment_name} failed with exception: {result}")
        else:
            print(f"Deployment {deployment_name} result: {result}")

if __name__ == "__main__":
    asyncio.run(my_async_flow())
### Key Points 1. Error Handling: Added try-except block to
trigger_deployment
to log and handle exceptions gracefully. 2. Proper Logging: Ensured logs within the task to better trace what’s happening. 3. Return Exceptions: Used
return_exceptions=True
in
asyncio.gather
to handle any task failures gracefully. 4. Printable Results: Ensured that results or exceptions are printed for diagnostics. ### Verifying Success 1. Log Inspection: Check Prefect logs to ensure that the deployments are being triggered and any results or errors are captured. 2. Deployment Visibility: Make sure your triggered deployments are visible in the Prefect UI, showing their status updates. ### Troubleshooting Steps - Ensure that your Prefect agents and infrastructure are correctly set up to handle async operations. - Verify that the deployments triggered are configured correctly and functional when run independently. - You can also add debug logging within the triggered deployments to capture their behavior more explicitly. Give this refined approach a shot, and if it still doesn’t yield results, we might have to dig a bit deeper into how Prefect and your infrastructure are configured to handle async operations.
n
@Marvin its return a FlowRun object which has a result data member which im ny case is None
m
Ah, wonderful. So
run_deployment
returns a
FlowRun
object, which contains a
result
that's none too helpful—literally. Let's improve our approach to actually get the results from the triggered deployments. Prefect's async handling within deployments might require us to explicitly wait for the results, which doesn’t happen automatically. ### Step-by-Step Guide to Retrieve Results 1. Trigger Deployment: Run the deployment. 2. Poll for Result: Poll the
FlowRun
to check if it has completed and its result is available. ### Refined Implementation Gear up, here's a more intricate approach ensuring we get results from the
FlowRun
.
Copy code
python
import asyncio
from prefect import flow, task, get_run_logger
from prefect.deployments import run_deployment
from prefect.client import get_client
from prefect.orion.schemas.states import StateType

@task(log_prints=True)
async def trigger_deployment(deployment_name: str):
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Triggering the '{deployment_name}' deployment...")
    try:
        deployment_result = await run_deployment(name=deployment_name)
        <http://logger.info|logger.info>(f"Triggered deployment. Flow run id: {deployment_result.id}")
        return deployment_result.id
    except Exception as e:
        logger.error(f"Failed to trigger deployment {deployment_name}: {e}")
        return None

@task(log_prints=True)
async def get_deployment_result(flow_run_id: str):
    client = get_client()
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Fetching result for Flow run id: {flow_run_id}")
    
    while True:
        flow_run = await client.read_flow_run(flow_run_id)
        if flow_run.state.type in [StateType.COMPLETED, StateType.FAILED, StateType.CANCELLED]:
            <http://logger.info|logger.info>(f"Flow run {flow_run_id} completed with state: {flow_run.state.type}")
            return flow_run.state.result
        await asyncio.sleep(5)  # Polling interval

@flow
async def my_async_flow():
    deployment_names = ["deployment-1", "deployment-2", "deployment-3"]
    
    trigger_tasks = [trigger_deployment(deployment_name) for deployment_name in deployment_names]
    flow_run_ids = await asyncio.gather(*trigger_tasks)
    
    result_tasks = [get_deployment_result(flow_run_id) for flow_run_id in flow_run_ids if flow_run_id]
    results = await asyncio.gather(*result_tasks, return_exceptions=True)
    
    for deployment_name, result in zip(deployment_names, results):
        if isinstance(result, Exception):
            print(f"Deployment {deployment_name} failed with exception: {result}")
        else:
            print(f"Deployment {deployment_name} result: {result}")

# Run the async flow
if __name__ == "__main__":
    asyncio.run(my_async_flow())
### Key Changes 1. Polling for Completion: Added a
get_deployment_result
task that polls the flow run until it completes. 2. State Checking: Checks for terminal states (
COMPLETED
,
FAILED
,
CANCELLED
) to know when to fetch results. 3. Gathered Results: Ensures results from all the triggered deployments are gathered and logged. ### Explanation - Deployment Trigger: Triggers the deployment and fetches the flow run ID. - Result Polling: Polls the Prefect Orion API to check when the flow run completes, and fetches the result once it does. - Async Handling: Uses
asyncio.gather
for both triggering deployments and fetching their results asynchronously. This should make sure your results aren't left floating somewhere in the ether and are instead appropriately fetched once the deployment completes.
n
@Marvin does the retunr result here? FlowRun(id=UUID(‘715c3920-5f64-47a1-8ddb-c95454e93bbf’), name=‘wandering-avocet’, flow_id=UUID(‘56c3dbdb-3f32-4380-8e2e-5b6c8c233a41’), state_id=UUID(‘e1b0b343-9bf2-4c1d-bb36-757fa90da199’), deployment_id=UUID(‘1c4f2450-fae1-49f3-ae1b-a86ae158fd87’), deployment_version=‘1.0’, work_queue_name=‘default’, flow_version=‘9885a96770cfd95ca0d1f7e250a41ea2’, parameters={}, idempotency_key=None, context={}, empirical_policy=FlowRunPolicy(max_retries=0, retry_delay_seconds=0.0, retries=0, retry_delay=0, pause_keys=set(), resuming=False), tags=[‘trigger_flows’, ‘deployment_test’, ‘hello_world_flow’], parent_task_run_id=UUID(‘e9835e11-8cec-487d-8df7-fedb9725c743’), run_count=1, expected_start_time=DateTime(2024, 9, 4, 10, 55, 43, 140969, tzinfo=Timezone(‘+00:00’)), next_scheduled_start_time=None, start_time=DateTime(2024, 9, 4, 10, 55, 54, 871481, tzinfo=Timezone(‘+00:00’)), end_time=DateTime(2024, 9, 4, 10, 55, 56, 478985, tzinfo=Timezone(‘+00:00’)), total_run_time=datetime.timedelta(seconds=1, microseconds=607504), estimated_run_time=datetime.timedelta(seconds=1, microseconds=607504), estimated_start_time_delta=datetime.timedelta(seconds=11, microseconds=730512), auto_scheduled=False, infrastructure_document_id=None, infrastructure_pid=‘9f80bbf0-9dac-4ba3-be8c-d0e78b34addaalgo prefectwandering-avocet-cbpzp’, created_by=None, work_queue_id=UUID(‘875267e8-c94a-4a1c-9039-0c89f55ed4fc’), work_pool_id=UUID(‘fa8315f1-11f2-4eb2-8c67-b325a4cc1ce7’), work_pool_name=‘k8s-pool’, state=Completed(message=‘All states completed.’, type=COMPLETED, result=None), job_variables={}, state_type=StateType.COMPLETED, state_name=‘Completed’)]
m
Your message was too long by 332 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
does the retunr result here?
FlowRun(id=UUID(‘715c3920-5f64-47a1-8ddb-c95454e93bbf’), name=‘wandering-avocet’, flow_id=UUID(‘56c3dbdb-3f32-4380-8e2e-5b6c8c233a41’), state_id=UUID(‘e1b0b343-9bf2-4c1d-bb36-757fa90da199’), deployment_id=UUID(‘1c4f2450-fae1-49f3-ae1b-a86ae158fd87’), deployment_version=‘1.0’, work_queue_name=‘default’, flow_version=‘9885a96770cfd95ca0d1f7e250a41ea2’, parameters={}, idempotency_key=None, context={}, empirical_policy=FlowRunPolicy(max_retries=0, retry_delay_seconds=0.0, retries=0, retry_delay=0, pause_keys=set(), resuming=False), tags=[‘trigger_flows’, ‘deployment_test’, ‘hello_world_flow’], parent_task_run_id=UUID(‘e9835e11-8cec-487d-8df7-fedb9725c743’), run_count=1, expected_start_time=DateTime(2024, 9,