基于DeepSeek-R1和LangChain一步一步构建RAG流程

360影视 2025-02-07 08:33 3

摘要:导入库: 导入必要的库和模块,如 langchain_core、langchain_ollama、langchain_community 和 logging。类初始化: ChatPDF 类初始化时,使用大型语言模型(LLM)和嵌入模型。初始化文本分割器、聊天提

下面是最终完成的RAG 应用,可以参考文章中的步骤一步一步来实现。

确保已经安装Python 3.8+ 和 Ollama。

另外,下载所需的 AI 模型:

ollama pull deepseek-r1:1.5b # 默认是 7B 模型,这里选择更小的模型

ollama pull mxbai-embed-large # 嵌入模型

安装依赖包,如下所示:

streamlitlangchainlangchain_ollamalangchain_communitystreamlit-chatpypdfchromadb

项目结构如图所示:

(1)rag.py 代码介绍

rag.py 文件定义了 ChatPDF 类,该类处理 PDF 文档的摄取,并使用检索增强生成(RAG)流程回答基于文档的问题。

导入库: 导入必要的库和模块,如 langchain_core、langchain_ollama、langchain_community 和 logging。类初始化: ChatPDF 类初始化时,使用大型语言模型(LLM)和嵌入模型。初始化文本分割器、聊天提示模板、向量存储和检索器。摄取方法: ingest 方法处理 PDF 文件,分割其内容,并将嵌入存储在向量存储中。问答方法: ask 方法使用 RAG 流程回答问题,通过检索相关文档块并生成响应。清除方法: clear 方法重置向量存储和检索器。# 导入所需的库和模块from langchain_core.globals import set_verbose, set_debug # 导入全局设置函数from langchain_ollama import ChatOllama, OllamaEmbeddings # 导入Ollama相关的类和函数from langchain.schema.output_parser import StrOutputParser # 导入输出解析器from langchain_community.vectorstores import Chroma # 导入Chroma向量存储类from langchain_community.document_loaders import PyPDFLoader # 导入PDF文档加载器from langchain.text_splitter import RecursiveCharacterTextSplitter # 导入文本分割器from langchain.schema.runnable import RunnablePassthrough # 导入RunnablePassthrough类from langchain_community.vectorstores.utils import filter_complex_metadata # 导入过滤复杂元数据的函数from langchain_core.prompts import ChatPromptTemplate # 导入聊天提示模板类import logging # 导入日志记录模块# 设置调试模式set_debug(True)# 设置详细模式set_verbose(True)# 配置日志记录logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__)class ChatPDF: """一个用于处理PDF摄取和使用RAG进行问答的类""" def __init__(self, llm_model: str = "deepseek-r1:1.5b", embedding_model: str = "mxbai-embed-large"): """ 初始化ChatPDF实例,使用一个大型语言模型(LLM)和嵌入模型。 """ # 初始化LLM模型 self.model = ChatOllama(model=llm_model) # 初始化嵌入模型 self.embeddings = OllamaEmbeddings(model=embedding_model) # 初始化文本分割器,用于将大文本分割成较小的块 self.text_splitter = RecursiveCharacterTextSplitter(chunk_size=1024, chunk_overlap=100) # 初始化聊天提示模板 self.prompt = ChatPromptTemplate.from_template( """ 你是一个基于上传文档回答问题的有用助手。 上下文: {context} 问题: {question} 请用三句话或更少的句子简洁准确地回答。 """ ) # 初始化向量存储(初始为空) self.vector_store = None # 初始化检索器(初始为空) self.retriever = None def ingest(self, pdf_file_path: str): """ 摄取一个PDF文件,分割其内容,并将嵌入存储在向量存储中。 """ # 记录开始处理PDF文件的日志 logger.info(f"Starting ingestion for file: {pdf_file_path}") # 使用PyPDFLoader加载PDF文件内容 docs = PyPDFLoader(file_path=pdf_file_path).load # 使用text_splitter将文档内容分割成较小的块 chunks = self.text_splitter.split_documents(docs) # 过滤掉复杂的元数据 chunks = filter_complex_metadata(chunks) # 使用Chroma将分割后的文档块转换成向量并存储在vector_store中 self.vector_store = Chroma.from_documents( documents=chunks, embedding=self.embeddings, persist_directory="chroma_db", ) # 记录PDF文件处理完成的日志 logger.info("Ingestion completed. Document embeddings stored successfully.") def ask(self, query: str, k: int = 5, score_threshold: float = 0.2): """ 使用RGA流程回答问题,并返回结果。 """ if not self.vector_store: raise ValueError("未找到向量存储,请先导入并摄取文档。") # 检查是否已设置检索器,如果没有,则使用相似度阈值检索器 if not self.retriever: self.retriever = self.vector_store.as_retriever( search_type="similarity_score_threshold", search_kwargs={"k": k, "score_threshold": score_threshold}, ) # 记录日志,说明正在为查询检索上下文 logger.info(f"正在为查询检索上下文: {query}") # 使用检索器根据查询检索文档 retrieved_docs = self.retriever.invoke(query) # 如果没有检索到文档,则返回相应的消息 if not retrieved_docs: return "在文档中没有找到与查询相关的上下文来回答您的问题" # 格式化输入,将检索到的文档内容和查询组合成一个字典,包括上下文和查询 formatted_input = { "context": "\n\n".join(doc.page_content for doc in retrieved_docs), "question": query, } # 构建RGA链,包括提示和模型。整个链条通过管道操作符连接,依次处理输入并生成最终的响应。 chain = ( RunnablePassthrough # 这是一个占位符,表示直接传递输入 | self.prompt # 使用聊天提示模板格式化输入,将检索到的文档内容和查询组合成一个字典,包括上下文和查询。 | self.model # 使用大型语言模型(LLM)处理格式化后的输入,生成响应 | StrOutputParser # 解析LLM的输出,将其转换为字符串格式的响应。 ) # 记录日志,说明正在使用LLM生成响应 logger.info("正在使用LLM生成响应") # 使用RGA链生成并返回响应 return chain.invoke(formatted_input) def clear(self): """ 重置向量存储和检索器 """ # 记录日志,说明正在清除向量存储和检索器 logger.info("清除向量存储和检索器") # 将向量存储重置为None self.vector_store = None # 将检索器重置为None self.retriever = None

