diff --git a/mq.go b/mq.go index c331444..8fb1233 100644 --- a/mq.go +++ b/mq.go @@ -7,5 +7,19 @@ import ( func (c *MQClient) Open() error { conn, err := amqp.Dial("amqp://" + c.User + ":" + c.Password + "@" + c.Host + "/" + c.Path) c.Connection = conn + + if err != nil { + c.Connection.Close() + return err + } + + ch, err := c.Connection.Channel() + c.Channel = ch + if err != nil { + c.Channel.Close() + c.Connection.Close() + return err + } + return err } diff --git a/workers.go b/workers.go index f5e0453..81177c6 100644 --- a/workers.go +++ b/workers.go @@ -27,61 +27,49 @@ func MQGetMsgWorker(id int, msgs chan<- ChatWarsMessage) { err := c.Open() - logOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Failed to connect to RabbitMQ") if err != nil { - c.Connection.Close() - time.Sleep(15 * time.Second) - } else { - ch, err := c.Connection.Channel() - c.Channel = ch - logOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Failed to open a channel") + q, err := c.Channel.QueueDeclare( + "msg", // name + false, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + logOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Failed to declare a queue") if err != nil { c.Channel.Close() + c.Connection.Close() time.Sleep(15 * time.Second) } else { - q, err := c.Channel.QueueDeclare( - "msg", // name - false, // durable - false, // delete when unused - false, // exclusive - false, // no-wait - nil, // arguments + m, err := ch.Consume( + q.Name, // queue + "", // consumer + true, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args ) - logOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Failed to declare a queue") + logOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Failed to register a consumer") if err != nil { c.Channel.Close() c.Connection.Close() time.Sleep(15 * time.Second) } else { - m, err := ch.Consume( - q.Name, // queue - "", // consumer - true, // auto-ack - false, // exclusive - false, // no-local - false, // no-wait - nil, // args - ) - logOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Failed to register a consumer") - if err != nil { - c.Channel.Close() - c.Connection.Close() - time.Sleep(15 * time.Second) - } else { - for d := range m { - // log.Printf("MQGetMsgWorker["+strconv.Itoa(id)+"] : Received a message: %s", string(d.Body)) - err = json.Unmarshal(d.Body, &x) - logOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Can't unmarshal.\n"+string(d.Body)) - if err == nil { - msgs <- x - } + for d := range m { + // log.Printf("MQGetMsgWorker["+strconv.Itoa(id)+"] : Received a message: %s", string(d.Body)) + err = json.Unmarshal(d.Body, &x) + logOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Can't unmarshal.\n"+string(d.Body)) + if err == nil { + msgs <- x } - c.Channel.Close() - c.Connection.Close() } + c.Channel.Close() + c.Connection.Close() } - } + } }