From 0a0c8e79602b5346dc8efa5112743587722b5b60 Mon Sep 17 00:00:00 2001 From: shoopea Date: Fri, 28 Jun 2019 15:57:06 +0800 Subject: [PATCH] Revert "test" This reverts commit 4968779dd6818df75b8e1f9487bafa38a3196eb8. --- chirpnest.sample.cfg | 2 +- client.go | 5 - def.go | 16 +-- main.go | 6 +- mq.go | 88 --------------- workers.go | 256 +++++++++++++++++++++++++++---------------- 6 files changed, 170 insertions(+), 203 deletions(-) diff --git a/chirpnest.sample.cfg b/chirpnest.sample.cfg index da10228..4e6eb9d 100644 --- a/chirpnest.sample.cfg +++ b/chirpnest.sample.cfg @@ -14,7 +14,7 @@ database = chirpnest user = chirpnest password = chirpnest host = localhost:5672 -path = chirpnest +queue = chirpnest [bot] admin = 0 diff --git a/client.go b/client.go index 2c5ccc3..d93fb6e 100644 --- a/client.go +++ b/client.go @@ -34,11 +34,6 @@ func clientKeepAlive(k, v interface{}) bool { return true } -func clientIsAlive(id int64) bool { - _, ok := clientsKeepAlive.Load(id) - return ok -} - func clientSendCWMsg(userID64 int64, s string) { c := TGCommand{ Type: commandSendMsg, diff --git a/def.go b/def.go index 498a0d5..ca505e3 100644 --- a/def.go +++ b/def.go @@ -19,18 +19,10 @@ type MQKeepAlive struct { Date time.Time `json:"date"` } -type MQSession struct { - Host string - SSL bool - User string - Password string - Path string - Queue string - MQConnection *amqp.Connection - MQChannel *amqp.Channel - MQQueue *amqp.Queue - MQDelivery <-chan *ampq.Delivery - isConnected bool +type MQClient struct { + Connection *amqp.Connection + Channel *amqp.Channel + Queue amqp.Queue } type ChatWarsClient struct { diff --git a/main.go b/main.go index bbd2e0c..7d4e985 100644 --- a/main.go +++ b/main.go @@ -32,7 +32,7 @@ type Config struct { User string Password string Host string - Path string + Queue string } Bot struct { Admin int64 @@ -60,7 +60,7 @@ var ( JobQueue chan Job msgParsingRules map[int]MessageParsingRule - clientsQueue map[int64]*MQSession + clientsQueue map[int64]*MQClient clientsCW *sync.Map clientsKeepAlive *sync.Map @@ -153,7 +153,7 @@ func main() { TGCmdQueue = make(chan TGCommand, TGCmdQueueSize) MQTGCmdQueue = make(chan TGCommand, MQTGCmdQueueSize) JobQueue = make(chan Job, JobQueueSize) - clientsQueue = make(map[int64]*MQSession) + clientsQueue = make(map[int64]*MQClient) clientsCW = new(sync.Map) clientsKeepAlive = new(sync.Map) diff --git a/mq.go b/mq.go index d916284..06ab7d0 100644 --- a/mq.go +++ b/mq.go @@ -1,89 +1 @@ package main - -import ( - "error" - - "github.com/streadway/amqp" -) - -func (s MQSession) Open() error { - if s.isConnected { - return error.Error("Session is already connected.") - } - - if SLL == false { - s.MQConnection, err = amqp.Dial("amqp://" + Session.User + ":" + Session.Password + "@" + Session.Host + "/" + Session.Path) - } else { - return error.Error("SSL connection not implemented") - } - logOnError(err, "Open : Failed to connect to RabbitMQ") - if err != nil { - s.MQConnection.Close() - return err - } - s.MQChannel, err = s.MQConnection.Channel() - logOnError(err, "Open : Failed to open channel") - if err != nil { - s.MQChannel.Close() - s.MQConnection.Close() - return err - } - s.MQQueue, err = s.Channel.QueueDeclare( - s.Queue, // name - false, // durable - false, // delete when unused - false, // exclusive - false, // no-wait - nil, // arguments - - ) - logOnError(err, "Open : Failed to declare queue") - if err != nil { - s.MQChannel.Close() - s.MQConnection.Close() - return err - } - s.MQDelivery, err = s.MQChannel.Consume( - s.MQQueue.Name, // queue - "", // consumer - true, // auto-ack - false, // exclusive - false, // no-local - false, // no-wait - nil, // args - ) - logOnError(err, "Open : Failed to register a consumer") - if err != nil { - s.MQChannel.Close() - s.MQConnection.Close() - return err - } - s.isConnected = true - return nil -} - -func (s MQSession) Publish(content string, msg string) error { - err := s.MQChannel.Publish( - "", // exchange - s.MQQueue.Name, // routing key - false, // mandatory - false, // immediate - amqp.Publishing{ - ContentType: content, - Body: []byte(m), - }) - logOnError(err, "Publish : Publish") - if err != nil { - return err - } - return nil -} - -func (s MQSession) Close() { - if s.isConnected { - s.MQChannel.Close() - s.MQConnection.Close() - s.isConnected = false - } - return -} diff --git a/workers.go b/workers.go index 5388f7d..3fc1764 100644 --- a/workers.go +++ b/workers.go @@ -16,33 +16,63 @@ import ( func MQGetMsgWorker(id int, msgs chan<- ChatWarsMessage) { //log.Printf("MQGetMsgWorker[" + strconv.Itoa(id) + "] : Starting.") var x ChatWarsMessage - s := MQSession{ - User: cfg.Rabbit.User, - Password: cfg.Rabbit.Password, - Host: cfg.Rabbit.Host, - Path: cfg.Rabbit.Path, - SSL: false, - Queue: "msg", - isConnected: false, - } for true { - err := s.Open() - for err != nil { - logOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Connection to RabbitMQ failed.") - time.Sleep(15) - err = s.Open() - } - log.Printf("MQGetMsgWorker[" + strconv.Itoa(id) + "] : Connected to RabbitMQ") + conn, err := amqp.Dial("amqp://" + cfg.Rabbit.User + ":" + cfg.Rabbit.Password + "@" + cfg.Rabbit.Host + "/" + cfg.Rabbit.Queue) + logOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Failed to connect to RabbitMQ") + if err != nil { + conn.Close() + time.Sleep(15 * time.Second) + } else { + ch, err := conn.Channel() + logOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Failed to open a channel") + if err != nil { + ch.Close() + time.Sleep(15 * time.Second) + } else { + q, err := ch.QueueDeclare( + "msg", // name + false, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + logOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Failed to declare a queue") + if err != nil { + ch.Close() + conn.Close() + time.Sleep(15 * time.Second) + } else { + m, err := ch.Consume( + q.Name, // queue + "", // consumer + true, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + logOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Failed to register a consumer") + if err != nil { + ch.Close() + conn.Close() + time.Sleep(15 * time.Second) + } else { + for d := range m { + // log.Printf("MQGetMsgWorker["+strconv.Itoa(id)+"] : Received a message: %s", string(d.Body)) + err = json.Unmarshal(d.Body, &x) + logOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Can't unmarshal.\n"+string(d.Body)) + if err == nil { + msgs <- x + } + } + ch.Close() + conn.Close() + } + } - for d := range s.MQDelivery { - // log.Printf("MQGetMsgWorker["+strconv.Itoa(id)+"] : Received a message: %s", string(d.Body)) - err = json.Unmarshal(d.Body, &x) - logOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Can't unmarshal.\n"+string(d.Body)) - if err == nil { - msgs <- x } } - s.Close() } log.Printf("MQGetMsgWorker[" + strconv.Itoa(id) + "] : Closing.") @@ -313,20 +343,21 @@ func TGCmdWorker(id int, b *tb.Bot, cmds <-chan TGCommand) { func MQTGCmdWorker(id int, cmds <-chan TGCommand) { //log.Printf("MQTGCmdWorker[" + strconv.Itoa(id) + "] : Starting.") for c := range cmds { - j, err := json.Marshal(c) - logOnError(err, "MQTGCmdWorker["+strconv.Itoa(id)+"] : Marshal(c)") - - if clientIsAlive(c.FromUserID64) { - err = clientsQueue[c.FromUserID64].Session.Publish("application/json", j) + if _, ok := clientsKeepAlive.Load(c.FromUserID64); ok { + j, err := json.Marshal(c) + logOnError(err, "MQTGCmdWorker["+strconv.Itoa(id)+"] : Marshal(c)") + //log.Printf("MQTGCmdWorker["+strconv.Itoa(id)+"] : new command.\n%s\n", string(j)) + err = clientsQueue[c.FromUserID64].Channel.Publish( + "", // exchange + clientsQueue[c.FromUserID64].Queue.Name, // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "application/json", + Body: []byte(j), + }) logOnError(err, "MQTGCmdWorker["+strconv.Itoa(id)+"] : Publishing message.") - for err != nil && clientIsAlive(c.FromUserID64) { - clientsQueue[c.FromUserID64].Session.Close() - time.Sleep(5 * time.Second) - clientsQueue[c.FromUserID64].Session.Open() - err = clientsQueue[c.FromUserID64].Session.Publish("application/json", j) - } } else { - err = nil log.Printf("MQTGCmdWorker["+strconv.Itoa(id)+"] : client %d offline.\n", c.FromUserID64) } } @@ -336,73 +367,110 @@ func MQTGCmdWorker(id int, cmds <-chan TGCommand) { } func MQKeepAliveWorker() { - s := MQSession{ - User: cfg.Rabbit.User, - Password: cfg.Rabbit.Password, - Host: cfg.Rabbit.Host, - Path: cfg.Rabbit.Path, - SSL: false, - Queue: "keepalive", - isConnected: false, - } //log.Printf("MQKeepAliveWorker : Starting.") for true { - err := s.Open() - for err != nil { - logOnError(err, "MQKeepAliveWorker : Failed to connect to RabbitMQ") + conn, err := amqp.Dial("amqp://" + cfg.Rabbit.User + ":" + cfg.Rabbit.Password + "@" + cfg.Rabbit.Host + "/" + cfg.Rabbit.Queue) + logOnError(err, "MQKeepAliveWorker : Failed to connect to RabbitMQ") + if err != nil { + conn.Close() time.Sleep(15 * time.Second) - err = s.Open() - } - for d := range m { - // log.Printf("MQKeepAliveWorker : Received a message: %s", string(d.Body)) - x := MQKeepAlive{} - err = json.Unmarshal(d.Body, &x) - logOnError(err, "MQKeepAliveWorker : Can't unmarshal.\n"+string(d.Body)) - if err == nil { - if x.Date.Add(10 * time.Second).Before(time.Now()) { - // outdated keep-alive coming from client - } else if v, ok := clientsKeepAlive.Load(x.UserID64); ok { - k := v.(*MQKeepAlive) - k.Date = x.Date + } else { + ch, err := conn.Channel() + logOnError(err, "MQKeepAliveWorker : Failed to open a channel") + if err != nil { + ch.Close() + conn.Close() + time.Sleep(15 * time.Second) + } else { + q, err := ch.QueueDeclare( + "keepalive", // name + false, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + logOnError(err, "MQKeepAliveWorker : Failed to declare a queue") + if err != nil { + ch.Close() + conn.Close() + time.Sleep(15 * time.Second) } else { - cs := MQSession{ - User: cfg.Rabbit.User, - Password: cfg.Rabbit.Password, - Host: cfg.Rabbit.Host, - Path: x.Queue, - SSL: false, - Queue: "msg", - isConnected: false, - } - err = cs.Open() - logOnError(err, "MQKeepAliveWorker : Failed to open MQ session") - clientsKeepAlive.Store(x.UserID64, &x) - clientsQueue[x.UserID64] = &cs + m, err := ch.Consume( + q.Name, // queue + "", // consumer + true, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + logOnError(err, "MQKeepAliveWorker : Failed to register a consumer") + if err != nil { + ch.Close() + conn.Close() + time.Sleep(15 * time.Second) + } else { + for d := range m { + // log.Printf("MQKeepAliveWorker : Received a message: %s", string(d.Body)) + x := MQKeepAlive{} + err = json.Unmarshal(d.Body, &x) + logOnError(err, "MQKeepAliveWorker : Can't unmarshal.\n"+string(d.Body)) + if err == nil { + if x.Date.Add(10 * time.Second).Before(time.Now()) { + // outdated keep-alive coming from client + } else if v, ok := clientsKeepAlive.Load(x.UserID64); ok { + k := v.(*MQKeepAlive) + k.Date = x.Date + } else { + clt := MQClient{} + clt.Connection, err = amqp.Dial("amqp://" + cfg.Rabbit.User + ":" + cfg.Rabbit.Password + "@" + cfg.Rabbit.Host + "/" + x.Queue) + logOnError(err, "MQKeepAliveWorker : Failed to connect to RabbitMQ") + clt.Channel, err = clt.Connection.Channel() + logOnError(err, "MQKeepAliveWorker : Failed to open a channel") + clt.Queue, err = clt.Channel.QueueDeclare( + "msg", // name + false, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + logOnError(err, "MQKeepAliveWorker : Failed to declare a queue") + clientsKeepAlive.Store(x.UserID64, &x) + clientsQueue[x.UserID64] = &clt - c := TGCommand{ - Type: commandSendMsg, - ToUserID64: x.UserID64, - Text: "Your client is connected.", - } - TGCmdQueue <- c - c = TGCommand{ - Type: commandSendMsg, - ToUserID64: cfg.Bot.Admin, - Text: fmt.Sprintf("Client `%s` is connected.", x.Nickname), - } - TGCmdQueue <- c + c := TGCommand{ + Type: commandSendMsg, + ToUserID64: x.UserID64, + Text: "Your client is connected.", + } + TGCmdQueue <- c + c = TGCommand{ + Type: commandSendMsg, + ToUserID64: cfg.Bot.Admin, + Text: fmt.Sprintf("Client `%s` is connected.", x.Nickname), + } + TGCmdQueue <- c - clientSendCWMsg(x.UserID64, `🏅Me`) - /* - c = TGCommand{ - Type: commandSendMsg, - FromUserID64: x.UserID64, - ToChatID64: userID64ChtWrsBot, - Text: `/hero`, + clientSendCWMsg(x.UserID64, `🏅Me`) + /* + c = TGCommand{ + Type: commandSendMsg, + FromUserID64: x.UserID64, + ToChatID64: userID64ChtWrsBot, + Text: `/hero`, + } + MQTGCmdQueue <- c + */ + } + } } - MQTGCmdQueue <- c - */ + ch.Close() + conn.Close() + } } + } } }