มาลองใช้ Kafka กับ NodeJS กันเถอะ — ”Good morning Kafka !!!”

เชื่อว่า Kafka หนึ่งในเทคโนโลยีที่ฮอตและร้อนแรงที่สุดในโลก developer เรา เรามักจะได้ยินชื่อ Kafka กันบ่อยๆ แต่เชื่อกว่าคงมีหลายคนที่แอบสงสัยว่า Kafka คืออะไร และทำไมเราต้องใช้ Kafka กันด้วยล่ะ วันนี้เกิด Use case ที่สนใจขึ้นพอดี เลยอยากลองเอา Kafka มายกตัวอย่างให้ฟังว่า Kafka คืออะไร และ ทำอะไรได้บ้าง

Kafka คืออะไร สำหรับ developer ที่มีประสบการณ์กับ Enterprise มาก่อนแล้วบ้าง คงจะคุ้นเคยกับคำว่า messaging service มาบ้างแล้ว โดยพื้นฐานแล้ว Kafka คือ messaging system ที่ทำเสร็จแล้ว (LOL)

0_eLNtCuOPWrgDgkDF

โดยปกติแล้ว ข้อมูลที่ส่งไปมาในระบบ messaging นั้นเราจะเรียกว่า “message” และจะมีผู้เกี่ยวข้องอย่างน้อย 3 parties คือ

  1. Producer หรือใน JMS เรามักเรียกว่า Publisher เป็นผู้สร้าง message และ ส่งมาให้กับตัวกลาง โดยเวลาส่งนั้นก็จะสามารถจัดกลุ่มในการส่งเรียกว่า “Topic” เพื่อให้ผู้รับสามารถมาเรียกหาข้อมูลได้ถูกต้อง
  2. Message Broker คือตัวกลางในการรับ/ส่ง โดยตัว Kafka เองนั้นมีความสามารถในการทำงานเป็นกลุ่ม ที่เรียกกันว่า Cluster เพื่อช่วยเพิ่มให้สามารถรองรับการขยาย (scale) ได้ โดยในการรับส่งข้อมูลนั้น message จะถูกส่งมารวมกันไว้ในหมวดหมู่ที่เรียกว่า “Topic” นั่นเอง
  3. Consumer หรือใน JMS เราเรียกมันว่า Subscriber ซึ่งจะเป็นผู้ที่จะรับเอา message ไปดำเนินการต่อนั่นเอง โดยปกติแล้ว ในการออกแบบ consumer นั้น เรามักจะกำหนด consumer group เพื่อความสะดวกในการจัดการกับ message ในแต่ละ topic และเราจะวางให้ consumer ใน consumer group มีจำนวนไม่เกินจำนวน partition เพราะถ้ามีจำนวนมากเกินไปจะทำให้มีบาง consumer ไม่ทำงานเพราะต้องรอ queue ไปก่อน

0_JIL_XGrknAFyRdxe

แล้วทำไมเราต้องใช้ Kafka/Message Broker พวกนี้ด้วยล่ะ เพราะว่าในการทำงานใน Real world scenario นั้นเราบอกไม่ได้ว่าผู้ใช้งานจะเฮโลเข้ามาพังระบบของเราเมื่อไหร่ยังไงล่ะพี่ชายยยยยย

Benefit ที่เราจะได้จากการใช้งาน Kafka นั้นหลักๆแล้วก็คือ

  1. ประสิทธิภาพที่ดีขึ้นจากสื่อสารแบบ Asynchronous ทำให้ producer และ consumer ไม่ต้องติดต่อกันโดยตรง producer เองก็สามารถส่ง message เข้าไปได้เลยโดยที่ไม่ต้องรอการประมวลผล และ consumerเองก็จะทำงานก็ต่อเมื่อมี message เข้ามา สำหรับใครที่สนใจการออกแบบด้วย CQRS (Command Query Responsibility Seperation) ก็น่าจะถูกใจ Kafka มากทีเดียว เพราะหนึ่งใน use case ของ Kafka ที่ kafka.apache.org ได้ mentioned ถึงก็คือใช้เป็น event sourcing ได้ด้วย !!! และยังสามารถ decoupling ระบบออกจากกันได้ด้วย ทำให้สามารถ focus กับ functionality ในแต่ละส่วนได้
  2. เพิ่ม Reliability ให้กับระบบเนื่องจาก message queue จะช่วยลดความผิดพลาดลงได้ เพราะ queue จะทำ persist ข้อมูลเอาไว้ในระบบชั่วคราว เมื่อระบบที่เป็น Fronting หรือ Backend ส่วนใดส่วนหนึ่งมีปัญหา ระบบที่เหลืออยู่ก็ยังสามารถทำงานได้ อีกทั้ง message queue ยังสามารถที่จะป้องกันความผิดพลาดด้วย replica/mirror ได้ทำให้ลดโอกาสเกิดความผิดพลาดได้อีก
  3. เพิ่มความสามารถในการ scale ด้วย consumer ที่เพิ่ม/ลดได้แบบ on-demand

