You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

493 lines
17 KiB

  1. // Copyright (c) 2015-2023 MinIO, Inc.
  2. //
  3. // This file is part of MinIO Object Storage stack
  4. //
  5. // This program is free software: you can redistribute it and/or modify
  6. // it under the terms of the GNU Affero General Public License as published by
  7. // the Free Software Foundation, either version 3 of the License, or
  8. // (at your option) any later version.
  9. //
  10. // This program is distributed in the hope that it will be useful
  11. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. // GNU Affero General Public License for more details.
  14. //
  15. // You should have received a copy of the GNU Affero General Public License
  16. // along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. package cmd
  18. import (
  19. "context"
  20. "fmt"
  21. "math"
  22. "net/http"
  23. "sync"
  24. "time"
  25. "github.com/minio/madmin-go/v3"
  26. "github.com/prometheus/client_golang/prometheus"
  27. )
  28. const (
  29. resourceMetricsCollectionInterval = time.Minute
  30. resourceMetricsCacheInterval = time.Minute
  31. // drive stats
  32. totalInodes MetricName = "total_inodes"
  33. readsPerSec MetricName = "reads_per_sec"
  34. writesPerSec MetricName = "writes_per_sec"
  35. readsKBPerSec MetricName = "reads_kb_per_sec"
  36. writesKBPerSec MetricName = "writes_kb_per_sec"
  37. readsAwait MetricName = "reads_await"
  38. writesAwait MetricName = "writes_await"
  39. percUtil MetricName = "perc_util"
  40. usedInodes MetricName = "used_inodes"
  41. // network stats
  42. interfaceRxBytes MetricName = "rx_bytes"
  43. interfaceRxErrors MetricName = "rx_errors"
  44. interfaceTxBytes MetricName = "tx_bytes"
  45. interfaceTxErrors MetricName = "tx_errors"
  46. // cpu stats
  47. cpuUser MetricName = "user"
  48. cpuSystem MetricName = "system"
  49. cpuIOWait MetricName = "iowait"
  50. cpuIdle MetricName = "idle"
  51. cpuNice MetricName = "nice"
  52. cpuSteal MetricName = "steal"
  53. cpuLoad1 MetricName = "load1"
  54. cpuLoad5 MetricName = "load5"
  55. cpuLoad15 MetricName = "load15"
  56. cpuLoad1Perc MetricName = "load1_perc"
  57. cpuLoad5Perc MetricName = "load5_perc"
  58. cpuLoad15Perc MetricName = "load15_perc"
  59. )
  60. var (
  61. resourceCollector *minioResourceCollector
  62. // resourceMetricsMap is a map of subsystem to its metrics
  63. resourceMetricsMap map[MetricSubsystem]ResourceMetrics
  64. resourceMetricsMapMu sync.RWMutex
  65. // resourceMetricsHelpMap maps metric name to its help string
  66. resourceMetricsHelpMap map[MetricName]string
  67. resourceMetricsGroups []*MetricsGroupV2
  68. // initial values for drives (at the time of server startup)
  69. // used for calculating avg values for drive metrics
  70. latestDriveStats map[string]madmin.DiskIOStats
  71. latestDriveStatsMu sync.RWMutex
  72. lastDriveStatsRefresh time.Time
  73. )
  74. // PeerResourceMetrics represents the resource metrics
  75. // retrieved from a peer, along with errors if any
  76. type PeerResourceMetrics struct {
  77. Metrics map[MetricSubsystem]ResourceMetrics
  78. Errors []string
  79. }
  80. // ResourceMetrics is a map of unique key identifying
  81. // a resource metric (e.g. reads_per_sec_{node}_{drive})
  82. // to its data
  83. type ResourceMetrics map[string]ResourceMetric
  84. // ResourceMetric represents a single resource metric
  85. // The metrics are collected from all servers periodically
  86. // and stored in the resource metrics map.
  87. // It also maintains the count of number of times this metric
  88. // was collected since the server started, and the sum,
  89. // average and max values across the same.
  90. type ResourceMetric struct {
  91. Name MetricName
  92. Labels map[string]string
  93. // value captured in current cycle
  94. Current float64
  95. // Used when system provides cumulative (since uptime) values
  96. // helps in calculating the current value by comparing the new
  97. // cumulative value with previous one
  98. Cumulative float64
  99. Max float64
  100. Avg float64
  101. Sum float64
  102. Count uint64
  103. }
  104. func init() {
  105. interval := fmt.Sprintf("%ds", int(resourceMetricsCollectionInterval.Seconds()))
  106. resourceMetricsHelpMap = map[MetricName]string{
  107. interfaceRxBytes: "Bytes received on the interface in " + interval,
  108. interfaceRxErrors: "Receive errors in " + interval,
  109. interfaceTxBytes: "Bytes transmitted in " + interval,
  110. interfaceTxErrors: "Transmit errors in " + interval,
  111. total: "Total memory on the node",
  112. memUsed: "Used memory on the node",
  113. memUsedPerc: "Used memory percentage on the node",
  114. memFree: "Free memory on the node",
  115. memShared: "Shared memory on the node",
  116. memBuffers: "Buffers memory on the node",
  117. memCache: "Cache memory on the node",
  118. memAvailable: "Available memory on the node",
  119. readsPerSec: "Reads per second on a drive",
  120. writesPerSec: "Writes per second on a drive",
  121. readsKBPerSec: "Kilobytes read per second on a drive",
  122. writesKBPerSec: "Kilobytes written per second on a drive",
  123. readsAwait: "Average time for read requests to be served on a drive",
  124. writesAwait: "Average time for write requests to be served on a drive",
  125. percUtil: "Percentage of time the disk was busy",
  126. usedBytes: "Used bytes on a drive",
  127. totalBytes: "Total bytes on a drive",
  128. usedInodes: "Total inodes used on a drive",
  129. totalInodes: "Total inodes on a drive",
  130. cpuUser: "CPU user time",
  131. cpuSystem: "CPU system time",
  132. cpuIdle: "CPU idle time",
  133. cpuIOWait: "CPU ioWait time",
  134. cpuSteal: "CPU steal time",
  135. cpuNice: "CPU nice time",
  136. cpuLoad1: "CPU load average 1min",
  137. cpuLoad5: "CPU load average 5min",
  138. cpuLoad15: "CPU load average 15min",
  139. cpuLoad1Perc: "CPU load average 1min (perentage)",
  140. cpuLoad5Perc: "CPU load average 5min (percentage)",
  141. cpuLoad15Perc: "CPU load average 15min (percentage)",
  142. }
  143. resourceMetricsGroups = []*MetricsGroupV2{
  144. getResourceMetrics(),
  145. }
  146. resourceCollector = newMinioResourceCollector(resourceMetricsGroups)
  147. }
  148. func getResourceKey(name MetricName, labels map[string]string) string {
  149. // labels are used to uniquely identify a metric
  150. // e.g. reads_per_sec_{drive} inside the map
  151. sfx := ""
  152. for _, v := range labels {
  153. if len(sfx) > 0 {
  154. sfx += "_"
  155. }
  156. sfx += v
  157. }
  158. return string(name) + "_" + sfx
  159. }
  160. func updateResourceMetrics(subSys MetricSubsystem, name MetricName, val float64, labels map[string]string, isCumulative bool) {
  161. resourceMetricsMapMu.Lock()
  162. defer resourceMetricsMapMu.Unlock()
  163. subsysMetrics, found := resourceMetricsMap[subSys]
  164. if !found {
  165. subsysMetrics = ResourceMetrics{}
  166. }
  167. key := getResourceKey(name, labels)
  168. metric, found := subsysMetrics[key]
  169. if !found {
  170. metric = ResourceMetric{
  171. Name: name,
  172. Labels: labels,
  173. }
  174. }
  175. if isCumulative {
  176. metric.Current = val - metric.Cumulative
  177. metric.Cumulative = val
  178. } else {
  179. metric.Current = val
  180. }
  181. if metric.Current > metric.Max {
  182. metric.Max = val
  183. }
  184. metric.Sum += metric.Current
  185. metric.Count++
  186. metric.Avg = metric.Sum / float64(metric.Count)
  187. subsysMetrics[key] = metric
  188. resourceMetricsMap[subSys] = subsysMetrics
  189. }
  190. // updateDriveIOStats - Updates the drive IO stats by calculating the difference between the current and latest updated values.
  191. func updateDriveIOStats(currentStats madmin.DiskIOStats, latestStats madmin.DiskIOStats, labels map[string]string) {
  192. sectorSize := uint64(512)
  193. kib := float64(1 << 10)
  194. diffInSeconds := time.Now().UTC().Sub(lastDriveStatsRefresh).Seconds()
  195. if diffInSeconds == 0 {
  196. // too soon to update the stats
  197. return
  198. }
  199. diffStats := getDiffStats(latestStats, currentStats)
  200. updateResourceMetrics(driveSubsystem, readsPerSec, float64(diffStats.ReadIOs)/diffInSeconds, labels, false)
  201. readKib := float64(diffStats.ReadSectors*sectorSize) / kib
  202. updateResourceMetrics(driveSubsystem, readsKBPerSec, readKib/diffInSeconds, labels, false)
  203. updateResourceMetrics(driveSubsystem, writesPerSec, float64(diffStats.WriteIOs)/diffInSeconds, labels, false)
  204. writeKib := float64(diffStats.WriteSectors*sectorSize) / kib
  205. updateResourceMetrics(driveSubsystem, writesKBPerSec, writeKib/diffInSeconds, labels, false)
  206. rdAwait := 0.0
  207. if diffStats.ReadIOs > 0 {
  208. rdAwait = float64(diffStats.ReadTicks) / float64(diffStats.ReadIOs)
  209. }
  210. updateResourceMetrics(driveSubsystem, readsAwait, rdAwait, labels, false)
  211. wrAwait := 0.0
  212. if diffStats.WriteIOs > 0 {
  213. wrAwait = float64(diffStats.WriteTicks) / float64(diffStats.WriteIOs)
  214. }
  215. updateResourceMetrics(driveSubsystem, writesAwait, wrAwait, labels, false)
  216. updateResourceMetrics(driveSubsystem, percUtil, float64(diffStats.TotalTicks)/(diffInSeconds*10), labels, false)
  217. }
  218. func collectDriveMetrics(m madmin.RealtimeMetrics) {
  219. latestDriveStatsMu.Lock()
  220. for d, dm := range m.ByDisk {
  221. labels := map[string]string{"drive": d}
  222. latestStats, ok := latestDriveStats[d]
  223. if !ok {
  224. latestDriveStats[d] = dm.IOStats
  225. continue
  226. }
  227. updateDriveIOStats(dm.IOStats, latestStats, labels)
  228. latestDriveStats[d] = dm.IOStats
  229. }
  230. lastDriveStatsRefresh = time.Now().UTC()
  231. latestDriveStatsMu.Unlock()
  232. globalLocalDrivesMu.RLock()
  233. localDrives := cloneDrives(globalLocalDrivesMap)
  234. globalLocalDrivesMu.RUnlock()
  235. for _, d := range localDrives {
  236. di, err := d.DiskInfo(GlobalContext, DiskInfoOptions{})
  237. labels := map[string]string{"drive": di.Endpoint}
  238. if err == nil {
  239. updateResourceMetrics(driveSubsystem, usedBytes, float64(di.Used), labels, false)
  240. updateResourceMetrics(driveSubsystem, totalBytes, float64(di.Total), labels, false)
  241. updateResourceMetrics(driveSubsystem, usedInodes, float64(di.UsedInodes), labels, false)
  242. updateResourceMetrics(driveSubsystem, totalInodes, float64(di.FreeInodes+di.UsedInodes), labels, false)
  243. }
  244. }
  245. }
  246. func collectLocalResourceMetrics() {
  247. types := madmin.MetricsDisk | madmin.MetricNet | madmin.MetricsMem | madmin.MetricsCPU
  248. m := collectLocalMetrics(types, collectMetricsOpts{})
  249. for _, hm := range m.ByHost {
  250. if hm.Net != nil && len(hm.Net.NetStats.Name) > 0 {
  251. stats := hm.Net.NetStats
  252. labels := map[string]string{"interface": stats.Name}
  253. updateResourceMetrics(interfaceSubsystem, interfaceRxBytes, float64(stats.RxBytes), labels, true)
  254. updateResourceMetrics(interfaceSubsystem, interfaceRxErrors, float64(stats.RxErrors), labels, true)
  255. updateResourceMetrics(interfaceSubsystem, interfaceTxBytes, float64(stats.TxBytes), labels, true)
  256. updateResourceMetrics(interfaceSubsystem, interfaceTxErrors, float64(stats.TxErrors), labels, true)
  257. }
  258. if hm.Mem != nil && len(hm.Mem.Info.Addr) > 0 {
  259. labels := map[string]string{}
  260. stats := hm.Mem.Info
  261. updateResourceMetrics(memSubsystem, total, float64(stats.Total), labels, false)
  262. updateResourceMetrics(memSubsystem, memUsed, float64(stats.Used), labels, false)
  263. perc := math.Round(float64(stats.Used*100*100)/float64(stats.Total)) / 100
  264. updateResourceMetrics(memSubsystem, memUsedPerc, perc, labels, false)
  265. updateResourceMetrics(memSubsystem, memFree, float64(stats.Free), labels, false)
  266. updateResourceMetrics(memSubsystem, memShared, float64(stats.Shared), labels, false)
  267. updateResourceMetrics(memSubsystem, memBuffers, float64(stats.Buffers), labels, false)
  268. updateResourceMetrics(memSubsystem, memAvailable, float64(stats.Available), labels, false)
  269. updateResourceMetrics(memSubsystem, memCache, float64(stats.Cache), labels, false)
  270. }
  271. if hm.CPU != nil {
  272. labels := map[string]string{}
  273. ts := hm.CPU.TimesStat
  274. if ts != nil {
  275. tot := ts.User + ts.System + ts.Idle + ts.Iowait + ts.Nice + ts.Steal
  276. cpuUserVal := math.Round(ts.User/tot*100*100) / 100
  277. updateResourceMetrics(cpuSubsystem, cpuUser, cpuUserVal, labels, false)
  278. cpuSystemVal := math.Round(ts.System/tot*100*100) / 100
  279. updateResourceMetrics(cpuSubsystem, cpuSystem, cpuSystemVal, labels, false)
  280. cpuIdleVal := math.Round(ts.Idle/tot*100*100) / 100
  281. updateResourceMetrics(cpuSubsystem, cpuIdle, cpuIdleVal, labels, false)
  282. cpuIOWaitVal := math.Round(ts.Iowait/tot*100*100) / 100
  283. updateResourceMetrics(cpuSubsystem, cpuIOWait, cpuIOWaitVal, labels, false)
  284. cpuNiceVal := math.Round(ts.Nice/tot*100*100) / 100
  285. updateResourceMetrics(cpuSubsystem, cpuNice, cpuNiceVal, labels, false)
  286. cpuStealVal := math.Round(ts.Steal/tot*100*100) / 100
  287. updateResourceMetrics(cpuSubsystem, cpuSteal, cpuStealVal, labels, false)
  288. }
  289. ls := hm.CPU.LoadStat
  290. if ls != nil {
  291. updateResourceMetrics(cpuSubsystem, cpuLoad1, ls.Load1, labels, false)
  292. updateResourceMetrics(cpuSubsystem, cpuLoad5, ls.Load5, labels, false)
  293. updateResourceMetrics(cpuSubsystem, cpuLoad15, ls.Load15, labels, false)
  294. if hm.CPU.CPUCount > 0 {
  295. perc := math.Round(ls.Load1*100*100/float64(hm.CPU.CPUCount)) / 100
  296. updateResourceMetrics(cpuSubsystem, cpuLoad1Perc, perc, labels, false)
  297. perc = math.Round(ls.Load5*100*100/float64(hm.CPU.CPUCount)) / 100
  298. updateResourceMetrics(cpuSubsystem, cpuLoad5Perc, perc, labels, false)
  299. perc = math.Round(ls.Load15*100*100/float64(hm.CPU.CPUCount)) / 100
  300. updateResourceMetrics(cpuSubsystem, cpuLoad15Perc, perc, labels, false)
  301. }
  302. }
  303. }
  304. break // only one host expected
  305. }
  306. collectDriveMetrics(m)
  307. }
  308. func initLatestValues() {
  309. m := collectLocalMetrics(madmin.MetricsDisk, collectMetricsOpts{})
  310. latestDriveStatsMu.Lock()
  311. latestDriveStats = map[string]madmin.DiskIOStats{}
  312. for d, dm := range m.ByDisk {
  313. latestDriveStats[d] = dm.IOStats
  314. }
  315. lastDriveStatsRefresh = time.Now().UTC()
  316. latestDriveStatsMu.Unlock()
  317. }
  318. // startResourceMetricsCollection - starts the job for collecting resource metrics
  319. func startResourceMetricsCollection() {
  320. initLatestValues()
  321. resourceMetricsMapMu.Lock()
  322. resourceMetricsMap = map[MetricSubsystem]ResourceMetrics{}
  323. resourceMetricsMapMu.Unlock()
  324. metricsTimer := time.NewTimer(resourceMetricsCollectionInterval)
  325. defer metricsTimer.Stop()
  326. collectLocalResourceMetrics()
  327. for {
  328. select {
  329. case <-GlobalContext.Done():
  330. return
  331. case <-metricsTimer.C:
  332. collectLocalResourceMetrics()
  333. // Reset the timer for next cycle.
  334. metricsTimer.Reset(resourceMetricsCollectionInterval)
  335. }
  336. }
  337. }
  338. // minioResourceCollector is the Collector for resource metrics
  339. type minioResourceCollector struct {
  340. metricsGroups []*MetricsGroupV2
  341. desc *prometheus.Desc
  342. }
  343. // Describe sends the super-set of all possible descriptors of metrics
  344. func (c *minioResourceCollector) Describe(ch chan<- *prometheus.Desc) {
  345. ch <- c.desc
  346. }
  347. // Collect is called by the Prometheus registry when collecting metrics.
  348. func (c *minioResourceCollector) Collect(out chan<- prometheus.Metric) {
  349. var wg sync.WaitGroup
  350. publish := func(in <-chan MetricV2) {
  351. defer wg.Done()
  352. for metric := range in {
  353. labels, values := getOrderedLabelValueArrays(metric.VariableLabels)
  354. collectMetric(metric, labels, values, "resource", out)
  355. }
  356. }
  357. // Call peer api to fetch metrics
  358. wg.Add(2)
  359. go publish(ReportMetrics(GlobalContext, c.metricsGroups))
  360. go publish(globalNotificationSys.GetResourceMetrics(GlobalContext))
  361. wg.Wait()
  362. }
  363. // newMinioResourceCollector describes the collector
  364. // and returns reference of minio resource Collector
  365. // It creates the Prometheus Description which is used
  366. // to define Metric and help string
  367. func newMinioResourceCollector(metricsGroups []*MetricsGroupV2) *minioResourceCollector {
  368. return &minioResourceCollector{
  369. metricsGroups: metricsGroups,
  370. desc: prometheus.NewDesc("minio_resource_stats", "Resource statistics exposed by MinIO server", nil, nil),
  371. }
  372. }
  373. func prepareResourceMetrics(rm ResourceMetric, subSys MetricSubsystem, requireAvgMax bool) []MetricV2 {
  374. help := resourceMetricsHelpMap[rm.Name]
  375. name := rm.Name
  376. metrics := make([]MetricV2, 0, 3)
  377. metrics = append(metrics, MetricV2{
  378. Description: getResourceMetricDescription(subSys, name, help),
  379. Value: rm.Current,
  380. VariableLabels: cloneMSS(rm.Labels),
  381. })
  382. if requireAvgMax {
  383. avgName := MetricName(fmt.Sprintf("%s_avg", name))
  384. avgHelp := fmt.Sprintf("%s (avg)", help)
  385. metrics = append(metrics, MetricV2{
  386. Description: getResourceMetricDescription(subSys, avgName, avgHelp),
  387. Value: math.Round(rm.Avg*100) / 100,
  388. VariableLabels: cloneMSS(rm.Labels),
  389. })
  390. maxName := MetricName(fmt.Sprintf("%s_max", name))
  391. maxHelp := fmt.Sprintf("%s (max)", help)
  392. metrics = append(metrics, MetricV2{
  393. Description: getResourceMetricDescription(subSys, maxName, maxHelp),
  394. Value: rm.Max,
  395. VariableLabels: cloneMSS(rm.Labels),
  396. })
  397. }
  398. return metrics
  399. }
  400. func getResourceMetricDescription(subSys MetricSubsystem, name MetricName, help string) MetricDescription {
  401. return MetricDescription{
  402. Namespace: nodeMetricNamespace,
  403. Subsystem: subSys,
  404. Name: name,
  405. Help: help,
  406. Type: gaugeMetric,
  407. }
  408. }
  409. func getResourceMetrics() *MetricsGroupV2 {
  410. mg := &MetricsGroupV2{
  411. cacheInterval: resourceMetricsCacheInterval,
  412. }
  413. mg.RegisterRead(func(ctx context.Context) []MetricV2 {
  414. metrics := []MetricV2{}
  415. subSystems := []MetricSubsystem{interfaceSubsystem, memSubsystem, driveSubsystem, cpuSubsystem}
  416. resourceMetricsMapMu.RLock()
  417. defer resourceMetricsMapMu.RUnlock()
  418. for _, subSys := range subSystems {
  419. stats, found := resourceMetricsMap[subSys]
  420. if found {
  421. requireAvgMax := true
  422. if subSys == driveSubsystem {
  423. requireAvgMax = false
  424. }
  425. for _, m := range stats {
  426. metrics = append(metrics, prepareResourceMetrics(m, subSys, requireAvgMax)...)
  427. }
  428. }
  429. }
  430. return metrics
  431. })
  432. return mg
  433. }
  434. // metricsResourceHandler is the prometheus handler for resource metrics
  435. func metricsResourceHandler() http.Handler {
  436. return metricsHTTPHandler(resourceCollector, "handler.MetricsResource")
  437. }