gocw2/mq.go

96 lines
2.6 KiB
Go
Raw Normal View History

2019-05-04 05:53:35 +02:00
package main
import (
"encoding/json"
"log"
2019-05-04 05:56:03 +02:00
"strconv"
2019-05-04 05:53:35 +02:00
"github.com/streadway/amqp"
)
2019-05-04 05:56:03 +02:00
func MQSendMsgWorker(id int, msgs <-chan ChatWarsMessage) {
2019-05-14 11:34:15 +02:00
conn, err := amqp.Dial("amqp://" + cfg.Rabbit.User + ":" + cfg.Rabbit.Password + "@" + cfg.Rabbit.Host + "/" + cfg.Rabbit.SendQueue)
2019-05-04 05:53:35 +02:00
failOnError(err, "MQSendMsgWorker["+strconv.Itoa(id)+"] : Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "MQSendMsgWorker["+strconv.Itoa(id)+"] : Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"msg", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "MQSendMsgWorker["+strconv.Itoa(id)+"] : Failed to declare a queue")
for m := range msgs {
b, err := json.Marshal(m)
if err != nil {
logOnError(err, "MQSendMsgWorker["+strconv.Itoa(id)+"] : Marshaling message.")
} else {
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: []byte(b),
})
if err != nil {
logOnError(err, "MQSendMsgWorker["+strconv.Itoa(id)+"] : Publishing message.")
}
}
}
log.Printf("MQSendMsgWorker[" + strconv.Itoa(id) + "] : Closing.")
}
2019-05-14 11:34:15 +02:00
2019-05-15 04:35:51 +02:00
func MQReceiveMsgWorker(id int, cmd chan<- ChatWarsCommand) {
2019-05-14 11:34:15 +02:00
conn, err := amqp.Dial("amqp://" + cfg.Rabbit.User + ":" + cfg.Rabbit.Password + "@" + cfg.Rabbit.Host + "/" + cfg.Rabbit.ReceiveQueue)
failOnError(err, "MQReceiveMsgWorker["+strconv.Itoa(id)+"] : Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "MQReceiveMsgWorker["+strconv.Itoa(id)+"] : Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"msg", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "MQReceiveMsgWorker["+strconv.Itoa(id)+"] : Failed to declare a queue")
m, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "MQReceiveMsgWorker["+strconv.Itoa(id)+"] : Failed to declare a consumer")
for d := range m {
// log.Printf("MQReceiveMsgWorker["+strconv.Itoa(id)+"] : Received a message: %s", string(d.Body))
err = json.Unmarshal(d.Body, &x)
logOnError(err, "MQReceiveMsgWorker["+strconv.Itoa(id)+"] : Can't unmarshal.\n"+string(d.Body))
if err == nil {
cmd <- x
}
}
log.Printf("MQReceiveMsgWorker[" + strconv.Itoa(id) + "] : Closing.")
}