2019-05-04 05:53:35 +02:00
package main
import (
"encoding/json"
"log"
2019-05-04 05:56:03 +02:00
"strconv"
2019-05-17 08:26:19 +02:00
"time"
2019-05-04 05:53:35 +02:00
"github.com/streadway/amqp"
)
2019-05-04 05:56:03 +02:00
func MQSendMsgWorker ( id int , msgs <- chan ChatWarsMessage ) {
2019-05-14 11:34:15 +02:00
conn , err := amqp . Dial ( "amqp://" + cfg . Rabbit . User + ":" + cfg . Rabbit . Password + "@" + cfg . Rabbit . Host + "/" + cfg . Rabbit . SendQueue )
2019-05-04 05:53:35 +02:00
failOnError ( err , "MQSendMsgWorker[" + strconv . Itoa ( id ) + "] : Failed to connect to RabbitMQ" )
defer conn . Close ( )
ch , err := conn . Channel ( )
failOnError ( err , "MQSendMsgWorker[" + strconv . Itoa ( id ) + "] : Failed to open a channel" )
defer ch . Close ( )
q , err := ch . QueueDeclare (
"msg" , // name
false , // durable
false , // delete when unused
false , // exclusive
false , // no-wait
nil , // arguments
)
failOnError ( err , "MQSendMsgWorker[" + strconv . Itoa ( id ) + "] : Failed to declare a queue" )
for m := range msgs {
b , err := json . Marshal ( m )
if err != nil {
logOnError ( err , "MQSendMsgWorker[" + strconv . Itoa ( id ) + "] : Marshaling message." )
} else {
err = ch . Publish (
"" , // exchange
q . Name , // routing key
false , // mandatory
false , // immediate
amqp . Publishing {
ContentType : "application/json" ,
Body : [ ] byte ( b ) ,
} )
2019-05-17 10:36:52 +02:00
logOnError ( err , "MQSendMsgWorker[" + strconv . Itoa ( id ) + "] : Publishing message." )
2019-05-04 05:53:35 +02:00
}
}
log . Printf ( "MQSendMsgWorker[" + strconv . Itoa ( id ) + "] : Closing." )
}
2019-05-14 11:34:15 +02:00
2019-05-16 04:06:27 +02:00
func MQReceiveMsgWorker ( id int , cmd chan <- TGCommand ) {
var c TGCommand
2019-05-14 11:34:15 +02:00
conn , err := amqp . Dial ( "amqp://" + cfg . Rabbit . User + ":" + cfg . Rabbit . Password + "@" + cfg . Rabbit . Host + "/" + cfg . Rabbit . ReceiveQueue )
failOnError ( err , "MQReceiveMsgWorker[" + strconv . Itoa ( id ) + "] : Failed to connect to RabbitMQ" )
defer conn . Close ( )
ch , err := conn . Channel ( )
failOnError ( err , "MQReceiveMsgWorker[" + strconv . Itoa ( id ) + "] : Failed to open a channel" )
defer ch . Close ( )
q , err := ch . QueueDeclare (
"msg" , // name
false , // durable
false , // delete when unused
false , // exclusive
false , // no-wait
nil , // arguments
)
failOnError ( err , "MQReceiveMsgWorker[" + strconv . Itoa ( id ) + "] : Failed to declare a queue" )
m , err := ch . Consume (
q . Name , // queue
"" , // consumer
true , // auto-ack
false , // exclusive
false , // no-local
false , // no-wait
nil , // args
)
failOnError ( err , "MQReceiveMsgWorker[" + strconv . Itoa ( id ) + "] : Failed to declare a consumer" )
for d := range m {
2019-06-11 05:09:30 +02:00
log . Printf ( "MQReceiveMsgWorker[" + strconv . Itoa ( id ) + "] : Received a message:\n %s" , string ( d . Body ) )
2019-05-15 04:36:57 +02:00
err = json . Unmarshal ( d . Body , & c )
2019-05-14 11:34:15 +02:00
logOnError ( err , "MQReceiveMsgWorker[" + strconv . Itoa ( id ) + "] : Can't unmarshal.\n" + string ( d . Body ) )
if err == nil {
2019-06-11 04:57:02 +02:00
log . Printf ( "****************************** New MQ message ******************************\n%s\n****************************************************************************\n" , string ( d . Body ) )
2019-05-15 04:36:57 +02:00
cmd <- c
2019-05-14 11:34:15 +02:00
}
}
log . Printf ( "MQReceiveMsgWorker[" + strconv . Itoa ( id ) + "] : Closing." )
}
2019-05-17 08:22:34 +02:00
func MQKeepAliveWorker ( ) {
conn , err := amqp . Dial ( "amqp://" + cfg . Rabbit . User + ":" + cfg . Rabbit . Password + "@" + cfg . Rabbit . Host + "/" + cfg . Rabbit . SendQueue )
failOnError ( err , "MQKeepAliveWorker : Failed to connect to RabbitMQ" )
defer conn . Close ( )
ch , err := conn . Channel ( )
failOnError ( err , "MQKeepAliveWorker : Failed to open a channel" )
defer ch . Close ( )
q , err := ch . QueueDeclare (
"keepalive" , // name
false , // durable
false , // delete when unused
false , // exclusive
false , // no-wait
nil , // arguments
)
failOnError ( err , "MQKeepAliveWorker : Failed to declare a queue" )
for true {
t := time . Now ( )
m := MQKeepAlive {
UserID64 : ownUserID64 ,
Nickname : cfg . Rabbit . User ,
Queue : cfg . Rabbit . ReceiveQueue ,
Date : t ,
}
b , err := json . Marshal ( m )
if err != nil {
logOnError ( err , "MQKeepAliveWorker : Marshaling message." )
} else {
err = ch . Publish (
"" , // exchange
q . Name , // routing key
false , // mandatory
false , // immediate
amqp . Publishing {
ContentType : "application/json" ,
Body : [ ] byte ( b ) ,
} )
if err != nil {
logOnError ( err , "MQKeepAliveWorker : Publishing message." )
}
}
2019-05-17 08:26:49 +02:00
time . Sleep ( time . Minute )
2019-05-17 08:22:34 +02:00
}
log . Printf ( "MQKeepAliveWorker : Closing." )
}