From afa8b181ae5b0305c982726484fd70501c665298 Mon Sep 17 00:00:00 2001 From: shoopea Date: Fri, 28 Jun 2019 15:54:54 +0800 Subject: [PATCH] Revert "test" This reverts commit b98069f4ba5cb3f1b43564ecbc2948884c5be24f. --- workers.go | 131 +++++++++++++++++++++++++++-------------------------- 1 file changed, 66 insertions(+), 65 deletions(-) diff --git a/workers.go b/workers.go index ba5e2f0..8331a68 100644 --- a/workers.go +++ b/workers.go @@ -62,7 +62,6 @@ func MQGetMsgWorker(id int, msgs chan<- ChatWarsMessage) { msgs <- x } } - log.Printf("MQGetMsgWorker[" + strconv.Itoa(id) + "] : Closing queue.\n") s.Close() } @@ -92,74 +91,76 @@ func MQKeepAliveWorker() { log.Printf("MQKeepAliveWorker : Session address : %p.\n", &s) log.Printf("MQKeepAliveWorker : Connection address : %p.\n", &s.MQConnection) log.Printf("MQKeepAliveWorker : Channel address : %p.\n", &s.MQChannel) - /* - q, err := s.MQChannel.QueueDeclare( - s.Queue, // name - false, // durable - false, // delete when unused - false, // exclusive - false, // no-wait - nil, // arguments + q, err := s.MQChannel.QueueDeclare( + s.Queue, // name + false, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments - ) - m, err := s.MQChannel.Consume( - q.Name, // queue - "", // consumer - true, // auto-ack - false, // exclusive - false, // no-local - false, // no-wait - nil, // args - ) - - - for d := range m { - log.Printf("MQKeepAliveWorker : Received a message: %s", string(d.Body)) - x := MQKeepAlive{} - err = json.Unmarshal(d.Body, &x) - logOnError(err, "MQKeepAliveWorker : Can't unmarshal.\n"+string(d.Body)) - if err == nil { - if x.Date.Add(10 * time.Second).Before(time.Now()) { - // outdated keep-alive coming from client - } else if v, ok := clientsKeepAlive.Load(x.UserID64); ok { - k := v.(*MQKeepAlive) - k.Date = x.Date - } else { - cs := MQSession{ - User: cfg.Rabbit.User, - Password: cfg.Rabbit.Password, - Host: cfg.Rabbit.Host, - Path: x.Queue, - SSL: false, - Queue: "msg", - isConnected: false, - } - err = cs.Open() - logOnError(err, "MQKeepAliveWorker : Failed to open MQ session") - clientsKeepAlive.Store(x.UserID64, &x) - clientsQueue[x.UserID64] = &cs - - c := TGCommand{ - Type: commandSendMsg, - ToUserID64: x.UserID64, - Text: "Your client is connected.", - } - TGCmdQueue <- c - c = TGCommand{ - Type: commandSendMsg, - ToUserID64: cfg.Bot.Admin, - Text: fmt.Sprintf("Client `%s` is connected.", x.Nickname), - } - TGCmdQueue <- c - - clientSendCWMsg(x.UserID64, `🏅Me`) - clientSendCWMsg(x.UserID64, `/hero`) + ) + m, err := s.MQChannel.Consume( + q.Name, // queue + "", // consumer + true, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + for d := range m { + log.Printf("MQKeepAliveWorker : Received a message: %s", string(d.Body)) + x := MQKeepAlive{} + err = json.Unmarshal(d.Body, &x) + logOnError(err, "MQKeepAliveWorker : Can't unmarshal.\n"+string(d.Body)) + if err == nil { + if x.Date.Add(10 * time.Second).Before(time.Now()) { + // outdated keep-alive coming from client + } else if v, ok := clientsKeepAlive.Load(x.UserID64); ok { + k := v.(*MQKeepAlive) + k.Date = x.Date + } else { + cs := MQSession{ + User: cfg.Rabbit.User, + Password: cfg.Rabbit.Password, + Host: cfg.Rabbit.Host, + Path: x.Queue, + SSL: false, + Queue: "msg", + isConnected: false, } + err = cs.Open() + logOnError(err, "MQKeepAliveWorker : Failed to open MQ session") + clientsKeepAlive.Store(x.UserID64, &x) + clientsQueue[x.UserID64] = &cs + + c := TGCommand{ + Type: commandSendMsg, + ToUserID64: x.UserID64, + Text: "Your client is connected.", + } + TGCmdQueue <- c + c = TGCommand{ + Type: commandSendMsg, + ToUserID64: cfg.Bot.Admin, + Text: fmt.Sprintf("Client `%s` is connected.", x.Nickname), + } + TGCmdQueue <- c + + clientSendCWMsg(x.UserID64, `🏅Me`) + /* + c = TGCommand{ + Type: commandSendMsg, + FromUserID64: x.UserID64, + ToChatID64: userID64ChtWrsBot, + Text: `/hero`, + } + MQTGCmdQueue <- c + */ } } - */ - s.Close() - log.Printf("MQKeepAliveWorker : Closing queue.\n") + } } log.Printf("MQKeepAliveWorker : Closing.")