This commit is contained in:
shoopea 2019-05-04 11:53:35 +08:00
parent 3a9480821f
commit e331e61824
3 changed files with 73 additions and 89 deletions

14
main.go
View File

@ -14,6 +14,7 @@ import (
type ChatWarsMessage struct { type ChatWarsMessage struct {
MsgID64 int64 `json:"msg_id"` MsgID64 int64 `json:"msg_id"`
ChatID64 int64 `json:"chat_id"` ChatID64 int64 `json:"chat_id"`
SenderUserID64 int64 `json:"sender_user_id"`
UserID64 int64 `json:"user_id"` UserID64 int64 `json:"user_id"`
MsgText string `json:"msg"` MsgText string `json:"msg"`
MsgDate int32 `json:"date"` MsgDate int32 `json:"date"`
@ -38,6 +39,7 @@ var (
ownUserID64 = int64(0) ownUserID64 = int64(0)
ownUserID32 = int32(0) ownUserID32 = int32(0)
MQCWMsgQueue chan ChatWarsMessage
) )
func main() { func main() {
@ -104,6 +106,11 @@ func main() {
ownUserID32 = OwnUserID(client) ownUserID32 = OwnUserID(client)
ownUserID64 = int64(OwnUserID(client)) ownUserID64 = int64(OwnUserID(client))
MQCWMsgQueue = make(chan ChatWarsMessage, 100)
for w := 1; w <= 3; w++ {
go MQSendMsgWorker(w, MQCWMsgQueue)
}
if *history { if *history {
fmt.Printf("Retrieving chat.\n") fmt.Printf("Retrieving chat.\n")
getHistory(client) getHistory(client)
@ -113,9 +120,6 @@ func main() {
fmt.Println("Started !") fmt.Println("Started !")
go ListenCW(client) go ListenCW(client)
// Main loop forever := make(chan bool)
for { <-forever
time.Sleep(30 * time.Second)
}
} }

54
mq.go Normal file
View File

@ -0,0 +1,54 @@
package main
import (
"encoding/json"
"log"
"github.com/streadway/amqp"
)
func MQSendMsgWorker(id int, msgs chan<- ChatWarsMessage) {
var x ChatWarsMessage
conn, err := amqp.Dial("amqp://" + cfg.Rabbit.User + ":" + cfg.Rabbit.Password + "@" + cfg.Rabbit.Host + "/" + cfg.Rabbit.Queue)
failOnError(err, "MQSendMsgWorker["+strconv.Itoa(id)+"] : Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "MQSendMsgWorker["+strconv.Itoa(id)+"] : Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"msg", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "MQSendMsgWorker["+strconv.Itoa(id)+"] : Failed to declare a queue")
for m := range msgs {
log.Printf("MQSendMsgWorker[" + strconv.Itoa(id) + "] : Sending a message.")
b, err := json.Marshal(m)
if err != nil {
logOnError(err, "MQSendMsgWorker["+strconv.Itoa(id)+"] : Marshaling message.")
} else {
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: []byte(b),
})
if err != nil {
logOnError(err, "MQSendMsgWorker["+strconv.Itoa(id)+"] : Publishing message.")
}
}
}
log.Printf("MQSendMsgWorker[" + strconv.Itoa(id) + "] : Closing.")
}

80
td.go
View File

@ -21,36 +21,11 @@ func ListenCW(c *tdlib.Client) {
return false return false
} }
conn, err := amqp.Dial("amqp://" + cfg.Rabbit.User + ":" + cfg.Rabbit.Password + "@" + cfg.Rabbit.Host + "/" + cfg.Rabbit.Queue)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatal(err)
}
defer ch.Close()
q, err := ch.QueueDeclare(
"msg", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatal(err)
}
receiver := c.AddEventReceiver(&tdlib.UpdateNewMessage{}, eventFilter, 100) receiver := c.AddEventReceiver(&tdlib.UpdateNewMessage{}, eventFilter, 100)
for newMsg := range receiver.Chan { for newMsg := range receiver.Chan {
updateMsg := (newMsg).(*tdlib.UpdateNewMessage) updateMsg := (newMsg).(*tdlib.UpdateNewMessage)
senderUserID := updateMsg.Message.SenderUserID senderUserID := updateMsg.Message.SenderUserID
mType := updateMsg.Message.Content.GetMessageContentEnum() if updateMsg.Message.Content.GetMessageContentEnum() == tdlib.MessageTextType {
if mType == "messageText" {
user, err := c.GetUser(senderUserID) user, err := c.GetUser(senderUserID)
if err != nil { if err != nil {
fmt.Println("ListenCW:", err.Error()) fmt.Println("ListenCW:", err.Error())
@ -68,17 +43,7 @@ func ListenCW(c *tdlib.Client) {
MsgID64: updateMsg.Message.ID, MsgID64: updateMsg.Message.ID,
} }
b, err := json.Marshal(m) MQCWMsgQueue <- m
_ = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: []byte(b),
})
fmt.Printf("[%d-%02d-%02d %02d:%02d:%02d-00:00]", t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second()) fmt.Printf("[%d-%02d-%02d %02d:%02d:%02d-00:00]", t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second())
fmt.Println(" === CHATWARS (", user.Username, ") =====================================================================") fmt.Println(" === CHATWARS (", user.Username, ") =====================================================================")
@ -108,30 +73,6 @@ func getHistory(c *tdlib.Client) {
var msgCount int32 = 0 var msgCount int32 = 0
var lastMsgTime time.Time = time.Now() var lastMsgTime time.Time = time.Now()
conn, err := amqp.Dial("amqp://" + cfg.Rabbit.User + ":" + cfg.Rabbit.Password + "@" + cfg.Rabbit.Host + "/" + cfg.Rabbit.Queue)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatal(err)
}
defer ch.Close()
q, err := ch.QueueDeclare(
"msg", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatal(err)
}
for lastMsgID64 >= 0 { for lastMsgID64 >= 0 {
msgs, err := c.GetChatHistory(user_chtwrsbot, lastMsgID64, 0, 10, false) msgs, err := c.GetChatHistory(user_chtwrsbot, lastMsgID64, 0, 10, false)
if err != nil { if err != nil {
@ -154,23 +95,8 @@ func getHistory(c *tdlib.Client) {
MsgID64: msg.ID, MsgID64: msg.ID,
} }
b, err := json.Marshal(m) MQCWMsgQueue <- m
if err != nil {
logOnError(err, "getHistory : Marshaling message.")
}
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: []byte(b),
})
if err != nil {
logOnError(err, "getHistory : Publishing message.")
}
if msg.ID < lastMsgID64 { if msg.ID < lastMsgID64 {
lastMsgID64 = msg.ID lastMsgID64 = msg.ID
lastMsgTime = time.Unix(int64(msg.Date), 0) lastMsgTime = time.Unix(int64(msg.Date), 0)