chirpnest/mq.go

103 lines
2.1 KiB
Go
Raw Normal View History

2019-05-03 05:58:36 +02:00
package main
2019-06-28 06:50:35 +02:00
import (
2019-06-28 06:54:18 +02:00
"errors"
//"log"
2019-06-28 06:50:35 +02:00
"github.com/streadway/amqp"
)
func (s MQSession) Open() error {
2019-06-28 08:25:10 +02:00
var (
err error
conn *amqp.Connection
ch *amqp.Channel
2019-06-28 09:24:18 +02:00
//q amqp.Queue
2019-06-28 08:25:10 +02:00
)
2019-06-28 06:50:35 +02:00
if s.isConnected {
2019-06-28 06:57:40 +02:00
return errors.New("Session is already connected.")
2019-06-28 06:50:35 +02:00
}
2019-06-28 07:08:35 +02:00
if s.SSL == false {
2019-06-28 08:25:10 +02:00
conn, err = amqp.Dial("amqp://" + s.User + ":" + s.Password + "@" + s.Host + "/" + s.Path)
2019-06-28 08:23:22 +02:00
s.MQConnection = conn
2019-06-28 06:50:35 +02:00
} else {
2019-06-28 06:57:40 +02:00
return errors.New("SSL connection not implemented")
2019-06-28 06:50:35 +02:00
}
logOnError(err, "Open : Failed to connect to RabbitMQ")
if err != nil {
s.MQConnection.Close()
return err
}
2019-06-28 08:25:10 +02:00
ch, err = s.MQConnection.Channel()
2019-06-28 08:23:22 +02:00
s.MQChannel = ch
2019-06-28 06:50:35 +02:00
logOnError(err, "Open : Failed to open channel")
if err != nil {
s.MQChannel.Close()
s.MQConnection.Close()
return err
}
2019-06-28 09:23:52 +02:00
/*
q, err = s.MQChannel.QueueDeclare(
s.Queue, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
2019-06-28 06:50:35 +02:00
2019-06-28 09:23:52 +02:00
)
s.MQQueue = &q
logOnError(err, "Open : Failed to declare queue")
if err != nil {
s.MQChannel.Close()
s.MQConnection.Close()
return err
}
s.MQDelivery, err = s.MQChannel.Consume(
s.MQQueue.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
logOnError(err, "Open : Failed to register a consumer")
if err != nil {
s.MQChannel.Close()
s.MQConnection.Close()
return err
}
//log.Printf("Open : MQ Connected to amqp://" + s.User + ":" + s.Password + "@" + s.Host + "/" + s.Path)
*/
2019-06-28 06:50:35 +02:00
s.isConnected = true
return nil
}
func (s MQSession) Publish(content string, msg string) error {
err := s.MQChannel.Publish(
"", // exchange
s.MQQueue.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: content,
2019-06-28 07:08:35 +02:00
Body: []byte(msg),
2019-06-28 06:50:35 +02:00
})
logOnError(err, "Publish : Publish")
if err != nil {
return err
}
return nil
}
func (s MQSession) Close() {
if s.isConnected {
s.MQChannel.Close()
s.MQConnection.Close()
s.isConnected = false
}
return
}