Implement RabbitMQ Node.js แบบบ้าน ๆ แต่ใช้ได้จริง

ต่อจากบทความก่อนที่เราได้พูดถึงเรื่อง Message Queue และ RabbitMQ ไปแล้ว รอบนี้เราจะมาลองใช้งานเจ้า rabbitmq + Express แล้วยิง Load Test ด้วย K6 ทดสอบ performance เริ่มกันนเลยยยย!!!!

อย่างแรกเริ่มจาก setup express ก่อนเลยครับ

  • mkdir node-rabbitmq && cd node-rabbitmq
  • npm init -y
  • npm i express

หลังจาก รัน command ตามด้านบนเสร็จเราก็จะได้ file ประมาณนี้มา ถ้าคุณได้ file ประมาณนี้แสดงว่าทำถูกแล้ว!!

สร้าง app express ต่อเลยครับ

  • สร้าง file app.js ขึ้นมา
const express = require("express");

const app = express();

app.listen(3000, () => {
    console.log("Server is running on port 3000");
});
  • หลังจากสร้าง file เสร็จแล้วก็รัน app ขึ้นมาได้เลยครับ
  • node app.js
  • ถ้าขึ้นแบบนี้แสดงว่า app เราสามารถรันได้แล้ว

ต่อไปเราจะรัน rabbitmq ใน docker กันนะครับ ถ้าใครไม่มี docker ไป install docker ก่อนนะครับ ref https://www.docker.com

  • สร้าง file docker-compose.yaml
services:
  rabbitmq:
    image: rabbitmq:management-alpine
    tty: true
    volumes:
      - ./rabbitmq_data:/var/lib/rabbitmq # Persistent volume for RabbitMQ data
    environment:
      RABBITMQ_DEFAULT_USER: "imotif" # Set default RabbitMQ user
      RABBITMQ_DEFAULT_PASS: "1234" # Set default RabbitMQ password
    ports:
      - "15672:15672"
      - "5672:5672"
  • ต่อไปเราก็ docker compose up ได้เลยครับ เพื่อรัน rabbitmq ขึ้นมา
  • docker compose up -d
  • รัน เสร็จแล้วเราสามารถ docker ps เพื่อดู status container
  • docker ps
  • ถ้า docker ps แล้วดู STATUS มันเป็น up แสดงว่า rabbitmq เราพร้อมรับ message แล้วครับ

ต่อเราจะ setup send mail กัน

  • npm i nodemailer
const express = require("express");
const nodemailer = require("nodemailer");

const app = express();

// เพิ่มฟังก์ชันส่งอีเมล
async function sendEmail() {
  let transporter = nodemailer.createTransport({
    service: "gmail",
    auth: {
      user: "your email",
      pass: "your password",
    },
  });
  // รายละเอียด mail
  let info = await transporter.sendMail({
    from: "ผู้ส่ง",
    to: "ผู้รับ",
    subject: "Hello Rabbitmq", // หัวข้อ
    text: "สวัสดีครับ Rabbitmq", // เนื้อหาธรรมดา
    html: "<b>สวัสดีครับ</b> <i>Rabbitmq</i>", // เนื้อหาธรรมดา HTML
  });

  console.log("Email sent: %s", info.messageId);
}

app.get("/", async (req, res) => {
  await sendEmail()
  res.send("Email sent");
});

app.listen(3000, () => {
    console.log("Server is running on port 3000");
});
  • รัน app ขึ้นมาได้เลยครับ
  • node app.js
  • แล้วก็ ทดสองยัง request เข้ามาได้เลยแล้วแต่ tools ที่ชอบเลยครับ ส่วนตัวผม ของ curl นะครับ 555 ง่ายดี
  • ถ้าลอง ยิง request แล้วได้ logs หน้าตาประมาณนี้ก็โอเคครับ แสดงว่าเมลเราส่งไปได้ หรือจะเข้าไปเช็คดูในเมลเราก็ได้ครับว่า มีเมลส่งมามั้ย

