diff --git a/def.go b/def.go index a1477fe..820c594 100644 --- a/def.go +++ b/def.go @@ -6,6 +6,13 @@ import ( "time" ) +type MQKeepAlive struct { + UserID64 int64 `json:"user_id"` + Nickname string `json:"nick"` + Queue string `json:"queue"` + Date time.Time `json:"date"` +} + type TGCommand struct { Type int64 `json:"type"` FromChatID64 int64 `json:"from_chat_id"` diff --git a/main.go b/main.go index d31ed07..f78c84c 100644 --- a/main.go +++ b/main.go @@ -50,6 +50,7 @@ var ( TGCmdQueue chan TGCommand MQTGCmdQueue chan TGCommand msgParsingRules map[int]MessageParsingRule + clientsQueues map[int64]MQKeepAlive ) func PrintText(m *tb.Message) { @@ -107,6 +108,7 @@ func main() { SQLMsgIdentifyQueue = make(chan int64, 100) TGCmdQueue = make(chan TGCommand, 100) MQTGCmdQueue = make(chan TGCommand, 100) + clientsQueues = make(map[int64]MQKeepAlive) for w := 1; w <= MQGetMsgWorkers; w++ { go MQGetMsgWorker(w, MQCWMsgQueue) @@ -126,6 +128,8 @@ func main() { for w := 1; w <= MQTGCmdWorkers; w++ { go MQTGCmdWorker(w, MQTGCmdQueue) } + go MQKeepAliveWorker() + go MQTidyKeepAliveWorker() log.Println("Bot started !") diff --git a/workers.go b/workers.go index 80ce2ba..1d3d849 100644 --- a/workers.go +++ b/workers.go @@ -13,6 +13,7 @@ import ( ) func MQGetMsgWorker(id int, msgs chan<- ChatWarsMessage) { + log.Printf("MQGetMsgWorker[" + strconv.Itoa(id) + "] : Starting.") var x ChatWarsMessage conn, err := amqp.Dial("amqp://" + cfg.Rabbit.User + ":" + cfg.Rabbit.Password + "@" + cfg.Rabbit.Host + "/" + cfg.Rabbit.Queue) failOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Failed to connect to RabbitMQ") @@ -224,3 +225,80 @@ func MQTGCmdWorker(id int, cmds <-chan TGCommand) { log.Printf("MQTGCmdWorker[" + strconv.Itoa(id) + "] : Closing.") } + +func MQKeepAliveWorker() { + log.Printf("MQKeepAliveWorker : Starting.") + conn, err := amqp.Dial("amqp://" + cfg.Rabbit.User + ":" + cfg.Rabbit.Password + "@" + cfg.Rabbit.Host + "/" + cfg.Rabbit.Queue) + failOnError(err, "MQKeepAliveWorker : Failed to connect to RabbitMQ") + defer conn.Close() + + ch, err := conn.Channel() + failOnError(err, "MQKeepAliveWorker : Failed to open a channel") + defer ch.Close() + + q, err := ch.QueueDeclare( + "keepalive", // name + false, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + failOnError(err, "MQKeepAliveWorker : Failed to declare a queue") + + m, err := ch.Consume( + q.Name, // queue + "", // consumer + true, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + failOnError(err, "MQKeepAliveWorker : Failed to register a consumer") + + for d := range m { + // log.Printf("MQKeepAliveWorker : Received a message: %s", string(d.Body)) + x := MQKeepAlive{} + err = json.Unmarshal(d.Body, &x) + logOnError(err, "MQKeepAliveWorker : Can't unmarshal.\n"+string(d.Body)) + if err == nil { + if x.Date.Add(time.Minute).Before(time.Now()) { + // outdate keep-alive + } else if _, ok := clientsQueues[x.UserID64]; ok { + clientsQueues[x.UserID64].Date = x.Date + } else { + clientsQueues[x.UserID64] = x + c := TGCommand{ + Type: commandSendMsg, + ToChatID64: x.UserID64, + Text: "You are connected.", + } + TGCmdQueue <- c + } + } + } + + log.Printf("MQKeepAliveWorker : Closing.") + +} + +func MQTidyKeepAliveWorker() { + log.Printf("MQTidyKeepAliveWorker : Starting.") + for true { + t := time.Now() + for k, v := range clientsQueues { + if v.Date.Add(90 * time.Second).Before(time.Now()) { + c := TGCommand{ + Type: commandSendMsg, + ToChatID64: x.UserID64, + Text: "Timeout.", + } + TGCmdQueue <- c + delete(clientsQueues, x.UserID64) + } + } + time.Sleep(time.Until(t.Add(time.Minute))) + } + log.Printf("MQTidyKeepAliveWorker : Closing.") +}