diff --git a/main.go b/main.go index b3680b4..1c0fce0 100644 --- a/main.go +++ b/main.go @@ -12,11 +12,12 @@ import ( ) type ChatWarsMessage struct { - MsgID64 int64 `json:"msg_id"` - ChatID64 int64 `json:"chat_id"` - UserID64 int64 `json:"user_id"` - MsgText string `json:"msg"` - MsgDate int32 `json:"date"` + MsgID64 int64 `json:"msg_id"` + ChatID64 int64 `json:"chat_id"` + SenderUserID64 int64 `json:"sender_user_id"` + UserID64 int64 `json:"user_id"` + MsgText string `json:"msg"` + MsgDate int32 `json:"date"` } const user_chtwrsbot = 408101137 @@ -36,8 +37,9 @@ var ( cfg Config - ownUserID64 = int64(0) - ownUserID32 = int32(0) + ownUserID64 = int64(0) + ownUserID32 = int32(0) + MQCWMsgQueue chan ChatWarsMessage ) func main() { @@ -104,6 +106,11 @@ func main() { ownUserID32 = OwnUserID(client) ownUserID64 = int64(OwnUserID(client)) + MQCWMsgQueue = make(chan ChatWarsMessage, 100) + for w := 1; w <= 3; w++ { + go MQSendMsgWorker(w, MQCWMsgQueue) + } + if *history { fmt.Printf("Retrieving chat.\n") getHistory(client) @@ -113,9 +120,6 @@ func main() { fmt.Println("Started !") go ListenCW(client) - // Main loop - for { - time.Sleep(30 * time.Second) - } - + forever := make(chan bool) + <-forever } diff --git a/mq.go b/mq.go new file mode 100644 index 0000000..a762be5 --- /dev/null +++ b/mq.go @@ -0,0 +1,54 @@ +package main + +import ( + "encoding/json" + "log" + + "github.com/streadway/amqp" +) + +func MQSendMsgWorker(id int, msgs chan<- ChatWarsMessage) { + var x ChatWarsMessage + conn, err := amqp.Dial("amqp://" + cfg.Rabbit.User + ":" + cfg.Rabbit.Password + "@" + cfg.Rabbit.Host + "/" + cfg.Rabbit.Queue) + failOnError(err, "MQSendMsgWorker["+strconv.Itoa(id)+"] : Failed to connect to RabbitMQ") + defer conn.Close() + + ch, err := conn.Channel() + failOnError(err, "MQSendMsgWorker["+strconv.Itoa(id)+"] : Failed to open a channel") + defer ch.Close() + + q, err := ch.QueueDeclare( + "msg", // name + false, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + failOnError(err, "MQSendMsgWorker["+strconv.Itoa(id)+"] : Failed to declare a queue") + + for m := range msgs { + log.Printf("MQSendMsgWorker[" + strconv.Itoa(id) + "] : Sending a message.") + + b, err := json.Marshal(m) + if err != nil { + logOnError(err, "MQSendMsgWorker["+strconv.Itoa(id)+"] : Marshaling message.") + } else { + err = ch.Publish( + "", // exchange + q.Name, // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "application/json", + Body: []byte(b), + }) + if err != nil { + logOnError(err, "MQSendMsgWorker["+strconv.Itoa(id)+"] : Publishing message.") + } + } + } + + log.Printf("MQSendMsgWorker[" + strconv.Itoa(id) + "] : Closing.") + +} diff --git a/td.go b/td.go index 47357c2..33a2d5d 100644 --- a/td.go +++ b/td.go @@ -21,36 +21,11 @@ func ListenCW(c *tdlib.Client) { return false } - conn, err := amqp.Dial("amqp://" + cfg.Rabbit.User + ":" + cfg.Rabbit.Password + "@" + cfg.Rabbit.Host + "/" + cfg.Rabbit.Queue) - if err != nil { - log.Fatal(err) - } - defer conn.Close() - - ch, err := conn.Channel() - if err != nil { - log.Fatal(err) - } - defer ch.Close() - - q, err := ch.QueueDeclare( - "msg", // name - false, // durable - false, // delete when unused - false, // exclusive - false, // no-wait - nil, // arguments - ) - if err != nil { - log.Fatal(err) - } - receiver := c.AddEventReceiver(&tdlib.UpdateNewMessage{}, eventFilter, 100) for newMsg := range receiver.Chan { updateMsg := (newMsg).(*tdlib.UpdateNewMessage) senderUserID := updateMsg.Message.SenderUserID - mType := updateMsg.Message.Content.GetMessageContentEnum() - if mType == "messageText" { + if updateMsg.Message.Content.GetMessageContentEnum() == tdlib.MessageTextType { user, err := c.GetUser(senderUserID) if err != nil { fmt.Println("ListenCW:", err.Error()) @@ -68,17 +43,7 @@ func ListenCW(c *tdlib.Client) { MsgID64: updateMsg.Message.ID, } - b, err := json.Marshal(m) - - _ = ch.Publish( - "", // exchange - q.Name, // routing key - false, // mandatory - false, // immediate - amqp.Publishing{ - ContentType: "application/json", - Body: []byte(b), - }) + MQCWMsgQueue <- m fmt.Printf("[%d-%02d-%02d %02d:%02d:%02d-00:00]", t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second()) fmt.Println(" === CHATWARS (", user.Username, ") =====================================================================") @@ -108,30 +73,6 @@ func getHistory(c *tdlib.Client) { var msgCount int32 = 0 var lastMsgTime time.Time = time.Now() - conn, err := amqp.Dial("amqp://" + cfg.Rabbit.User + ":" + cfg.Rabbit.Password + "@" + cfg.Rabbit.Host + "/" + cfg.Rabbit.Queue) - if err != nil { - log.Fatal(err) - } - defer conn.Close() - - ch, err := conn.Channel() - if err != nil { - log.Fatal(err) - } - defer ch.Close() - - q, err := ch.QueueDeclare( - "msg", // name - false, // durable - false, // delete when unused - false, // exclusive - false, // no-wait - nil, // arguments - ) - if err != nil { - log.Fatal(err) - } - for lastMsgID64 >= 0 { msgs, err := c.GetChatHistory(user_chtwrsbot, lastMsgID64, 0, 10, false) if err != nil { @@ -154,23 +95,8 @@ func getHistory(c *tdlib.Client) { MsgID64: msg.ID, } - b, err := json.Marshal(m) - if err != nil { - logOnError(err, "getHistory : Marshaling message.") - } + MQCWMsgQueue <- m - err = ch.Publish( - "", // exchange - q.Name, // routing key - false, // mandatory - false, // immediate - amqp.Publishing{ - ContentType: "application/json", - Body: []byte(b), - }) - if err != nil { - logOnError(err, "getHistory : Publishing message.") - } if msg.ID < lastMsgID64 { lastMsgID64 = msg.ID lastMsgTime = time.Unix(int64(msg.Date), 0)