(2)app.py 代码介绍

app.py 文件是一个使用 Streamlit 创建的交互式 Web 应用,允许用户上传 PDF 文档,使用本地 DeepSeek R1 模型处理这些文档,并通过聊天界面与模型交互。

导入库: 导入必要的库和模块,如 os、tempfile、time、streamlit 和 streamlit_chat。从 rag 模块中导入 ChatPDF 类。显示消息: display_messages 函数用于显示聊天历史记录。处理用户输入: process_input 函数用于处理用户输入并生成助手的响应。读取和保存文件: read_and_save_file 函数处理文件上传和摄取。页面布局: page 函数定义了应用的主页面布局。主程序:在 __main__ 块中调用 page 函数,启动应用。# app.py# 导入库import os # 导入os模块,用于与操作系统交互import tempfile # 导入tempfile模块,用于创建临时文件和目录import time # 导入time模块,用于时间相关操作import streamlit as st # 导入streamlit模块,用于创建交互式Web应用from streamlit_chat import message # 从streamlit_chat包中导入message函数,用于显示消息from rag import ChatPDF # 从rag中导入ChatPDF类,用于处理PDF文档st.set_page_config(page_title="基于本地DeepSeek R1模型构建RAG应用")def display_messages: """显示聊天历史记录""" st.subheader("聊天历史") # 遍历消息列表并显示消息 for i, (msg, is_user) in enumerate(st.session_state["messages"]): # 调用message函数显示每条消息,msg是消息内容,is_user参数用于区分用户消息和助手消息,key是每个消息的唯一标识符 message(msg, is_user=is_user, key=str(i)) # 创建一个空的 st.empty对象,用于后续显示思考中的加载动画 st.session_state["thinking_spinner"] = st.emptydef process_input: """处理用户输入并生成助手响应""" if st.session_state["user_input"] and len(st.session_state["user_input"].strip) > 0: # 检查用户输入是否为空,如果不为空,则获取用户输入 user_text = st.session_state["user_input"].strip # 显示加载动画,提示助手正在思考 with st.session_state["thinking_spinner"], st.spinner("思考中..."): try: # 调用助手的ask方法,获取助手的响应 agent_text = st.session_state["assistant"].ask( user_text, # 用户输入的问题 k=st.session_state["retrieval_k"], # 检索结果的数量 score_threshold=st.session_state["retrieval_threshold"], # 检索结果的相似度阈值 ) except ValueError as e: # 如果发生错误,将错误信息作为助手响应 agent_text = str(e) # 将用户输入添加到消息列表中,标记为用户发送 st.session_state["messages"].append((user_text, True)) # 将助手响应添加到消息列表中,标记为助手发送 st.session_state["messages"].append((agent_text, False))def read_and_save_file: """处理文件上传和摄取""" # 清空助手状态、消息列表和用户输入 st.session_state["assistant"].clear st.session_state["messages"] = st.session_state["user_input"] = "" # 遍历上传的文件 for file in st.session_state["file_uploader"]: # 创建一个临时文件来保存上传的文件 with tempfile.NamedTemporaryFile(delete=False) as tf: # 将文件内容写入临时文件 tf.write(file.getbuffer) # 获取临时文件路径 file_path = tf.name # 显示加载动画,提示正在摄取文件 with st.session_state["ingestion_spinner"], st.spinner(f"正在摄取 {file.name}..."): # 记录开始摄取文件的时间 t0 = time.time # 调用助手的摄取方法,摄取文件内容 st.session_state["assistant"].ingest(file_path) # 记录摄取文件完成的时间 t1 = time.time # 在消息列表中添加摄取成功的消息 st.session_state["messages"].append( (f"已摄取 {file.name},耗时 {t1 - t0:.2f} 秒", False) ) # 删除临时文件 os.remove(file_path)def page: """主应用页面布局""" # 检查是否已初始化会话状态,如果为空,则初始化 if len(st.session_state) == 0: # 初始化一个空的消息列表 st.session_state["messages"] = # 初始化一个 ChatPDF 实例,并将其赋值给 st.session_state["assistant" st.session_state["assistant"] = ChatPDF # 显示页面标题 st.header("基于本地DeepSeek R1模型构建RAG应用") st.image("deepseek-r1.png", caption="DeepSeek R1", use_container_width=True) # 显示子标题和文件上传组件 st.subheader("上传文档") st.file_uploader( "上传PDF文档", type=["pdf"], key="file_uploader", on_change=read_and_save_file, # 文件上传后的回调函数 label_visibility="collapsed", # 标签的可见性设置为折叠 accept_multiple_files=True, # 是否允许上传多个文件 ) # 创建一个空的 st.empty对象,用于后续显示摄取中的加载动画 st.session_state["ingestion_spinner"] = st.empty # 显示检索设置 st.subheader("设置") # 使用st.slider创建滑块,用于设置检索结果数量 st.session_state["retrieval_k"] = st.slider( "检索结果数量 (k)", min_value=1, max_value=10, value=5 ) # 使用st.slider创建滑块,用于设置相似度阈值 st.session_state["retrieval_threshold"] = st.slider( "相似度分数阈值", min_value=0.0, max_value=1.0, value=0.2, step=0.05 ) # 显示消息和文本输入框 display_messages # 文本输入框,输入内容变化时调用process_input函数 st.text_input("查询:", key="user_input", on_change=process_input) # 显示清除聊天按钮 if st.button("清除聊天"): st.session_state["messages"] = st.session_state["assistant"].clear# 检查当前模块是否是主程序入口if __name__ == "__main__": # 调用主页面函数 page

这个应用程序允许用户上传 PDF 文档,并通过与助手对话来查询文档内容。应用程序使用 Streamlit 框架构建,具有文件上传、聊天历史记录显示、检索设置等功能。

来源:软件架构

相关推荐