diff --git a/client.go b/client.go index 438c234..cfdde93 100644 --- a/client.go +++ b/client.go @@ -16,6 +16,7 @@ func getLockedClient(int64 id, bool createMissing) (*ChirpClient, bool) { c := new(ChirpClient) clients[id] = c c.TGUserID64 = id + c.Active = false c.Lock() return c, true } else { @@ -45,71 +46,61 @@ func clientRefreshCWMsg(userID64 int64, chatID64 int64, msgID64 int64) { } func clientMsgMeAck(m *ChatWarsMessageMeAck) { - if _, ok := clientsQueue[m.Msg.UserID64]; ok { - if v, ok := clientsCW.Load(m.Msg.UserID64); ok { - c := v.(*ChatWarsClient) - if c.LastUpdate.Before(m.Msg.Date) { - c.GuildID64 = m.GuildID64 - c.State = m.State - c.LastUpdate = m.Msg.Date - if getObjGuildID(``) != m.GuildID64 && strings.Compare(c.Role, ``) == 0 { + if clt, ok := getLockedClient(m.Msg.UserID64, false); ok { + if clt.Active { + if clt.CWLastUpdate.Before(m.Msg.Date) { + clt.CWGuildID64 = m.GuildID64 + clt.CWUserID64 = m.UserID64 + clt.CWState = m.State + clt.CWLastUpdate = m.Msg.Date + if getObjGuildID(``) != m.GuildID64 && strings.Compare(clt.CWRole, ``) == 0 { clientSendCWMsg(m.Msg.UserID64, "/g_roles") } } - } else { - c := ChatWarsClient{ - UserID64: m.UserID64, - TelegramID64: m.Msg.UserID64, - GuildID64: m.GuildID64, - State: m.State, - LastUpdate: m.Msg.Date, - } - clientsCW.Store(m.Msg.UserID64, &c) - if getObjGuildID(``) != m.GuildID64 { - clientSendCWMsg(m.Msg.UserID64, "/g_roles") - } } + clt.Unlock() } } func clientMsgGoQuestAck(m *ChatWarsMessageGoQuestAck) { - if _, ok := clientsQueue[m.Msg.UserID64]; ok { - if v, ok := clientsCW.Load(m.Msg.UserID64); ok { - c := v.(*ChatWarsClient) - if c.LastUpdate.Before(m.Msg.Date) { - c.LastUpdate = m.Msg.Date - c.BusyUntil = m.Msg.Date.Add(m.Duration) + if clt, ok := getLockedClient(m.Msg.UserID64, false); ok { + if clt.Active { + if clt.CWLastUpdate.Before(m.Msg.Date) { + clt.CWLastUpdate = m.Msg.Date + clt.CWBusyUntil = m.Msg.Date.Add(m.Duration) } } + clt.Unlock() } } func clientMsgGRolesAck(m *ChatWarsMessageGRolesAck) { - if _, ok := clientsQueue[m.Msg.UserID64]; ok { - if v, ok := clientsCW.Load(m.Msg.UserID64); ok { - c := v.(*ChatWarsClient) - if c.LastUpdate.Before(m.Msg.Date) { - if m.CommanderID64 == c.UserID64 { + if clt, ok := getLockedClient(m.Msg.UserID64, false); ok { + if clt.Active { + if clt.CWLastUpdate.Before(m.Msg.Date) { + if m.CommanderID64 == clt.CWUserID64 { c.Role = `commander` - } else if m.BartenderID64 == c.UserID64 { + } else if m.BartenderID64 == clt.CWUserID64 { c.Role = `bartender` - } else if m.SquireID64 == c.UserID64 { + } else if m.SquireID64 == clt.CWUserID64 { c.Role = `squire` - } else if m.TreasurerID64 == c.UserID64 { + } else if m.TreasurerID64 == clt.CWUserID64 { c.Role = `treasurer` } else { c.Role = `none` } - c.LastUpdate = m.Msg.Date + clt.CWLastUpdate = m.Msg.Date } } + clt.Unlock() } } -func clientGetUserID64(tgUserID64 int64) (int64, error) { - if v, ok := clientsCW.Load(tgUserID64); ok { - c := v.(*ChatWarsClient) - return c.UserID64, nil +func clientGetCWUserID64(tgUserID64 int64) (int64, error) { + if clt, ok := getLockedClient(m.Msg.UserID64, false); ok { + i := clt.CWUserID64 + clt.Unlock() + return i, nil } return 0, errors.New("Unknown user_id.") } diff --git a/workers.go b/workers.go index 4ebedc6..a0eae9a 100644 --- a/workers.go +++ b/workers.go @@ -168,58 +168,59 @@ func MQKeepAliveWorker() { if clt.Active { clt.Mux.Unlock() } else { - clt := MQClient{ - User: cfg.Rabbit.User, - Password: cfg.Rabbit.Password, - Host: cfg.Rabbit.Host, - Path: x.Queue, - SSL: false, - } - clt.Connection, err = amqp.Dial("amqp://" + clt.User + ":" + clt.Password + "@" + clt.Host + "/" + clt.Path) - logOnError(err, "MQKeepAliveWorker : Failed to connect to RabbitMQ") - if err != nil { - clt.Connection.Close() - } else { - clt.Channel, err = clt.Connection.Channel() - logOnError(err, "MQKeepAliveWorker : Failed to open a channel") + clt.MQ = MQClient{ + User: cfg.Rabbit.User, + Password: cfg.Rabbit.Password, + Host: cfg.Rabbit.Host, + Path: x.Queue, + SSL: false, + } + clt.MQ.Connection, err = amqp.Dial("amqp://" + clt.MQ.User + ":" + clt.MQ.Password + "@" + clt.MQ.Host + "/" + clt.MQ.Path) + logOnError(err, "MQKeepAliveWorker : Failed to connect to RabbitMQ") if err != nil { - clt.Channel.Close() - clt.Connection.Close() + clt.MQ.Connection.Close() } else { - clt.Queue, err = clt.Channel.QueueDeclare( - "msg", // name - false, // durable - false, // delete when unused - false, // exclusive - false, // no-wait - nil, // arguments - ) - logOnError(err, "MQKeepAliveWorker : Failed to declare a queue") + clt.MQ.Channel, err = clt.MQ.Connection.Channel() + logOnError(err, "MQKeepAliveWorker : Failed to open a channel") if err != nil { - clt.Channel.Close() - clt.Connection.Close() + clt.MQ.Channel.Close() + clt.MQ.Connection.Close() + } else { + clt.MQ.Queue, err = clt.MQ.Channel.QueueDeclare( + "msg", // name + false, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + logOnError(err, "MQKeepAliveWorker : Failed to declare a queue") + if err != nil { + clt.MQ.Channel.Close() + clt.MQ.Connection.Close() + } else { + clt.Active = true + } } } - } - clientsKeepAlive.Store(x.TGUserID64, &x) - clientsQueue[x.TGUserID64] = &clt + clt.Unlock() - c := TGCommand{ - Type: commandSendMsg, - ToUserID64: x.TGUserID64, - Text: "Your client is connected.", - } - TGCmdQueue <- c - c = TGCommand{ - Type: commandSendMsg, - ToUserID64: cfg.Bot.Admin, - Text: fmt.Sprintf("Client `%s` is connected.", x.Nickname), - } - TGCmdQueue <- c + c := TGCommand{ + Type: commandSendMsg, + ToUserID64: x.TGUserID64, + Text: "Your client is connected.", + } + TGCmdQueue <- c + c = TGCommand{ + Type: commandSendMsg, + ToUserID64: cfg.Bot.Admin, + Text: fmt.Sprintf("Client `%s` is connected.", x.Nickname), + } + TGCmdQueue <- c - clientSendCWMsg(x.UserID64, `🏅Me`) + clientSendCWMsg(x.TGUserID64, `🏅Me`) + } } - muxKeepAlive.Unlock() } } c.Channel.Close() @@ -233,57 +234,59 @@ func MQKeepAliveWorker() { func MQTGCmdWorker(id int, cmds <-chan TGCommand) { //log.Printf("MQTGCmdWorker[" + strconv.Itoa(id) + "] : Starting.") for c := range cmds { - if _, ok := clientsKeepAlive.Load(c.FromUserID64); ok { + if clt, ok := getLockedClient(c.FromUserID64, false); ok { j, err := json.Marshal(c) logOnError(err, "MQTGCmdWorker["+strconv.Itoa(id)+"] : Marshal(c)") //log.Printf("MQTGCmdWorker["+strconv.Itoa(id)+"] : new command.\n%s\n", string(j)) - for clientsQueue[c.FromUserID64].Connection.IsClosed() { - log.Printf("MQKeepAliveWorker : Resetting MQ connection for #%d.\n", c.FromUserID64) - clientsQueue[c.FromUserID64].Connection, err = amqp.Dial("amqp://" + clientsQueue[c.FromUserID64].User + ":" + clientsQueue[c.FromUserID64].Password + "@" + clientsQueue[c.FromUserID64].Host + "/" + clientsQueue[c.FromUserID64].Path) - logOnError(err, "MQKeepAliveWorker : Cannot open MQ connection") + for clt.MQ.Connection.IsClosed() { + clt.Active = false + log.Printf("MQTGCmdWorker : Resetting MQ connection for #%d.\n", c.FromUserID64) + clt.MQ.Connection, err = amqp.Dial("amqp://" + clt.MQ.User + ":" + clt.MQ.Password + "@" + clt.MQ.Host + "/" + clt.MQ.Path) + logOnError(err, "MQTGCmdWorker : Cannot open MQ connection") if err != nil { - clientsQueue[c.FromUserID64].Connection.Close() + clt.MQ.Connection.Close() time.Sleep(15 * time.Second) continue } - clientsQueue[c.FromUserID64].Channel, err = clientsQueue[c.FromUserID64].Connection.Channel() - logOnError(err, "MQKeepAliveWorker : Cannot open MQ channel") + clt.MQ.Channel, err = clt.MQ.Connection.Channel() + logOnError(err, "MQTGCmdWorker : Cannot open MQ channel") if err != nil { - clientsQueue[c.FromUserID64].Channel.Close() - clientsQueue[c.FromUserID64].Connection.Close() + clt.MQ.Channel.Close() + clt.MQ.Connection.Close() time.Sleep(15 * time.Second) continue } - clientsQueue[c.FromUserID64].Queue, err = clientsQueue[c.FromUserID64].Channel.QueueDeclare( - "keepalive", // name - false, // durable - false, // delete when unused - false, // exclusive - false, // no-wait - nil, // arguments + clt.MQ.Queue, err = clt.MQ.Channel.QueueDeclare( + "msg", // name + false, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments ) - logOnError(err, "MQKeepAliveWorker : Failed to declare a queue") + logOnError(err, "MQTGCmdWorker : Failed to declare a queue") if err != nil { - clientsQueue[c.FromUserID64].Channel.Close() - clientsQueue[c.FromUserID64].Connection.Close() + clt.MQ.Channel.Close() + clt.MQ.Connection.Close() time.Sleep(15 * time.Second) continue } } - err = clientsQueue[c.FromUserID64].Channel.Publish( - "", // exchange - clientsQueue[c.FromUserID64].Queue.Name, // routing key - false, // mandatory - false, // immediate + err = clt.MQ.Channel.Publish( + "", // exchange + clt.MQ.Queue.Name, // routing key + false, // mandatory + false, // immediate amqp.Publishing{ ContentType: "application/json", Body: []byte(j), }) logOnError(err, "MQTGCmdWorker["+strconv.Itoa(id)+"] : Publishing message.") //log.Printf("MQTGCmdWorker[" + strconv.Itoa(id) + "] : Message published.") + clt.Unlock() } else { log.Printf("MQTGCmdWorker["+strconv.Itoa(id)+"] : client %d offline.\n", c.FromUserID64) } @@ -568,7 +571,7 @@ func MQTidyKeepAliveWorker() { t := time.Now() muxClients.Lock() for id, clt := range clients { - if clt.Active && clt.Date.Add(3 * KeepAliveHeartBeatSeconds * time.Second).Before(time.Now()) { + if clt.Active && clt.Date.Add(3*KeepAliveHeartBeatSeconds*time.Second).Before(time.Now()) { msgs, err := clt.MQ.Channel.QueuePurge(c.MQ.Queue.Name, false) logOnError(err, "MQTidyKeepAliveWorker : Channel.QueuePurge()") err = clt.MQ.Channel.Close()