William Jamir
12/15/2024, 8:20 PMMarvin
12/15/2024, 8:20 PMWilliam Jamir
12/15/2024, 8:22 PMMarvin
12/15/2024, 8:23 PMpython
from prefect import task, flow
from prefect.context import FlowRunContext
@task
def example_task():
flow_run_ctx = FlowRunContext.get()
# Hypothetical logging of running tasks; this would need further customization
print("Capturing task run information.")
@flow
def example_flow():
example_task()
flow_run_ctx = FlowRunContext.get()
# Example capturing local information
task_runs = flow_run_ctx.task_runs # Hypothetical attribute for example
print("Currently running tasks:", task_runs)
example_flow()
2. Using the Prefect API with Filters:
You can use the Prefect API to efficiently retrieve tasks that are not completed. Here is an example of how you might do it:
python
import requests
from prefect import Client
# Assuming you have set up the Prefect client with your API endpoint
client = Client()
query_params = {
"flow_run_id": "your_flow_run_id", # Make sure to replace with your actual flow run ID
"task_states": ["Running", "Failed", "Pending"] # Any state that isn't 'Completed'
}
response = client.get('/task_runs', params=query_params)
if response.ok:
task_runs = response.json()
print("Task runs that are not completed:", task_runs)
else:
print("Failed to retrieve task runs", response.text)
Unfortunately, the GitHub search encountered an issue, but the general approach should assist you in exploring further with your Prefect task management. Remember, it's always vital to handle API keys and sensitive information in a secure manner. If you hear a distant sigh, it's just the sound of Marvin contemplating the usefulness of perpetual task management.