gocw2/mq.go

146 lines
3.9 KiB
Go

package main
import (
"encoding/json"
"log"
"strconv"
"time"
"github.com/streadway/amqp"
)
func MQSendMsgWorker(id int, msgs <-chan ChatWarsMessage) {
conn, err := amqp.Dial("amqp://" + cfg.Rabbit.User + ":" + cfg.Rabbit.Password + "@" + cfg.Rabbit.Host + "/" + cfg.Rabbit.SendQueue)
failOnError(err, "MQSendMsgWorker["+strconv.Itoa(id)+"] : Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "MQSendMsgWorker["+strconv.Itoa(id)+"] : Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"msg", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "MQSendMsgWorker["+strconv.Itoa(id)+"] : Failed to declare a queue")
for m := range msgs {
b, err := json.Marshal(m)
if err != nil {
logOnError(err, "MQSendMsgWorker["+strconv.Itoa(id)+"] : Marshaling message.")
} else {
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: []byte(b),
})
logOnError(err, "MQSendMsgWorker["+strconv.Itoa(id)+"] : Publishing message.")
}
}
log.Printf("MQSendMsgWorker[" + strconv.Itoa(id) + "] : Closing.")
}
func MQReceiveMsgWorker(id int, cmd chan<- TGCommand) {
var c TGCommand
conn, err := amqp.Dial("amqp://" + cfg.Rabbit.User + ":" + cfg.Rabbit.Password + "@" + cfg.Rabbit.Host + "/" + cfg.Rabbit.ReceiveQueue)
failOnError(err, "MQReceiveMsgWorker["+strconv.Itoa(id)+"] : Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "MQReceiveMsgWorker["+strconv.Itoa(id)+"] : Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"msg", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "MQReceiveMsgWorker["+strconv.Itoa(id)+"] : Failed to declare a queue")
m, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "MQReceiveMsgWorker["+strconv.Itoa(id)+"] : Failed to declare a consumer")
for d := range m {
//log.Printf("MQReceiveMsgWorker["+strconv.Itoa(id)+"] : Received a message:\n %s", string(d.Body))
err = json.Unmarshal(d.Body, &c)
logOnError(err, "MQReceiveMsgWorker["+strconv.Itoa(id)+"] : Can't unmarshal.\n"+string(d.Body))
if err == nil {
cmd <- c
}
}
log.Printf("MQReceiveMsgWorker[" + strconv.Itoa(id) + "] : Closing.")
}
func MQKeepAliveWorker() {
conn, err := amqp.Dial("amqp://" + cfg.Rabbit.User + ":" + cfg.Rabbit.Password + "@" + cfg.Rabbit.Host + "/" + cfg.Rabbit.SendQueue)
failOnError(err, "MQKeepAliveWorker : Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "MQKeepAliveWorker : Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"keepalive", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "MQKeepAliveWorker : Failed to declare a queue")
for true {
t := time.Now()
m := MQKeepAlive{
TGUserID64: ownUserID64,
Nickname: cfg.Rabbit.User,
Queue: cfg.Rabbit.ReceiveQueue,
Date: t,
Build: githash,
}
b, err := json.Marshal(m)
if err != nil {
logOnError(err, "MQKeepAliveWorker : Marshaling message.")
} else {
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: []byte(b),
})
if err != nil {
logOnError(err, "MQKeepAliveWorker : Publishing message.")
}
}
time.Sleep(KeepAliveHeartBeatSeconds * time.Second)
}
log.Printf("MQKeepAliveWorker : Closing.")
}