diff --git a/client.go b/client.go index 6facae8..6b6d690 100644 --- a/client.go +++ b/client.go @@ -9,7 +9,7 @@ import ( func clientKeepAlive(k, v interface{}) bool { clt := v.(*MQKeepAlive) if clt.Date.Add(3 * KeepAliveHeartBeatSeconds * time.Second).Before(time.Now()) { - msgs, err := clientsQueue[clt.UserID64].MQChannel.QueuePurge(clientsQueue[clt.UserID64].MQQueue.Name, false) + msgs, err := clientsQueue[clt.UserID64].MQChannel.QueuePurge(clientsQueue[clt.UserID64].Queue.Name, false) logOnError(err, "clientKeepAlive : Channel.QueuePurge()") clientsQueue[clt.UserID64].Close() c := TGCommand{ diff --git a/def.go b/def.go index 38f76f8..162ee92 100644 --- a/def.go +++ b/def.go @@ -29,7 +29,7 @@ type MQSession struct { MQConnection *amqp.Connection MQChannel *amqp.Channel MQQueue *amqp.Queue - MQDelivery <-chan amqp.Delivery + MQDelivery <-chan *amqp.Delivery isConnected bool } diff --git a/mq.go b/mq.go index bbb820b..02fd9d0 100644 --- a/mq.go +++ b/mq.go @@ -12,7 +12,7 @@ func (s MQSession) Open() error { return errors.New("Session is already connected.") } - if s.SSL == false { + if SSL == false { s.MQConnection, err = amqp.Dial("amqp://" + s.User + ":" + s.Password + "@" + s.Host + "/" + s.Path) } else { return errors.New("SSL connection not implemented") @@ -29,7 +29,7 @@ func (s MQSession) Open() error { s.MQConnection.Close() return err } - s.MQQueue, err = s.MQChannel.QueueDeclare( + s.MQQueue, err = s.Channel.QueueDeclare( s.Queue, // name false, // durable false, // delete when unused @@ -71,7 +71,7 @@ func (s MQSession) Publish(content string, msg string) error { false, // immediate amqp.Publishing{ ContentType: content, - Body: []byte(msg), + Body: []byte(m), }) logOnError(err, "Publish : Publish") if err != nil { diff --git a/workers.go b/workers.go index b049fdf..82a53ff 100644 --- a/workers.go +++ b/workers.go @@ -316,13 +316,13 @@ func MQTGCmdWorker(id int, cmds <-chan TGCommand) { logOnError(err, "MQTGCmdWorker["+strconv.Itoa(id)+"] : Marshal(c)") if clientIsAlive(c.FromUserID64) { - err = clientsQueue[c.FromUserID64].Publish("application/json", j) + err = clientsQueue[c.FromUserID64].Session.Publish("application/json", j) logOnError(err, "MQTGCmdWorker["+strconv.Itoa(id)+"] : Publishing message.") for err != nil && clientIsAlive(c.FromUserID64) { - clientsQueue[c.FromUserID64].Close() + clientsQueue[c.FromUserID64].Session.Close() time.Sleep(5 * time.Second) - clientsQueue[c.FromUserID64].Open() - err = clientsQueue[c.FromUserID64].Publish("application/json", j) + clientsQueue[c.FromUserID64].Session.Open() + err = clientsQueue[c.FromUserID64].Session.Publish("application/json", j) } } else { err = nil @@ -352,7 +352,7 @@ func MQKeepAliveWorker() { time.Sleep(15 * time.Second) err = s.Open() } - for d := range s.MQDelivery { + for d := range m { // log.Printf("MQKeepAliveWorker : Received a message: %s", string(d.Body)) x := MQKeepAlive{} err = json.Unmarshal(d.Body, &x)