This commit is contained in:
shoopea 2019-05-17 16:09:09 +08:00
parent 5d3e480a54
commit 4d6ebb6500
3 changed files with 52 additions and 9 deletions

10
def.go
View File

@ -1,9 +1,11 @@
package main package main
import ( import (
tb "gopkg.in/tucnak/telebot.v2"
"regexp" "regexp"
"time" "time"
"github.com/streadway/amqp"
tb "gopkg.in/tucnak/telebot.v2"
) )
type MQKeepAlive struct { type MQKeepAlive struct {
@ -13,6 +15,12 @@ type MQKeepAlive struct {
Date time.Time `json:"date"` Date time.Time `json:"date"`
} }
type MQClient struct {
Connnection *amqp.Connection
Channel *amqp.Channel
Queue amqp.Queue
}
type TGCommand struct { type TGCommand struct {
Type int64 `json:"type"` Type int64 `json:"type"`
FromChatID64 int64 `json:"from_chat_id"` FromChatID64 int64 `json:"from_chat_id"`

View File

@ -50,7 +50,8 @@ var (
TGCmdQueue chan TGCommand TGCmdQueue chan TGCommand
MQTGCmdQueue chan TGCommand MQTGCmdQueue chan TGCommand
msgParsingRules map[int]MessageParsingRule msgParsingRules map[int]MessageParsingRule
clientsQueues map[int64]*MQKeepAlive clientsKeepAlive map[int64]*MQKeepAlive
clientsQueue map[int64]*MQClient
) )
func PrintText(m *tb.Message) { func PrintText(m *tb.Message) {
@ -108,7 +109,8 @@ func main() {
SQLMsgIdentifyQueue = make(chan int64, 100) SQLMsgIdentifyQueue = make(chan int64, 100)
TGCmdQueue = make(chan TGCommand, 100) TGCmdQueue = make(chan TGCommand, 100)
MQTGCmdQueue = make(chan TGCommand, 100) MQTGCmdQueue = make(chan TGCommand, 100)
clientsQueues = make(map[int64]*MQKeepAlive) clientsQueue = make(map[int64]*MQQueue)
clientsKeepAlive = make(map[int64]*MQKeepAlive)
for w := 1; w <= MQGetMsgWorkers; w++ { for w := 1; w <= MQGetMsgWorkers; w++ {
go MQGetMsgWorker(w, MQCWMsgQueue) go MQGetMsgWorker(w, MQCWMsgQueue)

View File

@ -279,16 +279,38 @@ func MQKeepAliveWorker() {
if err == nil { if err == nil {
if x.Date.Add(time.Minute).Before(time.Now()) { if x.Date.Add(time.Minute).Before(time.Now()) {
// outdate keep-alive // outdate keep-alive
} else if _, ok := clientsQueues[x.UserID64]; ok { } else if _, ok := clientsKeepAlive[x.UserID64]; ok {
clientsQueues[x.UserID64].Date = x.Date clientsKeepAlive[x.UserID64].Date = x.Date
} else { } else {
clientsQueues[x.UserID64] = &x clt := MQClient{}
clt.Connection, err = amqp.Dial("amqp://" + cfg.Rabbit.User + ":" + cfg.Rabbit.Password + "@" + cfg.Rabbit.Host + "/" + x.Queue)
logOnError(err, "MQKeepAliveWorker : Failed to connect to RabbitMQ")
clt.Channel, err = clt.conn.Channel()
logOnError(err, "MQKeepAliveWorker : Failed to open a channel")
clt.Queue, err = ch.QueueDeclare(
"msg", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
logOnError(err, "MQKeepAliveWorker : Failed to declare a queue")
clientsKeepAlive[x.UserID64] = &x
c := TGCommand{ c := TGCommand{
Type: commandSendMsg, Type: commandSendMsg,
ToUserID64: x.UserID64, ToUserID64: x.UserID64,
Text: "You are connected.", Text: "You are connected.",
} }
TGCmdQueue <- c TGCmdQueue <- c
c = TGCommand{
Type: commandSendMsg,
ToUserID64: cfg.Bot.Admin,
Text: fmt.Sprintf("%s is connected.", x.Nickname),
}
TGCmdQueue <- c
} }
} }
} }
@ -301,15 +323,26 @@ func MQTidyKeepAliveWorker() {
log.Printf("MQTidyKeepAliveWorker : Starting.") log.Printf("MQTidyKeepAliveWorker : Starting.")
for true { for true {
t := time.Now() t := time.Now()
for _, v := range clientsQueues { for _, v := range clientsKeepAlive {
if v.Date.Add(90 * time.Second).Before(time.Now()) { if v.Date.Add(90 * time.Second).Before(time.Now()) {
msgs, err := clt[v.UserID64].Channel.QueuePurge(clt[v.UserID64].Queue.Name, false)
_ = clt[v.UserID64].Channel.Close()
_ = clt[v.UserID64].Connection.Close()
c := TGCommand{ c := TGCommand{
Type: commandSendMsg, Type: commandSendMsg,
ToUserID64: v.UserID64, ToUserID64: v.UserID64,
Text: "Timeout.", Text: "Timeout, purging and closing command queue.",
} }
TGCmdQueue <- c TGCmdQueue <- c
delete(clientsQueues, v.UserID64) c := TGCommand{
Type: commandSendMsg,
ToUserID64: cfg.Bot.Admin,
Text: fmt.Sprintf("Client %s timed out (%d messages purged).", v.Nickname, msgs),
}
TGCmdQueue <- c
delete(clientsKeepAlive, v.UserID64)
} }
} }
time.Sleep(time.Until(t.Add(time.Second))) time.Sleep(time.Until(t.Add(time.Second)))