diff --git a/workers.go b/workers.go index 86cfe02..d2f8f77 100644 --- a/workers.go +++ b/workers.go @@ -16,41 +16,62 @@ import ( func MQGetMsgWorker(id int, msgs chan<- ChatWarsMessage) { //log.Printf("MQGetMsgWorker[" + strconv.Itoa(id) + "] : Starting.") var x ChatWarsMessage - conn, err := amqp.Dial("amqp://" + cfg.Rabbit.User + ":" + cfg.Rabbit.Password + "@" + cfg.Rabbit.Host + "/" + cfg.Rabbit.Queue) - failOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Failed to connect to RabbitMQ") - defer conn.Close() + for true { + conn, err := amqp.Dial("amqps://" + cfg.Rabbit.User + ":" + cfg.Rabbit.Password + "@" + cfg.Rabbit.Host + "/" + cfg.Rabbit.Queue) + logOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Failed to connect to RabbitMQ") + if err != nil { + conn.Close() + time.Sleep(15 * time.Second) + } else { + ch, err := conn.Channel() + logOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Failed to open a channel") + if err != nil { + ch.Close() + time.Sleep(15 * time.Second) + } else { + q, err := ch.QueueDeclare( + "msg", // name + false, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + logOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Failed to declare a queue") + if err != nil { + ch.Close() + conn.Close() + time.Sleep(15 * time.Second) + } else { + m, err := ch.Consume( + q.Name, // queue + "", // consumer + true, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + logOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Failed to register a consumer") + if err != nil { + ch.Close() + conn.Close() + time.Sleep(15 * time.Second) + } else { + for d := range m { + // log.Printf("MQGetMsgWorker["+strconv.Itoa(id)+"] : Received a message: %s", string(d.Body)) + err = json.Unmarshal(d.Body, &x) + logOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Can't unmarshal.\n"+string(d.Body)) + if err == nil { + msgs <- x + } + } + ch.Close() + conn.Close() + } + } - ch, err := conn.Channel() - failOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Failed to open a channel") - defer ch.Close() - - q, err := ch.QueueDeclare( - "msg", // name - false, // durable - false, // delete when unused - false, // exclusive - false, // no-wait - nil, // arguments - ) - failOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Failed to declare a queue") - - m, err := ch.Consume( - q.Name, // queue - "", // consumer - true, // auto-ack - false, // exclusive - false, // no-local - false, // no-wait - nil, // args - ) - failOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Failed to register a consumer") - - for d := range m { - // log.Printf("MQGetMsgWorker["+strconv.Itoa(id)+"] : Received a message: %s", string(d.Body)) - err = json.Unmarshal(d.Body, &x) - logOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Can't unmarshal.\n"+string(d.Body)) - if err == nil { - msgs <- x + } } } @@ -347,87 +368,109 @@ func MQTGCmdWorker(id int, cmds <-chan TGCommand) { func MQKeepAliveWorker() { //log.Printf("MQKeepAliveWorker : Starting.") - conn, err := amqp.Dial("amqp://" + cfg.Rabbit.User + ":" + cfg.Rabbit.Password + "@" + cfg.Rabbit.Host + "/" + cfg.Rabbit.Queue) - failOnError(err, "MQKeepAliveWorker : Failed to connect to RabbitMQ") - defer conn.Close() - - ch, err := conn.Channel() - failOnError(err, "MQKeepAliveWorker : Failed to open a channel") - defer ch.Close() - - q, err := ch.QueueDeclare( - "keepalive", // name - false, // durable - false, // delete when unused - false, // exclusive - false, // no-wait - nil, // arguments - ) - failOnError(err, "MQKeepAliveWorker : Failed to declare a queue") - - m, err := ch.Consume( - q.Name, // queue - "", // consumer - true, // auto-ack - false, // exclusive - false, // no-local - false, // no-wait - nil, // args - ) - failOnError(err, "MQKeepAliveWorker : Failed to register a consumer") - - for d := range m { - // log.Printf("MQKeepAliveWorker : Received a message: %s", string(d.Body)) - x := MQKeepAlive{} - err = json.Unmarshal(d.Body, &x) - logOnError(err, "MQKeepAliveWorker : Can't unmarshal.\n"+string(d.Body)) - if err == nil { - if x.Date.Add(10 * time.Second).Before(time.Now()) { - // outdated keep-alive coming from client - } else if v, ok := clientsKeepAlive.Load(x.UserID64); ok { - k := v.(*MQKeepAlive) - k.Date = x.Date + for true { + conn, err := amqp.Dial("amqps://" + cfg.Rabbit.User + ":" + cfg.Rabbit.Password + "@" + cfg.Rabbit.Host + "/" + cfg.Rabbit.Queue) + logOnError(err, "MQKeepAliveWorker : Failed to connect to RabbitMQ") + if err != nil { + conn.Close() + time.Sleep(15 * time.Second) + } else { + ch, err := conn.Channel() + logOnError(err, "MQKeepAliveWorker : Failed to open a channel") + if err != nil { + ch.Close() + conn.Close() + time.Sleep(15 * time.Second) } else { - clt := MQClient{} - clt.Connection, err = amqp.Dial("amqp://" + cfg.Rabbit.User + ":" + cfg.Rabbit.Password + "@" + cfg.Rabbit.Host + "/" + x.Queue) - logOnError(err, "MQKeepAliveWorker : Failed to connect to RabbitMQ") - clt.Channel, err = clt.Connection.Channel() - logOnError(err, "MQKeepAliveWorker : Failed to open a channel") - clt.Queue, err = clt.Channel.QueueDeclare( - "msg", // name - false, // durable - false, // delete when unused - false, // exclusive - false, // no-wait - nil, // arguments + q, err := ch.QueueDeclare( + "keepalive", // name + false, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments ) logOnError(err, "MQKeepAliveWorker : Failed to declare a queue") - clientsKeepAlive.Store(x.UserID64, &x) - clientsQueue[x.UserID64] = &clt + if err != nil { + ch.Close() + conn.Close() + time.Sleep(15 * time.Second) + } else { + m, err := ch.Consume( + q.Name, // queue + "", // consumer + true, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + logOnError(err, "MQKeepAliveWorker : Failed to register a consumer") + if err != nil { + ch.Close() + conn.Close() + time.Sleep(15 * time.Second) + } else { + for d := range m { + // log.Printf("MQKeepAliveWorker : Received a message: %s", string(d.Body)) + x := MQKeepAlive{} + err = json.Unmarshal(d.Body, &x) + logOnError(err, "MQKeepAliveWorker : Can't unmarshal.\n"+string(d.Body)) + if err == nil { + if x.Date.Add(10 * time.Second).Before(time.Now()) { + // outdated keep-alive coming from client + } else if v, ok := clientsKeepAlive.Load(x.UserID64); ok { + k := v.(*MQKeepAlive) + k.Date = x.Date + } else { + clt := MQClient{} + clt.Connection, err = amqp.Dial("amqp://" + cfg.Rabbit.User + ":" + cfg.Rabbit.Password + "@" + cfg.Rabbit.Host + "/" + x.Queue) + logOnError(err, "MQKeepAliveWorker : Failed to connect to RabbitMQ") + clt.Channel, err = clt.Connection.Channel() + logOnError(err, "MQKeepAliveWorker : Failed to open a channel") + clt.Queue, err = clt.Channel.QueueDeclare( + "msg", // name + false, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + logOnError(err, "MQKeepAliveWorker : Failed to declare a queue") + clientsKeepAlive.Store(x.UserID64, &x) + clientsQueue[x.UserID64] = &clt - c := TGCommand{ - Type: commandSendMsg, - ToUserID64: x.UserID64, - Text: "Your client is connected.", - } - TGCmdQueue <- c - c = TGCommand{ - Type: commandSendMsg, - ToUserID64: cfg.Bot.Admin, - Text: fmt.Sprintf("Client `%s` is connected.", x.Nickname), - } - TGCmdQueue <- c + c := TGCommand{ + Type: commandSendMsg, + ToUserID64: x.UserID64, + Text: "Your client is connected.", + } + TGCmdQueue <- c + c = TGCommand{ + Type: commandSendMsg, + ToUserID64: cfg.Bot.Admin, + Text: fmt.Sprintf("Client `%s` is connected.", x.Nickname), + } + TGCmdQueue <- c - clientSendCWMsg(x.UserID64, `🏅Me`) - /* - c = TGCommand{ - Type: commandSendMsg, - FromUserID64: x.UserID64, - ToChatID64: userID64ChtWrsBot, - Text: `/hero`, + clientSendCWMsg(x.UserID64, `🏅Me`) + /* + c = TGCommand{ + Type: commandSendMsg, + FromUserID64: x.UserID64, + ToChatID64: userID64ChtWrsBot, + Text: `/hero`, + } + MQTGCmdQueue <- c + */ + } + } + } + ch.Close() + conn.Close() } - MQTGCmdQueue <- c - */ + } + } } }