Nimesh Kumar
07/31/2024, 11:13 AMTraceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/prefect/agent.py", line 424, in _submit_run_and_capture_errors
result = await infrastructure.run(task_status=task_status)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/infrastructure/process.py", line 90, in run
raise ValueError("Process cannot be run with empty command.")
ValueError: Process cannot be run with empty command.
Nathan Low
07/31/2024, 9:25 PMNimesh Kumar
08/01/2024, 3:54 AMimport warnings
warnings.filterwarnings('ignore')
from dotenv import load_dotenv
load_dotenv()
from prefect import flow, task
from zipfile import ZipFile
import pydicom as dicom
from pathlib import Path
from uuid import uuid1
import os, json, base64, copy
import requests
import json
import shutil
from test_algo_helper import *
import time
import os
import s3fs
import boto3
import botocore
from botocore.exceptions import ClientError
from botocore.client import Config
import uuid
from prefect.futures import PrefectFuture
from prefect.blocks.notifications import SlackWebhook
from prefect_alert import alert_on_failure
from prefect.task_runners import SequentialTaskRunner
from save_logs import *
from utils import *
state = {}
SEND_IP = os.environ.get("SEND_LOG_IP") # "platform-django:5000"
TEMP_PATH = os.environ.get("TEMP_FOLDER_PATH") # "/tmp"
HGW_URL = os.environ.get("SEND_HGW_URL") # "<http://hgw-in-api:8080/response>"
TMP_FOLDER_PATH = TEMP_PATH
def print_logs(template, *args):
formatted_string = template % args
logger.debug(formatted_string)
request_logger.debug(formatted_string)
def on_failure_callback():
print("In Failure Callback *****************************************************")
def on_success_callback():
print("In Success Callback *****************************************************")
def call_on_failure_if_failed(task_future: PrefectFuture):
task_future.wait() # block until completion
if task_future.get_state().is_failed():
calling_on_failure = on_failure_callback()
else:
print(f"\n Task {task_future.task_run.name} is th flow run id {task_future.task_run.flow_run_id} is SUCCESS \n")
return
@task(retries=1, retry_delay_seconds=30)
def generate_jobID(algo_id, jid):
global state
logger = setup_logger(jid)
try:
state["job_id"] = jid
state["algo_id"] = algo_id
state["algo-name"] = "test-algo"
print("In Get Job Id Task -> ", state)
send_log(job_id=state["job_id"], header="AI Gateway", subheader="File recieved to AI gateway", status="success",
level=5, direction=False, send_ip=SEND_IP, msg_id=5)
print_logs("Task 1 Generate Job_id ===> %s ", state)
return True
except Exception as e:
print("Generate Jobid Exception\n\n", e, "\n\nGenerate Jobid Exception")
print_logs("Task 1 Generate Job_id Exception ===> %s ", e)
send_log(job_id=state["job_id"], header="AI Gateway", subheader="Failed at AI Gateway", status="success",
level=5, direction=False, send_ip=SEND_IP, msg_id=5)
return False
@task(retries=1, retry_delay_seconds=30)
def convert_output(prev_task):
global state
logger = setup_logger(state["job_id"])
try:
print("inside convert_output")
output, output_flag = post_process_3001(TEMP_PATH, state['job_id'])
if output_flag:
print_logs("convert_output Block post_process_3001 reponse ===> %s ", output)
state['algo_output_path'] = output
send_log(job_id = state["job_id"], header = "AI Gateway", subheader = "Inference completed at AI gateway", status = "success", level = 5, direction = True, send_ip = SEND_IP, msg_id = 9)
return True
else:
send_log(job_id = state["job_id"], header = "AI Gateway", subheader = "Failed to Convert Outputs", status = "success", level = 5, direction = True, send_ip = SEND_IP, msg_id = 9)
return False
except Exception as e:
print_logs("convert_output Block post_process_3001 Exception Block ===> %s ", e)
send_log(job_id = state["job_id"], header = "AI Gateway", subheader = "Failed To load the Result", status = "success", level = 5, direction = True, send_ip = SEND_IP, msg_id = 9)
return False
@task(retries=1, retry_delay_seconds=30)
def zip_file(prev_task):
global state
logger = setup_logger(state["job_id"])
try:
status = create_random_sc(state)
status, final_path = zip_directory(state['algo_output_path'], TEMP_PATH, state['job_id'])
print("*********************************\n\n\n", status, final_path, "\n\n\n************************************************")
print_logs("In Task 13 Zip File Block flag status ===> %s ", status)
print_logs("In Task 13 Zip File Block final_path status ===> %s ", final_path)
if status != "failed":
state["final_zip_path"] = final_path
return True
except Exception as e:
print(e)
print_logs("In Task 13 Zip File Exception Block ===> %s ", e)
import traceback
print(traceback.format_exc())
return False
@task(retries=1, retry_delay_seconds=30)
def get_file(prev_task):
global state
logger = setup_logger(state["job_id"])
try:
print("Getting File ................ ")
status = unzip_file(TEMP_PATH + state["job_id"] + ".json", state['job_id'], TEMP_PATH)
if status != "failed":
state["unzip_path"] = status
send_log(job_id=state["job_id"], header="AI Gateway", subheader="Started Inferencing at AI gateway",
status="success", level=5, direction=False, send_ip=SEND_IP, msg_id=6)
print_logs("Task 2 Get FIle ===> %s ", state)
return True
print("In Get File Task -> ", state)
send_log(job_id=state["job_id"], header="AI Gateway", subheader="Failed to get Inferecing Data",
status="success", level=5, direction=False, send_ip=SEND_IP, msg_id=6)
print_logs("Task 2 Get FIle Failed block ===> %s ", state)
return False
except Exception as e:
print_logs("Task 2 Get FIle Exception block ===> %s ", e)
send_log(job_id=state["job_id"], header="AI Gateway", subheader="Failed to get Inferecing Data",
status="success", level=5, direction=False, send_ip=SEND_IP, msg_id=6)
return False
@flow(name="algo:3003")
def start_inferencing_3003(my_param, file_path):
job_id = my_param
algo_id = "3003"
global request_logger
global logger
logger = setup_logger(job_id)
request_logger = setup_request_logger("3003",job_id)
print(">>>>>>>>>>>>>>>>>>>>>>>>>>>")
print(algo_id, job_id, file_path)
print(">>>>>>>>>>>>>>>>>>>>>>>>>>>")
res_1 = generate_jobID.submit(algo_id, job_id)
call_on_failure_if_failed(res_1)
res_2 = get_file.submit(file_path)
call_on_failure_if_failed(res_2)
time.sleep(25)
res_10 = convert_output.submit(prev_task = res_2)
call_on_failure_if_failed(res_10)
res_13 = zip_file.submit(prev_task = res_10)
call_on_failure_if_failed(res_13)
res_14 = send_to_HGW.submit(state,prev_task = res_13)
call_on_failure_if_failed(res_14)
if __name__ == "main":
start_inferencing_3003(parameters=dict_)
This is the flows i am usingNimesh Kumar
08/01/2024, 6:01 AMNathan Low
08/01/2024, 1:30 PMNimesh Kumar
08/01/2024, 1:31 PMNimesh Kumar
08/01/2024, 1:32 PMdef create_deployment(algo_id, project_id, storage_id, infra_id, flow_run_id, flow_run_id_2, dicom_push_flag):
endpoint = "deployments/"
preprocess_deployment_id = ""
postprocess_deployment_id = ""
headers = {
'Content-Type': 'application/json',
'Authorization': AUTH_HEADER
}
payload = json.dumps({
"name": f"algo.{algo_id}.{project_id}",
"flow_id": flow_run_id,
"is_schedule_active": True,
"parameters": {},
"tags": [],
"work_queue_name": f"algo-{algo_id}-{project_id}",
"storage_document_id": storage_id,
"infrastructure_document_id": None,
"description": f"This is the deployment for algo id {algo_id} and project id {project_id}",
"parameter_openapi_schema": {},
"path": "",
"version": "1",
"entrypoint": f"flows/algo-{algo_id}.py:start_inferencing_{algo_id}",
"infra_overrides": {}
})
response = make_request("POST", endpoint, headers, payload)
print(response.json()["id"])
print(response.text)
preprocess_deployment_id = response.json()["id"]
Nimesh Kumar
08/01/2024, 1:33 PMNathan Low
08/01/2024, 1:44 PMNimesh Kumar
08/01/2024, 2:50 PMNathan Low
08/01/2024, 3:10 PMNimesh Kumar
08/01/2024, 3:11 PMNathan Low
08/01/2024, 3:28 PMBring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by