test
This commit is contained in:
parent
37945c5b60
commit
52d3ba9cce
87
workers.go
87
workers.go
@ -230,6 +230,67 @@ func MQKeepAliveWorker() {
|
||||
|
||||
}
|
||||
|
||||
func MQTGCmdWorker(id int, cmds <-chan TGCommand) {
|
||||
//log.Printf("MQTGCmdWorker[" + strconv.Itoa(id) + "] : Starting.")
|
||||
for c := range cmds {
|
||||
if _, ok := clientsKeepAlive.Load(c.FromUserID64); ok {
|
||||
j, err := json.Marshal(c)
|
||||
logOnError(err, "MQTGCmdWorker["+strconv.Itoa(id)+"] : Marshal(c)")
|
||||
//log.Printf("MQTGCmdWorker["+strconv.Itoa(id)+"] : new command.\n%s\n", string(j))
|
||||
for clientsQueue[c.FromUserID64].Connection.IsClosed() {
|
||||
clientsQueue[c.FromUserID64].Connection, err = amqp.Dial("amqp://" + c.User + ":" + c.Password + "@" + c.Host + "/" + c.Path)
|
||||
logOnError(err, "MQKeepAliveWorker : Cannot open MQ connection")
|
||||
if err != nil {
|
||||
clientsQueue[c.FromUserID64].Connection.Close()
|
||||
time.Sleep(15 * time.Second)
|
||||
continue
|
||||
}
|
||||
|
||||
clientsQueue[c.FromUserID64].Channel, err = clientsQueue[c.FromUserID64].Connection.Channel()
|
||||
logOnError(err, "MQKeepAliveWorker : Cannot open MQ channel")
|
||||
if err != nil {
|
||||
clientsQueue[c.FromUserID64].Channel.Close()
|
||||
clientsQueue[c.FromUserID64].Connection.Close()
|
||||
time.Sleep(15 * time.Second)
|
||||
continue
|
||||
}
|
||||
|
||||
clientsQueue[c.FromUserID64].Queue, err = clientsQueue[c.FromUserID64].Channel.QueueDeclare(
|
||||
"keepalive", // name
|
||||
false, // durable
|
||||
false, // delete when unused
|
||||
false, // exclusive
|
||||
false, // no-wait
|
||||
nil, // arguments
|
||||
)
|
||||
logOnError(err, "MQKeepAliveWorker : Failed to declare a queue")
|
||||
if err != nil {
|
||||
clientsQueue[c.FromUserID64].Channel.Close()
|
||||
clientsQueue[c.FromUserID64].Connection.Close()
|
||||
time.Sleep(15 * time.Second)
|
||||
continue
|
||||
}
|
||||
|
||||
}
|
||||
err = clientsQueue[c.FromUserID64].Channel.Publish(
|
||||
"", // exchange
|
||||
clientsQueue[c.FromUserID64].Queue.Name, // routing key
|
||||
false, // mandatory
|
||||
false, // immediate
|
||||
amqp.Publishing{
|
||||
ContentType: "application/json",
|
||||
Body: []byte(j),
|
||||
})
|
||||
logOnError(err, "MQTGCmdWorker["+strconv.Itoa(id)+"] : Publishing message.")
|
||||
} else {
|
||||
log.Printf("MQTGCmdWorker["+strconv.Itoa(id)+"] : client %d offline.\n", c.FromUserID64)
|
||||
}
|
||||
}
|
||||
|
||||
log.Printf("MQTGCmdWorker[" + strconv.Itoa(id) + "] : Closing.")
|
||||
|
||||
}
|
||||
|
||||
func SQLCWMsgWorker(id int, msgs <-chan ChatWarsMessage, objIds chan<- int64) {
|
||||
//log.Printf("SQLCWMsgWorker[" + strconv.Itoa(id) + "] : Starting.")
|
||||
for m := range msgs {
|
||||
@ -491,32 +552,6 @@ func TGCmdWorker(id int, b *tb.Bot, cmds <-chan TGCommand) {
|
||||
log.Printf("TGCmdWorker[" + strconv.Itoa(id) + "] : Closing.")
|
||||
}
|
||||
|
||||
func MQTGCmdWorker(id int, cmds <-chan TGCommand) {
|
||||
//log.Printf("MQTGCmdWorker[" + strconv.Itoa(id) + "] : Starting.")
|
||||
for c := range cmds {
|
||||
if _, ok := clientsKeepAlive.Load(c.FromUserID64); ok {
|
||||
j, err := json.Marshal(c)
|
||||
logOnError(err, "MQTGCmdWorker["+strconv.Itoa(id)+"] : Marshal(c)")
|
||||
//log.Printf("MQTGCmdWorker["+strconv.Itoa(id)+"] : new command.\n%s\n", string(j))
|
||||
err = clientsQueue[c.FromUserID64].Channel.Publish(
|
||||
"", // exchange
|
||||
clientsQueue[c.FromUserID64].Queue.Name, // routing key
|
||||
false, // mandatory
|
||||
false, // immediate
|
||||
amqp.Publishing{
|
||||
ContentType: "application/json",
|
||||
Body: []byte(j),
|
||||
})
|
||||
logOnError(err, "MQTGCmdWorker["+strconv.Itoa(id)+"] : Publishing message.")
|
||||
} else {
|
||||
log.Printf("MQTGCmdWorker["+strconv.Itoa(id)+"] : client %d offline.\n", c.FromUserID64)
|
||||
}
|
||||
}
|
||||
|
||||
log.Printf("MQTGCmdWorker[" + strconv.Itoa(id) + "] : Closing.")
|
||||
|
||||
}
|
||||
|
||||
func MQTidyKeepAliveWorker() {
|
||||
//log.Printf("MQTidyKeepAliveWorker : Starting.")
|
||||
for true {
|
||||
|
Loading…
Reference in New Issue
Block a user