From 4d6ebb6500a0f296c41d8c425d0e646a2118cc32 Mon Sep 17 00:00:00 2001 From: shoopea Date: Fri, 17 May 2019 16:09:09 +0800 Subject: [PATCH] test --- def.go | 10 +++++++++- main.go | 6 ++++-- workers.go | 45 +++++++++++++++++++++++++++++++++++++++------ 3 files changed, 52 insertions(+), 9 deletions(-) diff --git a/def.go b/def.go index 820c594..7a6ebaf 100644 --- a/def.go +++ b/def.go @@ -1,9 +1,11 @@ package main import ( - tb "gopkg.in/tucnak/telebot.v2" "regexp" "time" + + "github.com/streadway/amqp" + tb "gopkg.in/tucnak/telebot.v2" ) type MQKeepAlive struct { @@ -13,6 +15,12 @@ type MQKeepAlive struct { Date time.Time `json:"date"` } +type MQClient struct { + Connnection *amqp.Connection + Channel *amqp.Channel + Queue amqp.Queue +} + type TGCommand struct { Type int64 `json:"type"` FromChatID64 int64 `json:"from_chat_id"` diff --git a/main.go b/main.go index 045b8ce..44b39e4 100644 --- a/main.go +++ b/main.go @@ -50,7 +50,8 @@ var ( TGCmdQueue chan TGCommand MQTGCmdQueue chan TGCommand msgParsingRules map[int]MessageParsingRule - clientsQueues map[int64]*MQKeepAlive + clientsKeepAlive map[int64]*MQKeepAlive + clientsQueue map[int64]*MQClient ) func PrintText(m *tb.Message) { @@ -108,7 +109,8 @@ func main() { SQLMsgIdentifyQueue = make(chan int64, 100) TGCmdQueue = make(chan TGCommand, 100) MQTGCmdQueue = make(chan TGCommand, 100) - clientsQueues = make(map[int64]*MQKeepAlive) + clientsQueue = make(map[int64]*MQQueue) + clientsKeepAlive = make(map[int64]*MQKeepAlive) for w := 1; w <= MQGetMsgWorkers; w++ { go MQGetMsgWorker(w, MQCWMsgQueue) diff --git a/workers.go b/workers.go index ae7ee7b..f00afb7 100644 --- a/workers.go +++ b/workers.go @@ -279,16 +279,38 @@ func MQKeepAliveWorker() { if err == nil { if x.Date.Add(time.Minute).Before(time.Now()) { // outdate keep-alive - } else if _, ok := clientsQueues[x.UserID64]; ok { - clientsQueues[x.UserID64].Date = x.Date + } else if _, ok := clientsKeepAlive[x.UserID64]; ok { + clientsKeepAlive[x.UserID64].Date = x.Date } else { - clientsQueues[x.UserID64] = &x + clt := MQClient{} + clt.Connection, err = amqp.Dial("amqp://" + cfg.Rabbit.User + ":" + cfg.Rabbit.Password + "@" + cfg.Rabbit.Host + "/" + x.Queue) + logOnError(err, "MQKeepAliveWorker : Failed to connect to RabbitMQ") + clt.Channel, err = clt.conn.Channel() + logOnError(err, "MQKeepAliveWorker : Failed to open a channel") + clt.Queue, err = ch.QueueDeclare( + "msg", // name + false, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + logOnError(err, "MQKeepAliveWorker : Failed to declare a queue") + clientsKeepAlive[x.UserID64] = &x + c := TGCommand{ Type: commandSendMsg, ToUserID64: x.UserID64, Text: "You are connected.", } TGCmdQueue <- c + c = TGCommand{ + Type: commandSendMsg, + ToUserID64: cfg.Bot.Admin, + Text: fmt.Sprintf("%s is connected.", x.Nickname), + } + TGCmdQueue <- c + } } } @@ -301,15 +323,26 @@ func MQTidyKeepAliveWorker() { log.Printf("MQTidyKeepAliveWorker : Starting.") for true { t := time.Now() - for _, v := range clientsQueues { + for _, v := range clientsKeepAlive { if v.Date.Add(90 * time.Second).Before(time.Now()) { + msgs, err := clt[v.UserID64].Channel.QueuePurge(clt[v.UserID64].Queue.Name, false) + _ = clt[v.UserID64].Channel.Close() + _ = clt[v.UserID64].Connection.Close() c := TGCommand{ Type: commandSendMsg, ToUserID64: v.UserID64, - Text: "Timeout.", + Text: "Timeout, purging and closing command queue.", } TGCmdQueue <- c - delete(clientsQueues, v.UserID64) + c := TGCommand{ + Type: commandSendMsg, + ToUserID64: cfg.Bot.Admin, + Text: fmt.Sprintf("Client %s timed out (%d messages purged).", v.Nickname, msgs), + } + TGCmdQueue <- c + + delete(clientsKeepAlive, v.UserID64) + } } time.Sleep(time.Until(t.Add(time.Second)))