This commit is contained in:
shoopea 2019-06-28 17:27:16 +08:00
parent ef03a4dad4
commit cc8569b765

View File

@ -13,30 +13,9 @@ import (
tb "gopkg.in/tucnak/telebot.v2" tb "gopkg.in/tucnak/telebot.v2"
) )
func OpenMQ(c *MQClient) error {
log.Printf("OpenMQ : Address of struct : %p\n", c)
var err error
c.Connection, err = amqp.Dial("amqp://" + c.User + ":" + c.Password + "@" + c.Host + "/" + c.Path)
if err != nil {
c.Connection.Close()
return err
}
c.Channel, err = c.Connection.Channel()
if err != nil {
c.Channel.Close()
c.Connection.Close()
return err
}
return err
}
func MQGetMsgWorker(id int, msgs chan<- ChatWarsMessage) { func MQGetMsgWorker(id int, msgs chan<- ChatWarsMessage) {
//log.Printf("MQGetMsgWorker[" + strconv.Itoa(id) + "] : Starting.") //log.Printf("MQGetMsgWorker[" + strconv.Itoa(id) + "] : Starting.")
var x ChatWarsMessage var err error
for true {
c := MQClient{ c := MQClient{
User: cfg.Rabbit.User, User: cfg.Rabbit.User,
Password: cfg.Rabbit.Password, Password: cfg.Rabbit.Password,
@ -45,12 +24,25 @@ func MQGetMsgWorker(id int, msgs chan<- ChatWarsMessage) {
SSL: false, SSL: false,
} }
log.Printf("MQGetMsgWorker["+strconv.Itoa(id)+"] : Address of struct : %p\n", &c) for true {
err := OpenMQ(&c) c.Connection, err = amqp.Dial("amqp://" + c.User + ":" + c.Password + "@" + c.Host + "/" + c.Path)
logOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Cannot open MQ connection") logOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Cannot open MQ connection")
if err != nil { if err != nil {
q, err := c.Channel.QueueDeclare( c.Connection.Close()
time.Sleep(15 * time.Second)
continue
}
c.Channel, err = c.Connection.Channel()
logOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Cannot open MQ channel")
if err != nil {
c.Channel.Close()
c.Connection.Close()
time.Sleep(15 * time.Second)
continue
}
c.Queue, err = c.Channel.QueueDeclare(
"msg", // name "msg", // name
false, // durable false, // durable
false, // delete when unused false, // delete when unused
@ -63,9 +55,11 @@ func MQGetMsgWorker(id int, msgs chan<- ChatWarsMessage) {
c.Channel.Close() c.Channel.Close()
c.Connection.Close() c.Connection.Close()
time.Sleep(15 * time.Second) time.Sleep(15 * time.Second)
} else { continue
}
m, err := c.Channel.Consume( m, err := c.Channel.Consume(
q.Name, // queue c.Queue.Name, // queue
"", // consumer "", // consumer
true, // auto-ack true, // auto-ack
false, // exclusive false, // exclusive
@ -78,8 +72,11 @@ func MQGetMsgWorker(id int, msgs chan<- ChatWarsMessage) {
c.Channel.Close() c.Channel.Close()
c.Connection.Close() c.Connection.Close()
time.Sleep(15 * time.Second) time.Sleep(15 * time.Second)
} else { continue
}
for d := range m { for d := range m {
var x ChatWarsMessage
// log.Printf("MQGetMsgWorker["+strconv.Itoa(id)+"] : Received a message: %s", string(d.Body)) // log.Printf("MQGetMsgWorker["+strconv.Itoa(id)+"] : Received a message: %s", string(d.Body))
err = json.Unmarshal(d.Body, &x) err = json.Unmarshal(d.Body, &x)
logOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Can't unmarshal.\n"+string(d.Body)) logOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Can't unmarshal.\n"+string(d.Body))
@ -87,12 +84,9 @@ func MQGetMsgWorker(id int, msgs chan<- ChatWarsMessage) {
msgs <- x msgs <- x
} }
} }
c.Channel.Close() c.Channel.Close()
c.Connection.Close() c.Connection.Close()
}
}
}
} }