In this tutorial, we will create an intelligent chatbot using Milvus and Towhee. We will use the following components to build our chatbot:
1. Milvus: An open-source vector database for efficient similarity search and AI applications.
2. Towhee: A Python library that provides a set of pre-built machine learning models and tools for processing unstructured data.
3. OpenAI API: A service that allows developers to access powerful language generation models like GPT-3.5.
4. Gradio: An open-source Python library for creating interactive demos of machine learning models.
First, we need to install the required packages:
```bash
pip install milvus pymilvus towhee gradio
```
Next, let's define some variables and answer the prompt for the API key. Run this code to do so:
```python
import os
import getpass
MILVUS_URI = 'http://localhost:19530'
[MILVUS_HOST, MILVUS_PORT] = MILVUS_URI.split('://')[1].split(':')
DROP_EXIST = True
EMBED_MODEL = 'all-mpnet-base-v2'
COLLECTION_NAME = 'chatbot_demo'
DIM = 768
OPENAI_API_KEY = getpass.getpass('Enter your OpenAI API key: ')
if os.path.exists('./sqlite.db'):
os.remove('./sqlite.db')
```
Sample pipeline
Now, let's download some data and store it in Milvus. But before you do that, let's look at a sample pipeline for downloading and processing unstructured data.
You'll use the Towhee documentation pages for this example. You can try different sites to see how the code processes different data sets.
This code uses Towhee pipelines:
- input - begins a new pipeline with the source passed into it
- map - uses ops.text_loader() to retrieve the URL and map it to 'doc'
- flat_map - uses ops.text_splitter() to process the document into "chunks" for storage
- output - closes and prepares the pipeline for use
Pass this pipeline to DataCollection to see how it works:
```python
from towhee import pipe, ops, DataCollection
pipe_load = (
pipe.input('source')
.map('source', 'doc', ops.text_loader())
.flat_map('doc', 'doc_chunks', ops.text_splitter(chunk_size=300))
.output('source', 'doc_chunks')
)
DataCollection(pipe_load('https://towhee.io')).show()
```
Here's the output from show():
The pipeline created five chunks from the document.
Sample embedding pipeline
The pipeline retrieved the data and created chunks. You need to create embeddings, too. Let's take a look at another sample pipeline:
This one uses map() to run ops.sentence_embedding.sbert() on each chunk. In this example, we're passing in a single block of text.
```python
from towhee import pipe, ops, DataCollection
pipe_embed = (
pipe.input('doc_chunk')
.map('doc_chunk', 'vec', ops.sentence_embedding.sbert(model_name=EMBED_MODEL))
.map('vec', 'vec', ops.np_normalize())
.output('doc_chunk', 'vec')
)
text = '''SOTA Models
We provide 700+ pre-trained embedding models spanning 5 fields (CV, NLP, Multimodal, Audio, Medical), 15 tasks, and 140+ model architectures.
These include BERT, CLIP, ViT, SwinTransformer, data2vec, etc.
'''
DataCollection(pipe_embed(text)).show()
```
Run this code to see how the pipeline processes the single text block:
Prepare Milvus
Now, you need a collection to hold the data. This function defines create_collection(), which uses MILVUS_HOST and MILVUS_PORT to connect to Milvus, drop any existing collections with the specified name, and create a new one with this schema:
- id - an integer identifier
- embedding - a vector of floats for the embeddings
- text - the corresponding text for the embeddings
```python
from pymilvus import (
connections, utility, Collection,
CollectionSchema, FieldSchema, DataType
)
def create_collection(collection_name):
connections.connect(host=MILVUS_HOST, port=MILVUS_PORT)
has_collection = utility.has_collection(collection_name)
if has_collection:
collection = Collection(collection_name)
if DROP_EXIST:
collection.drop()
else:
return collection
# Create collection
fields = [
FieldSchema(name='id', dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name='embedding', dtype=DataType.FLOAT_VECTOR, dim=DIM),
FieldSchema(name='text', dtype=DataType.VARCHAR, max_length=500)
]
schema = CollectionSchema(
fields=fields,
description="Towhee demo",
enable_dynamic_field=True
)
collection = Collection(name=collection_name, schema=schema)
index_params = {
'metric_type': 'IP',
'index_type': 'IVF_FLAT',
'params': {'nlist': 1024}
}
collection.create_index(
field_name='embedding',
index_params=index_params
)
return collection
```
Insert pipeline
It's time to process your input text and insert it into Milvus. Let's start with a pipeline that collapses what you learned above:
This function:
- Creates the new collection
- Retrieves the data
- Splits it into chunks
- Creates embeddings using EMBED_MODEL
- Insert the text and embeddings into Milvus
```python
from towhee import pipe, ops, DataCollection
load_data = (
pipe.input('collection_name', 'source')
.map('collection_name', 'collection', create_collection)
.map('source', 'doc', ops.text_loader())
.flat_map('doc', 'doc_chunk', ops.text_splitter(chunk_size=300))
.map('doc_chunk', 'vec', ops.sentence_embedding.sbert(model_name=EMBED_MODEL))
.map('vec', 'vec', ops.np_normalize())
.map(('collection_name', 'vec', 'doc_chunk'), 'mr',
ops.ann_insert.osschat_milvus(host=MILVUS_HOST, port=MILVUS_PORT))
.output('mr')
)
```
Here it is in action:
```python
project_name = 'towhee_demo'
data_source = 'https://en.wikipedia.org/wiki/Frodo_Baggins'
mr = load_data(COLLECTION_NAME, data_source)
print('Doc chunks inserted:', len(mr.to_list()))
```
The model does a good job of pulling three closely matched nodes:
Search knowledge base
Now, with the embeddings and text stored in Milvus, you can search it:
This function creates a query pipeline. The most important step is this one:
ops.ann_search.osschat_milvus(host=MILVUS_HOST, port=MILVUS_PORT, **{'metric_type': 'IP', 'limit': 3, 'output_fields': ['text']}))
The osschat_milvus searches the embeddings for matches to the submitted text.
Here is the whole pipeline:
```python
from towhee import pipe, ops, DataCollection
pipe_search = (
pipe.input('collection_name', 'query')
.map('query', 'query_vec', ops.sentence_embedding.sbert(model_name=EMBED_MODEL))
.map('query_vec', 'query_vec', ops.np_normalize())
.map(('collection_name', 'query_vec'), 'search_res',
ops.ann_search.osschat_milvus(host=MILVUS_HOST, port=MILVUS_PORT, **{'metric_type': 'IP', 'limit': 3, 'output_fields': ['text']}))
.flat_map('search_res', ('id', 'score', 'text'), lambda x: (x[0], x[1], x[2]))
.output('query', 'text', 'score')
)
```
Try it:
```python
query = 'Who is Frodo Baggins?'
DataCollection(pipe_search(project_name, query)).show()
```
The model does a good job of pulling three closely matched nodes:
Add an LLM
Now, it’s time to add a large language model (LLM) so users can hold a conversation with the chatbot. We’ll use ChatGPT and the OpenAI API for this example.
Chat history
In order to get better results from the LLM, you need to store chat history and present it with queries. You’ll use SQLite for this step:
Here's a function for retrieving the history:
```python
from towhee import pipe, ops, DataCollection
pipe_get_history = (
pipe.input('collection_name', 'session')
.map(('collection_name', 'session'), 'history', ops.chat_message_histories.sql(method='get'))
.output('collection_name', 'session', 'history')
)
```
Here's the one to store it:
```python
from towhee import pipe, ops, DataCollection
pipe_add_history = (
pipe.input('collection_name', 'session', 'question', 'answer')
.map(('collection_name', 'session', 'question', 'answer'), 'history', ops.chat_message_histories.sql(method='add'))
.output('history')
)
```
LLM query pipeline
Now, we need a pipeline to submit queries to ChatGPT:
This pipeline:
- searches Milvus using the user's query
- collects the current chat history
- submits the query, Milvus search, and chat history to ChatGPT
- Appends the ChatGPT result to the chat history
- Returns the result to the caller
```python
from towhee import pipe, ops, DataCollection
chat = (
pipe.input('collection_name', 'query', 'session')
.map('query', 'query_vec', ops.sentence_embedding.sbert(model_name=EMBED_MODEL))
.map('query_vec', 'query_vec', ops.np_normalize())
.map(('collection_name', 'query_vec'), 'search_res',
ops.ann_search.osschat_milvus(host=MILVUS_HOST,
port=MILVUS_PORT,
**{'metric_type': 'IP', 'limit': 3, 'output_fields': ['text']}))
.map('search_res', 'knowledge', lambda y: [x[2] for x in y])
.map(('collection_name', 'session'), 'history', ops.chat_message_histories.sql(method='get'))
.map(('query', 'knowledge', 'history'), 'messages', ops.prompt.question_answer())
.map('messages', 'answer', ops.LLM.OpenAI(api_key=OPENAI_API_KEY,
model_name='gpt-3.5-turbo',
temperature=0.8))
.map(('collection_name', 'session', 'query', 'answer'), 'new_history', ops.chat_message_histories.sql(method='add'))
.output('query', 'history', 'answer')
)
```
Let's test this pipeline before connecting it to a GUI:
```python
new_query = 'Where did Frodo take the ring?'
DataCollection(chat(COLLECTION_NAME, new_query, session_id)).show()
```
The pipeline works. Let's put together a Gradio interface.
Gradio GUI
First, you need functions to create a session identifier and to respond to queries from the interface:
These functions create a session ID using a UUID, and accept a session and query for the query pipeline:
```python
import uuid
import io
def create_session_id():
uid = str(uuid.uuid4())
suid = ''.join(uid.split('-'))
return 'sess_' + suid
def respond(session, query):
res = chat(COLLECTION_NAME, query, session).get_dict()
answer = res['answer']
response = res['history']
response.append((query, answer))
return response
```
Next, the Gradio interface uses these functions to build a chatbot:
It uses the Blocks API to create a ChatBot interface. The Send Message button uses the respond function to send requests to ChatGPT:
```python
import gradio as gr
with gr.Blocks() as demo:
session_id = gr.State(create_session_id)
with gr.Row():
with gr.Column(scale=2):
gr.Markdown('''## Chat''')
conversation = gr.Chatbot(label='conversation').style(height=300)
question = gr.Textbox(label='question', value=None)
send_btn = gr.Button('Send Message')
send_btn.click(
fn=respond,
inputs=[
session_id,
question
],
outputs=conversation,
)
demo.launch(server_name='127.0.0.1', server_port=8902)
```
Here it is:
Now, you have an intelligent chatbot!
Summary
In this post, we created Towhee pipelines to ingest unstructured data, process it for embeddings, and store those embeddings in Milvus. Then, we created a query pipeline for the chat function and connected the chatbot with an LLM. Finally, we got an intelligent chatbot.
This tutorial demonstrates how easy it is to build applications with Milvus. Milvus brings numerous advantages when integrated into applications, especially those relying on machine learning and artificial intelligence. It offers highly efficient, scalable, and reliable vector similarity search and analytics capabilities critical in applications like chatbots, recommendation systems, and image or text recognition.