client revamped.

This commit is contained in:
shoopea 2019-07-31 12:45:42 +08:00
parent d018453106
commit 245468dfb8
2 changed files with 103 additions and 109 deletions

View File

@ -16,6 +16,7 @@ func getLockedClient(int64 id, bool createMissing) (*ChirpClient, bool) {
c := new(ChirpClient) c := new(ChirpClient)
clients[id] = c clients[id] = c
c.TGUserID64 = id c.TGUserID64 = id
c.Active = false
c.Lock() c.Lock()
return c, true return c, true
} else { } else {
@ -45,71 +46,61 @@ func clientRefreshCWMsg(userID64 int64, chatID64 int64, msgID64 int64) {
} }
func clientMsgMeAck(m *ChatWarsMessageMeAck) { func clientMsgMeAck(m *ChatWarsMessageMeAck) {
if _, ok := clientsQueue[m.Msg.UserID64]; ok { if clt, ok := getLockedClient(m.Msg.UserID64, false); ok {
if v, ok := clientsCW.Load(m.Msg.UserID64); ok { if clt.Active {
c := v.(*ChatWarsClient) if clt.CWLastUpdate.Before(m.Msg.Date) {
if c.LastUpdate.Before(m.Msg.Date) { clt.CWGuildID64 = m.GuildID64
c.GuildID64 = m.GuildID64 clt.CWUserID64 = m.UserID64
c.State = m.State clt.CWState = m.State
c.LastUpdate = m.Msg.Date clt.CWLastUpdate = m.Msg.Date
if getObjGuildID(``) != m.GuildID64 && strings.Compare(c.Role, ``) == 0 { if getObjGuildID(``) != m.GuildID64 && strings.Compare(clt.CWRole, ``) == 0 {
clientSendCWMsg(m.Msg.UserID64, "/g_roles") clientSendCWMsg(m.Msg.UserID64, "/g_roles")
} }
} }
} else {
c := ChatWarsClient{
UserID64: m.UserID64,
TelegramID64: m.Msg.UserID64,
GuildID64: m.GuildID64,
State: m.State,
LastUpdate: m.Msg.Date,
}
clientsCW.Store(m.Msg.UserID64, &c)
if getObjGuildID(``) != m.GuildID64 {
clientSendCWMsg(m.Msg.UserID64, "/g_roles")
}
} }
clt.Unlock()
} }
} }
func clientMsgGoQuestAck(m *ChatWarsMessageGoQuestAck) { func clientMsgGoQuestAck(m *ChatWarsMessageGoQuestAck) {
if _, ok := clientsQueue[m.Msg.UserID64]; ok { if clt, ok := getLockedClient(m.Msg.UserID64, false); ok {
if v, ok := clientsCW.Load(m.Msg.UserID64); ok { if clt.Active {
c := v.(*ChatWarsClient) if clt.CWLastUpdate.Before(m.Msg.Date) {
if c.LastUpdate.Before(m.Msg.Date) { clt.CWLastUpdate = m.Msg.Date
c.LastUpdate = m.Msg.Date clt.CWBusyUntil = m.Msg.Date.Add(m.Duration)
c.BusyUntil = m.Msg.Date.Add(m.Duration)
} }
} }
clt.Unlock()
} }
} }
func clientMsgGRolesAck(m *ChatWarsMessageGRolesAck) { func clientMsgGRolesAck(m *ChatWarsMessageGRolesAck) {
if _, ok := clientsQueue[m.Msg.UserID64]; ok { if clt, ok := getLockedClient(m.Msg.UserID64, false); ok {
if v, ok := clientsCW.Load(m.Msg.UserID64); ok { if clt.Active {
c := v.(*ChatWarsClient) if clt.CWLastUpdate.Before(m.Msg.Date) {
if c.LastUpdate.Before(m.Msg.Date) { if m.CommanderID64 == clt.CWUserID64 {
if m.CommanderID64 == c.UserID64 {
c.Role = `commander` c.Role = `commander`
} else if m.BartenderID64 == c.UserID64 { } else if m.BartenderID64 == clt.CWUserID64 {
c.Role = `bartender` c.Role = `bartender`
} else if m.SquireID64 == c.UserID64 { } else if m.SquireID64 == clt.CWUserID64 {
c.Role = `squire` c.Role = `squire`
} else if m.TreasurerID64 == c.UserID64 { } else if m.TreasurerID64 == clt.CWUserID64 {
c.Role = `treasurer` c.Role = `treasurer`
} else { } else {
c.Role = `none` c.Role = `none`
} }
c.LastUpdate = m.Msg.Date clt.CWLastUpdate = m.Msg.Date
} }
} }
clt.Unlock()
} }
} }
func clientGetUserID64(tgUserID64 int64) (int64, error) { func clientGetCWUserID64(tgUserID64 int64) (int64, error) {
if v, ok := clientsCW.Load(tgUserID64); ok { if clt, ok := getLockedClient(m.Msg.UserID64, false); ok {
c := v.(*ChatWarsClient) i := clt.CWUserID64
return c.UserID64, nil clt.Unlock()
return i, nil
} }
return 0, errors.New("Unknown user_id.") return 0, errors.New("Unknown user_id.")
} }

