You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

263 lines
7.7 KiB

  1. /*
  2. * Minio Cloud Storage, (C) 2016 Minio, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package cmd
  17. import (
  18. "fmt"
  19. "os"
  20. "runtime/debug"
  21. "sort"
  22. "strings"
  23. "sync"
  24. humanize "github.com/dustin/go-humanize"
  25. "github.com/minio/minio/pkg/disk"
  26. "github.com/minio/minio/pkg/objcache"
  27. )
  28. // XL constants.
  29. const (
  30. // Format config file carries backend format specific details.
  31. formatConfigFile = "format.json"
  32. // Format config tmp file carries backend format.
  33. formatConfigFileTmp = "format.json.tmp"
  34. // XL metadata file carries per object metadata.
  35. xlMetaJSONFile = "xl.json"
  36. // Uploads metadata file carries per multipart object metadata.
  37. uploadsJSONFile = "uploads.json"
  38. // Represents the minimum required RAM size before
  39. // we enable caching.
  40. minRAMSize = 8 * humanize.GiByte
  41. // Maximum erasure blocks.
  42. maxErasureBlocks = 16
  43. // Minimum erasure blocks.
  44. minErasureBlocks = 4
  45. )
  46. // xlObjects - Implements XL object layer.
  47. type xlObjects struct {
  48. mutex *sync.Mutex
  49. storageDisks []StorageAPI // Collection of initialized backend disks.
  50. dataBlocks int // dataBlocks count caculated for erasure.
  51. parityBlocks int // parityBlocks count calculated for erasure.
  52. readQuorum int // readQuorum minimum required disks to read data.
  53. writeQuorum int // writeQuorum minimum required disks to write data.
  54. // ListObjects pool management.
  55. listPool *treeWalkPool
  56. // Object cache for caching objects.
  57. objCache *objcache.Cache
  58. // Object cache enabled.
  59. objCacheEnabled bool
  60. }
  61. // list of all errors that can be ignored in tree walk operation in XL
  62. var xlTreeWalkIgnoredErrs = append(baseIgnoredErrs, errDiskAccessDenied, errVolumeNotFound, errFileNotFound)
  63. // newXLObjectLayer - initialize any object layer depending on the number of disks.
  64. func newXLObjectLayer(storageDisks []StorageAPI) (ObjectLayer, error) {
  65. // Initialize XL object layer.
  66. objAPI, err := newXLObjects(storageDisks)
  67. fatalIf(err, "Unable to initialize XL object layer.")
  68. // Initialize and load bucket policies.
  69. err = initBucketPolicies(objAPI)
  70. fatalIf(err, "Unable to load all bucket policies.")
  71. // Initialize a new event notifier.
  72. err = initEventNotifier(objAPI)
  73. fatalIf(err, "Unable to initialize event notification.")
  74. // Success.
  75. return objAPI, nil
  76. }
  77. // newXLObjects - initialize new xl object layer.
  78. func newXLObjects(storageDisks []StorageAPI) (ObjectLayer, error) {
  79. if storageDisks == nil {
  80. return nil, errInvalidArgument
  81. }
  82. readQuorum := len(storageDisks) / 2
  83. writeQuorum := len(storageDisks)/2 + 1
  84. // Load saved XL format.json and validate.
  85. newStorageDisks, err := loadFormatXL(storageDisks, readQuorum)
  86. if err != nil {
  87. return nil, fmt.Errorf("Unable to recognize backend format, %s", err)
  88. }
  89. // Calculate data and parity blocks.
  90. dataBlocks, parityBlocks := len(newStorageDisks)/2, len(newStorageDisks)/2
  91. // Initialize list pool.
  92. listPool := newTreeWalkPool(globalLookupTimeout)
  93. // Check if object cache is disabled.
  94. objCacheDisabled := strings.EqualFold(os.Getenv("_MINIO_CACHE"), "off")
  95. // Initialize xl objects.
  96. xl := &xlObjects{
  97. mutex: &sync.Mutex{},
  98. storageDisks: newStorageDisks,
  99. dataBlocks: dataBlocks,
  100. parityBlocks: parityBlocks,
  101. listPool: listPool,
  102. }
  103. // Get cache size if _MINIO_CACHE environment variable is set.
  104. var maxCacheSize uint64
  105. if !objCacheDisabled {
  106. maxCacheSize, err = GetMaxCacheSize()
  107. errorIf(err, "Unable to get maximum cache size")
  108. // Enable object cache if cache size is more than zero
  109. xl.objCacheEnabled = maxCacheSize > 0
  110. }
  111. // Check if object cache is enabled.
  112. if xl.objCacheEnabled {
  113. // Initialize object cache.
  114. objCache, oerr := objcache.New(maxCacheSize, objcache.DefaultExpiry)
  115. if oerr != nil {
  116. return nil, oerr
  117. }
  118. objCache.OnEviction = func(key string) {
  119. debug.FreeOSMemory()
  120. }
  121. xl.objCache = objCache
  122. }
  123. // Initialize meta volume, if volume already exists ignores it.
  124. if err = initMetaVolume(xl.storageDisks); err != nil {
  125. return nil, fmt.Errorf("Unable to initialize '.minio.sys' meta volume, %s", err)
  126. }
  127. // Figure out read and write quorum based on number of storage disks.
  128. // READ and WRITE quorum is always set to (N/2) number of disks.
  129. xl.readQuorum = readQuorum
  130. xl.writeQuorum = writeQuorum
  131. // Do a quick heal on the buckets themselves for any discrepancies.
  132. return xl, quickHeal(xl.storageDisks, xl.writeQuorum, xl.readQuorum)
  133. }
  134. // Shutdown function for object storage interface.
  135. func (xl xlObjects) Shutdown() error {
  136. // Add any object layer shutdown activities here.
  137. for _, disk := range xl.storageDisks {
  138. // This closes storage rpc client connections if any.
  139. // Otherwise this is a no-op.
  140. if disk == nil {
  141. continue
  142. }
  143. disk.Close()
  144. }
  145. return nil
  146. }
  147. // byDiskTotal is a collection satisfying sort.Interface.
  148. type byDiskTotal []disk.Info
  149. func (d byDiskTotal) Len() int { return len(d) }
  150. func (d byDiskTotal) Swap(i, j int) { d[i], d[j] = d[j], d[i] }
  151. func (d byDiskTotal) Less(i, j int) bool {
  152. return d[i].Total < d[j].Total
  153. }
  154. // getDisksInfo - fetch disks info across all other storage API.
  155. func getDisksInfo(disks []StorageAPI) (disksInfo []disk.Info, onlineDisks int, offlineDisks int) {
  156. disksInfo = make([]disk.Info, len(disks))
  157. for i, storageDisk := range disks {
  158. if storageDisk == nil {
  159. // Storage disk is empty, perhaps ignored disk or not available.
  160. offlineDisks++
  161. continue
  162. }
  163. info, err := storageDisk.DiskInfo()
  164. if err != nil {
  165. errorIf(err, "Unable to fetch disk info for %#v", storageDisk)
  166. if isErr(err, baseErrs...) {
  167. offlineDisks++
  168. continue
  169. }
  170. }
  171. onlineDisks++
  172. disksInfo[i] = info
  173. }
  174. // Success.
  175. return disksInfo, onlineDisks, offlineDisks
  176. }
  177. // returns sorted disksInfo slice which has only valid entries.
  178. // i.e the entries where the total size of the disk is not stated
  179. // as 0Bytes, this means that the disk is not online or ignored.
  180. func sortValidDisksInfo(disksInfo []disk.Info) []disk.Info {
  181. var validDisksInfo []disk.Info
  182. for _, diskInfo := range disksInfo {
  183. if diskInfo.Total == 0 {
  184. continue
  185. }
  186. validDisksInfo = append(validDisksInfo, diskInfo)
  187. }
  188. sort.Sort(byDiskTotal(validDisksInfo))
  189. return validDisksInfo
  190. }
  191. // Get an aggregated storage info across all disks.
  192. func getStorageInfo(disks []StorageAPI) StorageInfo {
  193. disksInfo, onlineDisks, offlineDisks := getDisksInfo(disks)
  194. // Sort so that the first element is the smallest.
  195. validDisksInfo := sortValidDisksInfo(disksInfo)
  196. if len(validDisksInfo) == 0 {
  197. return StorageInfo{
  198. Total: -1,
  199. Free: -1,
  200. }
  201. }
  202. // Return calculated storage info, choose the lowest Total and
  203. // Free as the total aggregated values. Total capacity is always
  204. // the multiple of smallest disk among the disk list.
  205. storageInfo := StorageInfo{
  206. Total: validDisksInfo[0].Total * int64(onlineDisks) / 2,
  207. Free: validDisksInfo[0].Free * int64(onlineDisks) / 2,
  208. }
  209. storageInfo.Backend.Type = Erasure
  210. storageInfo.Backend.OnlineDisks = onlineDisks
  211. storageInfo.Backend.OfflineDisks = offlineDisks
  212. return storageInfo
  213. }
  214. // StorageInfo - returns underlying storage statistics.
  215. func (xl xlObjects) StorageInfo() StorageInfo {
  216. storageInfo := getStorageInfo(xl.storageDisks)
  217. storageInfo.Backend.ReadQuorum = xl.readQuorum
  218. storageInfo.Backend.WriteQuorum = xl.writeQuorum
  219. return storageInfo
  220. }