This commit is contained in:
shoopea 2019-06-14 11:03:43 +08:00
parent c483922063
commit 0a84246a26
3 changed files with 14 additions and 11 deletions

View File

@ -1,4 +1,5 @@
ChirpNestBot
- [ ] MQTGCmdWorker[2] : Publishing message.: Exception (504) Reason: "channel/connection is not open" (add check in MQKeepAliveWorker ?)
- [ ] Make sure refresh message receives the message later (jobMsgRefresh)
- [ ] Adjust /time clock for auctions/... (delay between cw and real time ?)
- [ ] Update old auctions with client

View File

@ -59,11 +59,11 @@ var (
MQTGCmdQueue chan TGCommand
JobQueue chan Job
msgParsingRules map[int]MessageParsingRule
clientsKeepAlive map[int64]*MQKeepAlive
clientsQueue map[int64]*MQClient
msgParsingRules map[int]MessageParsingRule
clientsQueue map[int64]*MQClient
clientsCW *sync.Map
clientsCW *sync.Map
clientsKeepAlive *sync.Map
)
func PrintText(m *tb.Message) {
@ -156,6 +156,7 @@ func main() {
clientsQueue = make(map[int64]*MQClient)
clientsKeepAlive = make(map[int64]*MQKeepAlive)
clientsCW = new(sync.Map)
clientsKeepAlive = new(sync.Map)
for w := 1; w <= MQGetMsgWorkers; w++ {
go MQGetMsgWorker(w, MQCWMsgQueue)

View File

@ -322,7 +322,7 @@ func TGCmdWorker(id int, b *tb.Bot, cmds <-chan TGCommand) {
func MQTGCmdWorker(id int, cmds <-chan TGCommand) {
//log.Printf("MQTGCmdWorker[" + strconv.Itoa(id) + "] : Starting.")
for c := range cmds {
if _, ok := clientsKeepAlive[c.FromUserID64]; ok {
if _, ok := clientsKeepAlive.Load(c.FromUserID64); ok {
j, err := json.Marshal(c)
logOnError(err, "MQTGCmdWorker["+strconv.Itoa(id)+"] : Marshal(c)")
//log.Printf("MQTGCmdWorker["+strconv.Itoa(id)+"] : new command.\n%s\n", string(j))
@ -382,10 +382,11 @@ func MQKeepAliveWorker() {
err = json.Unmarshal(d.Body, &x)
logOnError(err, "MQKeepAliveWorker : Can't unmarshal.\n"+string(d.Body))
if err == nil {
if x.Date.Add(time.Minute).Before(time.Now()) {
// outdated keep-alive
} else if _, ok := clientsKeepAlive[x.UserID64]; ok {
clientsKeepAlive[x.UserID64].Date = x.Date
if x.Date.Add(10 * time.Second).Before(time.Now()) {
// outdated keep-alive coming from client
} else if v, ok := clientsKeepAlive.Load(x.UserID64); ok {
k := v.(MQKeepAlive)
k.Date = x.Date
} else {
clt := MQClient{}
clt.Connection, err = amqp.Dial("amqp://" + cfg.Rabbit.User + ":" + cfg.Rabbit.Password + "@" + cfg.Rabbit.Host + "/" + x.Queue)
@ -401,7 +402,7 @@ func MQKeepAliveWorker() {
nil, // arguments
)
logOnError(err, "MQKeepAliveWorker : Failed to declare a queue")
clientsKeepAlive[x.UserID64] = &x
clientsKeepAlive.Load(x.UserID64, &x)
clientsQueue[x.UserID64] = &clt
c := TGCommand{
@ -460,7 +461,7 @@ func MQTidyKeepAliveWorker() {
}
TGCmdQueue <- c
delete(clientsKeepAlive, v.UserID64)
clientsKeepAlive.Delete(v.UserID64)
}
}