Implementing RPC Calls with RabbitMQ in TypeScript

March 16, 2024

Communication between different parts of an application is crucial. Two common patterns emerge for these interactions: Request/Response and Event-Driven. While Event-Driven architectures focus on asynchronous communication without expecting a response immediately, the Request/Response pattern, especially implemented through Remote Procedure Calls (RPC), emphasizes a direct call and response mechanism between services.

In this blog post, I try to show a real-world example of how that can be used.

RabbitMQ

RabbitMQ is an open-source message broker software that enables applications to communicate with each other and share data by sending messages. It supports multiple messaging protocols, message queuing, delivery acknowledgement, and flexible routing to queues, making it highly reliable and scalable for modern application architectures.

More about RPC

Remote Procedure Calls (RPC) are a protocol that one program can use to request a service from a program located on another computer in a network without needing to understand network details. RPC abstracts the communication, so developers can call functions on remote servers just as they would do on a local system, expecting a response back.

Scenario: Inventory Check

Imagine an e-commerce application where a user adds an item to their cart. Before confirming the purchase, the application needs to verify if the item is in stock. Here's where RPC with RabbitMQ can be incredibly useful:

  1. Client (Cart Service): The cart service prepares an RPC request containing the item ID and sends it to a dedicated RabbitMQ queue.
  2. Server (Inventory Service): A separate inventory service listens on that queue for incoming requests.
  3. Inventory Check: Upon receiving the request, the inventory service checks its database for the item's availability.
  4. Response: The inventory service sends an RPC response back to the cart service, indicating whether the item is in stock or not.
  5. Cart Update: Based on the response, the cart service can update the user interface and potentially offer alternatives if the item is unavailable.

Although, service structure and use case can be imaginary and not the best but this scenario showcases the benefits of RPC:

  • Decoupling: The cart service doesn't need to know the internal workings of the inventory service. They communicate solely through messages.
  • Scalability: The inventory service can be scaled independently to handle high traffic without affecting the cart service.
  • Resilience: If the inventory service is temporarily unavailable, the cart service can handle the fallback gracefully.

Implementation with Typescript

Cart service:

import { connect } from 'amqplib';
import { randomUUID } from 'crypto';
 
// Function to call the inventory service
async function checkInventory(itemId: string): Promise<boolean> {
  // Create a connection to RabbitMQ
  const connection = await connect('amqp://localhost');
  const channel = await connection.createChannel();
 
  // Declare a temporary queue for responses
  const replyQueue = await channel.assertQueue('', { exclusive: true });
 
  // Generate a unique correlation ID
  const correlationId = randomUUID();
 
  // Prepare the RPC request message
  const message = {
    itemId,
    replyTo: replyQueue.queue,
    correlationId,
  };
 
  // Send the request to the RPC queue
  await channel.sendToQueue('inventory_checks', Buffer.from(JSON.stringify(message)), {
    correlationId,
    replyTo: replyQueue.queue,
  });
  
  // Make sure to disconnect the channel
 
  // Consume responses from the temporary queue
  return new Promise<boolean>((resolve) => {
    channel.consume(replyQueue.queue, (msg) => {
      if (msg.properties.correlationId === correlationId) {
        channel.ack(msg);
        resolve(JSON.parse(msg.content.toString()).inStock);
      }
    });
  });
}

Inventory service

import { connect } from 'amqplib';
 
async function listenForInventoryChecks() {
  // Create a connection to RabbitMQ
  const connection = await connect('amqp://localhost');
  const channel = await connection.createChannel();
 
  // Declare the RPC queue
  await channel.assertQueue('inventory_checks');
 
  // Consume messages from the RPC queue
  channel.consume('inventory_checks', async (msg) => {
    const request = JSON.parse(msg.content.toString());
    const itemId = request.itemId;
 
    // Check inventory for the requested item
    const inStock = await checkInventoryDatabase(itemId);
 
    // Prepare the response message
    const response = { inStock };
 
    // Send the response back to the client's temporary queue
    await channel.sendToQueue(msg.properties.replyTo, Buffer.from(JSON.stringify(response)), {
      correlationId: msg.properties.correlationId,
    });
 
    channel.ack(msg);
  });
}