From 0a84246a2625953b7c6cf4c831dae110ed45a38c Mon Sep 17 00:00:00 2001 From: shoopea Date: Fri, 14 Jun 2019 11:03:43 +0800 Subject: [PATCH] test --- README.md | 1 + main.go | 9 +++++---- workers.go | 15 ++++++++------- 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 1fc53c2..2f8ba48 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,5 @@ ChirpNestBot +- [ ] MQTGCmdWorker[2] : Publishing message.: Exception (504) Reason: "channel/connection is not open" (add check in MQKeepAliveWorker ?) - [ ] Make sure refresh message receives the message later (jobMsgRefresh) - [ ] Adjust /time clock for auctions/... (delay between cw and real time ?) - [ ] Update old auctions with client diff --git a/main.go b/main.go index f002d6e..1ba8af8 100644 --- a/main.go +++ b/main.go @@ -59,11 +59,11 @@ var ( MQTGCmdQueue chan TGCommand JobQueue chan Job - msgParsingRules map[int]MessageParsingRule - clientsKeepAlive map[int64]*MQKeepAlive - clientsQueue map[int64]*MQClient + msgParsingRules map[int]MessageParsingRule + clientsQueue map[int64]*MQClient - clientsCW *sync.Map + clientsCW *sync.Map + clientsKeepAlive *sync.Map ) func PrintText(m *tb.Message) { @@ -156,6 +156,7 @@ func main() { clientsQueue = make(map[int64]*MQClient) clientsKeepAlive = make(map[int64]*MQKeepAlive) clientsCW = new(sync.Map) + clientsKeepAlive = new(sync.Map) for w := 1; w <= MQGetMsgWorkers; w++ { go MQGetMsgWorker(w, MQCWMsgQueue) diff --git a/workers.go b/workers.go index 326a63f..9c32762 100644 --- a/workers.go +++ b/workers.go @@ -322,7 +322,7 @@ func TGCmdWorker(id int, b *tb.Bot, cmds <-chan TGCommand) { func MQTGCmdWorker(id int, cmds <-chan TGCommand) { //log.Printf("MQTGCmdWorker[" + strconv.Itoa(id) + "] : Starting.") for c := range cmds { - if _, ok := clientsKeepAlive[c.FromUserID64]; ok { + if _, ok := clientsKeepAlive.Load(c.FromUserID64); ok { j, err := json.Marshal(c) logOnError(err, "MQTGCmdWorker["+strconv.Itoa(id)+"] : Marshal(c)") //log.Printf("MQTGCmdWorker["+strconv.Itoa(id)+"] : new command.\n%s\n", string(j)) @@ -382,10 +382,11 @@ func MQKeepAliveWorker() { err = json.Unmarshal(d.Body, &x) logOnError(err, "MQKeepAliveWorker : Can't unmarshal.\n"+string(d.Body)) if err == nil { - if x.Date.Add(time.Minute).Before(time.Now()) { - // outdated keep-alive - } else if _, ok := clientsKeepAlive[x.UserID64]; ok { - clientsKeepAlive[x.UserID64].Date = x.Date + if x.Date.Add(10 * time.Second).Before(time.Now()) { + // outdated keep-alive coming from client + } else if v, ok := clientsKeepAlive.Load(x.UserID64); ok { + k := v.(MQKeepAlive) + k.Date = x.Date } else { clt := MQClient{} clt.Connection, err = amqp.Dial("amqp://" + cfg.Rabbit.User + ":" + cfg.Rabbit.Password + "@" + cfg.Rabbit.Host + "/" + x.Queue) @@ -401,7 +402,7 @@ func MQKeepAliveWorker() { nil, // arguments ) logOnError(err, "MQKeepAliveWorker : Failed to declare a queue") - clientsKeepAlive[x.UserID64] = &x + clientsKeepAlive.Load(x.UserID64, &x) clientsQueue[x.UserID64] = &clt c := TGCommand{ @@ -460,7 +461,7 @@ func MQTidyKeepAliveWorker() { } TGCmdQueue <- c - delete(clientsKeepAlive, v.UserID64) + clientsKeepAlive.Delete(v.UserID64) } }