View File

@ -168,58 +168,59 @@ func MQKeepAliveWorker() {
if clt.Active { if clt.Active {
clt.Mux.Unlock() clt.Mux.Unlock()
} else { } else {
clt := MQClient{ clt.MQ = MQClient{
User: cfg.Rabbit.User, User: cfg.Rabbit.User,
Password: cfg.Rabbit.Password, Password: cfg.Rabbit.Password,
Host: cfg.Rabbit.Host, Host: cfg.Rabbit.Host,
Path: x.Queue, Path: x.Queue,
SSL: false, SSL: false,
} }
clt.Connection, err = amqp.Dial("amqp://" + clt.User + ":" + clt.Password + "@" + clt.Host + "/" + clt.Path) clt.MQ.Connection, err = amqp.Dial("amqp://" + clt.MQ.User + ":" + clt.MQ.Password + "@" + clt.MQ.Host + "/" + clt.MQ.Path)
logOnError(err, "MQKeepAliveWorker : Failed to connect to RabbitMQ") logOnError(err, "MQKeepAliveWorker : Failed to connect to RabbitMQ")
if err != nil {
clt.Connection.Close()
} else {
clt.Channel, err = clt.Connection.Channel()
logOnError(err, "MQKeepAliveWorker : Failed to open a channel")
if err != nil { if err != nil {
clt.Channel.Close() clt.MQ.Connection.Close()
clt.Connection.Close()
} else { } else {
clt.Queue, err = clt.Channel.QueueDeclare( clt.MQ.Channel, err = clt.MQ.Connection.Channel()
"msg", // name logOnError(err, "MQKeepAliveWorker : Failed to open a channel")
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
logOnError(err, "MQKeepAliveWorker : Failed to declare a queue")
if err != nil { if err != nil {
clt.Channel.Close() clt.MQ.Channel.Close()
clt.Connection.Close() clt.MQ.Connection.Close()
} else {
clt.MQ.Queue, err = clt.MQ.Channel.QueueDeclare(
"msg", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
logOnError(err, "MQKeepAliveWorker : Failed to declare a queue")
if err != nil {
clt.MQ.Channel.Close()
clt.MQ.Connection.Close()
} else {
clt.Active = true
}
} }
} }
} clt.Unlock()
clientsKeepAlive.Store(x.TGUserID64, &x)
clientsQueue[x.TGUserID64] = &clt
c := TGCommand{ c := TGCommand{
Type: commandSendMsg, Type: commandSendMsg,
ToUserID64: x.TGUserID64, ToUserID64: x.TGUserID64,
Text: "Your client is connected.", Text: "Your client is connected.",
} }
TGCmdQueue <- c TGCmdQueue <- c
c = TGCommand{ c = TGCommand{
Type: commandSendMsg, Type: commandSendMsg,
ToUserID64: cfg.Bot.Admin, ToUserID64: cfg.Bot.Admin,
Text: fmt.Sprintf("Client `%s` is connected.", x.Nickname), Text: fmt.Sprintf("Client `%s` is connected.", x.Nickname),
} }
TGCmdQueue <- c TGCmdQueue <- c
clientSendCWMsg(x.UserID64, `🏅Me`) clientSendCWMsg(x.TGUserID64, `🏅Me`)
}
} }
muxKeepAlive.Unlock()
} }
} }
c.Channel.Close() c.Channel.Close()
@ -233,57 +234,59 @@ func MQKeepAliveWorker() {
func MQTGCmdWorker(id int, cmds <-chan TGCommand) { func MQTGCmdWorker(id int, cmds <-chan TGCommand) {
//log.Printf("MQTGCmdWorker[" + strconv.Itoa(id) + "] : Starting.") //log.Printf("MQTGCmdWorker[" + strconv.Itoa(id) + "] : Starting.")
for c := range cmds { for c := range cmds {
if _, ok := clientsKeepAlive.Load(c.FromUserID64); ok { if clt, ok := getLockedClient(c.FromUserID64, false); ok {
j, err := json.Marshal(c) j, err := json.Marshal(c)
logOnError(err, "MQTGCmdWorker["+strconv.Itoa(id)+"] : Marshal(c)") logOnError(err, "MQTGCmdWorker["+strconv.Itoa(id)+"] : Marshal(c)")
//log.Printf("MQTGCmdWorker["+strconv.Itoa(id)+"] : new command.\n%s\n", string(j)) //log.Printf("MQTGCmdWorker["+strconv.Itoa(id)+"] : new command.\n%s\n", string(j))
for clientsQueue[c.FromUserID64].Connection.IsClosed() { for clt.MQ.Connection.IsClosed() {
log.Printf("MQKeepAliveWorker : Resetting MQ connection for #%d.\n", c.FromUserID64) clt.Active = false
clientsQueue[c.FromUserID64].Connection, err = amqp.Dial("amqp://" + clientsQueue[c.FromUserID64].User + ":" + clientsQueue[c.FromUserID64].Password + "@" + clientsQueue[c.FromUserID64].Host + "/" + clientsQueue[c.FromUserID64].Path) log.Printf("MQTGCmdWorker : Resetting MQ connection for #%d.\n", c.FromUserID64)
logOnError(err, "MQKeepAliveWorker : Cannot open MQ connection") clt.MQ.Connection, err = amqp.Dial("amqp://" + clt.MQ.User + ":" + clt.MQ.Password + "@" + clt.MQ.Host + "/" + clt.MQ.Path)
logOnError(err, "MQTGCmdWorker : Cannot open MQ connection")
if err != nil { if err != nil {
clientsQueue[c.FromUserID64].Connection.Close() clt.MQ.Connection.Close()
time.Sleep(15 * time.Second) time.Sleep(15 * time.Second)
continue continue
} }
clientsQueue[c.FromUserID64].Channel, err = clientsQueue[c.FromUserID64].Connection.Channel() clt.MQ.Channel, err = clt.MQ.Connection.Channel()
logOnError(err, "MQKeepAliveWorker : Cannot open MQ channel") logOnError(err, "MQTGCmdWorker : Cannot open MQ channel")
if err != nil { if err != nil {
clientsQueue[c.FromUserID64].Channel.Close() clt.MQ.Channel.Close()
clientsQueue[c.FromUserID64].Connection.Close() clt.MQ.Connection.Close()
time.Sleep(15 * time.Second) time.Sleep(15 * time.Second)
continue continue
} }
clientsQueue[c.FromUserID64].Queue, err = clientsQueue[c.FromUserID64].Channel.QueueDeclare( clt.MQ.Queue, err = clt.MQ.Channel.QueueDeclare(
"keepalive", // name "msg", // name
false, // durable false, // durable
false, // delete when unused false, // delete when unused
false, // exclusive false, // exclusive
false, // no-wait false, // no-wait
nil, // arguments nil, // arguments
) )
logOnError(err, "MQKeepAliveWorker : Failed to declare a queue") logOnError(err, "MQTGCmdWorker : Failed to declare a queue")
if err != nil { if err != nil {
clientsQueue[c.FromUserID64].Channel.Close() clt.MQ.Channel.Close()
clientsQueue[c.FromUserID64].Connection.Close() clt.MQ.Connection.Close()
time.Sleep(15 * time.Second) time.Sleep(15 * time.Second)
continue continue
} }
} }
err = clientsQueue[c.FromUserID64].Channel.Publish( err = clt.MQ.Channel.Publish(
"", // exchange "", // exchange
clientsQueue[c.FromUserID64].Queue.Name, // routing key clt.MQ.Queue.Name, // routing key
false, // mandatory false, // mandatory
false, // immediate false, // immediate
amqp.Publishing{ amqp.Publishing{
ContentType: "application/json", ContentType: "application/json",
Body: []byte(j), Body: []byte(j),
}) })
logOnError(err, "MQTGCmdWorker["+strconv.Itoa(id)+"] : Publishing message.") logOnError(err, "MQTGCmdWorker["+strconv.Itoa(id)+"] : Publishing message.")
//log.Printf("MQTGCmdWorker[" + strconv.Itoa(id) + "] : Message published.") //log.Printf("MQTGCmdWorker[" + strconv.Itoa(id) + "] : Message published.")
clt.Unlock()
} else { } else {
log.Printf("MQTGCmdWorker["+strconv.Itoa(id)+"] : client %d offline.\n", c.FromUserID64) log.Printf("MQTGCmdWorker["+strconv.Itoa(id)+"] : client %d offline.\n", c.FromUserID64)
} }
@ -568,7 +571,7 @@ func MQTidyKeepAliveWorker() {
t := time.Now() t := time.Now()
muxClients.Lock() muxClients.Lock()
for id, clt := range clients { for id, clt := range clients {
if clt.Active && clt.Date.Add(3 * KeepAliveHeartBeatSeconds * time.Second).Before(time.Now()) { if clt.Active && clt.Date.Add(3*KeepAliveHeartBeatSeconds*time.Second).Before(time.Now()) {
msgs, err := clt.MQ.Channel.QueuePurge(c.MQ.Queue.Name, false) msgs, err := clt.MQ.Channel.QueuePurge(c.MQ.Queue.Name, false)
logOnError(err, "MQTidyKeepAliveWorker : Channel.QueuePurge()") logOnError(err, "MQTidyKeepAliveWorker : Channel.QueuePurge()")
err = clt.MQ.Channel.Close() err = clt.MQ.Channel.Close()