add "empty-vector-storage" and "empty-object-storage" endpoint
This commit is contained in:
parent
1c9d74407d
commit
e3b62b76f9
1 changed files with 81 additions and 37 deletions
|
|
@ -1,7 +1,7 @@
|
||||||
"""API-Endpoints to interaction with the pdfs."""
|
"""API-Endpoints to interaction with the pdfs."""
|
||||||
|
|
||||||
import os
|
import os
|
||||||
import sys
|
import json
|
||||||
|
|
||||||
from typing import List
|
from typing import List
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
|
|
@ -15,11 +15,16 @@ from preprocessing import pdf
|
||||||
from neural_search.search_component import IndexSearchComponent
|
from neural_search.search_component import IndexSearchComponent
|
||||||
from connector.database_interface.opensearch_client import OpenSearchInterface
|
from connector.database_interface.opensearch_client import OpenSearchInterface
|
||||||
|
|
||||||
import logging
|
|
||||||
import datetime as dt
|
|
||||||
from datetime import datetime
|
|
||||||
import traceback
|
import traceback
|
||||||
|
|
||||||
|
from common_packages import logging
|
||||||
|
|
||||||
|
# instantiate logger
|
||||||
|
logger = logging.create_logger(
|
||||||
|
log_level=os.getenv("LOGGING_LEVEL", "INFO"),
|
||||||
|
logger_name=__name__,
|
||||||
|
)
|
||||||
|
|
||||||
OS_INTERFACE = OpenSearchInterface(
|
OS_INTERFACE = OpenSearchInterface(
|
||||||
index_name="german",
|
index_name="german",
|
||||||
embedder_name="PM-AI/bi-encoder_msmarco_bert-base_german",
|
embedder_name="PM-AI/bi-encoder_msmarco_bert-base_german",
|
||||||
|
|
@ -29,19 +34,6 @@ OS_INTERFACE = OpenSearchInterface(
|
||||||
SEARCH_COMPONENT = IndexSearchComponent(os_client=OS_INTERFACE)
|
SEARCH_COMPONENT = IndexSearchComponent(os_client=OS_INTERFACE)
|
||||||
DIRECTORY_NAME = "german"
|
DIRECTORY_NAME = "german"
|
||||||
|
|
||||||
|
|
||||||
# Setup Logging
|
|
||||||
logging.basicConfig(
|
|
||||||
level=logging.DEBUG,
|
|
||||||
# level=logging.INFO,
|
|
||||||
format="Start: " + str(dt.datetime.now()).replace(" ", "_") + " | %(asctime)s [%(levelname)s] %(message)s",
|
|
||||||
handlers=[
|
|
||||||
logging.FileHandler("/<path>-_" + str(datetime.today().strftime('%Y-%m-%d')) + "_-_debug.log"),
|
|
||||||
logging.StreamHandler(sys.stdout)
|
|
||||||
]
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
router = APIRouter()
|
router = APIRouter()
|
||||||
|
|
||||||
# Create MinIO client
|
# Create MinIO client
|
||||||
|
|
@ -54,7 +46,7 @@ minio_client = boto3.client(
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@router.get("/list-all-buckets", tags=["Object Storage"])
|
@router.get("/list-all-buckets", tags=["object-storage"])
|
||||||
def list_buckets():
|
def list_buckets():
|
||||||
"""Lists object storage buckets.
|
"""Lists object storage buckets.
|
||||||
|
|
||||||
|
|
@ -69,7 +61,7 @@ def list_buckets():
|
||||||
}
|
}
|
||||||
"""
|
"""
|
||||||
|
|
||||||
logging.info("Listing all available buckets")
|
logger.info("Listing all available buckets")
|
||||||
|
|
||||||
response = minio_client.list_buckets()
|
response = minio_client.list_buckets()
|
||||||
buckets = [bucket["Name"] for bucket in response["Buckets"]]
|
buckets = [bucket["Name"] for bucket in response["Buckets"]]
|
||||||
|
|
@ -77,7 +69,7 @@ def list_buckets():
|
||||||
# Create the bucket
|
# Create the bucket
|
||||||
minio_client.create_bucket(Bucket=settings.BUCKET)
|
minio_client.create_bucket(Bucket=settings.BUCKET)
|
||||||
else:
|
else:
|
||||||
logging.info("Bucket '%s' already exists", settings.BUCKET)
|
logger.info("Bucket '%s' already exists", settings.BUCKET)
|
||||||
|
|
||||||
return {"response": buckets}
|
return {"response": buckets}
|
||||||
|
|
||||||
|
|
@ -90,25 +82,25 @@ def list_pdf_files():
|
||||||
list: Returns the list of pdf document names
|
list: Returns the list of pdf document names
|
||||||
"""
|
"""
|
||||||
|
|
||||||
logging.info("Listing all available files in object storage")
|
logger.info("Listing all available files in object storage")
|
||||||
|
|
||||||
# Get all entries from MinIO bucket
|
# Get all entries from MinIO bucket
|
||||||
try:
|
try:
|
||||||
response = minio_client.list_objects_v2(Bucket=settings.BUCKET)
|
response = minio_client.list_objects_v2(Bucket=settings.BUCKET)
|
||||||
|
|
||||||
except minio_client.exceptions.NoSuchBucket as error:
|
except minio_client.exceptions.NoSuchBucket as error:
|
||||||
logging.warning("No such MinIO bucket: '%s'", settings.BUCKET)
|
logger.warning("No such MinIO bucket: '%s'", settings.BUCKET)
|
||||||
raise HTTPException(status_code=404, detail="Bucket not found") from error
|
raise HTTPException(status_code=404, detail="Bucket not found") from error
|
||||||
|
|
||||||
# Check if empty
|
# Check if empty
|
||||||
if "Contents" not in response.keys():
|
if "Contents" not in response.keys():
|
||||||
logging.info("Bucket is empty")
|
logger.info("Bucket is empty")
|
||||||
return {"message": "success", "count": 0, "objects": []}
|
return {"message": "success", "count": 0, "objects": []}
|
||||||
|
|
||||||
files_in_directory = response["Contents"]
|
files_in_directory = response["Contents"]
|
||||||
|
|
||||||
object_names = [entry["Key"] for entry in files_in_directory]
|
object_names = [entry["Key"] for entry in files_in_directory]
|
||||||
logging.info("Number of objects in MinIO: %s", len(object_names))
|
logger.info("Number of objects in MinIO: %s", len(object_names))
|
||||||
return {"message": "success", "count": len(object_names), "objects": object_names}
|
return {"message": "success", "count": len(object_names), "objects": object_names}
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -128,14 +120,14 @@ def get_pdf(file_path: str):
|
||||||
minio_client.download_fileobj(settings.BUCKET, file_path, buffer)
|
minio_client.download_fileobj(settings.BUCKET, file_path, buffer)
|
||||||
buffer.seek(0)
|
buffer.seek(0)
|
||||||
pdf_bytes = buffer.read()
|
pdf_bytes = buffer.read()
|
||||||
logging.info("PDF read successful")
|
logger.info("PDF read successful")
|
||||||
|
|
||||||
# Create a streaming response with the PDF bytes
|
# Create a streaming response with the PDF bytes
|
||||||
return StreamingResponse(BytesIO(pdf_bytes), media_type="application/pdf")
|
return StreamingResponse(BytesIO(pdf_bytes), media_type="application/pdf")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error("Retrieving PDF failed with error: %s", e)
|
logger.error("Retrieving PDF failed with error: %s", e)
|
||||||
logging.error("Stacktrace: " + str(traceback.format_exc()))
|
logger.error("Stacktrace: " + str(traceback.format_exc()))
|
||||||
raise HTTPException("Retrieving PDF failed") from e
|
raise HTTPException("Retrieving PDF failed") from e
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -153,11 +145,11 @@ def upload_pdf_list(tag: str = Form(...), pdf_files: List[UploadFile] = File(...
|
||||||
"""
|
"""
|
||||||
|
|
||||||
upload_responses = []
|
upload_responses = []
|
||||||
logging.info("Number of files to be processed: %s", len(pdf_files))
|
logger.info("Number of files to be processed: %s", len(pdf_files))
|
||||||
|
|
||||||
# read pdf files and upload it, then process it. Finally, delete it.
|
# read pdf files and upload it, then process it. Finally, delete it.
|
||||||
for pdf_file in pdf_files:
|
for pdf_file in pdf_files:
|
||||||
logging.info("Processing file: %s", pdf_file.filename)
|
logger.info("Processing file: %s", pdf_file.filename)
|
||||||
pdf_file_name = pdf_file.filename
|
pdf_file_name = pdf_file.filename
|
||||||
pdf_file_path = f"{settings.BUCKET_FILE_PATH}/{pdf_file.filename}"
|
pdf_file_path = f"{settings.BUCKET_FILE_PATH}/{pdf_file.filename}"
|
||||||
pdf_contents = pdf_file.file.read()
|
pdf_contents = pdf_file.file.read()
|
||||||
|
|
@ -178,30 +170,82 @@ def upload_pdf_list(tag: str = Form(...), pdf_files: List[UploadFile] = File(...
|
||||||
put_response = minio_client.put_object(
|
put_response = minio_client.put_object(
|
||||||
Bucket=settings.BUCKET, Key=object_name, Body=pdf_contents
|
Bucket=settings.BUCKET, Key=object_name, Body=pdf_contents
|
||||||
)
|
)
|
||||||
logging.info("Upload to object store successful")
|
logger.info("Upload to object store successful")
|
||||||
logging.debug(put_response)
|
logger.debug(put_response)
|
||||||
|
|
||||||
# return put_response
|
# return put_response
|
||||||
|
|
||||||
if isinstance(put_response, dict):
|
if isinstance(put_response, dict):
|
||||||
upload_responses.append("success")
|
upload_responses.append("success")
|
||||||
logging.info("File upload successful")
|
logger.info("File upload successful")
|
||||||
else:
|
else:
|
||||||
upload_responses.append("failure")
|
upload_responses.append("failure")
|
||||||
|
|
||||||
docs, pages_list = pdf.read_pdf(pdf_bytes=pdf_contents)
|
docs, pages_list = pdf.read_pdf(pdf_bytes=pdf_contents)
|
||||||
logging.debug(docs)
|
logger.debug(docs)
|
||||||
logging.debug(pages_list)
|
logger.debug(pages_list)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error("PDF upload failed with error: %s ", e)
|
logger.error("PDF upload failed with error: %s ", e)
|
||||||
logging.error("Stacktrace: " + str(traceback.format_exc()))
|
logger.error("Stacktrace: " + str(traceback.format_exc()))
|
||||||
raise HTTPException("PDF upload failed") from e
|
raise HTTPException("PDF upload failed") from e
|
||||||
|
|
||||||
if "failure" in upload_responses:
|
if "failure" in upload_responses:
|
||||||
logging.error(
|
logger.error(
|
||||||
"Error while uploading. At least one document was not processed correctly"
|
"Error while uploading. At least one document was not processed correctly"
|
||||||
)
|
)
|
||||||
raise HTTPException(status_code=400, detail="Error while uploading documents")
|
raise HTTPException(status_code=400, detail="Error while uploading documents")
|
||||||
|
|
||||||
return {"message": "success"}
|
return {"message": "success"}
|
||||||
|
|
||||||
|
|
||||||
|
@router.delete("/empty-vector-storage", tags=["vector-db"], status_code=200)
|
||||||
|
def empty_vector_storage():
|
||||||
|
"""Remove all entries of the index in the vector storage.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
dict: A simple success message.
|
||||||
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
OS_INTERFACE.empty_entire_index()
|
||||||
|
|
||||||
|
except HTTPException as error:
|
||||||
|
raise error
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Emptying vector storage failed with error: %s", e)
|
||||||
|
raise HTTPException(500, detail="An unexpected error occurred") from e
|
||||||
|
|
||||||
|
return {"message": "success"}
|
||||||
|
|
||||||
|
|
||||||
|
@router.delete("/empty-object-storage", tags=["object-storage"], status_code=200)
|
||||||
|
def empty_object_storage(bucket: str = None):
|
||||||
|
"""Remove all available objects in object storage.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
bucket (str, optional): Bucket to be used. Defaults to value in env-var.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
response (dict): A successful response.
|
||||||
|
"""
|
||||||
|
|
||||||
|
if not bucket:
|
||||||
|
bucket = settings.BUCKET
|
||||||
|
|
||||||
|
try:
|
||||||
|
# List all objects in the bucket
|
||||||
|
objects = minio_client.list_objects(Bucket=settings.BUCKET)
|
||||||
|
|
||||||
|
# Remove each object in the list
|
||||||
|
for obj in objects['Contents']:
|
||||||
|
minio_client.delete_object(
|
||||||
|
Bucket=settings.BUCKET,
|
||||||
|
Key=obj["Key"]
|
||||||
|
)
|
||||||
|
except Exception as err:
|
||||||
|
logger.warning("Error occurred when deleting object: %s", err)
|
||||||
|
raise HTTPException(500, detail="An unexpected error occurred") from err
|
||||||
|
|
||||||
|
return {"message": "success"}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue