This commit is contained in:
shoopea 2019-06-28 13:08:35 +08:00
parent 8957a372d9
commit 3957c8fe3c
4 changed files with 10 additions and 10 deletions

View File

@ -9,7 +9,7 @@ import (
func clientKeepAlive(k, v interface{}) bool { func clientKeepAlive(k, v interface{}) bool {
clt := v.(*MQKeepAlive) clt := v.(*MQKeepAlive)
if clt.Date.Add(3 * KeepAliveHeartBeatSeconds * time.Second).Before(time.Now()) { if clt.Date.Add(3 * KeepAliveHeartBeatSeconds * time.Second).Before(time.Now()) {
msgs, err := clientsQueue[clt.UserID64].MQChannel.QueuePurge(clientsQueue[clt.UserID64].Queue.Name, false) msgs, err := clientsQueue[clt.UserID64].MQChannel.QueuePurge(clientsQueue[clt.UserID64].MQQueue.Name, false)
logOnError(err, "clientKeepAlive : Channel.QueuePurge()") logOnError(err, "clientKeepAlive : Channel.QueuePurge()")
clientsQueue[clt.UserID64].Close() clientsQueue[clt.UserID64].Close()
c := TGCommand{ c := TGCommand{

2
def.go
View File

@ -29,7 +29,7 @@ type MQSession struct {
MQConnection *amqp.Connection MQConnection *amqp.Connection
MQChannel *amqp.Channel MQChannel *amqp.Channel
MQQueue *amqp.Queue MQQueue *amqp.Queue
MQDelivery <-chan *amqp.Delivery MQDelivery <-chan amqp.Delivery
isConnected bool isConnected bool
} }

6
mq.go
View File

@ -12,7 +12,7 @@ func (s MQSession) Open() error {
return errors.New("Session is already connected.") return errors.New("Session is already connected.")
} }
if SSL == false { if s.SSL == false {
s.MQConnection, err = amqp.Dial("amqp://" + s.User + ":" + s.Password + "@" + s.Host + "/" + s.Path) s.MQConnection, err = amqp.Dial("amqp://" + s.User + ":" + s.Password + "@" + s.Host + "/" + s.Path)
} else { } else {
return errors.New("SSL connection not implemented") return errors.New("SSL connection not implemented")
@ -29,7 +29,7 @@ func (s MQSession) Open() error {
s.MQConnection.Close() s.MQConnection.Close()
return err return err
} }
s.MQQueue, err = s.Channel.QueueDeclare( s.MQQueue, err = s.MQChannel.QueueDeclare(
s.Queue, // name s.Queue, // name
false, // durable false, // durable
false, // delete when unused false, // delete when unused
@ -71,7 +71,7 @@ func (s MQSession) Publish(content string, msg string) error {
false, // immediate false, // immediate
amqp.Publishing{ amqp.Publishing{
ContentType: content, ContentType: content,
Body: []byte(m), Body: []byte(msg),
}) })
logOnError(err, "Publish : Publish") logOnError(err, "Publish : Publish")
if err != nil { if err != nil {

View File

@ -316,13 +316,13 @@ func MQTGCmdWorker(id int, cmds <-chan TGCommand) {
logOnError(err, "MQTGCmdWorker["+strconv.Itoa(id)+"] : Marshal(c)") logOnError(err, "MQTGCmdWorker["+strconv.Itoa(id)+"] : Marshal(c)")
if clientIsAlive(c.FromUserID64) { if clientIsAlive(c.FromUserID64) {
err = clientsQueue[c.FromUserID64].Session.Publish("application/json", j) err = clientsQueue[c.FromUserID64].Publish("application/json", j)
logOnError(err, "MQTGCmdWorker["+strconv.Itoa(id)+"] : Publishing message.") logOnError(err, "MQTGCmdWorker["+strconv.Itoa(id)+"] : Publishing message.")
for err != nil && clientIsAlive(c.FromUserID64) { for err != nil && clientIsAlive(c.FromUserID64) {
clientsQueue[c.FromUserID64].Session.Close() clientsQueue[c.FromUserID64].Close()
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
clientsQueue[c.FromUserID64].Session.Open() clientsQueue[c.FromUserID64].Open()
err = clientsQueue[c.FromUserID64].Session.Publish("application/json", j) err = clientsQueue[c.FromUserID64].Publish("application/json", j)
} }
} else { } else {
err = nil err = nil
@ -352,7 +352,7 @@ func MQKeepAliveWorker() {
time.Sleep(15 * time.Second) time.Sleep(15 * time.Second)
err = s.Open() err = s.Open()
} }
for d := range m { for d := range s.MQDelivery {
// log.Printf("MQKeepAliveWorker : Received a message: %s", string(d.Body)) // log.Printf("MQKeepAliveWorker : Received a message: %s", string(d.Body))
x := MQKeepAlive{} x := MQKeepAlive{}
err = json.Unmarshal(d.Body, &x) err = json.Unmarshal(d.Body, &x)