From 16e50042286208080fdd55128dbb1eecab8bc8e5 Mon Sep 17 00:00:00 2001 From: Niklas Mueller Date: Tue, 18 Jun 2024 20:21:04 +0200 Subject: [PATCH] added API-Endpoints for files --- README.md | 7 + docker-compose.yaml | 2 +- rag-chat-backend/requirements.txt | 2 + rag-chat-backend/src/app.py | 7 + rag-chat-backend/src/core/config.py | 7 + rag-chat-backend/src/endpoints/__init__.py | 0 rag-chat-backend/src/endpoints/files.py | 183 +++++++++++++++++++++ 7 files changed, 207 insertions(+), 1 deletion(-) create mode 100644 rag-chat-backend/src/endpoints/__init__.py create mode 100644 rag-chat-backend/src/endpoints/files.py diff --git a/README.md b/README.md index 6ca0a8b..dac773d 100644 --- a/README.md +++ b/README.md @@ -21,3 +21,10 @@ podman-compose -f docker-compose.yaml up ### RAG-Backend * API-Docs http://localhost:8000/docs + + +## Development + +``` +nodemon --ext '*' --exec "podman stop rag-chat-backend; podman rm rag-chat-backend; podman-compose -f docker-compose.yaml up --build" +``` diff --git a/docker-compose.yaml b/docker-compose.yaml index 71b91e5..ce95721 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -48,7 +48,7 @@ services: build: context: ./rag-chat-backend dockerfile: Dockerfile - image: rag-chat-backend:0.1 + image: rag-chat-backend ports: - "8000:8000" environment: diff --git a/rag-chat-backend/requirements.txt b/rag-chat-backend/requirements.txt index 5741cf6..7de041d 100644 --- a/rag-chat-backend/requirements.txt +++ b/rag-chat-backend/requirements.txt @@ -2,6 +2,7 @@ uvicorn==0.22.0 fastapi==0.109.1 minio==7.1.17 +boto3==1.26.145 opensearch-py==2.3.2 opensearch-logger==1.3.0 @@ -9,3 +10,4 @@ opensearch-logger==1.3.0 python-dotenv==1.0.0 python-multipart==0.0.7 PyPDF2==3.0.1 +langchain==0.1.11 diff --git a/rag-chat-backend/src/app.py b/rag-chat-backend/src/app.py index 3f16c64..783b5b2 100644 --- a/rag-chat-backend/src/app.py +++ b/rag-chat-backend/src/app.py @@ -7,9 +7,13 @@ from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from dotenv import load_dotenv +from endpoints import files + from core.config import settings +# load env and disable several warning load_dotenv() + app = FastAPI() app.add_middleware( @@ -20,6 +24,9 @@ app.add_middleware( allow_headers=["*"], ) +app.include_router(files.router, prefix=settings.API_V1_STR) # , tags=["files"] + + print('OPENSEARCH_USE_SSL') print(os.getenv('OPENSEARCH_USE_SSL')) print('settings.API_V1_STR') diff --git a/rag-chat-backend/src/core/config.py b/rag-chat-backend/src/core/config.py index 420e4d6..dbd993e 100644 --- a/rag-chat-backend/src/core/config.py +++ b/rag-chat-backend/src/core/config.py @@ -1,7 +1,14 @@ # config.py +import os + + class Settings: API_V1_STR: str = "/api" + # OS_INTERFACE: str = "" + BUCKET: str = os.getenv("BUCKET_NAME") + BUCKET_FILE_PATH: str = os.getenv("BUCKET_FILE_PATH") + settings = Settings() diff --git a/rag-chat-backend/src/endpoints/__init__.py b/rag-chat-backend/src/endpoints/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/rag-chat-backend/src/endpoints/files.py b/rag-chat-backend/src/endpoints/files.py new file mode 100644 index 0000000..54d6433 --- /dev/null +++ b/rag-chat-backend/src/endpoints/files.py @@ -0,0 +1,183 @@ +"""API-Endpoints to interaction with the pdfs.""" + +import os +import sys + +from typing import List +from io import BytesIO +import boto3 + +from fastapi import APIRouter, File, UploadFile, Form, HTTPException +from fastapi.responses import StreamingResponse + +from core.config import settings + + +import logging +import datetime as dt +from datetime import datetime +import traceback + + +# Setup Logging +logging.basicConfig( + level=logging.DEBUG, + # level=logging.INFO, + format="Start: " + str(dt.datetime.now()).replace(" ", "_") + " | %(asctime)s [%(levelname)s] %(message)s", + handlers=[ + logging.FileHandler("/-_" + str(datetime.today().strftime('%Y-%m-%d')) + "_-_debug.log"), + logging.StreamHandler(sys.stdout) + ] +) + + +router = APIRouter() + +# Create MinIO client +minio_client = boto3.client( + "s3", + endpoint_url=os.getenv("S3_ENDPOINT"), + aws_access_key_id=os.getenv("S3_ACCESS_KEY"), + aws_secret_access_key=os.getenv("S3_SECRET_KEY"), + use_ssl=False, +) + + +@router.get("/list-all-buckets", tags=["Object Storage"]) +def list_buckets(): + """Lists object storage buckets. + + Returns: + list: Returns the list of buckets + + Example Response: + { + "response": [ + "rag-bucket" + ] + } + """ + + logging.info("Listing all available buckets") + + response = minio_client.list_buckets() + buckets = [bucket["Name"] for bucket in response["Buckets"]] + if settings.BUCKET not in buckets: + # Create the bucket + minio_client.create_bucket(Bucket=settings.BUCKET) + else: + logging.info("Bucket '%s' already exists", settings.BUCKET) + + return {"response": buckets} + + +@router.get("/list-all-pdfs", tags=["pdf"]) +def list_pdf_files(): + """Lists all document names from the object storage. + + Returns: + list: Returns the list of pdf document names + """ + + logging.info("Listing all available files in object storage") + + # Get all entries from MinIO bucket + try: + response = minio_client.list_objects_v2(Bucket=settings.BUCKET) + + except minio_client.exceptions.NoSuchBucket as error: + logging.warning("No such MinIO bucket: '%s'", settings.BUCKET) + raise HTTPException(status_code=404, detail="Bucket not found") from error + + # Check if empty + if "Contents" not in response.keys(): + logging.info("Bucket is empty") + return {"message": "success", "count": 0, "objects": []} + + files_in_directory = response["Contents"] + + object_names = [entry["Key"] for entry in files_in_directory] + logging.info("Number of objects in MinIO: %s", len(object_names)) + return {"message": "success", "count": len(object_names), "objects": object_names} + + +@router.get("/get-pdf", tags=["pdf"]) +def get_pdf(file_path: str): + """Retrieve and return a PDF based on the file path. + + Args: + file_path (str): Path to the PDF file. + + Returns: + StreamingResponse: Response containing the PDF file. + """ + + try: + buffer = BytesIO() + minio_client.download_fileobj(settings.BUCKET, file_path, buffer) + buffer.seek(0) + pdf_bytes = buffer.read() + logging.info("PDF read successful") + + # Create a streaming response with the PDF bytes + return StreamingResponse(BytesIO(pdf_bytes), media_type="application/pdf") + + except Exception as e: + logging.error("Retrieving PDF failed with error: %s", e) + logging.error("Stacktrace: " + str(traceback.format_exc())) + raise HTTPException("Retrieving PDF failed") from e + + +@router.post("/upload-pdf-list", tags=["pdf"], status_code=201) +def upload_pdf_list(tag: str = Form(...), pdf_files: List[UploadFile] = File(...)): + """Upload multiple pdf documents, unify them, and add to storage. + + Args: + unified_pdf_name (str): Initialize pdf name for unified pdf + pdf_files (List[UploadFile]): List of uploaded pdf files. + tag (str): Industry tag given by user + + Returns: + response (dict): A successful response. + """ + + upload_responses = [] + logging.info("Number of files to be processed: %s", len(pdf_files)) + + # read pdf files and upload it, then process it. Finally, delete it. + for pdf_file in pdf_files: + logging.info("Processing file: %s", pdf_file.filename) + pdf_file_name = pdf_file.filename + # pdf_file_path = f"{settings.BUCKET_FILE_PATH}/{pdf_file.filename}" + pdf_contents = pdf_file.file.read() + + # process pdf + # docs, pages_list = pdf.read_pdf(pdf_bytes=pdf_contents) + try: + object_name = f"{settings.BUCKET_FILE_PATH}/{pdf_file_name}" + put_response = minio_client.put_object( + Bucket=settings.BUCKET, Key=object_name, Body=pdf_contents + ) + logging.info("Upload to object store successful") + logging.debug(put_response) + + # return put_response + + if isinstance(put_response, dict): + upload_responses.append("success") + logging.info("File upload successful") + else: + upload_responses.append("failure") + + except Exception as e: + logging.error("PDF upload failed with error: %s ", e) + logging.error("Stacktrace: " + str(traceback.format_exc())) + raise HTTPException("PDF upload failed") from e + + if "failure" in upload_responses: + logging.error( + "Error while uploading. At least one document was not processed correctly" + ) + raise HTTPException(status_code=400, detail="Error while uploading documents") + + return {"message": "success"}