เอาล่ะ เรามาลองลงมือกันเลยดีกว่า :D

อันดับแรก เราก็จะสร้าง docker compose file ก่อน เพื่อที่จะ run Kafka แบบง่ายๆกันก่อน

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka:0.10.2.0-1
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: {Docker Machine IP} #192.168.99.100
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

อันนี้อย่าลืมเปลี่ยน {Docker Machine IP} เป็น IP Address ของเครื่องที่จะทดลองรัน Kafka ก่อนนะครับ หลังจากเรียบร้อยแล้ว เราก็ลอง start docker กันเลย

docker-compose -f docker-compose-simple-kafka.yml up -d

ซึ่งเมื่อเรารันครั้งแรก docker ก็จะ pull filesystem มาตามระเบียบครับ

1_wDqm5WfFWDBFCBBMmEUTDQ

อันนี้เป็นผลลัพธ์จากเครื่องผมเอง
เมื่อเราเช็คดูว่า docker up and running เรียบร้อยดีแล้ว ก็ถึงเวลามาเช็คว่า kafka ทำงานได้ถูกต้องไหมนะครับ

1_ZYi3LNCeG4LWVLBT1nz3EQ

ในการทดสอบนั้นผมจะใช้ kafka shell script เพื่อทดสอบสร้าง topic ดูว่าสามารถทำงานได้ถูกต้องหรือไม่นะครับ สามารถ download ได้จากลิงค์นี้เลยครับ https://kafka.apache.org/downloads

#คำสั่งที่ใช้เรียกดู topics ใน kafka
bin/kafka-topics.sh --list --zookeeper localhost:2181
#คำสั่งที่ใช้สร้าง topics ใน kafka
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-kafka

ในคำสั่งนี้ เราจะสร้าง topic ที่มีชื่อว่า test-kafka กันนะครับ ซึ่งผมก็ได้ทดลองกับ kafka cluster ที่เราสร้างด้วย docker นี้แล้ว เหมือนจะไม่มีปัญหาอะไรนะครับ

1_naKQnQnU6yS1KRYFPZsU4g-1

1_18Hup86zjeDl0kvObEAO8A

ลำดับถัดไป เราจะใช้งาน console consumer เพื่อเทสต์ดูว่า broker สามารถรับส่งข้อมูลได้เป็นปกติดีไหม

./bin/kafka-console-consumer.sh — bootstrap-server localhost:9092 — topic test-kafka — from-beginning

และเราก็ใช้ console producer เพื่อใช้ในการส่ง message ไปให้ consumer ของเรา โดยใช้ command นี้

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-kafka

ซึ่งพอเราพิมพ์ไปที่ console producer ตัว message จะไปแสดงที่ consumer message เลยทันที

1_naKQnQnU6yS1KRYFPZsU4g

1_EaK97vXuNxQNfypNF3tlww

ที่นี้ขั้นตอนต่อไปเราจะลองใช้ nodejs มาเป็น consumer ของ topic “test-kafka” นี้ดูนะครับ โดยเราจะใช้ npm package ที่ชื่อว่า kafka-node (https://www.npmjs.com/package/kafka-node) มาใช้ในการสร้าง nodejs kafka consumer แทนตัว kafka-consumer ดูนะครับ

const kafka = require('kafka-node')
const Consumer = kafka.Consumer
const client = new kafka.Client()
const consumer = new Consumer(client, [{
  topic: 'test-kafka',
  offset: 0
}], {
  autoCommit: true
});

consumer.on('message', function (message) {
  console.log(message);
});

consumer.on('error', function (err) {
  console.log('Error:', err);
})

consumer.on('offsetOutOfRange', function (err) {
  console.log('offsetOutOfRange:', err);
})

อันนี้เป็นโค้ดตัวอย่างง่ายๆ ที่ใช้ในการ intercept message stream จาก Kafka ด้วย consumer class นี้นะครับ สังเกตุดูว่า จะมี option autoCommit ที่ใช้ในการสร้าง Consumer ถ้าเราเปลี่ยนเป็น false เมื่อเราเรียก consumer.js ใหม่มันจะแสดงข้อความตั้งแต่ offset แรกเสมอ แต่ถ้าเราให้ autoCommit เป็น true เมื่อ message ใดๆถูกเรียกแล้ว มันจะไม่ถูกเรียกขึ้นมาแสดงอีกนะครับ

node consumer.js

1_pnTq2A57OahWCpqyziYdng-1

สำหรับตอนถัดๆไป เราจะมาลองใช้ Kafka ทำหลายๆอย่างที่ผมเคยเขียน blog มาตั้งแต่ตอนแรกๆ เช่น ใช้เป็น log aggregrator, metrics composer, Stream processing หรือ Event sourcing ดูนะครับ สำหรับตอนนี้ เกือบเช้าแล้ว คงได้เวลาไปนอน

“Good morning Kafka !!!” ครับ

References: