package main import ( "errors" //"log" "github.com/streadway/amqp" ) func (s MQSession) Open() error { var ( err error conn *amqp.Connection ch *amqp.Channel //q amqp.Queue ) if s.isConnected { return errors.New("Session is already connected.") } if s.SSL == false { conn, err = amqp.Dial("amqp://" + s.User + ":" + s.Password + "@" + s.Host + "/" + s.Path) s.MQConnection = conn } else { return errors.New("SSL connection not implemented") } logOnError(err, "Open : Failed to connect to RabbitMQ") if err != nil { s.MQConnection.Close() return err } ch, err = s.MQConnection.Channel() s.MQChannel = ch logOnError(err, "Open : Failed to open channel") if err != nil { s.MQChannel.Close() s.MQConnection.Close() return err } /* q, err = s.MQChannel.QueueDeclare( s.Queue, // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) s.MQQueue = &q logOnError(err, "Open : Failed to declare queue") if err != nil { s.MQChannel.Close() s.MQConnection.Close() return err } s.MQDelivery, err = s.MQChannel.Consume( s.MQQueue.Name, // queue "", // consumer true, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) logOnError(err, "Open : Failed to register a consumer") if err != nil { s.MQChannel.Close() s.MQConnection.Close() return err } //log.Printf("Open : MQ Connected to amqp://" + s.User + ":" + s.Password + "@" + s.Host + "/" + s.Path) */ s.isConnected = true return nil } func (s MQSession) Publish(content string, msg string) error { err := s.MQChannel.Publish( "", // exchange s.MQQueue.Name, // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: content, Body: []byte(msg), }) logOnError(err, "Publish : Publish") if err != nil { return err } return nil } func (s MQSession) Close() { if s.isConnected { s.MQChannel.Close() s.MQConnection.Close() s.isConnected = false } return }