ที่นี้เราจะมาลองทดสอบด้วย K6 ยิง load test เข้ามาดู เพื่อจำลองเหตุการณ์จริงกันครับ

  • ติดตั้ง K6 เพื่อทดสอบกันครับ ถ้าใครไม่มี สามารถติดตั้งก่อนนะครับ ref https://k6.io/
  • ถ้าติดตั้งเสร็จแล้วลองพิมพ์ k6 บน terminal เพื่อทดสอบก่อนครับว่าใช้ได้มั้ย ถ้าใช้ได้จะได้น่าตาประมาณนี้ครับ

ต่อไปเราจะเขียน script ยิง load test ด้วย k6 ต่อครับ

  • สร้าง file test-sendmail.js ขึ้นมาครับ
import http from 'k6/http';
import { check, sleep } from 'k6';

export const options = {
    stages: [
        { duration: '5s', target: 1000 },
    ],
};
export default function () {
    const res = http.get('http://localhost:3000');
    check(res, { 'status was 200': (r) => r.status == 200 });
    sleep(1);
}
  • จาก config ด้านบนคือ ให้ K6 ค่อย ๆ เพิ่มจำนวนผู้ใช้จำลอง จาก 0 เป็น 1000 ภายใน 5 วินาที จะยิง request ไปที่ http://localhost:3000 และตรวจสอบว่า status code เป็น 200 หรือไม่ หลังจากยิงแล้วจะหน่วงเวลา 1 วินาทีก่อนยิงครั้งถัดไป เมื่อครบ 5 วินาที

นี่คือ report จาก K6 ซึ่งแสดงให้เห็นว่า K6 ได้ยิง request ไปทั้งหมด 1,429 request จากทั้งหมดนั้น มีเพียง 111 request เท่านั้นที่ได้ status code 200 ส่วนที่เหลือ 1,318 requests (92.23%) fail (ไม่ได้รับ status 200)
แสดงให้เห็นว่า endpoint หรือ service ที่รับอีเมลไม่สามารถรับโหลดระดับนี้ได้ หรือมี error เกิดขึ้นระหว่างทาง

จะลองเอา rabbitmq เข้ามาช่วยในการส่งเมลกันแล้วมาดู report ว่า request ที่ fail จะลดลงกี่เปอร์เซ็นต์

  • ผมจะสร้าง producer.js และ comsumer.js
  • แล้วก็ติดตั้ง amqp
    npm install amqplib

producer.js คนส่งงาน message เข้าไปใน queue

const express = require('express')
const amqp = require('amqplib')

const app = express()
const RABBITMQ_URL = 'amqp://imotif:1234@localhost:5672'
const QUEUE_NAME = 'email_queue'

app.get('/', async (req, res) => {
  const message = {
    to: 'ผู้ส่ง',
    subject: 'Hello Rabbitmq',
    html: '<b>สวัสดีครับ</b> <i>Rabbitmq</i>',
  }

  const connection = await amqp.connect(RABBITMQ_URL)
  const channel = await connection.createChannel()
  await channel.assertQueue(QUEUE_NAME, { durable: true })

  channel.sendToQueue(QUEUE_NAME, Buffer.from(JSON.stringify(message)), {
    persistent: true,
  })

  console.log('Sent message to queue:', message)
  res.send('Email job queued')

  setTimeout(() => {
    connection.close()
  }, 500)
})

app.listen(3000, () => {
  console.log('Producer server running on http://localhost:3000')
})

consumer.js รับงานจาก queue แล้วส่งเมล

const amqp = require("amqplib");
const nodemailer = require("nodemailer");

const RABBITMQ_URL = "amqp://imotif:1234@localhost:5672";
const QUEUE_NAME = "email_queue";

async function sendEmail({ to, subject, html }) {
  let transporter = nodemailer.createTransport({
    service: "gmail",
    auth: {
      user: "your email",
      pass: "your password",
    },
  });

  let info = await transporter.sendMail({
    from: "ผู้ส่ง",
    to: "ผู้รับ",
    subject,
    html,
  });

  console.log("Email sent:", info.messageId);
}

