add keepalive

This commit is contained in:
shoopea 2019-05-17 14:22:34 +08:00
parent d4088edc1a
commit 82014a9d32
3 changed files with 57 additions and 1 deletions

7
def.go
View File

@ -23,6 +23,13 @@ type ChatWarsMessage struct {
Text string `json:"text"`
}
type MQKeepAlive struct {
UserID64 int64 `json:"user_id"`
Nickname string `json:"nick"`
Queue string `json:"queue"`
Date time.Time `json:"date"`
}
const (
commandForwardMsg = 1
commandReplyMsg = 2

View File

@ -131,9 +131,9 @@ func main() {
for w := 1; w <= 3; w++ {
go MQReceiveMsgWorker(w, MQTGCmdQueue)
}
go MQKeepAliveWorker()
lastOwnTDMsg = time.Now()
go ListenCW(client)
go ListenMQ(client, MQTGCmdQueue)
go ListenMe(client)

49
mq.go
View File

@ -94,3 +94,52 @@ func MQReceiveMsgWorker(id int, cmd chan<- TGCommand) {
log.Printf("MQReceiveMsgWorker[" + strconv.Itoa(id) + "] : Closing.")
}
func MQKeepAliveWorker() {
conn, err := amqp.Dial("amqp://" + cfg.Rabbit.User + ":" + cfg.Rabbit.Password + "@" + cfg.Rabbit.Host + "/" + cfg.Rabbit.SendQueue)
failOnError(err, "MQKeepAliveWorker : Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "MQKeepAliveWorker : Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"keepalive", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "MQKeepAliveWorker : Failed to declare a queue")
for true {
t := time.Now()
m := MQKeepAlive{
UserID64: ownUserID64,
Nickname: cfg.Rabbit.User,
Queue: cfg.Rabbit.ReceiveQueue,
Date: t,
}
b, err := json.Marshal(m)
if err != nil {
logOnError(err, "MQKeepAliveWorker : Marshaling message.")
} else {
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: []byte(b),
})
if err != nil {
logOnError(err, "MQKeepAliveWorker : Publishing message.")
}
}
t.Sleep(time.Minute)
}
log.Printf("MQKeepAliveWorker : Closing.")
}