This commit is contained in:
shoopea 2019-05-06 10:54:24 +08:00
parent c3cb74a705
commit 42a4f3e367
4 changed files with 70 additions and 68 deletions

9
def.go Normal file
View File

@ -0,0 +1,9 @@
package main
type ChatWarsMessage struct {
SenderUserID64 int64 `json:"sender_user_id"`
Date int32 `json:"date"`
ID64 int64 `json:"id"`
ChatID64 int64 `json:"chat_id"`
Text string `json:"text"`
}

12
main.go
View File

@ -37,14 +37,6 @@ type Config struct {
} }
} }
type ChatWarsMessage struct {
SenderUserID64 int64 `json:"sender_user_id"`
Date int32 `json:"date"`
ID64 int64 `json:"id"`
ChatID64 int64 `json:"chat_id"`
Text string `json:"text"`
}
var ( var (
config = flag.String("config", "chirpnest.cfg", "config file path") config = flag.String("config", "chirpnest.cfg", "config file path")
initdb = flag.Bool("initdb", false, "initialize bot database") initdb = flag.Bool("initdb", false, "initialize bot database")
@ -98,11 +90,13 @@ func main() {
go StartBot() go StartBot()
MQCWMsgQueue = make(chan ChatWarsMessage, 100) MQCWMsgQueue = make(chan ChatWarsMessage, 100)
SQLMsgIdQueue = make(chan int64, 100)
for w := 1; w <= 3; w++ { for w := 1; w <= 3; w++ {
go MQGetMsgWorker(w, MQCWMsgQueue) go MQGetMsgWorker(w, MQCWMsgQueue)
} }
for w := 1; w <= 6; w++ { for w := 1; w <= 6; w++ {
go SQLCWMsgWorker(w, MQCWMsgQueue) go SQLCWMsgWorker(w, MQCWMsgQueue, SQLMsgIdQueue)
} }
for w := 1; w <= 6; w++ { for w := 1; w <= 6; w++ {
go SQLIdentifyMsgWorker(w, SQLMsgIdQueue) go SQLIdentifyMsgWorker(w, SQLMsgIdQueue)

53
mq.go
View File

@ -1,54 +1 @@
package main package main
import (
"encoding/json"
"log"
"strconv"
"github.com/streadway/amqp"
)
func MQGetMsgWorker(id int, msgs chan<- ChatWarsMessage) {
var x ChatWarsMessage
conn, err := amqp.Dial("amqp://" + cfg.Rabbit.User + ":" + cfg.Rabbit.Password + "@" + cfg.Rabbit.Host + "/" + cfg.Rabbit.Queue)
failOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"msg", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Failed to declare a queue")
m, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Failed to register a consumer")
for d := range m {
log.Printf("MQGetMsgWorker["+strconv.Itoa(id)+"] : Received a message: %s", string(d.Body))
if err = json.Unmarshal(d.Body, &x); err != nil {
logOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Can't unmarshal.\n"+string(d.Body))
} else {
msgs <- x
}
}
log.Printf("MQGetMsgWorker[" + strconv.Itoa(id) + "] : Closing.")
}

View File

@ -1,18 +1,66 @@
package main package main
import ( import (
"encoding/json"
"log" "log"
"strconv" "strconv"
"github.com/streadway/amqp"
) )
func SQLCWMsgWorker(id int, msgs <-chan ChatWarsMessage) { func MQGetMsgWorker(id int, msgs chan<- ChatWarsMessage) {
var x ChatWarsMessage
conn, err := amqp.Dial("amqp://" + cfg.Rabbit.User + ":" + cfg.Rabbit.Password + "@" + cfg.Rabbit.Host + "/" + cfg.Rabbit.Queue)
failOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"msg", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Failed to declare a queue")
m, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Failed to register a consumer")
for d := range m {
log.Printf("MQGetMsgWorker["+strconv.Itoa(id)+"] : Received a message: %s", string(d.Body))
err = json.Unmarshal(d.Body, &x)
logOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Can't unmarshal.\n"+string(d.Body))
if err == nil {
msgs <- x
}
}
log.Printf("MQGetMsgWorker[" + strconv.Itoa(id) + "] : Closing.")
}
func SQLCWMsgWorker(id int, msgs <-chan ChatWarsMessage, objIds chan<- int64) {
log.Printf("SQLCWMsgWorker[" + strconv.Itoa(id) + "] : Starting.") log.Printf("SQLCWMsgWorker[" + strconv.Itoa(id) + "] : Starting.")
for m := range msgs { for m := range msgs {
objId, err := putUnprocessedMsg(m) objId, err := putUnprocessedMsg(m)
if err != nil { logOnError(err, "SQLCWMsgWorker["+strconv.Itoa(id)+"] : Inserting message.")
logOnError(err, "SQLCWMsgWorker["+strconv.Itoa(id)+"] : Inserting message.") if err == nil {
} else { log.Printf("SQLCWMsgWorker["+strconv.Itoa(id)+"] : Message inserted (%d).\n", objId)
objIds <- objId
} }
} }
@ -23,7 +71,11 @@ func SQLCWMsgWorker(id int, msgs <-chan ChatWarsMessage) {
func SQLIdentifyMsgWorker(id int, objIds <-chan int64) { func SQLIdentifyMsgWorker(id int, objIds <-chan int64) {
log.Printf("SQLIdentifyMsgWorker[" + strconv.Itoa(id) + "] : Starting.") log.Printf("SQLIdentifyMsgWorker[" + strconv.Itoa(id) + "] : Starting.")
for objId := range objIds { for objId := range objIds {
m, err := getMsg(objId)
logOnError(err, "SQLIdentifyMsgWorker["+strconv.Itoa(id)+"] : Retrieving message.")
if err == nil {
log.Printf("SQLIdentifyMsgWorker["+strconv.Itoa(id)+"] : Message retrieved\n%s\n", m.text)
}
} }
log.Printf("SQLIdentifyMsgWorker[" + strconv.Itoa(id) + "] : Closing.") log.Printf("SQLIdentifyMsgWorker[" + strconv.Itoa(id) + "] : Closing.")
} }