Omid Sayfun
Omid SayfunComputer Geek
Home
Notebook
Journey

Online

Github
Linkedin
Read.cv
Notebook
A try at type-safe groupBy function in TypeScript
April 10, 2025
Adding prettier to eslint
April 10, 2025
Upgrading my blog to Next 15
April 05, 2025
tsx doesn’t support decorators
March 26, 2025
Extending Window: types vs interfaces
March 21, 2025
Loading env file into Node process
February 06, 2025
Validating NestJS env vars with zod
February 06, 2025
Using node API for delay
February 06, 2025
React Component Lifecycle
November 28, 2024
Email special headers
November 20, 2024
How CQRS is different than Event Sourcing
August 18, 2024
RabbitMQ exchange vs queue
August 14, 2024
PgVector similarity search distance functions
August 13, 2024
PgVector indexing options for vector similarity search
July 31, 2024
Counting GPT tokens
June 30, 2024
Logging route handler responses in Next.js 14
June 19, 2024
Redirect www subdomain with Cloudflare
June 17, 2024
Logging requests in Express app
June 16, 2024
Move Docker volume to bind mount
June 12, 2024
Using puppeteer executable for GSTS
June 08, 2024
Next.js Hydration Window Issue
May 29, 2024
Using Git rebase without creating chaos in your repo
May 16, 2024
Why EQ is Your Next Career Upgrade
May 13, 2024
Storing Vector in Postgres with Drizzle ORM
March 21, 2024
Implementing RPC Calls with RabbitMQ in TypeScript
March 16, 2024
Optimize webpage load with special tags
March 15, 2024
What the hell is Open Graph?
March 13, 2024
My go-to Next.js ESlint config
March 10, 2024
List of useful Chrome args
March 10, 2024
Add update date field to Postgres
February 27, 2024
Combining RxJS observables - Part 1
February 20, 2024
Canvas macOS issue
February 20, 2024

Implementing RPC Calls with RabbitMQ in TypeScript

March 16, 2024

Continue Reading

  • 09-03-2024

    Combining RxJS observables - Part 1

  • 14-08-2024

    RabbitMQ exchange vs queue

  • 18-08-2024

    How CQRS is different than Event Sourcing

  • 16-06-2024

    Logging requests in Express app

  • 11-04-2025

    A try at type-safe groupBy function in TypeScript

Communication between different parts of an application is crucial. Two common patterns emerge for these interactions: Request/Response and Event-Driven. While Event-Driven architectures focus on asynchronous communication without expecting a response immediately, the Request/Response pattern, especially implemented through Remote Procedure Calls (RPC), emphasizes a direct call and response mechanism between services.

In this blog post, I try to show a real-world example of how that can be used.

RabbitMQ

RabbitMQ is an open-source message broker software that enables applications to communicate with each other and share data by sending messages. It supports multiple messaging protocols, message queuing, delivery acknowledgement, and flexible routing to queues, making it highly reliable and scalable for modern application architectures.

More about RPC

Remote Procedure Calls (RPC) are a protocol that one program can use to request a service from a program located on another computer in a network without needing to understand network details. RPC abstracts the communication, so developers can call functions on remote servers just as they would do on a local system, expecting a response back.

Scenario: Inventory Check

Imagine an e-commerce application where a user adds an item to their cart. Before confirming the purchase, the application needs to verify if the item is in stock. Here's where RPC with RabbitMQ can be incredibly useful:

  1. Client (Cart Service): The cart service prepares an RPC request containing the item ID and sends it to a dedicated RabbitMQ queue.
  2. Server (Inventory Service): A separate inventory service listens on that queue for incoming requests.
  3. Inventory Check: Upon receiving the request, the inventory service checks its database for the item's availability.
  4. Response: The inventory service sends an RPC response back to the cart service, indicating whether the item is in stock or not.
  5. Cart Update: Based on the response, the cart service can update the user interface and potentially offer alternatives if the item is unavailable.

Although, service structure and use case can be imaginary and not the best but this scenario showcases the benefits of RPC:

  • Decoupling: The cart service doesn't need to know the internal workings of the inventory service. They communicate solely through messages.
  • Scalability: The inventory service can be scaled independently to handle high traffic without affecting the cart service.
  • Resilience: If the inventory service is temporarily unavailable, the cart service can handle the fallback gracefully.

Implementation with Typescript

Cart service:

import { connect } from 'amqplib';
import { randomUUID } from 'crypto';
 
// Function to call the inventory service
async function checkInventory(itemId: string): Promise<boolean> {
  // Create a connection to RabbitMQ
  const connection = await connect('amqp://localhost');
  const channel = await connection.createChannel();
 
  // Declare a temporary queue for responses
  const replyQueue = await channel.assertQueue('', { exclusive: true });
 
  // Generate a unique correlation ID
  const correlationId = randomUUID();
 
  // Prepare the RPC request message
  const message = {
    itemId,
    replyTo: replyQueue.queue,
    correlationId,
  };
 
  // Send the request to the RPC queue
  await channel.sendToQueue('inventory_checks', Buffer.from(JSON.stringify(message)), {
    correlationId,
    replyTo: replyQueue.queue,
  });
  
  // Make sure to disconnect the channel
 
  // Consume responses from the temporary queue
  return new Promise<boolean>((resolve) => {
    channel.consume(replyQueue.queue, (msg) => {
      if (msg.properties.correlationId === correlationId) {
        channel.ack(msg);
        resolve(JSON.parse(msg.content.toString()).inStock);
      }
    });
  });
}

Inventory service

import { connect } from 'amqplib';
 
async function listenForInventoryChecks() {
  // Create a connection to RabbitMQ
  const connection = await connect('amqp://localhost');
  const channel = await connection.createChannel();
 
  // Declare the RPC queue
  await channel.assertQueue('inventory_checks');
 
  // Consume messages from the RPC queue
  channel.consume('inventory_checks', async (msg) => {
    const request = JSON.parse(msg.content.toString());
    const itemId = request.itemId;
 
    // Check inventory for the requested item
    const inStock = await checkInventoryDatabase(itemId);
 
    // Prepare the response message
    const response = { inStock };
 
    // Send the response back to the client's temporary queue
    await channel.sendToQueue(msg.properties.replyTo, Buffer.from(JSON.stringify(response)), {
      correlationId: msg.properties.correlationId,
    });
 
    channel.ack(msg);
  });
}