From 52d3ba9cce6166766e538256b547dbfded2db3c5 Mon Sep 17 00:00:00 2001 From: shoopea Date: Fri, 28 Jun 2019 18:32:16 +0800 Subject: [PATCH] test --- workers.go | 87 ++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 61 insertions(+), 26 deletions(-) diff --git a/workers.go b/workers.go index a1800f5..c93be61 100644 --- a/workers.go +++ b/workers.go @@ -230,6 +230,67 @@ func MQKeepAliveWorker() { } +func MQTGCmdWorker(id int, cmds <-chan TGCommand) { + //log.Printf("MQTGCmdWorker[" + strconv.Itoa(id) + "] : Starting.") + for c := range cmds { + if _, ok := clientsKeepAlive.Load(c.FromUserID64); ok { + j, err := json.Marshal(c) + logOnError(err, "MQTGCmdWorker["+strconv.Itoa(id)+"] : Marshal(c)") + //log.Printf("MQTGCmdWorker["+strconv.Itoa(id)+"] : new command.\n%s\n", string(j)) + for clientsQueue[c.FromUserID64].Connection.IsClosed() { + clientsQueue[c.FromUserID64].Connection, err = amqp.Dial("amqp://" + c.User + ":" + c.Password + "@" + c.Host + "/" + c.Path) + logOnError(err, "MQKeepAliveWorker : Cannot open MQ connection") + if err != nil { + clientsQueue[c.FromUserID64].Connection.Close() + time.Sleep(15 * time.Second) + continue + } + + clientsQueue[c.FromUserID64].Channel, err = clientsQueue[c.FromUserID64].Connection.Channel() + logOnError(err, "MQKeepAliveWorker : Cannot open MQ channel") + if err != nil { + clientsQueue[c.FromUserID64].Channel.Close() + clientsQueue[c.FromUserID64].Connection.Close() + time.Sleep(15 * time.Second) + continue + } + + clientsQueue[c.FromUserID64].Queue, err = clientsQueue[c.FromUserID64].Channel.QueueDeclare( + "keepalive", // name + false, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + logOnError(err, "MQKeepAliveWorker : Failed to declare a queue") + if err != nil { + clientsQueue[c.FromUserID64].Channel.Close() + clientsQueue[c.FromUserID64].Connection.Close() + time.Sleep(15 * time.Second) + continue + } + + } + err = clientsQueue[c.FromUserID64].Channel.Publish( + "", // exchange + clientsQueue[c.FromUserID64].Queue.Name, // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "application/json", + Body: []byte(j), + }) + logOnError(err, "MQTGCmdWorker["+strconv.Itoa(id)+"] : Publishing message.") + } else { + log.Printf("MQTGCmdWorker["+strconv.Itoa(id)+"] : client %d offline.\n", c.FromUserID64) + } + } + + log.Printf("MQTGCmdWorker[" + strconv.Itoa(id) + "] : Closing.") + +} + func SQLCWMsgWorker(id int, msgs <-chan ChatWarsMessage, objIds chan<- int64) { //log.Printf("SQLCWMsgWorker[" + strconv.Itoa(id) + "] : Starting.") for m := range msgs { @@ -491,32 +552,6 @@ func TGCmdWorker(id int, b *tb.Bot, cmds <-chan TGCommand) { log.Printf("TGCmdWorker[" + strconv.Itoa(id) + "] : Closing.") } -func MQTGCmdWorker(id int, cmds <-chan TGCommand) { - //log.Printf("MQTGCmdWorker[" + strconv.Itoa(id) + "] : Starting.") - for c := range cmds { - if _, ok := clientsKeepAlive.Load(c.FromUserID64); ok { - j, err := json.Marshal(c) - logOnError(err, "MQTGCmdWorker["+strconv.Itoa(id)+"] : Marshal(c)") - //log.Printf("MQTGCmdWorker["+strconv.Itoa(id)+"] : new command.\n%s\n", string(j)) - err = clientsQueue[c.FromUserID64].Channel.Publish( - "", // exchange - clientsQueue[c.FromUserID64].Queue.Name, // routing key - false, // mandatory - false, // immediate - amqp.Publishing{ - ContentType: "application/json", - Body: []byte(j), - }) - logOnError(err, "MQTGCmdWorker["+strconv.Itoa(id)+"] : Publishing message.") - } else { - log.Printf("MQTGCmdWorker["+strconv.Itoa(id)+"] : client %d offline.\n", c.FromUserID64) - } - } - - log.Printf("MQTGCmdWorker[" + strconv.Itoa(id) + "] : Closing.") - -} - func MQTidyKeepAliveWorker() { //log.Printf("MQTidyKeepAliveWorker : Starting.") for true {