parent
afffc3beb9
commit
0a0c8e7960
@ -14,7 +14,7 @@ database = chirpnest
|
||||
user = chirpnest
|
||||
password = chirpnest
|
||||
host = localhost:5672
|
||||
path = chirpnest
|
||||
queue = chirpnest
|
||||
|
||||
[bot]
|
||||
admin = 0
|
||||
|
@ -34,11 +34,6 @@ func clientKeepAlive(k, v interface{}) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func clientIsAlive(id int64) bool {
|
||||
_, ok := clientsKeepAlive.Load(id)
|
||||
return ok
|
||||
}
|
||||
|
||||
func clientSendCWMsg(userID64 int64, s string) {
|
||||
c := TGCommand{
|
||||
Type: commandSendMsg,
|
||||
|
16
def.go
16
def.go
@ -19,18 +19,10 @@ type MQKeepAlive struct {
|
||||
Date time.Time `json:"date"`
|
||||
}
|
||||
|
||||
type MQSession struct {
|
||||
Host string
|
||||
SSL bool
|
||||
User string
|
||||
Password string
|
||||
Path string
|
||||
Queue string
|
||||
MQConnection *amqp.Connection
|
||||
MQChannel *amqp.Channel
|
||||
MQQueue *amqp.Queue
|
||||
MQDelivery <-chan *ampq.Delivery
|
||||
isConnected bool
|
||||
type MQClient struct {
|
||||
Connection *amqp.Connection
|
||||
Channel *amqp.Channel
|
||||
Queue amqp.Queue
|
||||
}
|
||||
|
||||
type ChatWarsClient struct {
|
||||
|
6
main.go
6
main.go
@ -32,7 +32,7 @@ type Config struct {
|
||||
User string
|
||||
Password string
|
||||
Host string
|
||||
Path string
|
||||
Queue string
|
||||
}
|
||||
Bot struct {
|
||||
Admin int64
|
||||
@ -60,7 +60,7 @@ var (
|
||||
JobQueue chan Job
|
||||
|
||||
msgParsingRules map[int]MessageParsingRule
|
||||
clientsQueue map[int64]*MQSession
|
||||
clientsQueue map[int64]*MQClient
|
||||
|
||||
clientsCW *sync.Map
|
||||
clientsKeepAlive *sync.Map
|
||||
@ -153,7 +153,7 @@ func main() {
|
||||
TGCmdQueue = make(chan TGCommand, TGCmdQueueSize)
|
||||
MQTGCmdQueue = make(chan TGCommand, MQTGCmdQueueSize)
|
||||
JobQueue = make(chan Job, JobQueueSize)
|
||||
clientsQueue = make(map[int64]*MQSession)
|
||||
clientsQueue = make(map[int64]*MQClient)
|
||||
|
||||
clientsCW = new(sync.Map)
|
||||
clientsKeepAlive = new(sync.Map)
|
||||
|
88
mq.go
88
mq.go
@ -1,89 +1 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"error"
|
||||
|
||||
"github.com/streadway/amqp"
|
||||
)
|
||||
|
||||
func (s MQSession) Open() error {
|
||||
if s.isConnected {
|
||||
return error.Error("Session is already connected.")
|
||||
}
|
||||
|
||||
if SLL == false {
|
||||
s.MQConnection, err = amqp.Dial("amqp://" + Session.User + ":" + Session.Password + "@" + Session.Host + "/" + Session.Path)
|
||||
} else {
|
||||
return error.Error("SSL connection not implemented")
|
||||
}
|
||||
logOnError(err, "Open : Failed to connect to RabbitMQ")
|
||||
if err != nil {
|
||||
s.MQConnection.Close()
|
||||
return err
|
||||
}
|
||||
s.MQChannel, err = s.MQConnection.Channel()
|
||||
logOnError(err, "Open : Failed to open channel")
|
||||
if err != nil {
|
||||
s.MQChannel.Close()
|
||||
s.MQConnection.Close()
|
||||
return err
|
||||
}
|
||||
s.MQQueue, err = s.Channel.QueueDeclare(
|
||||
s.Queue, // name
|
||||
false, // durable
|
||||
false, // delete when unused
|
||||
false, // exclusive
|
||||
false, // no-wait
|
||||
nil, // arguments
|
||||
|
||||
)
|
||||
logOnError(err, "Open : Failed to declare queue")
|
||||
if err != nil {
|
||||
s.MQChannel.Close()
|
||||
s.MQConnection.Close()
|
||||
return err
|
||||
}
|
||||
s.MQDelivery, err = s.MQChannel.Consume(
|
||||
s.MQQueue.Name, // queue
|
||||
"", // consumer
|
||||
true, // auto-ack
|
||||
false, // exclusive
|
||||
false, // no-local
|
||||
false, // no-wait
|
||||
nil, // args
|
||||
)
|
||||
logOnError(err, "Open : Failed to register a consumer")
|
||||
if err != nil {
|
||||
s.MQChannel.Close()
|
||||
s.MQConnection.Close()
|
||||
return err
|
||||
}
|
||||
s.isConnected = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s MQSession) Publish(content string, msg string) error {
|
||||
err := s.MQChannel.Publish(
|
||||
"", // exchange
|
||||
s.MQQueue.Name, // routing key
|
||||
false, // mandatory
|
||||
false, // immediate
|
||||
amqp.Publishing{
|
||||
ContentType: content,
|
||||
Body: []byte(m),
|
||||
})
|
||||
logOnError(err, "Publish : Publish")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s MQSession) Close() {
|
||||
if s.isConnected {
|
||||
s.MQChannel.Close()
|
||||
s.MQConnection.Close()
|
||||
s.isConnected = false
|
||||
}
|
||||
return
|
||||
}
|
||||
|
176
workers.go
176
workers.go
@ -16,25 +16,49 @@ import (
|
||||
func MQGetMsgWorker(id int, msgs chan<- ChatWarsMessage) {
|
||||
//log.Printf("MQGetMsgWorker[" + strconv.Itoa(id) + "] : Starting.")
|
||||
var x ChatWarsMessage
|
||||
s := MQSession{
|
||||
User: cfg.Rabbit.User,
|
||||
Password: cfg.Rabbit.Password,
|
||||
Host: cfg.Rabbit.Host,
|
||||
Path: cfg.Rabbit.Path,
|
||||
SSL: false,
|
||||
Queue: "msg",
|
||||
isConnected: false,
|
||||
}
|
||||
for true {
|
||||
err := s.Open()
|
||||
for err != nil {
|
||||
logOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Connection to RabbitMQ failed.")
|
||||
time.Sleep(15)
|
||||
err = s.Open()
|
||||
}
|
||||
log.Printf("MQGetMsgWorker[" + strconv.Itoa(id) + "] : Connected to RabbitMQ")
|
||||
|
||||
for d := range s.MQDelivery {
|
||||
conn, err := amqp.Dial("amqp://" + cfg.Rabbit.User + ":" + cfg.Rabbit.Password + "@" + cfg.Rabbit.Host + "/" + cfg.Rabbit.Queue)
|
||||
logOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Failed to connect to RabbitMQ")
|
||||
if err != nil {
|
||||
conn.Close()
|
||||
time.Sleep(15 * time.Second)
|
||||
} else {
|
||||
ch, err := conn.Channel()
|
||||
logOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Failed to open a channel")
|
||||
if err != nil {
|
||||
ch.Close()
|
||||
time.Sleep(15 * time.Second)
|
||||
} else {
|
||||
q, err := ch.QueueDeclare(
|
||||
"msg", // name
|
||||
false, // durable
|
||||
false, // delete when unused
|
||||
false, // exclusive
|
||||
false, // no-wait
|
||||
nil, // arguments
|
||||
)
|
||||
logOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Failed to declare a queue")
|
||||
if err != nil {
|
||||
ch.Close()
|
||||
conn.Close()
|
||||
time.Sleep(15 * time.Second)
|
||||
} else {
|
||||
m, err := ch.Consume(
|
||||
q.Name, // queue
|
||||
"", // consumer
|
||||
true, // auto-ack
|
||||
false, // exclusive
|
||||
false, // no-local
|
||||
false, // no-wait
|
||||
nil, // args
|
||||
)
|
||||
logOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Failed to register a consumer")
|
||||
if err != nil {
|
||||
ch.Close()
|
||||
conn.Close()
|
||||
time.Sleep(15 * time.Second)
|
||||
} else {
|
||||
for d := range m {
|
||||
// log.Printf("MQGetMsgWorker["+strconv.Itoa(id)+"] : Received a message: %s", string(d.Body))
|
||||
err = json.Unmarshal(d.Body, &x)
|
||||
logOnError(err, "MQGetMsgWorker["+strconv.Itoa(id)+"] : Can't unmarshal.\n"+string(d.Body))
|
||||
@ -42,7 +66,13 @@ func MQGetMsgWorker(id int, msgs chan<- ChatWarsMessage) {
|
||||
msgs <- x
|
||||
}
|
||||
}
|
||||
s.Close()
|
||||
ch.Close()
|
||||
conn.Close()
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log.Printf("MQGetMsgWorker[" + strconv.Itoa(id) + "] : Closing.")
|
||||
@ -313,20 +343,21 @@ func TGCmdWorker(id int, b *tb.Bot, cmds <-chan TGCommand) {
|
||||
func MQTGCmdWorker(id int, cmds <-chan TGCommand) {
|
||||
//log.Printf("MQTGCmdWorker[" + strconv.Itoa(id) + "] : Starting.")
|
||||
for c := range cmds {
|
||||
if _, ok := clientsKeepAlive.Load(c.FromUserID64); ok {
|
||||
j, err := json.Marshal(c)
|
||||
logOnError(err, "MQTGCmdWorker["+strconv.Itoa(id)+"] : Marshal(c)")
|
||||
|
||||
if clientIsAlive(c.FromUserID64) {
|
||||
err = clientsQueue[c.FromUserID64].Session.Publish("application/json", j)
|
||||
//log.Printf("MQTGCmdWorker["+strconv.Itoa(id)+"] : new command.\n%s\n", string(j))
|
||||
err = clientsQueue[c.FromUserID64].Channel.Publish(
|
||||
"", // exchange
|
||||
clientsQueue[c.FromUserID64].Queue.Name, // routing key
|
||||
false, // mandatory
|
||||
false, // immediate
|
||||
amqp.Publishing{
|
||||
ContentType: "application/json",
|
||||
Body: []byte(j),
|
||||
})
|
||||
logOnError(err, "MQTGCmdWorker["+strconv.Itoa(id)+"] : Publishing message.")
|
||||
for err != nil && clientIsAlive(c.FromUserID64) {
|
||||
clientsQueue[c.FromUserID64].Session.Close()
|
||||
time.Sleep(5 * time.Second)
|
||||
clientsQueue[c.FromUserID64].Session.Open()
|
||||
err = clientsQueue[c.FromUserID64].Session.Publish("application/json", j)
|
||||
}
|
||||
} else {
|
||||
err = nil
|
||||
log.Printf("MQTGCmdWorker["+strconv.Itoa(id)+"] : client %d offline.\n", c.FromUserID64)
|
||||
}
|
||||
}
|
||||
@ -336,23 +367,50 @@ func MQTGCmdWorker(id int, cmds <-chan TGCommand) {
|
||||
}
|
||||
|
||||
func MQKeepAliveWorker() {
|
||||
s := MQSession{
|
||||
User: cfg.Rabbit.User,
|
||||
Password: cfg.Rabbit.Password,
|
||||
Host: cfg.Rabbit.Host,
|
||||
Path: cfg.Rabbit.Path,
|
||||
SSL: false,
|
||||
Queue: "keepalive",
|
||||
isConnected: false,
|
||||
}
|
||||
//log.Printf("MQKeepAliveWorker : Starting.")
|
||||
for true {
|
||||
err := s.Open()
|
||||
for err != nil {
|
||||
conn, err := amqp.Dial("amqp://" + cfg.Rabbit.User + ":" + cfg.Rabbit.Password + "@" + cfg.Rabbit.Host + "/" + cfg.Rabbit.Queue)
|
||||
logOnError(err, "MQKeepAliveWorker : Failed to connect to RabbitMQ")
|
||||
if err != nil {
|
||||
conn.Close()
|
||||
time.Sleep(15 * time.Second)
|
||||
err = s.Open()
|
||||
}
|
||||
} else {
|
||||
ch, err := conn.Channel()
|
||||
logOnError(err, "MQKeepAliveWorker : Failed to open a channel")
|
||||
if err != nil {
|
||||
ch.Close()
|
||||
conn.Close()
|
||||
time.Sleep(15 * time.Second)
|
||||
} else {
|
||||
q, err := ch.QueueDeclare(
|
||||
"keepalive", // name
|
||||
false, // durable
|
||||
false, // delete when unused
|
||||
false, // exclusive
|
||||
false, // no-wait
|
||||
nil, // arguments
|
||||
)
|
||||
logOnError(err, "MQKeepAliveWorker : Failed to declare a queue")
|
||||
if err != nil {
|
||||
ch.Close()
|
||||
conn.Close()
|
||||
time.Sleep(15 * time.Second)
|
||||
} else {
|
||||
m, err := ch.Consume(
|
||||
q.Name, // queue
|
||||
"", // consumer
|
||||
true, // auto-ack
|
||||
false, // exclusive
|
||||
false, // no-local
|
||||
false, // no-wait
|
||||
nil, // args
|
||||
)
|
||||
logOnError(err, "MQKeepAliveWorker : Failed to register a consumer")
|
||||
if err != nil {
|
||||
ch.Close()
|
||||
conn.Close()
|
||||
time.Sleep(15 * time.Second)
|
||||
} else {
|
||||
for d := range m {
|
||||
// log.Printf("MQKeepAliveWorker : Received a message: %s", string(d.Body))
|
||||
x := MQKeepAlive{}
|
||||
@ -365,19 +423,22 @@ func MQKeepAliveWorker() {
|
||||
k := v.(*MQKeepAlive)
|
||||
k.Date = x.Date
|
||||
} else {
|
||||
cs := MQSession{
|
||||
User: cfg.Rabbit.User,
|
||||
Password: cfg.Rabbit.Password,
|
||||
Host: cfg.Rabbit.Host,
|
||||
Path: x.Queue,
|
||||
SSL: false,
|
||||
Queue: "msg",
|
||||
isConnected: false,
|
||||
}
|
||||
err = cs.Open()
|
||||
logOnError(err, "MQKeepAliveWorker : Failed to open MQ session")
|
||||
clt := MQClient{}
|
||||
clt.Connection, err = amqp.Dial("amqp://" + cfg.Rabbit.User + ":" + cfg.Rabbit.Password + "@" + cfg.Rabbit.Host + "/" + x.Queue)
|
||||
logOnError(err, "MQKeepAliveWorker : Failed to connect to RabbitMQ")
|
||||
clt.Channel, err = clt.Connection.Channel()
|
||||
logOnError(err, "MQKeepAliveWorker : Failed to open a channel")
|
||||
clt.Queue, err = clt.Channel.QueueDeclare(
|
||||
"msg", // name
|
||||
false, // durable
|
||||
false, // delete when unused
|
||||
false, // exclusive
|
||||
false, // no-wait
|
||||
nil, // arguments
|
||||
)
|
||||
logOnError(err, "MQKeepAliveWorker : Failed to declare a queue")
|
||||
clientsKeepAlive.Store(x.UserID64, &x)
|
||||
clientsQueue[x.UserID64] = &cs
|
||||
clientsQueue[x.UserID64] = &clt
|
||||
|
||||
c := TGCommand{
|
||||
Type: commandSendMsg,
|
||||
@ -405,6 +466,13 @@ func MQKeepAliveWorker() {
|
||||
}
|
||||
}
|
||||
}
|
||||
ch.Close()
|
||||
conn.Close()
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log.Printf("MQKeepAliveWorker : Closing.")
|
||||
|
Loading…
Reference in New Issue
Block a user