I have built a RAG application with Langchain and now want to deploy it with FastAPI. Generally it works tto call a FastAPI endpoint and that the answer of the LCEL-chain gets streamed. However I want to achieve that my answer gets streamed and if streaming is done I want to return the source documents. Here is the code, where streaming is working when calling the endpoint. At the moment I am yielding the source_documents but I don't want the user to see them. I would like to preprocess the source_documents before the user sees them:
# example endpoint call: `http://127.0.0.1:8000/rag_model_response?question=Welche%203%20wesentlichen%20Merkmale%20hat%20die%20BCMS%20Leitlinie%3F`
# this example call streams the response perfectly in the browser
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_core.prompts import PromptTemplate, ChatPromptTemplate
from langchain_core.runnables import RunnableLambda, RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
embeddings = HuggingFaceEmbeddings(model_name="intfloat/multilingual-e5-large-instruct", model_kwargs={'device': "mps"})
db = FAISS.load_local("streamlit_vectorstores/vectorstores/db_maxiw_testfreitag", embeddings, allow_dangerous_deserialization=True)
retriever = db.as_retriever(search_kwargs={'k': cfg.STREAMLIT_VECTOR_COUNT, 'score_threshold': cfg.SCORE_THRESHOLD,'sorted': True}, search_type="similarity_score_threshold")
model_path = cfg.MIXTRAL_PATH
llm = build_llm(model_path) # loads a model from Llamacpp with streaming enabled
def rag_model_response(question: str):
start_time = time.time()
context = retriever.get_relevant_documents(question)
response_dict = {"question": question, "result": "", "source_documents": []}
rag_prompt = f"""<s> [INST] Du bist RagBot, ein hilfsbereiter Assistent. Antworte nur auf Deutsch:
{context}
{question}
Antwort: [/INST]
"""
result_content = ""
first_response = True
for resp in llm.stream(rag_prompt):
if resp:
result_content += resp
if first_response:
# Calculate and print time after the first batch of text is streamed
end_time = time.time()
elapsed_time = round(end_time - start_time, 1)
first_response = False
yield f"(Response Time: {elapsed_time} seconds)\n"
yield resp
if context:
# yield context # hier aufgehört
yield "\n\nQuellen:\n"
for i, doc in enumerate(context):
yield doc.metadata["source"].split("/")[-1] + ", Seite: " + str(doc.metadata["page"]+1) + "\n\n"
response_dict["source_documents"] = [{"source": doc.metadata["source"], "page": doc.metadata["page"]+1} for doc in context]
else:
yield "\n\nVorsicht, für die vorliegende Antwort wurden keine interne Quellen verwendet, da die Suche nach relevanten Dokumenten kein Ergebnis geliefert hat."
yield response_dict
app = FastAPI(
title="FastAPI for Database Management",
description="An API that handles user Vectordatabase creation or deletion",
version="1.0",)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get('/rag_model_response',response_class=JSONResponse)
async def main(question: str):
return StreamingResponse(rag_model_response(question), media_type='text/event-stream')
So my first question would be:
- How do I need to change my change so that it returns also the retrieved documents from the retreiver?
- How can I also return these source_documents in my FastAPI endpoint response? It would be perfect if the generated answers gets streamed and the source documents are getting returned. The could as well be streamed but somehow it should be possible to show the users only the streaming of the generated answer. If the streaming is finished I want to show the user the documents which were used to generate the answer.
One alternative solution, which is not very effective I think, was that I just create a new endpoint that returns the source documents:
@app.get('/source_documents')
async def source_documents(question: str):
source_docs = retriever.get_relevant_documents(question)
return source_docs
But with this it always gets searched 2 times for every question, one time for the chain and one time for the retriever.
Thanks in advance!