work in progress

This commit is contained in:
shoopea 2019-07-31 09:24:01 +08:00
parent ab22012eb5
commit 90853271d3
5 changed files with 66 additions and 50 deletions

View File

@ -24,6 +24,7 @@ ChirpNestBot
- [ ] Link TelegramUserID and UserID
- [ ] Issue with Squire in the /g_roles ?
- [x] Foray interception
- [ ] Handle Foray timeout to stop spamming in case something goes bad
- [x] Export/import all messages
- [x] Test HTML in message
- [x] Update auction from broadcast

View File

@ -7,32 +7,21 @@ import (
"time"
)
func clientKeepAlive(k, v interface{}) bool {
clt := v.(*MQKeepAlive)
if clt.Date.Add(3 * KeepAliveHeartBeatSeconds * time.Second).Before(time.Now()) {
msgs, err := clientsQueue[clt.UserID64].Channel.QueuePurge(clientsQueue[clt.UserID64].Queue.Name, false)
logOnError(err, "clientKeepAlive : Channel.QueuePurge()")
err = clientsQueue[clt.UserID64].Channel.Close()
logOnError(err, "clientKeepAlive : Channel.Close()")
err = clientsQueue[clt.UserID64].Connection.Close()
logOnError(err, "clientKeepAlive : Connection.Close()")
c := TGCommand{
Type: commandSendMsg,
ToUserID64: clt.UserID64,
Text: "Timeout, purging and closing command queue.",
}
TGCmdQueue <- c
c = TGCommand{
Type: commandSendMsg,
ToUserID64: cfg.Bot.Admin,
Text: fmt.Sprintf("Client %s timed out (%d messages purged).", clt.Nickname, msgs),
}
TGCmdQueue <- c
clientsKeepAlive.Delete(clt.UserID64)
func getLockedClient(int64 id, bool createMissing) (*ChirpClient, bool) {
muxClients.Lock()
if c, ok := clients[id]; ok {
c.Mux.Lock()
return c, true
} else if createMissing {
c := new(ChirpClient)
clients[id] = c
c.TGUserID64 = id
c.Lock()
return c, true
} else {
return 0, false
}
return true
defer muxClients.Unlock()
}
func clientSendCWMsg(userID64 int64, s string) {

34
def.go
View File

@ -12,13 +12,6 @@ type DataBackup struct {
Messages []ChatWarsMessage `json:"messages"`
}
type MQKeepAlive struct {
TGUserID64 int64 `json:"tg_user_id"`
Nickname string `json:"nick"`
Queue string `json:"queue"`
Date time.Time `json:"date"`
}
type MQClient struct {
User string
Password string
@ -30,14 +23,25 @@ type MQClient struct {
Queue amqp.Queue
}
type ChatWarsClient struct {
UserID64 int64 `json:"user_id"`
TelegramID64 int64 `json:"telegram_id"`
GuildID64 int64 `json:"guild_id"`
Role string `json:"role"`
State string `json:"state"`
BusyUntil time.Time `json:"busy_until"`
LastUpdate time.Time `json:"last_update"`
type ChirpClient struct {
HeartBeat time.Time `json:"heart_beat"`
Active bool
TGUserID64 int64 `json:"tg_user_id"`
MQ MQClient `json:"mq_client"`
CWUserID64 int64 `json:"user_id"`
CWGuildID64 int64 `json:"guild_id"`
CWRole string `json:"role"`
CWState string `json:"state"`
CWBusyUntil time.Time `json:"busy_until"`
CWLastUpdate time.Time `json:"last_update"`
Mux sync.Mutex
}
type MQKeepAlive struct {
TGUserID64 int64 `json:"tg_user_id"`
Nickname string `json:"nick"`
Queue string `json:"queue"`
Date time.Time `json:"date"`
}
type TGCommand struct {

View File

@ -60,11 +60,9 @@ var (
JobQueue chan Job
msgParsingRules map[int]MessageParsingRule
clientsQueue map[int64]*MQClient
clientsCW *sync.Map
clientsKeepAlive *sync.Map
muxKeepAlive sync.Mutex
clients map[int64]*ChirpClient
muxClients sync.Mutex
)
func PrintText(m *tb.Message) {

View File

@ -161,13 +161,13 @@ func MQKeepAliveWorker() {
err = json.Unmarshal(d.Body, &x)
logOnError(err, "MQKeepAliveWorker : Can't unmarshal.\n"+string(d.Body))
if err == nil {
muxKeepAlive.Lock()
if x.Date.Add(10 * time.Second).Before(time.Now()) {
// outdated keep-alive coming from client
} else if v, ok := clientsKeepAlive.Load(x.TGUserID64); ok {
k := v.(*MQKeepAlive)
k.Date = x.Date
} else {
} else if clt, ok := getLockedClient(x.TGUserID64, true); ok {
clt.HeartBeat = x.Date
if clt.Active {
clt.Mux.Unlock()
} else {
clt := MQClient{
User: cfg.Rabbit.User,
Password: cfg.Rabbit.Password,
@ -566,7 +566,31 @@ func MQTidyKeepAliveWorker() {
//log.Printf("MQTidyKeepAliveWorker : Starting.")
for true {
t := time.Now()
clientsKeepAlive.Range(clientKeepAlive)
muxClients.Lock()
for id, clt := range clients {
if clt.Active && clt.Date.Add(3 * KeepAliveHeartBeatSeconds * time.Second).Before(time.Now()) {
msgs, err := clt.MQ.Channel.QueuePurge(c.MQ.Queue.Name, false)
logOnError(err, "MQTidyKeepAliveWorker : Channel.QueuePurge()")
err = clt.MQ.Channel.Close()
logOnError(err, "MQTidyKeepAliveWorker : Channel.Close()")
err = clt.MQ.Connection.Close()
logOnError(err, "MQTidyKeepAliveWorker : Connection.Close()")
cmd := TGCommand{
Type: commandSendMsg,
ToUserID64: clt.TGUserID64,
Text: "Timeout, purging and closing command queue.",
}
TGCmdQueue <- cmd
cmd = TGCommand{
Type: commandSendMsg,
ToUserID64: cfg.Bot.Admin,
Text: fmt.Sprintf("Client %s timed out (%d messages purged).", clt.MQ.User, msgs),
}
TGCmdQueue <- cmd
clt.Active = false
}
}
muxClients.Unlock()
time.Sleep(time.Until(t.Add(time.Second)))
}
log.Printf("MQTidyKeepAliveWorker : Closing.")