package main import ( "encoding/json" "log" "strconv" "time" "github.com/streadway/amqp" ) func MQSendMsgWorker(id int, msgs <-chan ChatWarsMessage) { conn, err := amqp.Dial("amqp://" + cfg.Rabbit.User + ":" + cfg.Rabbit.Password + "@" + cfg.Rabbit.Host + "/" + cfg.Rabbit.SendQueue) failOnError(err, "MQSendMsgWorker["+strconv.Itoa(id)+"] : Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "MQSendMsgWorker["+strconv.Itoa(id)+"] : Failed to open a channel") defer ch.Close() q, err := ch.QueueDeclare( "msg", // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "MQSendMsgWorker["+strconv.Itoa(id)+"] : Failed to declare a queue") for m := range msgs { b, err := json.Marshal(m) if err != nil { logOnError(err, "MQSendMsgWorker["+strconv.Itoa(id)+"] : Marshaling message.") } else { err = ch.Publish( "", // exchange q.Name, // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "application/json", Body: []byte(b), }) logOnError(err, "MQSendMsgWorker["+strconv.Itoa(id)+"] : Publishing message.") } } log.Printf("MQSendMsgWorker[" + strconv.Itoa(id) + "] : Closing.") } func MQReceiveMsgWorker(id int, cmd chan<- TGCommand) { var c TGCommand conn, err := amqp.Dial("amqp://" + cfg.Rabbit.User + ":" + cfg.Rabbit.Password + "@" + cfg.Rabbit.Host + "/" + cfg.Rabbit.ReceiveQueue) failOnError(err, "MQReceiveMsgWorker["+strconv.Itoa(id)+"] : Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "MQReceiveMsgWorker["+strconv.Itoa(id)+"] : Failed to open a channel") defer ch.Close() q, err := ch.QueueDeclare( "msg", // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "MQReceiveMsgWorker["+strconv.Itoa(id)+"] : Failed to declare a queue") m, err := ch.Consume( q.Name, // queue "", // consumer true, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "MQReceiveMsgWorker["+strconv.Itoa(id)+"] : Failed to declare a consumer") for d := range m { log.Printf("MQReceiveMsgWorker["+strconv.Itoa(id)+"] : Received a message:\n %s", string(d.Body)) err = json.Unmarshal(d.Body, &c) logOnError(err, "MQReceiveMsgWorker["+strconv.Itoa(id)+"] : Can't unmarshal.\n"+string(d.Body)) if err == nil { cmd <- c } } log.Printf("MQReceiveMsgWorker[" + strconv.Itoa(id) + "] : Closing.") } func MQKeepAliveWorker() { conn, err := amqp.Dial("amqp://" + cfg.Rabbit.User + ":" + cfg.Rabbit.Password + "@" + cfg.Rabbit.Host + "/" + cfg.Rabbit.SendQueue) failOnError(err, "MQKeepAliveWorker : Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "MQKeepAliveWorker : Failed to open a channel") defer ch.Close() q, err := ch.QueueDeclare( "keepalive", // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "MQKeepAliveWorker : Failed to declare a queue") for true { t := time.Now() m := MQKeepAlive{ TGUserID64: ownUserID64, Nickname: cfg.Rabbit.User, Queue: cfg.Rabbit.ReceiveQueue, Date: t, } b, err := json.Marshal(m) if err != nil { logOnError(err, "MQKeepAliveWorker : Marshaling message.") } else { err = ch.Publish( "", // exchange q.Name, // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "application/json", Body: []byte(b), }) if err != nil { logOnError(err, "MQKeepAliveWorker : Publishing message.") } } time.Sleep(KeepAliveHeartBeatSeconds * time.Second) } log.Printf("MQKeepAliveWorker : Closing.") }