diff --git a/README.md b/README.md index 4fa667d..0a9b990 100644 --- a/README.md +++ b/README.md @@ -24,6 +24,7 @@ ChirpNestBot - [ ] Link TelegramUserID and UserID - [ ] Issue with Squire in the /g_roles ? - [x] Foray interception +- [ ] Handle Foray timeout to stop spamming in case something goes bad - [x] Export/import all messages - [x] Test HTML in message - [x] Update auction from broadcast diff --git a/client.go b/client.go index b39e387..438c234 100644 --- a/client.go +++ b/client.go @@ -7,32 +7,21 @@ import ( "time" ) -func clientKeepAlive(k, v interface{}) bool { - clt := v.(*MQKeepAlive) - if clt.Date.Add(3 * KeepAliveHeartBeatSeconds * time.Second).Before(time.Now()) { - msgs, err := clientsQueue[clt.UserID64].Channel.QueuePurge(clientsQueue[clt.UserID64].Queue.Name, false) - logOnError(err, "clientKeepAlive : Channel.QueuePurge()") - err = clientsQueue[clt.UserID64].Channel.Close() - logOnError(err, "clientKeepAlive : Channel.Close()") - err = clientsQueue[clt.UserID64].Connection.Close() - logOnError(err, "clientKeepAlive : Connection.Close()") - c := TGCommand{ - Type: commandSendMsg, - ToUserID64: clt.UserID64, - Text: "Timeout, purging and closing command queue.", - } - TGCmdQueue <- c - c = TGCommand{ - Type: commandSendMsg, - ToUserID64: cfg.Bot.Admin, - Text: fmt.Sprintf("Client %s timed out (%d messages purged).", clt.Nickname, msgs), - } - TGCmdQueue <- c - - clientsKeepAlive.Delete(clt.UserID64) - +func getLockedClient(int64 id, bool createMissing) (*ChirpClient, bool) { + muxClients.Lock() + if c, ok := clients[id]; ok { + c.Mux.Lock() + return c, true + } else if createMissing { + c := new(ChirpClient) + clients[id] = c + c.TGUserID64 = id + c.Lock() + return c, true + } else { + return 0, false } - return true + defer muxClients.Unlock() } func clientSendCWMsg(userID64 int64, s string) { diff --git a/def.go b/def.go index 3e1cc88..cc0ea7a 100644 --- a/def.go +++ b/def.go @@ -12,13 +12,6 @@ type DataBackup struct { Messages []ChatWarsMessage `json:"messages"` } -type MQKeepAlive struct { - TGUserID64 int64 `json:"tg_user_id"` - Nickname string `json:"nick"` - Queue string `json:"queue"` - Date time.Time `json:"date"` -} - type MQClient struct { User string Password string @@ -30,14 +23,25 @@ type MQClient struct { Queue amqp.Queue } -type ChatWarsClient struct { - UserID64 int64 `json:"user_id"` - TelegramID64 int64 `json:"telegram_id"` - GuildID64 int64 `json:"guild_id"` - Role string `json:"role"` - State string `json:"state"` - BusyUntil time.Time `json:"busy_until"` - LastUpdate time.Time `json:"last_update"` +type ChirpClient struct { + HeartBeat time.Time `json:"heart_beat"` + Active bool + TGUserID64 int64 `json:"tg_user_id"` + MQ MQClient `json:"mq_client"` + CWUserID64 int64 `json:"user_id"` + CWGuildID64 int64 `json:"guild_id"` + CWRole string `json:"role"` + CWState string `json:"state"` + CWBusyUntil time.Time `json:"busy_until"` + CWLastUpdate time.Time `json:"last_update"` + Mux sync.Mutex +} + +type MQKeepAlive struct { + TGUserID64 int64 `json:"tg_user_id"` + Nickname string `json:"nick"` + Queue string `json:"queue"` + Date time.Time `json:"date"` } type TGCommand struct { diff --git a/main.go b/main.go index 9dbc897..bcaa6c6 100644 --- a/main.go +++ b/main.go @@ -60,11 +60,9 @@ var ( JobQueue chan Job msgParsingRules map[int]MessageParsingRule - clientsQueue map[int64]*MQClient - clientsCW *sync.Map - clientsKeepAlive *sync.Map - muxKeepAlive sync.Mutex + clients map[int64]*ChirpClient + muxClients sync.Mutex ) func PrintText(m *tb.Message) { diff --git a/workers.go b/workers.go index 04b5a43..4ebedc6 100644 --- a/workers.go +++ b/workers.go @@ -161,13 +161,13 @@ func MQKeepAliveWorker() { err = json.Unmarshal(d.Body, &x) logOnError(err, "MQKeepAliveWorker : Can't unmarshal.\n"+string(d.Body)) if err == nil { - muxKeepAlive.Lock() if x.Date.Add(10 * time.Second).Before(time.Now()) { // outdated keep-alive coming from client - } else if v, ok := clientsKeepAlive.Load(x.TGUserID64); ok { - k := v.(*MQKeepAlive) - k.Date = x.Date - } else { + } else if clt, ok := getLockedClient(x.TGUserID64, true); ok { + clt.HeartBeat = x.Date + if clt.Active { + clt.Mux.Unlock() + } else { clt := MQClient{ User: cfg.Rabbit.User, Password: cfg.Rabbit.Password, @@ -566,7 +566,31 @@ func MQTidyKeepAliveWorker() { //log.Printf("MQTidyKeepAliveWorker : Starting.") for true { t := time.Now() - clientsKeepAlive.Range(clientKeepAlive) + muxClients.Lock() + for id, clt := range clients { + if clt.Active && clt.Date.Add(3 * KeepAliveHeartBeatSeconds * time.Second).Before(time.Now()) { + msgs, err := clt.MQ.Channel.QueuePurge(c.MQ.Queue.Name, false) + logOnError(err, "MQTidyKeepAliveWorker : Channel.QueuePurge()") + err = clt.MQ.Channel.Close() + logOnError(err, "MQTidyKeepAliveWorker : Channel.Close()") + err = clt.MQ.Connection.Close() + logOnError(err, "MQTidyKeepAliveWorker : Connection.Close()") + cmd := TGCommand{ + Type: commandSendMsg, + ToUserID64: clt.TGUserID64, + Text: "Timeout, purging and closing command queue.", + } + TGCmdQueue <- cmd + cmd = TGCommand{ + Type: commandSendMsg, + ToUserID64: cfg.Bot.Admin, + Text: fmt.Sprintf("Client %s timed out (%d messages purged).", clt.MQ.User, msgs), + } + TGCmdQueue <- cmd + clt.Active = false + } + } + muxClients.Unlock() time.Sleep(time.Until(t.Add(time.Second))) } log.Printf("MQTidyKeepAliveWorker : Closing.")