package main import ( "encoding/json" "log" "strconv" "github.com/streadway/amqp" ) func MQSendMsgWorker(id int, msgs <-chan ChatWarsMessage) { conn, err := amqp.Dial("amqp://" + cfg.Rabbit.User + ":" + cfg.Rabbit.Password + "@" + cfg.Rabbit.Host + "/" + cfg.Rabbit.Queue) failOnError(err, "MQSendMsgWorker["+strconv.Itoa(id)+"] : Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "MQSendMsgWorker["+strconv.Itoa(id)+"] : Failed to open a channel") defer ch.Close() q, err := ch.QueueDeclare( "msg", // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "MQSendMsgWorker["+strconv.Itoa(id)+"] : Failed to declare a queue") for m := range msgs { log.Printf("MQSendMsgWorker[" + strconv.Itoa(id) + "] : Sending a message.") b, err := json.Marshal(m) if err != nil { logOnError(err, "MQSendMsgWorker["+strconv.Itoa(id)+"] : Marshaling message.") } else { err = ch.Publish( "", // exchange q.Name, // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "application/json", Body: []byte(b), }) if err != nil { logOnError(err, "MQSendMsgWorker["+strconv.Itoa(id)+"] : Publishing message.") } } } log.Printf("MQSendMsgWorker[" + strconv.Itoa(id) + "] : Closing.") }