test keep alive

This commit is contained in:
shoopea 2019-05-17 15:02:35 +08:00
parent caaf63aaac
commit 75efdb2e34
3 changed files with 89 additions and 0 deletions

7
def.go
View File

@ -6,6 +6,13 @@ import (
"time" "time"
) )
type MQKeepAlive struct {
UserID64 int64 `json:"user_id"`
Nickname string `json:"nick"`
Queue string `json:"queue"`
Date time.Time `json:"date"`
}
type TGCommand struct { type TGCommand struct {
Type int64 `json:"type"` Type int64 `json:"type"`
FromChatID64 int64 `json:"from_chat_id"` FromChatID64 int64 `json:"from_chat_id"`

View File

@ -50,6 +50,7 @@ var (
TGCmdQueue chan TGCommand TGCmdQueue chan TGCommand
MQTGCmdQueue chan TGCommand MQTGCmdQueue chan TGCommand
msgParsingRules map[int]MessageParsingRule msgParsingRules map[int]MessageParsingRule
clientsQueues map[int64]MQKeepAlive
) )
func PrintText(m *tb.Message) { func PrintText(m *tb.Message) {
@ -107,6 +108,7 @@ func main() {
SQLMsgIdentifyQueue = make(chan int64, 100) SQLMsgIdentifyQueue = make(chan int64, 100)
TGCmdQueue = make(chan TGCommand, 100) TGCmdQueue = make(chan TGCommand, 100)
MQTGCmdQueue = make(chan TGCommand, 100) MQTGCmdQueue = make(chan TGCommand, 100)
clientsQueues = make(map[int64]MQKeepAlive)
for w := 1; w <= MQGetMsgWorkers; w++ { for w := 1; w <= MQGetMsgWorkers; w++ {
go MQGetMsgWorker(w, MQCWMsgQueue) go MQGetMsgWorker(w, MQCWMsgQueue)
@ -126,6 +128,8 @@ func main() {
for w := 1; w <= MQTGCmdWorkers; w++ { for w := 1; w <= MQTGCmdWorkers; w++ {
go MQTGCmdWorker(w, MQTGCmdQueue) go MQTGCmdWorker(w, MQTGCmdQueue)
} }
go MQKeepAliveWorker()
go MQTidyKeepAliveWorker()
log.Println("Bot started !") log.Println("Bot started !")

View File

@ -13,6 +13,7 @@ import (
) )
func MQGetMsgWorker(id int, msgs chan<- ChatWarsMessage) { func MQGetMsgWorker(id int, msgs chan<- ChatWarsMessage) {
log.Printf("MQGetMsgWorker[" + strconv.Itoa(id) + "] : Starting.")
var x ChatWarsMessage var x ChatWarsMessage
conn, err := amqp.Dial("amqp://" + cfg.Rabbit.User + ":" + cfg.Rabbit.Password + "@" + cfg.Rabbit.Host + "/" + cfg.Rabbit.Queue) conn, err := amqp.Dial("amqp://" + cfg.Rabbit.User + ":" + cfg.Rabbit.Password + "@" + cfg.Rabbit.Host + "/" + cfg.Rabbit.Queue)
failOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Failed to connect to RabbitMQ") failOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Failed to connect to RabbitMQ")
@ -224,3 +225,80 @@ func MQTGCmdWorker(id int, cmds <-chan TGCommand) {
log.Printf("MQTGCmdWorker[" + strconv.Itoa(id) + "] : Closing.") log.Printf("MQTGCmdWorker[" + strconv.Itoa(id) + "] : Closing.")
} }
func MQKeepAliveWorker() {
log.Printf("MQKeepAliveWorker : Starting.")
conn, err := amqp.Dial("amqp://" + cfg.Rabbit.User + ":" + cfg.Rabbit.Password + "@" + cfg.Rabbit.Host + "/" + cfg.Rabbit.Queue)
failOnError(err, "MQKeepAliveWorker : Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "MQKeepAliveWorker : Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"keepalive", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "MQKeepAliveWorker : Failed to declare a queue")
m, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "MQKeepAliveWorker : Failed to register a consumer")
for d := range m {
// log.Printf("MQKeepAliveWorker : Received a message: %s", string(d.Body))
x := MQKeepAlive{}
err = json.Unmarshal(d.Body, &x)
logOnError(err, "MQKeepAliveWorker : Can't unmarshal.\n"+string(d.Body))
if err == nil {
if x.Date.Add(time.Minute).Before(time.Now()) {
// outdate keep-alive
} else if _, ok := clientsQueues[x.UserID64]; ok {
clientsQueues[x.UserID64].Date = x.Date
} else {
clientsQueues[x.UserID64] = x
c := TGCommand{
Type: commandSendMsg,
ToChatID64: x.UserID64,
Text: "You are connected.",
}
TGCmdQueue <- c
}
}
}
log.Printf("MQKeepAliveWorker : Closing.")
}
func MQTidyKeepAliveWorker() {
log.Printf("MQTidyKeepAliveWorker : Starting.")
for true {
t := time.Now()
for k, v := range clientsQueues {
if v.Date.Add(90 * time.Second).Before(time.Now()) {
c := TGCommand{
Type: commandSendMsg,
ToChatID64: x.UserID64,
Text: "Timeout.",
}
TGCmdQueue <- c
delete(clientsQueues, x.UserID64)
}
}
time.Sleep(time.Until(t.Add(time.Minute)))
}
log.Printf("MQTidyKeepAliveWorker : Closing.")
}