This commit is contained in:
shoopea 2019-06-14 11:24:43 +08:00
parent 92c47a28ee
commit c94c8efd46
3 changed files with 46 additions and 41 deletions

View File

@ -4,6 +4,34 @@ import (
"strings" "strings"
) )
func clientKeepAlive(k, v interface{}) bool {
c := v.(MQKeepAlive)
if c.Date.Add(2 * KeepAliveHeartBeatSeconds * time.Second).Before(time.Now()) {
msgs, err := clientsQueue[v.UserID64].Channel.QueuePurge(clientsQueue[v.UserID64].Queue.Name, false)
logOnError(err, "clientKeepAlive : Channel.QueuePurge()")
err = clientsQueue[v.UserID64].Channel.Close()
logOnError(err, "clientKeepAlive : Channel.Close()")
err = clientsQueue[v.UserID64].Connection.Close()
logOnError(err, "clientKeepAlive : Connection.Close()")
c := TGCommand{
Type: commandSendMsg,
ToUserID64: v.UserID64,
Text: "Timeout, purging and closing command queue.",
}
TGCmdQueue <- c
c = TGCommand{
Type: commandSendMsg,
ToUserID64: cfg.Bot.Admin,
Text: fmt.Sprintf("Client %s timed out (%d messages purged).", v.Nickname, msgs),
}
TGCmdQueue <- c
clientsKeepAlive.Delete(c.UserID64)
}
return true
}
func clientSendCWMsg(userID64 int64, s string) { func clientSendCWMsg(userID64 int64, s string) {
c := TGCommand{ c := TGCommand{
Type: commandSendMsg, Type: commandSendMsg,

27
def.go
View File

@ -383,19 +383,20 @@ const (
objJobPriorityRescanAllMsg = 3 objJobPriorityRescanAllMsg = 3
objJobPriorityBackup = 4 objJobPriorityBackup = 4
MQGetMsgWorkers = 12 MQGetMsgWorkers = 12
MQCWMsgQueueSize = 100 MQCWMsgQueueSize = 100
SQLCWMsgWorkers = 6 SQLCWMsgWorkers = 6
SQLIdentifyMsgWorkers = 6 SQLIdentifyMsgWorkers = 6
SQLMsgIdentifyQueueSize = 100 SQLMsgIdentifyQueueSize = 100
SQLMsgRescanJobSize = 25 SQLMsgRescanJobSize = 25
JobWorkers = 12 JobWorkers = 12
JobQueueSize = 100 JobQueueSize = 100
TGCmdWorkers = 3 TGCmdWorkers = 3
TGCmdQueueSize = 100 TGCmdQueueSize = 100
MQTGCmdWorkers = 3 MQTGCmdWorkers = 3
MQTGCmdQueueSize = 100 MQTGCmdQueueSize = 100
SQLJobSliceSize = 25 SQLJobSliceSize = 25
KeepAliveHeartBeatSeconds = 5
) )
var ( var (

View File

@ -327,10 +327,10 @@ func MQTGCmdWorker(id int, cmds <-chan TGCommand) {
logOnError(err, "MQTGCmdWorker["+strconv.Itoa(id)+"] : Marshal(c)") logOnError(err, "MQTGCmdWorker["+strconv.Itoa(id)+"] : Marshal(c)")
//log.Printf("MQTGCmdWorker["+strconv.Itoa(id)+"] : new command.\n%s\n", string(j)) //log.Printf("MQTGCmdWorker["+strconv.Itoa(id)+"] : new command.\n%s\n", string(j))
err = clientsQueue[c.FromUserID64].Channel.Publish( err = clientsQueue[c.FromUserID64].Channel.Publish(
"", // exchange "", // exchange
clientsQueue[c.FromUserID64].Queue.Name, // routing key clientsQueue[c.FromUserID64].Queue.Name, // routing key
false, // mandatory false, // mandatory
false, // immediate false, // immediate
amqp.Publishing{ amqp.Publishing{
ContentType: "application/json", ContentType: "application/json",
Body: []byte(j), Body: []byte(j),
@ -440,31 +440,7 @@ func MQTidyKeepAliveWorker() {
//log.Printf("MQTidyKeepAliveWorker : Starting.") //log.Printf("MQTidyKeepAliveWorker : Starting.")
for true { for true {
t := time.Now() t := time.Now()
for _, v := range clientsKeepAlive { clientsKeepAlive.Range(clientKeepAlive)
if v.Date.Add(90 * time.Second).Before(time.Now()) {
msgs, err := clientsQueue[v.UserID64].Channel.QueuePurge(clientsQueue[v.UserID64].Queue.Name, false)
logOnError(err, "MQTidyKeepAliveWorker : Channel.QueuePurge()")
err = clientsQueue[v.UserID64].Channel.Close()
logOnError(err, "MQTidyKeepAliveWorker : Channel.Close()")
err = clientsQueue[v.UserID64].Connection.Close()
logOnError(err, "MQTidyKeepAliveWorker : Connection.Close()")
c := TGCommand{
Type: commandSendMsg,
ToUserID64: v.UserID64,
Text: "Timeout, purging and closing command queue.",
}
TGCmdQueue <- c
c = TGCommand{
Type: commandSendMsg,
ToUserID64: cfg.Bot.Admin,
Text: fmt.Sprintf("Client %s timed out (%d messages purged).", v.Nickname, msgs),
}
TGCmdQueue <- c
clientsKeepAlive.Delete(v.UserID64)
}
}
time.Sleep(time.Until(t.Add(time.Second))) time.Sleep(time.Until(t.Add(time.Second)))
} }
log.Printf("MQTidyKeepAliveWorker : Closing.") log.Printf("MQTidyKeepAliveWorker : Closing.")