본문으로 바로가기

Golang RabbitMQ 클라이언트 사용해보기

category Go 2022. 8. 19. 19:12
반응형

RabbitMQ란?

  • AMQP(Advanced Message Queueing Protocol, MQ의 오픈소스에 기반한 표준 프로토콜) 프로토콜을 구현한 메시지 브로커
  • 주요 개념으로는 Producer, Exchange, Binding, Queue, Consumer
  • Exchange, Binding, Queue는 Broker 영역에 해당됨

Producer

  • 메시지를 생성하고 발송하는 주체
  • 메시지를 Queue에 직접 전달하지 않고 항상 Exchange를 통해 전달

Exchange

  • Producer들에게서 전달 받은 메시지들을 어떤 Queue들에 발송할지를 결정하는 주체
  • 일종의 라우터 개념
  • Exchange는 네 가지 타입으로 바인딩 가능함 (Direct, Topic, Headers, Fanout)
    • Direct 라우팅 키를 활용해 라우팅하는 타입 (Unicast)
    • Topic 라우팅 패턴이 일치하는 Queue에 메시지 전송 (Multicast)
    • Headers [key, value]로 이루어진 header 값을 기준으로 일치하는 Queue에 메시지 전송 (Multicast)
    • Fanout 해당 Exchange에 등록된 모든 Queue에 메시지 전송 (Brodcast)

Consumer

  • 메시지를 수신하는 주체
  • Queue에 직접 접근하여 메시지를 가져옴

Queue

  • Producer들이 발송한 메세지들이 Consumer가 소비하기 전까지 보관되는 장소

 

RabbitMQ Docker 실행

#docker-compose.yml
version: "2"
services:
  rabbitmq:
    image: rabbitmq:3.6-management
    container_name: rabbitmq
    network_mode: host
    ports:
      - 5672:5672
      - 15672:15672
http://localhost:15672/#/
id : Guest
pw : Guest

 

Go RabbitMQ 클라이언트 사용

go get 을 사용하여 amqp 설치

go get github.com/rabbitmq/amqp091-go

 

Producer

package main

import (
	"log"

	amqp "github.com/rabbitmq/amqp091-go"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Panicf("%s: %s", msg, err)
	}
}

func main() {
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	q, err := ch.QueueDeclare(
		"hello", // name
		false,   // durable
		false,   // delete when unused
		false,   // exclusive
		false,   // no-wait
		nil,     // arguments
	)
	failOnError(err, "Failed to declare a queue")

	body := "Hello World!"
	err = ch.Publish(
		"",     // exchange
		q.Name, // routing key
		false,  // mandatory
		false,  // immediate
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body),
		})
	failOnError(err, "Failed to publish a message")
	log.Printf(" [x] Sent %s\n", body)
}

 

Consumer

package main

import (
	"log"

	amqp "github.com/rabbitmq/amqp091-go"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Panicf("%s: %s", msg, err)
	}
}

func main() {
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	q, err := ch.QueueDeclare(
		"hello", // name
		false,   // durable
		false,   // delete when unused
		false,   // exclusive
		false,   // no-wait
		nil,     // arguments
	)
	failOnError(err, "Failed to declare a queue")

	msgs, err := ch.Consume(
		q.Name, // queue
		"",     // consumer
		true,   // auto-ack
		false,  // exclusive
		false,  // no-local
		false,  // no-wait
		nil,    // args
	)
	failOnError(err, "Failed to register a consumer")

	var forever chan struct{}

	go func() {
		for d := range msgs {
			log.Printf("Received a message: %s", d.Body)
		}
	}()

	log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
	<-forever
}
> go run .\main.go
2022/08/19 19:11:14  [*] Waiting for messages. To exit press CTRL+C
2022/08/19 19:11:14 Received a message: Hello World!

 

 

 

 

 

참고

반응형

'Go' 카테고리의 다른 글

Elasticsearch Golang Client 사용하기  (0) 2023.08.25
Go Gin 사용해보기  (0) 2022.08.16
Golang 설치  (0) 2022.08.16