Improving LLMs CAG with accumulative knowledge using RabbitMQ, Ollama and Gemma3

Improving LLMs CAG with accumulative knowledge using RabbitMQ, Ollama and Gemma3

Why RabbitMQ ?

Thinking about an LLM that will receive a lot of messages from everywhere means that you need to keep track of its messages and responses so the context could be understood on the whole conversation. Remember that it can't be just a stream every time; sometimes users would appear after days or months, and the sense of knowing the person should be stored.

Now, storing is just one part of the solution for retaining context. We all have heard about RAG and CAG, but what happens if I need to store the information in real-time? You know, users keep sending messages. So the saving and retrieving messages would be blocking the AI's response time. Therefore, I figured why not use a queue system that notifies a consumer about new messages, stores them in a DB, so the main LLM manager just retrieves the history as it's been tuned. Of course, that means that after the LLM finishes the response, the consumer should be notified.

How did I do it?

Well, first you need some stuff to begin with:

  • Install Docker, if you already have it, nice!
  • Pull the RabbitMQ image from Docker.
  • Start running RabbitMQ; follow the commands below.
  • Ensure RabbitMQ is working by going to your browser at http://localhost:15672/.
    docker pull rabbitmq:management
    docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
        

Now we need somewhere to publish the message on the queue

Okay, yes, in fact there should be two places that this should be called:

The first one is when the main LLM manager receives the message, and the second one should be when the LLM answers it.

So basically, we are going to keep track of the "user" questions and the "assistant" answers, and the Ollama protocol for the chat function works.

Sending message to queue


    async function sendMessageToQueue(message) {
    try {
        const connection = await amqplib.connect(UPDATE_MESSAGES_QUEUE_URL);
        const channel = await connection.createChannel();

        await channel.assertQueue(UPDATE_MESSAGES_QUEUE_NAME, { durable: false });
        channel.sendToQueue(UPDATE_MESSAGES_QUEUE_NAME, Buffer.from(JSON.stringify(message)));

        console.log('✅ Message sent:', message);
        await channel.close();
        await connection.close();
    } catch (err) {
        console.error('❌ Error sending message:', err);
    }
}
        

Calling it on the LLM manager program


    //first
    await sendMessageToQueue({
    owner: owner,
    conversationId,
    role: 'user',
    message: messageQuestion
});

// calling the LLM
const response = await ollama.chat({
    model: 'gemma3',
    options:{
        num_predict: 300,
        temperature: 0.7,
    },
    messages: [
        {
            role: 'system',
            content: `${templatePrompt}`
        },
        ...messages, // we will see how we get the messages later
        {
            role: 'user',
            content: `
            ${messageQuestion}
            `
        },

],
    requestId: nanoid() // Unique ID for tracking
});

//second
await sendMessageToQueue({
    owner: owner,
    conversationId,
    role: 'assistant',
    message: response.message.content
});
        

How to create a listener ?

So we already have someone who is sending messages , now what can we do on the lsitener?
well as I said at the begining it should save on the store , in this case a sql db table for the messages

in the following code you will see that it stores the role the message and the conversation id , so we can use them to retrieve the content on the next steps


      import amqplib from 'amqplib';
import sqlite3 from 'sqlite3';
import path from 'path';
import { fileURLToPath } from 'url';

// Resolves __dirname in ES Modules
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);

const queueName = 'myQueue';

try {
  const connection = await amqplib.connect('amqp://localhost');
  const channel = await connection.createChannel();
  await channel.assertQueue(queueName, { durable: false });

  console.log(`👂 Listening for messages on "${queueName}"...`);

  channel.consume(queueName, (msg) => {
    console.log('📥 Received message:', msg.content.toString());
    if (!msg) return;

    const data = JSON.parse(msg.content.toString());
    const { owner, conversationId, role, message } = data;
    const timestamp = new Date().toISOString();

    // Adjust path calculation
    const dbPath = path.join(__dirname, '..', '..', 'users', owner, 'messages.sqlite');
    console.log('📂 Database path:', dbPath);
    const db = new sqlite3.Database(dbPath);

    db.run(
      `CREATE TABLE IF NOT EXISTS messages (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        timestamp TEXT,
        conversationId TEXT,
        role TEXT,
        message TEXT
      )`
    );

    db.run(
      `INSERT INTO messages (timestamp, conversationId, role, message) VALUES (?, ?, ?, ?)`,
      [timestamp, conversationId, role, message],
      (err) => {
        if (err) {
          console.error('❌ Failed to insert message:', err.message);
        } else {
          console.log(`✅ Message saved for owner "${owner}"`);
          channel.ack(msg);
        }
        db.close();
      }
    );
  });
} catch (err) {
  console.error('❌ Error in consumer:', err);
}
      

Comments

Popular posts from this blog

Analysis of dark patterns in League of Legends and Star Wars:Battlefront II with the usage of Loot Boxes

Kerstin's Fate developer diary

Exploring LLMs with Ollama and Llama3