chirpnest/mq.go
shoopea 19e33ee958 Revert "test"
This reverts commit 8957a372d9.
2019-06-28 15:56:55 +08:00

90 lines
1.9 KiB
Go

package main
import (
"errors"
"github.com/streadway/amqp"
)
func (s MQSession) Open() error {
if s.isConnected {
return errors.New("Session is already connected.")
}
if SLL == false {
s.MQConnection, err = amqp.Dial("amqp://" + Session.User + ":" + Session.Password + "@" + Session.Host + "/" + Session.Path)
} else {
return errors.New("SSL connection not implemented")
}
logOnError(err, "Open : Failed to connect to RabbitMQ")
if err != nil {
s.MQConnection.Close()
return err
}
s.MQChannel, err = s.MQConnection.Channel()
logOnError(err, "Open : Failed to open channel")
if err != nil {
s.MQChannel.Close()
s.MQConnection.Close()
return err
}
s.MQQueue, err = s.Channel.QueueDeclare(
s.Queue, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
logOnError(err, "Open : Failed to declare queue")
if err != nil {
s.MQChannel.Close()
s.MQConnection.Close()
return err
}
s.MQDelivery, err = s.MQChannel.Consume(
s.MQQueue.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
logOnError(err, "Open : Failed to register a consumer")
if err != nil {
s.MQChannel.Close()
s.MQConnection.Close()
return err
}
s.isConnected = true
return nil
}
func (s MQSession) Publish(content string, msg string) error {
err := s.MQChannel.Publish(
"", // exchange
s.MQQueue.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: content,
Body: []byte(m),
})
logOnError(err, "Publish : Publish")
if err != nil {
return err
}
return nil
}
func (s MQSession) Close() {
if s.isConnected {
s.MQChannel.Close()
s.MQConnection.Close()
s.isConnected = false
}
return
}