Company
Date Published
Aug. 18, 2023
Author
Eric Goebelbecker
Word count
2364
Language
English
Hacker News points
None

Summary

In this tutorial, we will create an intelligent chatbot using Milvus and Towhee. We will use the following components to build our chatbot: 1. Milvus: An open-source vector database for efficient similarity search and AI applications. 2. Towhee: A Python library that provides a set of pre-built machine learning models and tools for processing unstructured data. 3. OpenAI API: A service that allows developers to access powerful language generation models like GPT-3.5. 4. Gradio: An open-source Python library for creating interactive demos of machine learning models. First, we need to install the required packages: ```bash pip install milvus pymilvus towhee gradio ``` Next, let's define some variables and answer the prompt for the API key. Run this code to do so: ```python import os import getpass MILVUS_URI = 'http://localhost:19530' [MILVUS_HOST, MILVUS_PORT] = MILVUS_URI.split('://')[1].split(':') DROP_EXIST = True EMBED_MODEL = 'all-mpnet-base-v2' COLLECTION_NAME = 'chatbot_demo' DIM = 768 OPENAI_API_KEY = getpass.getpass('Enter your OpenAI API key: ') if os.path.exists('./sqlite.db'): os.remove('./sqlite.db') ``` Sample pipeline Now, let's download some data and store it in Milvus. But before you do that, let's look at a sample pipeline for downloading and processing unstructured data. You'll use the Towhee documentation pages for this example. You can try different sites to see how the code processes different data sets. This code uses Towhee pipelines: - input - begins a new pipeline with the source passed into it - map - uses ops.text_loader() to retrieve the URL and map it to 'doc' - flat_map - uses ops.text_splitter() to process the document into "chunks" for storage - output - closes and prepares the pipeline for use Pass this pipeline to DataCollection to see how it works: ```python from towhee import pipe, ops, DataCollection pipe_load = ( pipe.input('source') .map('source', 'doc', ops.text_loader()) .flat_map('doc', 'doc_chunks', ops.text_splitter(chunk_size=300)) .output('source', 'doc_chunks') ) DataCollection(pipe_load('https://towhee.io')).show() ``` Here's the output from show(): The pipeline created five chunks from the document. Sample embedding pipeline The pipeline retrieved the data and created chunks. You need to create embeddings, too. Let's take a look at another sample pipeline: This one uses map() to run ops.sentence_embedding.sbert() on each chunk. In this example, we're passing in a single block of text. ```python from towhee import pipe, ops, DataCollection pipe_embed = ( pipe.input('doc_chunk') .map('doc_chunk', 'vec', ops.sentence_embedding.sbert(model_name=EMBED_MODEL)) .map('vec', 'vec', ops.np_normalize()) .output('doc_chunk', 'vec') ) text = '''SOTA Models We provide 700+ pre-trained embedding models spanning 5 fields (CV, NLP, Multimodal, Audio, Medical), 15 tasks, and 140+ model architectures. These include BERT, CLIP, ViT, SwinTransformer, data2vec, etc. ''' DataCollection(pipe_embed(text)).show() ``` Run this code to see how the pipeline processes the single text block: Prepare Milvus Now, you need a collection to hold the data. This function defines create_collection(), which uses MILVUS_HOST and MILVUS_PORT to connect to Milvus, drop any existing collections with the specified name, and create a new one with this schema: - id - an integer identifier - embedding - a vector of floats for the embeddings - text - the corresponding text for the embeddings ```python from pymilvus import ( connections, utility, Collection, CollectionSchema, FieldSchema, DataType ) def create_collection(collection_name): connections.connect(host=MILVUS_HOST, port=MILVUS_PORT) has_collection = utility.has_collection(collection_name) if has_collection: collection = Collection(collection_name) if DROP_EXIST: collection.drop() else: return collection # Create collection fields = [ FieldSchema(name='id', dtype=DataType.INT64, is_primary=True, auto_id=True), FieldSchema(name='embedding', dtype=DataType.FLOAT_VECTOR, dim=DIM), FieldSchema(name='text', dtype=DataType.VARCHAR, max_length=500) ] schema = CollectionSchema( fields=fields, description="Towhee demo", enable_dynamic_field=True ) collection = Collection(name=collection_name, schema=schema) index_params = { 'metric_type': 'IP', 'index_type': 'IVF_FLAT', 'params': {'nlist': 1024} } collection.create_index( field_name='embedding', index_params=index_params ) return collection ``` Insert pipeline It's time to process your input text and insert it into Milvus. Let's start with a pipeline that collapses what you learned above: This function: - Creates the new collection - Retrieves the data - Splits it into chunks - Creates embeddings using EMBED_MODEL - Insert the text and embeddings into Milvus ```python from towhee import pipe, ops, DataCollection load_data = ( pipe.input('collection_name', 'source') .map('collection_name', 'collection', create_collection) .map('source', 'doc', ops.text_loader()) .flat_map('doc', 'doc_chunk', ops.text_splitter(chunk_size=300)) .map('doc_chunk', 'vec', ops.sentence_embedding.sbert(model_name=EMBED_MODEL)) .map('vec', 'vec', ops.np_normalize()) .map(('collection_name', 'vec', 'doc_chunk'), 'mr', ops.ann_insert.osschat_milvus(host=MILVUS_HOST, port=MILVUS_PORT)) .output('mr') ) ``` Here it is in action: ```python project_name = 'towhee_demo' data_source = 'https://en.wikipedia.org/wiki/Frodo_Baggins' mr = load_data(COLLECTION_NAME, data_source) print('Doc chunks inserted:', len(mr.to_list())) ``` The model does a good job of pulling three closely matched nodes: Search knowledge base Now, with the embeddings and text stored in Milvus, you can search it: This function creates a query pipeline. The most important step is this one: ops.ann_search.osschat_milvus(host=MILVUS_HOST, port=MILVUS_PORT, **{'metric_type': 'IP', 'limit': 3, 'output_fields': ['text']})) The osschat_milvus searches the embeddings for matches to the submitted text. Here is the whole pipeline: ```python from towhee import pipe, ops, DataCollection pipe_search = ( pipe.input('collection_name', 'query') .map('query', 'query_vec', ops.sentence_embedding.sbert(model_name=EMBED_MODEL)) .map('query_vec', 'query_vec', ops.np_normalize()) .map(('collection_name', 'query_vec'), 'search_res', ops.ann_search.osschat_milvus(host=MILVUS_HOST, port=MILVUS_PORT, **{'metric_type': 'IP', 'limit': 3, 'output_fields': ['text']})) .flat_map('search_res', ('id', 'score', 'text'), lambda x: (x[0], x[1], x[2])) .output('query', 'text', 'score') ) ``` Try it: ```python query = 'Who is Frodo Baggins?' DataCollection(pipe_search(project_name, query)).show() ``` The model does a good job of pulling three closely matched nodes: Add an LLM Now, it’s time to add a large language model (LLM) so users can hold a conversation with the chatbot. We’ll use ChatGPT and the OpenAI API for this example. Chat history In order to get better results from the LLM, you need to store chat history and present it with queries. You’ll use SQLite for this step: Here's a function for retrieving the history: ```python from towhee import pipe, ops, DataCollection pipe_get_history = ( pipe.input('collection_name', 'session') .map(('collection_name', 'session'), 'history', ops.chat_message_histories.sql(method='get')) .output('collection_name', 'session', 'history') ) ``` Here's the one to store it: ```python from towhee import pipe, ops, DataCollection pipe_add_history = ( pipe.input('collection_name', 'session', 'question', 'answer') .map(('collection_name', 'session', 'question', 'answer'), 'history', ops.chat_message_histories.sql(method='add')) .output('history') ) ``` LLM query pipeline Now, we need a pipeline to submit queries to ChatGPT: This pipeline: - searches Milvus using the user's query - collects the current chat history - submits the query, Milvus search, and chat history to ChatGPT - Appends the ChatGPT result to the chat history - Returns the result to the caller ```python from towhee import pipe, ops, DataCollection chat = ( pipe.input('collection_name', 'query', 'session') .map('query', 'query_vec', ops.sentence_embedding.sbert(model_name=EMBED_MODEL)) .map('query_vec', 'query_vec', ops.np_normalize()) .map(('collection_name', 'query_vec'), 'search_res', ops.ann_search.osschat_milvus(host=MILVUS_HOST, port=MILVUS_PORT, **{'metric_type': 'IP', 'limit': 3, 'output_fields': ['text']})) .map('search_res', 'knowledge', lambda y: [x[2] for x in y]) .map(('collection_name', 'session'), 'history', ops.chat_message_histories.sql(method='get')) .map(('query', 'knowledge', 'history'), 'messages', ops.prompt.question_answer()) .map('messages', 'answer', ops.LLM.OpenAI(api_key=OPENAI_API_KEY, model_name='gpt-3.5-turbo', temperature=0.8)) .map(('collection_name', 'session', 'query', 'answer'), 'new_history', ops.chat_message_histories.sql(method='add')) .output('query', 'history', 'answer') ) ``` Let's test this pipeline before connecting it to a GUI: ```python new_query = 'Where did Frodo take the ring?' DataCollection(chat(COLLECTION_NAME, new_query, session_id)).show() ``` The pipeline works. Let's put together a Gradio interface. Gradio GUI First, you need functions to create a session identifier and to respond to queries from the interface: These functions create a session ID using a UUID, and accept a session and query for the query pipeline: ```python import uuid import io def create_session_id(): uid = str(uuid.uuid4()) suid = ''.join(uid.split('-')) return 'sess_' + suid def respond(session, query): res = chat(COLLECTION_NAME, query, session).get_dict() answer = res['answer'] response = res['history'] response.append((query, answer)) return response ``` Next, the Gradio interface uses these functions to build a chatbot: It uses the Blocks API to create a ChatBot interface. The Send Message button uses the respond function to send requests to ChatGPT: ```python import gradio as gr with gr.Blocks() as demo: session_id = gr.State(create_session_id) with gr.Row(): with gr.Column(scale=2): gr.Markdown('''## Chat''') conversation = gr.Chatbot(label='conversation').style(height=300) question = gr.Textbox(label='question', value=None) send_btn = gr.Button('Send Message') send_btn.click( fn=respond, inputs=[ session_id, question ], outputs=conversation, ) demo.launch(server_name='127.0.0.1', server_port=8902) ``` Here it is: Now, you have an intelligent chatbot! Summary In this post, we created Towhee pipelines to ingest unstructured data, process it for embeddings, and store those embeddings in Milvus. Then, we created a query pipeline for the chat function and connected the chatbot with an LLM. Finally, we got an intelligent chatbot. This tutorial demonstrates how easy it is to build applications with Milvus. Milvus brings numerous advantages when integrated into applications, especially those relying on machine learning and artificial intelligence. It offers highly efficient, scalable, and reliable vector similarity search and analytics capabilities critical in applications like chatbots, recommendation systems, and image or text recognition.