async function startConsumer() {
  try {
    const connection = await amqp.connect(RABBITMQ_URL);
    const channel = await connection.createChannel();
    await channel.assertQueue(QUEUE_NAME, { durable: true });

    console.log("Waiting for messages in queue:", QUEUE_NAME);

    channel.consume(
      QUEUE_NAME,
      async (msg) => {
        if (msg !== null) {
          const content = JSON.parse(msg.content.toString());
          console.log("Received:", content);
          await sendEmail(content);
          channel.ack(msg);
        }
      },
      { noAck: false }
    );
  } catch (err) {
    console.error("Consumer error", err);
  }
}

startConsumer();
เขียนเสร็จแล้วเรามาสดสอบกัน
  • เริ่มจาก รัน consumer ก่อนเลยครับ ถ้า logs แสดงแบบนี้แสดงว่า รันได้
    node consumer.js
  • หลังจากรัน consumer เสร็จแล้วก็มารัน producer ต่อกัน
    node producer.js

หลังจาก รัน consumer, producer เสร็จแล้วเรามา ยิง load test กันครับ

ถ้าดูจาก report k6 K6 จะเห็นว่า request ทั้งหมด 2993 requests ได้รับ HTTP status 200 ทั้งหมด ไม่มี request ไหน fail เลย
ซึ่งหมายความว่า Producer (Express API) ทำงานได้ดี และสามารถส่ง message เข้า RabbitMQ queue ได้ทั้งหมดโดยไม่ fail เลย

แต่ถึง consumer จะล่ม พอ start consumer กลับมา queue ก็ยังกลับมาเขียนต่อ อีกครั้งหรือจะ config ใน docker ก็ได้ให้มัน restart ตัวมันเอง

จุดที่น่าสนใจ:

  • ถึงแม้ว่า Consumer จะล่ม แต่ Message ที่อยู่ใน Queue จะไม่หายไป
  • RabbitMQ จะเก็บ message ไว้ใน queue (ถ้า config เป็น durable และไม่ ack) รอจนกว่า consumer จะกลับมาทำงาน แล้วค่อย ๆ ดึง message ไปประมวลผลต่อ
  • ดังนั้น restart Consumer กลับขึ้นมาใหม่ มันจะเริ่ม process ข้อความจาก queue ต่อได้ทันที โดยไม่เสียข้อมูล

จากเคสนี้:

  • การใช้ Message Queue ช่วย decouple ระบบ ได้อย่างแท้จริง: frontend (producer) ไม่พัง ถึง backend (consumer) จะล่ม
  • ระบบสามารถ scale หรือ recover ได้ง่ายโดยไม่สูญเสีย request ของผู้ใช้
  • จังหวะนี้แนะนำให้ ใช้ queue มาคั่นกลางการส่งเมลแทนการยิง SMTP ตรง ๆ จาก API เพื่อรองรับ load และหลีกเลี่ยง rate limit จาก Gmail

สรุปส่งท้าย:

จากที่ลองเล่นมาทั้งหมด ผมไม่ได้ลงลึกในรายละเอียดของ RabbitMQ มากนัก เช่นว่าแต่ละคำสั่งหรือ config ใช้ทำอะไรบ้าง
เพราะเป้าหมายหลักของโพสต์นี้คืออยากให้เห็น “ภาพรวม” ก่อนว่าเราควรใช้ RabbitMQ ในสถานการณ์แบบไหน

ผมเชื่อว่า พอถึงเวลาที่ระบบของคุณเริ่มมีปัญหาด้าน async, scaling หรือความช้าในการประมวลผล
คุณจะนึกถึง queue ขึ้นมาเป็นตัวเลือกแรก ๆ แน่นอน และตอนนั้นแหละค่อยไปไล่ขุดเอกสารต่อก็ไม่สายครับ

ฝากไว้เป็นไอเดียครับ แล้วเจอกันครั้งหน้าครับ สวัสดีครับ

Share the Post:

Related Posts