반응형
RabbitMQ란?
- AMQP(Advanced Message Queueing Protocol, MQ의 오픈소스에 기반한 표준 프로토콜) 프로토콜을 구현한 메시지 브로커
- 주요 개념으로는 Producer, Exchange, Binding, Queue, Consumer
- Exchange, Binding, Queue는 Broker 영역에 해당됨
Producer
- 메시지를 생성하고 발송하는 주체
- 메시지를 Queue에 직접 전달하지 않고 항상 Exchange를 통해 전달
Exchange
- Producer들에게서 전달 받은 메시지들을 어떤 Queue들에 발송할지를 결정하는 주체
- 일종의 라우터 개념
- Exchange는 네 가지 타입으로 바인딩 가능함 (Direct, Topic, Headers, Fanout)
- Direct 라우팅 키를 활용해 라우팅하는 타입 (Unicast)
- Topic 라우팅 패턴이 일치하는 Queue에 메시지 전송 (Multicast)
- Headers [key, value]로 이루어진 header 값을 기준으로 일치하는 Queue에 메시지 전송 (Multicast)
- Fanout 해당 Exchange에 등록된 모든 Queue에 메시지 전송 (Brodcast)
Consumer
- 메시지를 수신하는 주체
- Queue에 직접 접근하여 메시지를 가져옴
Queue
- Producer들이 발송한 메세지들이 Consumer가 소비하기 전까지 보관되는 장소
RabbitMQ Docker 실행
#docker-compose.yml
version: "2"
services:
rabbitmq:
image: rabbitmq:3.6-management
container_name: rabbitmq
network_mode: host
ports:
- 5672:5672
- 15672:15672
http://localhost:15672/#/
id : Guest
pw : Guest
Go RabbitMQ 클라이언트 사용
go get 을 사용하여 amqp 설치
go get github.com/rabbitmq/amqp091-go
Producer
package main
import (
"log"
amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"hello", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
body := "Hello World!"
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s\n", body)
}
Consumer
package main
import (
"log"
amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"hello", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
var forever chan struct{}
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
> go run .\main.go
2022/08/19 19:11:14 [*] Waiting for messages. To exit press CTRL+C
2022/08/19 19:11:14 Received a message: Hello World!
참고
반응형
'Go' 카테고리의 다른 글
Elasticsearch Golang Client 사용하기 (0) | 2023.08.25 |
---|---|
Go Gin 사용해보기 (0) | 2022.08.16 |
Golang 설치 (0) | 2022.08.16 |