From c94c8efd46b6763fa2babd03c2b3e8366ff7d330 Mon Sep 17 00:00:00 2001 From: shoopea Date: Fri, 14 Jun 2019 11:24:43 +0800 Subject: [PATCH] test --- client.go | 28 ++++++++++++++++++++++++++++ def.go | 27 ++++++++++++++------------- workers.go | 32 ++++---------------------------- 3 files changed, 46 insertions(+), 41 deletions(-) diff --git a/client.go b/client.go index 0149fce..ff5fe89 100644 --- a/client.go +++ b/client.go @@ -4,6 +4,34 @@ import ( "strings" ) +func clientKeepAlive(k, v interface{}) bool { + c := v.(MQKeepAlive) + if c.Date.Add(2 * KeepAliveHeartBeatSeconds * time.Second).Before(time.Now()) { + msgs, err := clientsQueue[v.UserID64].Channel.QueuePurge(clientsQueue[v.UserID64].Queue.Name, false) + logOnError(err, "clientKeepAlive : Channel.QueuePurge()") + err = clientsQueue[v.UserID64].Channel.Close() + logOnError(err, "clientKeepAlive : Channel.Close()") + err = clientsQueue[v.UserID64].Connection.Close() + logOnError(err, "clientKeepAlive : Connection.Close()") + c := TGCommand{ + Type: commandSendMsg, + ToUserID64: v.UserID64, + Text: "Timeout, purging and closing command queue.", + } + TGCmdQueue <- c + c = TGCommand{ + Type: commandSendMsg, + ToUserID64: cfg.Bot.Admin, + Text: fmt.Sprintf("Client %s timed out (%d messages purged).", v.Nickname, msgs), + } + TGCmdQueue <- c + + clientsKeepAlive.Delete(c.UserID64) + + } + return true +} + func clientSendCWMsg(userID64 int64, s string) { c := TGCommand{ Type: commandSendMsg, diff --git a/def.go b/def.go index 5f9abc0..ca505e3 100644 --- a/def.go +++ b/def.go @@ -383,19 +383,20 @@ const ( objJobPriorityRescanAllMsg = 3 objJobPriorityBackup = 4 - MQGetMsgWorkers = 12 - MQCWMsgQueueSize = 100 - SQLCWMsgWorkers = 6 - SQLIdentifyMsgWorkers = 6 - SQLMsgIdentifyQueueSize = 100 - SQLMsgRescanJobSize = 25 - JobWorkers = 12 - JobQueueSize = 100 - TGCmdWorkers = 3 - TGCmdQueueSize = 100 - MQTGCmdWorkers = 3 - MQTGCmdQueueSize = 100 - SQLJobSliceSize = 25 + MQGetMsgWorkers = 12 + MQCWMsgQueueSize = 100 + SQLCWMsgWorkers = 6 + SQLIdentifyMsgWorkers = 6 + SQLMsgIdentifyQueueSize = 100 + SQLMsgRescanJobSize = 25 + JobWorkers = 12 + JobQueueSize = 100 + TGCmdWorkers = 3 + TGCmdQueueSize = 100 + MQTGCmdWorkers = 3 + MQTGCmdQueueSize = 100 + SQLJobSliceSize = 25 + KeepAliveHeartBeatSeconds = 5 ) var ( diff --git a/workers.go b/workers.go index 90bac99..3aae38a 100644 --- a/workers.go +++ b/workers.go @@ -327,10 +327,10 @@ func MQTGCmdWorker(id int, cmds <-chan TGCommand) { logOnError(err, "MQTGCmdWorker["+strconv.Itoa(id)+"] : Marshal(c)") //log.Printf("MQTGCmdWorker["+strconv.Itoa(id)+"] : new command.\n%s\n", string(j)) err = clientsQueue[c.FromUserID64].Channel.Publish( - "", // exchange + "", // exchange clientsQueue[c.FromUserID64].Queue.Name, // routing key - false, // mandatory - false, // immediate + false, // mandatory + false, // immediate amqp.Publishing{ ContentType: "application/json", Body: []byte(j), @@ -440,31 +440,7 @@ func MQTidyKeepAliveWorker() { //log.Printf("MQTidyKeepAliveWorker : Starting.") for true { t := time.Now() - for _, v := range clientsKeepAlive { - if v.Date.Add(90 * time.Second).Before(time.Now()) { - msgs, err := clientsQueue[v.UserID64].Channel.QueuePurge(clientsQueue[v.UserID64].Queue.Name, false) - logOnError(err, "MQTidyKeepAliveWorker : Channel.QueuePurge()") - err = clientsQueue[v.UserID64].Channel.Close() - logOnError(err, "MQTidyKeepAliveWorker : Channel.Close()") - err = clientsQueue[v.UserID64].Connection.Close() - logOnError(err, "MQTidyKeepAliveWorker : Connection.Close()") - c := TGCommand{ - Type: commandSendMsg, - ToUserID64: v.UserID64, - Text: "Timeout, purging and closing command queue.", - } - TGCmdQueue <- c - c = TGCommand{ - Type: commandSendMsg, - ToUserID64: cfg.Bot.Admin, - Text: fmt.Sprintf("Client %s timed out (%d messages purged).", v.Nickname, msgs), - } - TGCmdQueue <- c - - clientsKeepAlive.Delete(v.UserID64) - - } - } + clientsKeepAlive.Range(clientKeepAlive) time.Sleep(time.Until(t.Add(time.Second))) } log.Printf("MQTidyKeepAliveWorker : Closing.")