From 3957c8fe3c83ec9c85ed6a35edde85eff27052db Mon Sep 17 00:00:00 2001 From: shoopea Date: Fri, 28 Jun 2019 13:08:35 +0800 Subject: [PATCH] test --- client.go | 2 +- def.go | 2 +- mq.go | 6 +++--- workers.go | 10 +++++----- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/client.go b/client.go index 6b6d690..6facae8 100644 --- a/client.go +++ b/client.go @@ -9,7 +9,7 @@ import ( func clientKeepAlive(k, v interface{}) bool { clt := v.(*MQKeepAlive) if clt.Date.Add(3 * KeepAliveHeartBeatSeconds * time.Second).Before(time.Now()) { - msgs, err := clientsQueue[clt.UserID64].MQChannel.QueuePurge(clientsQueue[clt.UserID64].Queue.Name, false) + msgs, err := clientsQueue[clt.UserID64].MQChannel.QueuePurge(clientsQueue[clt.UserID64].MQQueue.Name, false) logOnError(err, "clientKeepAlive : Channel.QueuePurge()") clientsQueue[clt.UserID64].Close() c := TGCommand{ diff --git a/def.go b/def.go index 162ee92..38f76f8 100644 --- a/def.go +++ b/def.go @@ -29,7 +29,7 @@ type MQSession struct { MQConnection *amqp.Connection MQChannel *amqp.Channel MQQueue *amqp.Queue - MQDelivery <-chan *amqp.Delivery + MQDelivery <-chan amqp.Delivery isConnected bool } diff --git a/mq.go b/mq.go index 02fd9d0..bbb820b 100644 --- a/mq.go +++ b/mq.go @@ -12,7 +12,7 @@ func (s MQSession) Open() error { return errors.New("Session is already connected.") } - if SSL == false { + if s.SSL == false { s.MQConnection, err = amqp.Dial("amqp://" + s.User + ":" + s.Password + "@" + s.Host + "/" + s.Path) } else { return errors.New("SSL connection not implemented") @@ -29,7 +29,7 @@ func (s MQSession) Open() error { s.MQConnection.Close() return err } - s.MQQueue, err = s.Channel.QueueDeclare( + s.MQQueue, err = s.MQChannel.QueueDeclare( s.Queue, // name false, // durable false, // delete when unused @@ -71,7 +71,7 @@ func (s MQSession) Publish(content string, msg string) error { false, // immediate amqp.Publishing{ ContentType: content, - Body: []byte(m), + Body: []byte(msg), }) logOnError(err, "Publish : Publish") if err != nil { diff --git a/workers.go b/workers.go index 82a53ff..b049fdf 100644 --- a/workers.go +++ b/workers.go @@ -316,13 +316,13 @@ func MQTGCmdWorker(id int, cmds <-chan TGCommand) { logOnError(err, "MQTGCmdWorker["+strconv.Itoa(id)+"] : Marshal(c)") if clientIsAlive(c.FromUserID64) { - err = clientsQueue[c.FromUserID64].Session.Publish("application/json", j) + err = clientsQueue[c.FromUserID64].Publish("application/json", j) logOnError(err, "MQTGCmdWorker["+strconv.Itoa(id)+"] : Publishing message.") for err != nil && clientIsAlive(c.FromUserID64) { - clientsQueue[c.FromUserID64].Session.Close() + clientsQueue[c.FromUserID64].Close() time.Sleep(5 * time.Second) - clientsQueue[c.FromUserID64].Session.Open() - err = clientsQueue[c.FromUserID64].Session.Publish("application/json", j) + clientsQueue[c.FromUserID64].Open() + err = clientsQueue[c.FromUserID64].Publish("application/json", j) } } else { err = nil @@ -352,7 +352,7 @@ func MQKeepAliveWorker() { time.Sleep(15 * time.Second) err = s.Open() } - for d := range m { + for d := range s.MQDelivery { // log.Printf("MQKeepAliveWorker : Received a message: %s", string(d.Body)) x := MQKeepAlive{} err = json.Unmarshal(d.Body, &x)