diff --git a/def.go b/def.go index 3b6cfde..7943079 100644 --- a/def.go +++ b/def.go @@ -23,6 +23,13 @@ type ChatWarsMessage struct { Text string `json:"text"` } +type MQKeepAlive struct { + UserID64 int64 `json:"user_id"` + Nickname string `json:"nick"` + Queue string `json:"queue"` + Date time.Time `json:"date"` +} + const ( commandForwardMsg = 1 commandReplyMsg = 2 diff --git a/main.go b/main.go index 313bd99..ac871d4 100644 --- a/main.go +++ b/main.go @@ -131,9 +131,9 @@ func main() { for w := 1; w <= 3; w++ { go MQReceiveMsgWorker(w, MQTGCmdQueue) } + go MQKeepAliveWorker() lastOwnTDMsg = time.Now() - go ListenCW(client) go ListenMQ(client, MQTGCmdQueue) go ListenMe(client) diff --git a/mq.go b/mq.go index 8e0bc93..39bb5f6 100644 --- a/mq.go +++ b/mq.go @@ -94,3 +94,52 @@ func MQReceiveMsgWorker(id int, cmd chan<- TGCommand) { log.Printf("MQReceiveMsgWorker[" + strconv.Itoa(id) + "] : Closing.") } + +func MQKeepAliveWorker() { + conn, err := amqp.Dial("amqp://" + cfg.Rabbit.User + ":" + cfg.Rabbit.Password + "@" + cfg.Rabbit.Host + "/" + cfg.Rabbit.SendQueue) + failOnError(err, "MQKeepAliveWorker : Failed to connect to RabbitMQ") + defer conn.Close() + + ch, err := conn.Channel() + failOnError(err, "MQKeepAliveWorker : Failed to open a channel") + defer ch.Close() + + q, err := ch.QueueDeclare( + "keepalive", // name + false, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + failOnError(err, "MQKeepAliveWorker : Failed to declare a queue") + + for true { + t := time.Now() + m := MQKeepAlive{ + UserID64: ownUserID64, + Nickname: cfg.Rabbit.User, + Queue: cfg.Rabbit.ReceiveQueue, + Date: t, + } + b, err := json.Marshal(m) + if err != nil { + logOnError(err, "MQKeepAliveWorker : Marshaling message.") + } else { + err = ch.Publish( + "", // exchange + q.Name, // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "application/json", + Body: []byte(b), + }) + if err != nil { + logOnError(err, "MQKeepAliveWorker : Publishing message.") + } + } + t.Sleep(time.Minute) + } + log.Printf("MQKeepAliveWorker : Closing.") +}