diff --git a/workers.go b/workers.go index 84d7c42..fb2aa29 100644 --- a/workers.go +++ b/workers.go @@ -13,87 +13,81 @@ import ( tb "gopkg.in/tucnak/telebot.v2" ) -func OpenMQ(c *MQClient) error { - log.Printf("OpenMQ : Address of struct : %p\n", c) - var err error - c.Connection, err = amqp.Dial("amqp://" + c.User + ":" + c.Password + "@" + c.Host + "/" + c.Path) - - if err != nil { - c.Connection.Close() - return err - } - - c.Channel, err = c.Connection.Channel() - if err != nil { - c.Channel.Close() - c.Connection.Close() - return err - } - - return err -} - func MQGetMsgWorker(id int, msgs chan<- ChatWarsMessage) { //log.Printf("MQGetMsgWorker[" + strconv.Itoa(id) + "] : Starting.") - var x ChatWarsMessage + var err error + c := MQClient{ + User: cfg.Rabbit.User, + Password: cfg.Rabbit.Password, + Host: cfg.Rabbit.Host, + Path: cfg.Rabbit.Path, + SSL: false, + } + for true { - c := MQClient{ - User: cfg.Rabbit.User, - Password: cfg.Rabbit.Password, - Host: cfg.Rabbit.Host, - Path: cfg.Rabbit.Path, - SSL: false, - } - - log.Printf("MQGetMsgWorker["+strconv.Itoa(id)+"] : Address of struct : %p\n", &c) - err := OpenMQ(&c) + c.Connection, err = amqp.Dial("amqp://" + c.User + ":" + c.Password + "@" + c.Host + "/" + c.Path) logOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Cannot open MQ connection") - if err != nil { - q, err := c.Channel.QueueDeclare( - "msg", // name - false, // durable - false, // delete when unused - false, // exclusive - false, // no-wait - nil, // arguments - ) - logOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Failed to declare a queue") - if err != nil { - c.Channel.Close() - c.Connection.Close() - time.Sleep(15 * time.Second) - } else { - m, err := c.Channel.Consume( - q.Name, // queue - "", // consumer - true, // auto-ack - false, // exclusive - false, // no-local - false, // no-wait - nil, // args - ) - logOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Failed to register a consumer") - if err != nil { - c.Channel.Close() - c.Connection.Close() - time.Sleep(15 * time.Second) - } else { - for d := range m { - // log.Printf("MQGetMsgWorker["+strconv.Itoa(id)+"] : Received a message: %s", string(d.Body)) - err = json.Unmarshal(d.Body, &x) - logOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Can't unmarshal.\n"+string(d.Body)) - if err == nil { - msgs <- x - } - } - c.Channel.Close() - c.Connection.Close() - } - } - + c.Connection.Close() + time.Sleep(15 * time.Second) + continue } + c.Channel, err = c.Connection.Channel() + logOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Cannot open MQ channel") + if err != nil { + c.Channel.Close() + c.Connection.Close() + time.Sleep(15 * time.Second) + continue + } + + c.Queue, err = c.Channel.QueueDeclare( + "msg", // name + false, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + logOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Failed to declare a queue") + if err != nil { + c.Channel.Close() + c.Connection.Close() + time.Sleep(15 * time.Second) + continue + } + + m, err := c.Channel.Consume( + c.Queue.Name, // queue + "", // consumer + true, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + logOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Failed to register a consumer") + if err != nil { + c.Channel.Close() + c.Connection.Close() + time.Sleep(15 * time.Second) + continue + } + + for d := range m { + var x ChatWarsMessage + // log.Printf("MQGetMsgWorker["+strconv.Itoa(id)+"] : Received a message: %s", string(d.Body)) + err = json.Unmarshal(d.Body, &x) + logOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Can't unmarshal.\n"+string(d.Body)) + if err == nil { + msgs <- x + } + } + + c.Channel.Close() + c.Connection.Close() + } log.Printf("MQGetMsgWorker[" + strconv.Itoa(id) + "] : Closing.")