<@ULVA73B9P> given my flow and task in prefect 3 ....
# ask-marvin
h
@Marvin given my flow and task in prefect 3 . The task get restarted after completion state @flow( name="process_batches", task_runner=DaskTaskRunner(), ) async def process_batches(logging): gcp_client = GCPStorage( service_account_key_json_path=os.getenv("SERVICE_ACCOUNT_KEY"), bucket_name=os.getenv("BUCKET_NAME"), content_type="application/pdf", ) data_base = Postgres( dbname=os.getenv("DB_NAME"), user=os.getenv("DB_USERNAME"), password=os.getenv("DB_PASSWORD"), host=os.getenv("DB_HOST"), port=os.getenv("DB_PORT"), schema_name=os.getenv("STAGGING_DB_SCHEMA"), ) llm_adapter = JazzHRLLMToTableAdapter( model_type=int(os.getenv("MODEL_TYPE")), model_name=os.getenv("MODEL_NAME"), model_api_key=os.getenv("MODEL_API_KEY"), model_temperature=os.getenv("MODEL_TEMPERATURE"), model_max_retries=os.getenv("MODEL_MAX_RETRIES"), database=data_base, ) if int(os.getenv("MODEL_TYPE")) == 1: llm_output_format = llm_output_structure_pydantic elif int(os.getenv("MODEL_TYPE")) == 2: llm_output_format = llm_output_structure_gemini.schema elif int(os.getenv("MODEL_TYPE")) == 3: llm_output_format = llm_output_structure_pydantic_langchain else: raise ValueError(f"""Invalid model_type {int(os.getenv("MODEL_TYPE"))} ACCEPTED VALUES {LLMType.OPENAI.name}, model_type={LLMType.OPENAI.value} {LLMType.GOOGLE.name}, model_type={LLMType.GOOGLE.value} {LLMType.HUGGINGFACE.name}, model_type={LLMType.HUGGINGFACE.value}""") await llm_adapter.set_llm(llm_output_format=llm_output_format) logging.info("loading file names stored in given storage location") pdf_files = await gcp_client.list_files(logging=logging) logging.info(f"total count of files fetched : {len(pdf_files)}") file_names = await llm_adapter.existing_files() resumes_to_process = [x for x in pdf_files if x.split("/")[-1] not in file_names] logging.info( f"total files remaining to be processed after removing the processed files {len(resumes_to_process)}" ) prompt_text = """ You are an AI bot designed to act as a professional for parsing resumes. You are given with resume as an input by the user and your job is to extract all the information in detail from the resume. Do not leave any information out. In case a field is not present return an empty string as the value for that particular field. """ batches = [ resumes_to_process[ i : i + int(len(resumes_to_process) / int(os.getenv("MODEL_PROCESSING_BATCH_SIZE"))) ] for i in range( 0, len(resumes_to_process), int(len(resumes_to_process) / int(os.getenv("MODEL_PROCESSING_BATCH_SIZE"))), ) ] futures = [] for index, batch in enumerate(batches): logging.info(f"BATCH TO PASS TO TASKKK :{len(batch)}") futures.append( processing_resume.submit( logging=logging, file_names=batch, prompt_text=prompt_text ) ) wait(futures) await data_base.async_execute_query(call_clean_parsed_resumes) logging.info("Parsed all resumes successfully") @flow(name="resume_to_json_flow") def resume_to_json_flow(): logger = get_run_logger() asyncio.run(process_batches(logging=logger)) @task async def processing_resume(logging, file_names: list[str], prompt_text: str): resume_parser = ResumeParser() gcp_client = GCPStorage( service_account_key_json_path=os.getenv("SERVICE_ACCOUNT_KEY"), bucket_name=os.getenv("BUCKET_NAME"), content_type="application/pdf", ) data_base = Postgres( dbname=os.getenv("DB_NAME"), user=os.getenv("DB_USERNAME"), password=os.getenv("DB_PASSWORD"), host=os.getenv("DB_HOST"), port=os.getenv("DB_PORT"), schema_name=os.getenv("STAGGING_DB_SCHEMA"), ) llm_adapter = JazzHRLLMToTableAdapter( model_type=int(os.getenv("MODEL_TYPE", "999")), model_name=os.getenv("MODEL_NAME", ""), model_api_key=os.getenv("MODEL_API_KEY", ""), model_temperature=os.getenv("MODEL_TEMPERATURE",""), model_max_retries=os.getenv("MODEL_MAX_RETRIES", ""), database=data_base, ) if int(os.getenv("MODEL_TYPE")) == 1: llm_output_format = llm_output_structure_pydantic elif int(os.getenv("MODEL_TYPE")) == 2: llm_output_format = llm_output_structure_gemini.schema elif int(os.getenv("MODEL_TYPE")) == 3: llm_output_format = llm_output_structure_pydantic_langchain else: raise ValueError(f"""Invalid model_type {int(os.getenv("MODEL_TYPE"))} ACCEPTED VALUES {LLMType.OPENAI.name}, model_type={LLMType.OPENAI.value} {LLMType.GOOGLE.name}, model_type={LLMType.GOOGLE.value} {LLMType.HUGGINGFACE.name}, model_type={LLMType.HUGGINGFACE.value}""") logging.info(f"batch size for worker {task_run.name} is : {len(file_names)}") await llm_adapter.set_llm(llm_output_format=llm_output_format) for file_name in file_names: logging.info(f"converting pdf file {file_name} for worker {task_run.name}") downloaded_pdf = await gcp_client.storage_to_memory(logging=logging,file_name=file_name) if not isinstance(downloaded_pdf, bytes): continue extracted_resume = await resume_parser.extract_text(logging=logging,file=downloaded_pdf, file_name=file_name) if len(extracted_resume.get("resume_text", ""))<700: logging.info(f"{file_name} has fewer than 700 characters") await llm_adapter.unprocessed_files(logging=logging,file_name=file_name) continue await llm_adapter.llm_to_table(logging=logging,llm=llm_adapter.get_llm(), prompt_text=prompt_text, resume_text=extracted_resume.get("resume_text", ""), file_name=file_name, page_count=extracted_resume.get("page_count", 0))
m
Your message was too long by 1160 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
given my flow and task in prefect 3 . The task get restarted after completion state
@flow( name="process_batches", task_runner=DaskTaskRunner(), ) async def process_batches(logging): gcp_client = GCPStorage( service_account_key_json_path=os.getenv("SERVICE_ACCOUNT_KEY"), bucket_name=os.getenv("BUCKET_NAME"), content_type="application/pdf", ) data_base = Postgres( dbname=os.getenv("DB_NAME"), user=os.getenv("DB_USERNAME"), password=os.getenv("DB_PASSWORD"), host=os.getenv("DB_HOST"), port=os.getenv("DB_PORT"), schema_name=os.getenv("STAGGING_DB_SCHEMA"), ) llm_adapter = JazzHRLLMToTableAdapter( model_type=int(os.getenv("MODEL_TYPE")), model_name=os.getenv("MODEL_NAME"), model_api_key=os.getenv("MODEL_API_KEY"), model_temperature=os.getenv("MODEL_TEMPERATURE"), model_max_retries=os.getenv("MODEL_MAX_RETRIES"), database=data_base, ) if int(os.getenv("MODEL_TYPE")) == 1: llm_output_format = llm_output_structure_pydantic elif int(os.getenv("MODEL_TYPE")) == 2: llm_output_format = llm_output_structure_gemini.schema elif int(os.getenv("MODEL_TYPE")) == 3: llm_output_format = llm_output_structure_pydantic