From 92f77b3b17232b0e00a125aafc180f1ab5a875fd Mon Sep 17 00:00:00 2001 From: shoopea Date: Sat, 4 May 2019 17:15:33 +0800 Subject: [PATCH] test --- main.go | 5 ++++- mq.go | 16 ++++++++-------- sql.go | 12 ++++++++++++ 3 files changed, 24 insertions(+), 9 deletions(-) diff --git a/main.go b/main.go index cfa1535..6d817ec 100644 --- a/main.go +++ b/main.go @@ -98,7 +98,10 @@ func main() { MQCWMsgQueue = make(chan ChatWarsMessage, 100) for w := 1; w <= 3; w++ { - go MQGetMsg(w, MQCWMsgQueue) + go MQGetMsgWorker(w, MQCWMsgQueue) + } + for w := 1; w <= 6; w++ { + go SQLCWMsgWorker(w, MQCWMsgQueue) } fmt.Println("Started !") diff --git a/mq.go b/mq.go index 01ad780..081daff 100644 --- a/mq.go +++ b/mq.go @@ -8,14 +8,14 @@ import ( "github.com/streadway/amqp" ) -func MQGetMsg(id int, msgs chan<- ChatWarsMessage) { +func MQGetMsgWorker(id int, msgs chan<- ChatWarsMessage) { var x ChatWarsMessage conn, err := amqp.Dial("amqp://" + cfg.Rabbit.User + ":" + cfg.Rabbit.Password + "@" + cfg.Rabbit.Host + "/" + cfg.Rabbit.Queue) - failOnError(err, "MQGetMsg["+strconv.Itoa(id)+"] : Failed to connect to RabbitMQ") + failOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() - failOnError(err, "MQGetMsg["+strconv.Itoa(id)+"] : Failed to open a channel") + failOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Failed to open a channel") defer ch.Close() q, err := ch.QueueDeclare( @@ -26,7 +26,7 @@ func MQGetMsg(id int, msgs chan<- ChatWarsMessage) { false, // no-wait nil, // arguments ) - failOnError(err, "MQGetMsg["+strconv.Itoa(id)+"] : Failed to declare a queue") + failOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Failed to declare a queue") m, err := ch.Consume( q.Name, // queue @@ -37,18 +37,18 @@ func MQGetMsg(id int, msgs chan<- ChatWarsMessage) { false, // no-wait nil, // args ) - failOnError(err, "MQGetMsg["+strconv.Itoa(id)+"] : Failed to register a consumer") + failOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Failed to register a consumer") for d := range m { - log.Printf("MQGetMsg["+strconv.Itoa(id)+"] : Received a message: %s", string(d.Body)) + log.Printf("MQGetMsgWorker["+strconv.Itoa(id)+"] : Received a message: %s", string(d.Body)) if err = json.Unmarshal(d.Body, &x); err != nil { - logOnError(err, "MQGetMsg["+strconv.Itoa(id)+"] : Can't unmarshal.\n"+string(d.Body)) + logOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Can't unmarshal.\n"+string(d.Body)) } else { msgs <- x } } - log.Printf("MQGetMsg[" + strconv.Itoa(id) + "] : Closing.") + log.Printf("MQGetMsgWorker[" + strconv.Itoa(id) + "] : Closing.") } diff --git a/sql.go b/sql.go index a04dd0c..fcf2b76 100644 --- a/sql.go +++ b/sql.go @@ -122,6 +122,18 @@ func initDB() { log.Println("Database set up") } +func SQLCWMsgWorker(id int, msgs <-chan ChatWarsMessage) { + for m := range msgs { + err := putMsg(m) + if err != nil { + logOnError(err, "SQLCWMsgWorker["+strconv.Itoa(id)+"] : Inserting message.") + } + } + + log.Printf("SQLCWMsgWorker[" + strconv.Itoa(id) + "] : Closing.") + +} + func putMsg(m ChatWarsMessage) error { res, err := db.Exec(`INSERT INTO obj (obj_type_id) VALUES (3);`)