diff --git a/def.go b/def.go new file mode 100644 index 0000000..75ad052 --- /dev/null +++ b/def.go @@ -0,0 +1,9 @@ +package main + +type ChatWarsMessage struct { + SenderUserID64 int64 `json:"sender_user_id"` + Date int32 `json:"date"` + ID64 int64 `json:"id"` + ChatID64 int64 `json:"chat_id"` + Text string `json:"text"` +} diff --git a/main.go b/main.go index 85eae4b..3c68703 100644 --- a/main.go +++ b/main.go @@ -37,14 +37,6 @@ type Config struct { } } -type ChatWarsMessage struct { - SenderUserID64 int64 `json:"sender_user_id"` - Date int32 `json:"date"` - ID64 int64 `json:"id"` - ChatID64 int64 `json:"chat_id"` - Text string `json:"text"` -} - var ( config = flag.String("config", "chirpnest.cfg", "config file path") initdb = flag.Bool("initdb", false, "initialize bot database") @@ -98,11 +90,13 @@ func main() { go StartBot() MQCWMsgQueue = make(chan ChatWarsMessage, 100) + SQLMsgIdQueue = make(chan int64, 100) + for w := 1; w <= 3; w++ { go MQGetMsgWorker(w, MQCWMsgQueue) } for w := 1; w <= 6; w++ { - go SQLCWMsgWorker(w, MQCWMsgQueue) + go SQLCWMsgWorker(w, MQCWMsgQueue, SQLMsgIdQueue) } for w := 1; w <= 6; w++ { go SQLIdentifyMsgWorker(w, SQLMsgIdQueue) diff --git a/mq.go b/mq.go index 081daff..06ab7d0 100644 --- a/mq.go +++ b/mq.go @@ -1,54 +1 @@ package main - -import ( - "encoding/json" - "log" - "strconv" - - "github.com/streadway/amqp" -) - -func MQGetMsgWorker(id int, msgs chan<- ChatWarsMessage) { - var x ChatWarsMessage - conn, err := amqp.Dial("amqp://" + cfg.Rabbit.User + ":" + cfg.Rabbit.Password + "@" + cfg.Rabbit.Host + "/" + cfg.Rabbit.Queue) - failOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Failed to connect to RabbitMQ") - defer conn.Close() - - ch, err := conn.Channel() - failOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Failed to open a channel") - defer ch.Close() - - q, err := ch.QueueDeclare( - "msg", // name - false, // durable - false, // delete when unused - false, // exclusive - false, // no-wait - nil, // arguments - ) - failOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Failed to declare a queue") - - m, err := ch.Consume( - q.Name, // queue - "", // consumer - true, // auto-ack - false, // exclusive - false, // no-local - false, // no-wait - nil, // args - ) - failOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Failed to register a consumer") - - for d := range m { - log.Printf("MQGetMsgWorker["+strconv.Itoa(id)+"] : Received a message: %s", string(d.Body)) - - if err = json.Unmarshal(d.Body, &x); err != nil { - logOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Can't unmarshal.\n"+string(d.Body)) - } else { - msgs <- x - } - } - - log.Printf("MQGetMsgWorker[" + strconv.Itoa(id) + "] : Closing.") - -} diff --git a/workers.go b/workers.go index 1630fe8..7bb2efc 100644 --- a/workers.go +++ b/workers.go @@ -1,18 +1,66 @@ package main import ( + "encoding/json" "log" "strconv" + + "github.com/streadway/amqp" ) -func SQLCWMsgWorker(id int, msgs <-chan ChatWarsMessage) { +func MQGetMsgWorker(id int, msgs chan<- ChatWarsMessage) { + var x ChatWarsMessage + conn, err := amqp.Dial("amqp://" + cfg.Rabbit.User + ":" + cfg.Rabbit.Password + "@" + cfg.Rabbit.Host + "/" + cfg.Rabbit.Queue) + failOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Failed to connect to RabbitMQ") + defer conn.Close() + + ch, err := conn.Channel() + failOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Failed to open a channel") + defer ch.Close() + + q, err := ch.QueueDeclare( + "msg", // name + false, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + failOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Failed to declare a queue") + + m, err := ch.Consume( + q.Name, // queue + "", // consumer + true, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + failOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Failed to register a consumer") + + for d := range m { + log.Printf("MQGetMsgWorker["+strconv.Itoa(id)+"] : Received a message: %s", string(d.Body)) + + err = json.Unmarshal(d.Body, &x) + logOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Can't unmarshal.\n"+string(d.Body)) + if err == nil { + msgs <- x + } + } + + log.Printf("MQGetMsgWorker[" + strconv.Itoa(id) + "] : Closing.") + +} + +func SQLCWMsgWorker(id int, msgs <-chan ChatWarsMessage, objIds chan<- int64) { log.Printf("SQLCWMsgWorker[" + strconv.Itoa(id) + "] : Starting.") for m := range msgs { objId, err := putUnprocessedMsg(m) - if err != nil { - logOnError(err, "SQLCWMsgWorker["+strconv.Itoa(id)+"] : Inserting message.") - } else { - + logOnError(err, "SQLCWMsgWorker["+strconv.Itoa(id)+"] : Inserting message.") + if err == nil { + log.Printf("SQLCWMsgWorker["+strconv.Itoa(id)+"] : Message inserted (%d).\n", objId) + objIds <- objId } } @@ -23,7 +71,11 @@ func SQLCWMsgWorker(id int, msgs <-chan ChatWarsMessage) { func SQLIdentifyMsgWorker(id int, objIds <-chan int64) { log.Printf("SQLIdentifyMsgWorker[" + strconv.Itoa(id) + "] : Starting.") for objId := range objIds { - + m, err := getMsg(objId) + logOnError(err, "SQLIdentifyMsgWorker["+strconv.Itoa(id)+"] : Retrieving message.") + if err == nil { + log.Printf("SQLIdentifyMsgWorker["+strconv.Itoa(id)+"] : Message retrieved\n%s\n", m.text) + } } log.Printf("SQLIdentifyMsgWorker[" + strconv.Itoa(id) + "] : Closing.") }