Hi everyone, can anyone help me with the below error, i am creating the deployment and then creating...
n
Hi everyone, can anyone help me with the below error, i am creating the deployment and then creating a flow run using the api request, but after creating the flow run it get failed instantly. Below is the error i am getting
Copy code
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/prefect/agent.py", line 424, in _submit_run_and_capture_errors
    result = await infrastructure.run(task_status=task_status)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/infrastructure/process.py", line 90, in run
    raise ValueError("Process cannot be run with empty command.")
ValueError: Process cannot be run with empty command.
n
Seeing some of the flow might help, never seen that error before
n
Copy code
import warnings
warnings.filterwarnings('ignore')
from dotenv import load_dotenv
load_dotenv()
from prefect import flow, task
from zipfile import ZipFile
import pydicom as dicom
from pathlib import Path
from uuid import uuid1
import os, json, base64, copy
import requests
import json
import shutil
from test_algo_helper import *
import time
import os
import s3fs
import boto3
import botocore
from botocore.exceptions import ClientError
from botocore.client import Config
import uuid
from prefect.futures import PrefectFuture
from prefect.blocks.notifications import SlackWebhook
from prefect_alert import alert_on_failure
from prefect.task_runners import SequentialTaskRunner
from save_logs import *
from utils import *

state = {}
SEND_IP = os.environ.get("SEND_LOG_IP")  # "platform-django:5000"
TEMP_PATH = os.environ.get("TEMP_FOLDER_PATH")  # "/tmp"
HGW_URL = os.environ.get("SEND_HGW_URL")  # "<http://hgw-in-api:8080/response>"
TMP_FOLDER_PATH = TEMP_PATH


def print_logs(template, *args):
    formatted_string = template % args
    logger.debug(formatted_string)
    request_logger.debug(formatted_string)

def on_failure_callback():
    print("In Failure Callback *****************************************************")

def on_success_callback():

    print("In Success Callback *****************************************************")


def call_on_failure_if_failed(task_future: PrefectFuture):
    task_future.wait()  # block until completion
    if task_future.get_state().is_failed():
        calling_on_failure = on_failure_callback()
    else:
        print(f"\n Task {task_future.task_run.name} is th flow run id {task_future.task_run.flow_run_id} is SUCCESS \n")
    return


@task(retries=1, retry_delay_seconds=30)
def generate_jobID(algo_id, jid):
    global state
    logger = setup_logger(jid)
    try:
        state["job_id"] = jid
        state["algo_id"] = algo_id
        state["algo-name"] = "test-algo"
        print("In Get Job Id Task -> ", state)
        send_log(job_id=state["job_id"], header="AI Gateway", subheader="File recieved to AI gateway", status="success",
                 level=5, direction=False, send_ip=SEND_IP, msg_id=5)
        print_logs("Task 1 Generate Job_id ===> %s ", state)
        return True
    except Exception as e:
        print("Generate Jobid Exception\n\n", e, "\n\nGenerate Jobid Exception")
        print_logs("Task 1 Generate Job_id Exception ===> %s ", e)
        send_log(job_id=state["job_id"], header="AI Gateway", subheader="Failed at AI Gateway", status="success",
                 level=5, direction=False, send_ip=SEND_IP, msg_id=5)
        return False

@task(retries=1, retry_delay_seconds=30)
def convert_output(prev_task):
    global state
    logger = setup_logger(state["job_id"])
    try:
        print("inside convert_output")
        output, output_flag = post_process_3001(TEMP_PATH, state['job_id'])
        if output_flag:
            print_logs("convert_output Block post_process_3001 reponse   ===> %s ", output)
            state['algo_output_path'] = output
            send_log(job_id = state["job_id"], header = "AI Gateway", subheader = "Inference completed at AI gateway", status = "success", level = 5, direction = True,  send_ip = SEND_IP, msg_id = 9)
            return True
        else:
            send_log(job_id = state["job_id"], header = "AI Gateway", subheader = "Failed to Convert Outputs", status = "success", level = 5, direction = True,  send_ip = SEND_IP, msg_id = 9)
            return False
    except Exception as e:
        print_logs("convert_output Block post_process_3001 Exception Block   ===> %s ", e)
        send_log(job_id = state["job_id"], header = "AI Gateway", subheader = "Failed To load the Result", status = "success", level = 5, direction = True,  send_ip = SEND_IP, msg_id = 9)
        return False

@task(retries=1, retry_delay_seconds=30)
def zip_file(prev_task):
    global state
    logger = setup_logger(state["job_id"])
    try:
        status = create_random_sc(state)
        status, final_path = zip_directory(state['algo_output_path'], TEMP_PATH, state['job_id'])
        print("*********************************\n\n\n", status, final_path, "\n\n\n************************************************")
        print_logs("In Task 13 Zip File Block flag status ===> %s ", status)
        print_logs("In Task 13 Zip File Block final_path status ===> %s ", final_path)
        if status != "failed":
            state["final_zip_path"] = final_path
            return True
    except Exception as e:
        print(e)
        print_logs("In Task 13 Zip File Exception Block ===> %s ", e)
        import traceback
        print(traceback.format_exc())
        return False

