parent
5db39673b0
commit
d02e792e24
@ -9,7 +9,7 @@ import (
|
|||||||
func clientKeepAlive(k, v interface{}) bool {
|
func clientKeepAlive(k, v interface{}) bool {
|
||||||
clt := v.(*MQKeepAlive)
|
clt := v.(*MQKeepAlive)
|
||||||
if clt.Date.Add(3 * KeepAliveHeartBeatSeconds * time.Second).Before(time.Now()) {
|
if clt.Date.Add(3 * KeepAliveHeartBeatSeconds * time.Second).Before(time.Now()) {
|
||||||
msgs, err := clientsQueue[clt.UserID64].MQChannel.QueuePurge(clientsQueue[clt.UserID64].MQQueue.Name, false)
|
msgs, err := clientsQueue[clt.UserID64].MQChannel.QueuePurge(clientsQueue[clt.UserID64].Queue.Name, false)
|
||||||
logOnError(err, "clientKeepAlive : Channel.QueuePurge()")
|
logOnError(err, "clientKeepAlive : Channel.QueuePurge()")
|
||||||
clientsQueue[clt.UserID64].Close()
|
clientsQueue[clt.UserID64].Close()
|
||||||
c := TGCommand{
|
c := TGCommand{
|
||||||
|
2
def.go
2
def.go
@ -29,7 +29,7 @@ type MQSession struct {
|
|||||||
MQConnection *amqp.Connection
|
MQConnection *amqp.Connection
|
||||||
MQChannel *amqp.Channel
|
MQChannel *amqp.Channel
|
||||||
MQQueue *amqp.Queue
|
MQQueue *amqp.Queue
|
||||||
MQDelivery <-chan amqp.Delivery
|
MQDelivery <-chan *amqp.Delivery
|
||||||
isConnected bool
|
isConnected bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
6
mq.go
6
mq.go
@ -12,7 +12,7 @@ func (s MQSession) Open() error {
|
|||||||
return errors.New("Session is already connected.")
|
return errors.New("Session is already connected.")
|
||||||
}
|
}
|
||||||
|
|
||||||
if s.SSL == false {
|
if SSL == false {
|
||||||
s.MQConnection, err = amqp.Dial("amqp://" + s.User + ":" + s.Password + "@" + s.Host + "/" + s.Path)
|
s.MQConnection, err = amqp.Dial("amqp://" + s.User + ":" + s.Password + "@" + s.Host + "/" + s.Path)
|
||||||
} else {
|
} else {
|
||||||
return errors.New("SSL connection not implemented")
|
return errors.New("SSL connection not implemented")
|
||||||
@ -29,7 +29,7 @@ func (s MQSession) Open() error {
|
|||||||
s.MQConnection.Close()
|
s.MQConnection.Close()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
s.MQQueue, err = s.MQChannel.QueueDeclare(
|
s.MQQueue, err = s.Channel.QueueDeclare(
|
||||||
s.Queue, // name
|
s.Queue, // name
|
||||||
false, // durable
|
false, // durable
|
||||||
false, // delete when unused
|
false, // delete when unused
|
||||||
@ -71,7 +71,7 @@ func (s MQSession) Publish(content string, msg string) error {
|
|||||||
false, // immediate
|
false, // immediate
|
||||||
amqp.Publishing{
|
amqp.Publishing{
|
||||||
ContentType: content,
|
ContentType: content,
|
||||||
Body: []byte(msg),
|
Body: []byte(m),
|
||||||
})
|
})
|
||||||
logOnError(err, "Publish : Publish")
|
logOnError(err, "Publish : Publish")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
10
workers.go
10
workers.go
@ -316,13 +316,13 @@ func MQTGCmdWorker(id int, cmds <-chan TGCommand) {
|
|||||||
logOnError(err, "MQTGCmdWorker["+strconv.Itoa(id)+"] : Marshal(c)")
|
logOnError(err, "MQTGCmdWorker["+strconv.Itoa(id)+"] : Marshal(c)")
|
||||||
|
|
||||||
if clientIsAlive(c.FromUserID64) {
|
if clientIsAlive(c.FromUserID64) {
|
||||||
err = clientsQueue[c.FromUserID64].Publish("application/json", j)
|
err = clientsQueue[c.FromUserID64].Session.Publish("application/json", j)
|
||||||
logOnError(err, "MQTGCmdWorker["+strconv.Itoa(id)+"] : Publishing message.")
|
logOnError(err, "MQTGCmdWorker["+strconv.Itoa(id)+"] : Publishing message.")
|
||||||
for err != nil && clientIsAlive(c.FromUserID64) {
|
for err != nil && clientIsAlive(c.FromUserID64) {
|
||||||
clientsQueue[c.FromUserID64].Close()
|
clientsQueue[c.FromUserID64].Session.Close()
|
||||||
time.Sleep(5 * time.Second)
|
time.Sleep(5 * time.Second)
|
||||||
clientsQueue[c.FromUserID64].Open()
|
clientsQueue[c.FromUserID64].Session.Open()
|
||||||
err = clientsQueue[c.FromUserID64].Publish("application/json", j)
|
err = clientsQueue[c.FromUserID64].Session.Publish("application/json", j)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
err = nil
|
err = nil
|
||||||
@ -352,7 +352,7 @@ func MQKeepAliveWorker() {
|
|||||||
time.Sleep(15 * time.Second)
|
time.Sleep(15 * time.Second)
|
||||||
err = s.Open()
|
err = s.Open()
|
||||||
}
|
}
|
||||||
for d := range s.MQDelivery {
|
for d := range m {
|
||||||
// log.Printf("MQKeepAliveWorker : Received a message: %s", string(d.Body))
|
// log.Printf("MQKeepAliveWorker : Received a message: %s", string(d.Body))
|
||||||
x := MQKeepAlive{}
|
x := MQKeepAlive{}
|
||||||
err = json.Unmarshal(d.Body, &x)
|
err = json.Unmarshal(d.Body, &x)
|
||||||
|
Loading…
Reference in New Issue
Block a user