You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

195 lines
5.3 KiB

  1. /*
  2. * Minio Cloud Storage, (C) 2016 Minio, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package cmd
  17. import (
  18. "io/ioutil"
  19. "github.com/Sirupsen/logrus"
  20. "github.com/nats-io/go-nats-streaming"
  21. "github.com/nats-io/nats"
  22. )
  23. // natsNotifyStreaming contains specific options related to connection
  24. // to a NATS streaming server
  25. type natsNotifyStreaming struct {
  26. Enable bool `json:"enable"`
  27. ClusterID string `json:"clusterID"`
  28. ClientID string `json:"clientID"`
  29. Async bool `json:"async"`
  30. MaxPubAcksInflight int `json:"maxPubAcksInflight"`
  31. }
  32. // natsNotify - represents logrus compatible NATS hook.
  33. // All fields represent NATS configuration details.
  34. type natsNotify struct {
  35. Enable bool `json:"enable"`
  36. Address string `json:"address"`
  37. Subject string `json:"subject"`
  38. Username string `json:"username"`
  39. Password string `json:"password"`
  40. Token string `json:"token"`
  41. Secure bool `json:"secure"`
  42. PingInterval int64 `json:"pingInterval"`
  43. Streaming natsNotifyStreaming `json:"streaming"`
  44. }
  45. func (n *natsNotify) Validate() error {
  46. if !n.Enable {
  47. return nil
  48. }
  49. if _, err := checkURL(n.Address); err != nil {
  50. return err
  51. }
  52. return nil
  53. }
  54. // natsIOConn abstracts connection to any type of NATS server
  55. type natsIOConn struct {
  56. params natsNotify
  57. natsConn *nats.Conn
  58. stanConn stan.Conn
  59. }
  60. // dialNATS - dials and returns an natsIOConn instance,
  61. // for sending notifications. Returns error if nats logger
  62. // is not enabled.
  63. func dialNATS(natsL natsNotify, testDial bool) (natsIOConn, error) {
  64. if !natsL.Enable {
  65. return natsIOConn{}, errNotifyNotEnabled
  66. }
  67. // Construct natsIOConn which holds all NATS connection information
  68. conn := natsIOConn{params: natsL}
  69. if natsL.Streaming.Enable {
  70. // Construct scheme to differentiate between clear and TLS connections
  71. scheme := "nats"
  72. if natsL.Secure {
  73. scheme = "tls"
  74. }
  75. // Construct address URL
  76. addressURL := scheme + "://" + natsL.Username + ":" + natsL.Password + "@" + natsL.Address
  77. // Fetch the user-supplied client ID and provide a random one if not provided
  78. clientID := natsL.Streaming.ClientID
  79. if clientID == "" {
  80. clientID = mustGetUUID()
  81. }
  82. // Add test suffix to clientID to avoid clientID already registered error
  83. if testDial {
  84. clientID += "-test"
  85. }
  86. connOpts := []stan.Option{
  87. stan.NatsURL(addressURL),
  88. }
  89. // Setup MaxPubAcksInflight parameter
  90. if natsL.Streaming.MaxPubAcksInflight > 0 {
  91. connOpts = append(connOpts,
  92. stan.MaxPubAcksInflight(natsL.Streaming.MaxPubAcksInflight))
  93. }
  94. // Do the real connection to the NATS server
  95. sc, err := stan.Connect(natsL.Streaming.ClusterID, clientID, connOpts...)
  96. if err != nil {
  97. return natsIOConn{}, err
  98. }
  99. // Save the created connection
  100. conn.stanConn = sc
  101. } else {
  102. // Configure and connect to NATS server
  103. natsC := nats.DefaultOptions
  104. natsC.Url = "nats://" + natsL.Address
  105. natsC.User = natsL.Username
  106. natsC.Password = natsL.Password
  107. natsC.Token = natsL.Token
  108. natsC.Secure = natsL.Secure
  109. // Do the real connection
  110. nc, err := natsC.Connect()
  111. if err != nil {
  112. return natsIOConn{}, err
  113. }
  114. // Save the created connection
  115. conn.natsConn = nc
  116. }
  117. return conn, nil
  118. }
  119. // closeNATS - close the underlying NATS connection
  120. func closeNATS(conn natsIOConn) {
  121. if conn.params.Streaming.Enable {
  122. conn.stanConn.Close()
  123. } else {
  124. conn.natsConn.Close()
  125. }
  126. }
  127. func newNATSNotify(accountID string) (*logrus.Logger, error) {
  128. natsL := serverConfig.Notify.GetNATSByID(accountID)
  129. // Connect to nats server.
  130. natsC, err := dialNATS(natsL, false)
  131. if err != nil {
  132. return nil, err
  133. }
  134. natsLog := logrus.New()
  135. // Disable writing to console.
  136. natsLog.Out = ioutil.Discard
  137. // Add a nats hook.
  138. natsLog.Hooks.Add(natsC)
  139. // Set default JSON formatter.
  140. natsLog.Formatter = new(logrus.JSONFormatter)
  141. // Successfully enabled all NATSs.
  142. return natsLog, nil
  143. }
  144. // Fire is called when an event should be sent to the message broker
  145. func (n natsIOConn) Fire(entry *logrus.Entry) error {
  146. body, err := entry.Reader()
  147. if err != nil {
  148. return err
  149. }
  150. if n.params.Streaming.Enable {
  151. // Streaming flag is enabled, publish the log synchronously or asynchronously
  152. // depending on the user supplied parameter
  153. if n.params.Streaming.Async {
  154. _, err = n.stanConn.PublishAsync(n.params.Subject, body.Bytes(), nil)
  155. } else {
  156. err = n.stanConn.Publish(n.params.Subject, body.Bytes())
  157. }
  158. if err != nil {
  159. return err
  160. }
  161. } else {
  162. // Publish the log
  163. err = n.natsConn.Publish(n.params.Subject, body.Bytes())
  164. if err != nil {
  165. return err
  166. }
  167. }
  168. return nil
  169. }
  170. // Levels is available logging levels.
  171. func (n natsIOConn) Levels() []logrus.Level {
  172. return []logrus.Level{
  173. logrus.InfoLevel,
  174. }
  175. }