@task(retries=1, retry_delay_seconds=30)
def get_file(prev_task):
    global state
    logger = setup_logger(state["job_id"])
    try:
        print("Getting File ................ ")
        status = unzip_file(TEMP_PATH + state["job_id"] + ".json", state['job_id'], TEMP_PATH)
        if status != "failed":
            state["unzip_path"] = status
            send_log(job_id=state["job_id"], header="AI Gateway", subheader="Started Inferencing at AI gateway",
                     status="success", level=5, direction=False, send_ip=SEND_IP, msg_id=6)
            print_logs("Task 2 Get FIle ===> %s ", state)
            return True
        print("In Get File Task -> ", state)
        send_log(job_id=state["job_id"], header="AI Gateway", subheader="Failed to get Inferecing Data",
                 status="success", level=5, direction=False, send_ip=SEND_IP, msg_id=6)
        print_logs("Task 2 Get FIle Failed block ===> %s ", state)
        return False
    except Exception as e:
        print_logs("Task 2 Get FIle Exception block ===> %s ", e)
        send_log(job_id=state["job_id"], header="AI Gateway", subheader="Failed to get Inferecing Data",
                 status="success", level=5, direction=False, send_ip=SEND_IP, msg_id=6)
        return False

@flow(name="algo:3003")
def start_inferencing_3003(my_param, file_path):
    job_id = my_param
    algo_id = "3003"
    global request_logger
    global logger
    logger = setup_logger(job_id)
    request_logger = setup_request_logger("3003",job_id)
    print(">>>>>>>>>>>>>>>>>>>>>>>>>>>")
    print(algo_id, job_id, file_path)
    print(">>>>>>>>>>>>>>>>>>>>>>>>>>>")
    res_1 = generate_jobID.submit(algo_id, job_id)
    call_on_failure_if_failed(res_1)
    res_2 = get_file.submit(file_path)
    call_on_failure_if_failed(res_2)
    time.sleep(25)
    res_10 = convert_output.submit(prev_task = res_2)
    call_on_failure_if_failed(res_10)
    res_13 = zip_file.submit(prev_task = res_10)
    call_on_failure_if_failed(res_13)
    res_14 = send_to_HGW.submit(state,prev_task = res_13)
    call_on_failure_if_failed(res_14)

if __name__ == "main":
    start_inferencing_3003(parameters=dict_)
This is the flows i am using
Also i am creating deployment using the API and i am not using infrastructure so i am sending infrastructure_document_id = None.
n
Weird, the code looks decent enough at a glance, I guess you’re able to run it from the command line? Maybe the issue is with the deployment? Maybe it can’t find the code to run, how are you deploying it?
n
let me share my deployment function with you.
Copy code
def create_deployment(algo_id, project_id, storage_id, infra_id, flow_run_id, flow_run_id_2, dicom_push_flag):                                                                                                               
    endpoint = "deployments/" 
    preprocess_deployment_id = ""
    postprocess_deployment_id = ""                                                                                                                                                               
    headers = {                                                                                                                                                                              
        'Content-Type': 'application/json',                                                                                                                                                  
        'Authorization': AUTH_HEADER                                                                                                                    
    }                                                                                                                                                                                        
    payload = json.dumps({                                                                                                                                                                   
        "name": f"algo.{algo_id}.{project_id}",                                                                                                                                              
        "flow_id": flow_run_id,                                                                                                                                                              
        "is_schedule_active": True,                                                                                                                                                          
        "parameters": {},                                                                                                                                                                    
        "tags": [],                                                                                                                                                                          
        "work_queue_name": f"algo-{algo_id}-{project_id}",                                                                                                                                   
        "storage_document_id": storage_id,                                                                                                                                                   
        "infrastructure_document_id": None,                                                                                                                                              
        "description": f"This is the deployment for algo id {algo_id} and project id {project_id}",                                                                                          
        "parameter_openapi_schema": {},                                                                                                                                                      
        "path": "",                                                                                                                                                                          
        "version": "1",                                                                                                                                                                      
        "entrypoint": f"flows/algo-{algo_id}.py:start_inferencing_{algo_id}",                                                                                                                
        "infra_overrides": {}                                                                                                                                                                
    })                                                                                                                                                                                       
    response = make_request("POST", endpoint, headers, payload)                                                                                                                              
    print(response.json()["id"])                                                                                                                                                             
    print(response.text)                                                                                                                                                                     
    preprocess_deployment_id =  response.json()["id"]
Since i am not using any infra, so i am sending infrastructure_document_id as None. The deployment is getting created, i cannot see any error it that.
n
Is there a reason you are using JSON and the prefect rest api over the python sdk? The python sdk is much easier. I have created deployments in the past that worked fine but couldn’t find the code that is being deployed, so the flow fails. I’m guessing that’s what is happening here, I don’t really see where the code location is being referenced. You might need to make /flows an absolute path
n
If it is possible can you please share how you are creating deployment with python sdk. Also i am creating this deployment using the api because i needed a dynamic flow where i am create queue, deployment and flow run as when required This idea of Api i got from there docs only which is available on https//{url}4200/docs I will see if i giving a path of the flows resolves this issue
n
With basic test deployments on local infa, in the new style, I’d use serve: https://docs.prefect.io/latest/tutorial/deployments/ - if you are using docker and all that you’d want to use deployments: https://docs.prefect.io/latest/concepts/deployments/?deviceId=39767c8c-6b79-4fc9-860d-f4046a65d9fa I’d probably always have a server running, so you can use the rest api to make calls to make a flow run from a deployments of a flow. Then there’s less setup complexity. You’d need to decide where you are storing your code, in GitHub or local or a shared drive. There’s very little documentation for a local or shared drive but apparently it’s possible.
n
Ohh okay, thanks a lot nathan for you help
n
Sure thing, if you want more dynamic setups, work pools should do what you want, there’s a bit more setup though