167 lines
6.1 KiB
Python
167 lines
6.1 KiB
Python
"""
|
|
File: RAG-Demo.py
|
|
Author: Martin Rattensberger
|
|
Description: A GUI application for interacting with a local Llama vision model.
|
|
Users can select a directory with PDF files, load them into a vector database,
|
|
and ask questions about them.
|
|
Date: 11.11.2024 # Replace with actual date
|
|
Version: 1.2
|
|
Development Environment: Visual Studio Code with Continue.ai (Claude Sonnet 3.5)
|
|
|
|
This script creates a tkinter-based GUI for selecting a directory with PDFs,
|
|
loading them into a LanceDB vector database, and querying them using a local Llama 3.2 vision model.
|
|
"""
|
|
|
|
import tkinter as tk
|
|
from tkinter import filedialog, scrolledtext
|
|
import ollama
|
|
from PIL import Image
|
|
import fitz # PyMuPDF library for handling PDFs
|
|
import io
|
|
import base64
|
|
import threading
|
|
import time
|
|
import os
|
|
import lancedb
|
|
import numpy as np
|
|
import pyarrow as pa
|
|
from sentence_transformers import SentenceTransformer
|
|
|
|
|
|
class LlamaVisionApp:
|
|
def __init__(self, master):
|
|
self.master = master
|
|
master.title("Llama Vision Interface RAG")
|
|
|
|
# Initialize LanceDB and sentence transformer
|
|
self.db = lancedb.connect("./lancedb")
|
|
self.db.drop_table("pdf_embeddings")
|
|
schema = pa.schema([
|
|
('id', pa.int64()),
|
|
('filename', pa.string()),
|
|
('page', pa.int64()),
|
|
('text', pa.string()),
|
|
("embedding", pa.list_(pa.float32(), 384))
|
|
])
|
|
|
|
self.table = self.db.create_table("pdf_embeddings", schema=schema)
|
|
self.model = SentenceTransformer('all-MiniLM-L6-v2')
|
|
|
|
# Directory selection button
|
|
self.select_dir_button = tk.Button(master, text="Select PDF Directory", command=self.select_directory)
|
|
self.select_dir_button.pack(pady=10)
|
|
|
|
# Display selected directory
|
|
self.dir_label = tk.Label(master, text="No directory selected")
|
|
self.dir_label.pack()
|
|
|
|
# Load PDFs button
|
|
self.load_pdfs_button = tk.Button(master, text="Load PDFs into Database", command=self.load_pdfs_to_db)
|
|
self.load_pdfs_button.pack(pady=10)
|
|
|
|
# Question input
|
|
self.question_entry = tk.Text(master, width=50, height=3)
|
|
self.question_entry.pack(pady=10)
|
|
self.question_entry.insert(tk.END, "What is in these PDFs?")
|
|
|
|
# Submit button
|
|
self.submit_button = tk.Button(master, text="Submit", command=self.submit_question)
|
|
self.submit_button.pack()
|
|
|
|
# Response display
|
|
self.response_text = scrolledtext.ScrolledText(master, width=60, height=30)
|
|
self.response_text.pack(pady=10)
|
|
|
|
self.directory_path = None
|
|
self.pdf_files = []
|
|
self.processing = False
|
|
|
|
def select_directory(self):
|
|
self.directory_path = filedialog.askdirectory()
|
|
if self.directory_path:
|
|
self.dir_label.config(text=f"Selected directory: {self.directory_path}")
|
|
self.pdf_files = [f for f in os.listdir(self.directory_path) if f.lower().endswith('.pdf')]
|
|
|
|
def load_pdfs_to_db(self):
|
|
if not self.directory_path:
|
|
self.response_text.delete('1.0', tk.END)
|
|
self.response_text.insert(tk.END, "Please select a directory first.\n")
|
|
return
|
|
|
|
self.processing = True
|
|
threading.Thread(target=self.processing_animation).start()
|
|
threading.Thread(target=self.process_pdfs).start()
|
|
|
|
def process_pdfs(self):
|
|
data = []
|
|
id_counter = 0
|
|
for pdf_file in self.pdf_files:
|
|
pdf_path = os.path.join(self.directory_path, pdf_file)
|
|
doc = fitz.open(pdf_path)
|
|
for page_num in range(len(doc)):
|
|
page = doc[page_num]
|
|
text = page.get_text()
|
|
embedding = self.model.encode(text)
|
|
data.append({
|
|
"id": id_counter,
|
|
"filename": pdf_file,
|
|
"page": page_num,
|
|
"text": text,
|
|
"embedding": embedding.tolist()
|
|
})
|
|
id_counter += 1
|
|
doc.close()
|
|
|
|
self.table.add(data)
|
|
self.processing = False
|
|
self.master.after(0, self.update_response, "Load Complete", f"Loaded {len(data)} pages from {len(self.pdf_files)} PDFs into the database.")
|
|
|
|
def submit_question(self):
|
|
question = self.question_entry.get('1.0', tk.END).strip()
|
|
self.response_text.delete('1.0', tk.END)
|
|
|
|
self.processing = True
|
|
threading.Thread(target=self.processing_animation).start()
|
|
threading.Thread(target=self.query_database, args=(question,)).start()
|
|
|
|
def query_database(self, question):
|
|
try:
|
|
question_embedding = self.model.encode(question)
|
|
results = self.table.search(question_embedding).limit(5).to_list()
|
|
|
|
context = "\n".join([f"From {r['filename']} (Page {r['page']+1}):\n{r['text'][:500]}..." for r in results])
|
|
|
|
response = ollama.chat(
|
|
model='llama3.2-vision',
|
|
messages=[{
|
|
'role': 'system',
|
|
'content': f"You are an AI assistant that answers questions based on the following context:\n\n{context}"
|
|
},
|
|
{
|
|
'role': 'user',
|
|
'content': question
|
|
}]
|
|
)
|
|
self.processing = False
|
|
self.master.after(0, self.update_response, question, response['message']['content'])
|
|
except Exception as e:
|
|
self.processing = False
|
|
self.master.after(0, self.update_response, question, f"Error: {str(e)}")
|
|
|
|
def processing_animation(self):
|
|
animation = "|/-\\"
|
|
i = 0
|
|
while self.processing:
|
|
self.response_text.delete('1.0', tk.END)
|
|
self.response_text.insert(tk.END, f"Processing {animation[i % len(animation)]}")
|
|
self.master.update_idletasks()
|
|
time.sleep(0.1)
|
|
i += 1
|
|
|
|
def update_response(self, question, answer):
|
|
self.response_text.delete('1.0', tk.END)
|
|
self.response_text.insert(tk.END, f"Q: {question}\nA: {answer}\n\n")
|
|
|
|
root = tk.Tk()
|
|
app = LlamaVisionApp(root)
|
|
root.mainloop() |