You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1098 lines
34 KiB

Move admin APIs to new path and add redesigned heal APIs (#5351) - Changes related to moving admin APIs - admin APIs now have an endpoint under /minio/admin - admin APIs are now versioned - a new API to server the version is added at "GET /minio/admin/version" and all API operations have the path prefix /minio/admin/v1/<operation> - new service stop API added - credentials change API is moved to /minio/admin/v1/config/credential - credentials change API and configuration get/set API now require TLS so that credentials are protected - all API requests now receive JSON - heal APIs are disabled as they will be changed substantially - Heal API changes Heal API is now provided at a single endpoint with the ability for a client to start a heal sequence on all the data in the server, a single bucket, or under a prefix within a bucket. When a heal sequence is started, the server returns a unique token that needs to be used for subsequent 'status' requests to fetch heal results. On each status request from the client, the server returns heal result records that it has accumulated since the previous status request. The server accumulates upto 1000 records and pauses healing further objects until the client requests for status. If the client does not request any further records for a long time, the server aborts the heal sequence automatically. A heal result record is returned for each entity healed on the server, such as system metadata, object metadata, buckets and objects, and has information about the before and after states on each disk. A client may request to force restart a heal sequence - this causes the running heal sequence to be aborted at the next safe spot and starts a new heal sequence.
8 years ago
Move admin APIs to new path and add redesigned heal APIs (#5351) - Changes related to moving admin APIs - admin APIs now have an endpoint under /minio/admin - admin APIs are now versioned - a new API to server the version is added at "GET /minio/admin/version" and all API operations have the path prefix /minio/admin/v1/<operation> - new service stop API added - credentials change API is moved to /minio/admin/v1/config/credential - credentials change API and configuration get/set API now require TLS so that credentials are protected - all API requests now receive JSON - heal APIs are disabled as they will be changed substantially - Heal API changes Heal API is now provided at a single endpoint with the ability for a client to start a heal sequence on all the data in the server, a single bucket, or under a prefix within a bucket. When a heal sequence is started, the server returns a unique token that needs to be used for subsequent 'status' requests to fetch heal results. On each status request from the client, the server returns heal result records that it has accumulated since the previous status request. The server accumulates upto 1000 records and pauses healing further objects until the client requests for status. If the client does not request any further records for a long time, the server aborts the heal sequence automatically. A heal result record is returned for each entity healed on the server, such as system metadata, object metadata, buckets and objects, and has information about the before and after states on each disk. A client may request to force restart a heal sequence - this causes the running heal sequence to be aborted at the next safe spot and starts a new heal sequence.
8 years ago
Move admin APIs to new path and add redesigned heal APIs (#5351) - Changes related to moving admin APIs - admin APIs now have an endpoint under /minio/admin - admin APIs are now versioned - a new API to server the version is added at "GET /minio/admin/version" and all API operations have the path prefix /minio/admin/v1/<operation> - new service stop API added - credentials change API is moved to /minio/admin/v1/config/credential - credentials change API and configuration get/set API now require TLS so that credentials are protected - all API requests now receive JSON - heal APIs are disabled as they will be changed substantially - Heal API changes Heal API is now provided at a single endpoint with the ability for a client to start a heal sequence on all the data in the server, a single bucket, or under a prefix within a bucket. When a heal sequence is started, the server returns a unique token that needs to be used for subsequent 'status' requests to fetch heal results. On each status request from the client, the server returns heal result records that it has accumulated since the previous status request. The server accumulates upto 1000 records and pauses healing further objects until the client requests for status. If the client does not request any further records for a long time, the server aborts the heal sequence automatically. A heal result record is returned for each entity healed on the server, such as system metadata, object metadata, buckets and objects, and has information about the before and after states on each disk. A client may request to force restart a heal sequence - this causes the running heal sequence to be aborted at the next safe spot and starts a new heal sequence.
8 years ago
Move admin APIs to new path and add redesigned heal APIs (#5351) - Changes related to moving admin APIs - admin APIs now have an endpoint under /minio/admin - admin APIs are now versioned - a new API to server the version is added at "GET /minio/admin/version" and all API operations have the path prefix /minio/admin/v1/<operation> - new service stop API added - credentials change API is moved to /minio/admin/v1/config/credential - credentials change API and configuration get/set API now require TLS so that credentials are protected - all API requests now receive JSON - heal APIs are disabled as they will be changed substantially - Heal API changes Heal API is now provided at a single endpoint with the ability for a client to start a heal sequence on all the data in the server, a single bucket, or under a prefix within a bucket. When a heal sequence is started, the server returns a unique token that needs to be used for subsequent 'status' requests to fetch heal results. On each status request from the client, the server returns heal result records that it has accumulated since the previous status request. The server accumulates upto 1000 records and pauses healing further objects until the client requests for status. If the client does not request any further records for a long time, the server aborts the heal sequence automatically. A heal result record is returned for each entity healed on the server, such as system metadata, object metadata, buckets and objects, and has information about the before and after states on each disk. A client may request to force restart a heal sequence - this causes the running heal sequence to be aborted at the next safe spot and starts a new heal sequence.
8 years ago
Move admin APIs to new path and add redesigned heal APIs (#5351) - Changes related to moving admin APIs - admin APIs now have an endpoint under /minio/admin - admin APIs are now versioned - a new API to server the version is added at "GET /minio/admin/version" and all API operations have the path prefix /minio/admin/v1/<operation> - new service stop API added - credentials change API is moved to /minio/admin/v1/config/credential - credentials change API and configuration get/set API now require TLS so that credentials are protected - all API requests now receive JSON - heal APIs are disabled as they will be changed substantially - Heal API changes Heal API is now provided at a single endpoint with the ability for a client to start a heal sequence on all the data in the server, a single bucket, or under a prefix within a bucket. When a heal sequence is started, the server returns a unique token that needs to be used for subsequent 'status' requests to fetch heal results. On each status request from the client, the server returns heal result records that it has accumulated since the previous status request. The server accumulates upto 1000 records and pauses healing further objects until the client requests for status. If the client does not request any further records for a long time, the server aborts the heal sequence automatically. A heal result record is returned for each entity healed on the server, such as system metadata, object metadata, buckets and objects, and has information about the before and after states on each disk. A client may request to force restart a heal sequence - this causes the running heal sequence to be aborted at the next safe spot and starts a new heal sequence.
8 years ago
Move admin APIs to new path and add redesigned heal APIs (#5351) - Changes related to moving admin APIs - admin APIs now have an endpoint under /minio/admin - admin APIs are now versioned - a new API to server the version is added at "GET /minio/admin/version" and all API operations have the path prefix /minio/admin/v1/<operation> - new service stop API added - credentials change API is moved to /minio/admin/v1/config/credential - credentials change API and configuration get/set API now require TLS so that credentials are protected - all API requests now receive JSON - heal APIs are disabled as they will be changed substantially - Heal API changes Heal API is now provided at a single endpoint with the ability for a client to start a heal sequence on all the data in the server, a single bucket, or under a prefix within a bucket. When a heal sequence is started, the server returns a unique token that needs to be used for subsequent 'status' requests to fetch heal results. On each status request from the client, the server returns heal result records that it has accumulated since the previous status request. The server accumulates upto 1000 records and pauses healing further objects until the client requests for status. If the client does not request any further records for a long time, the server aborts the heal sequence automatically. A heal result record is returned for each entity healed on the server, such as system metadata, object metadata, buckets and objects, and has information about the before and after states on each disk. A client may request to force restart a heal sequence - this causes the running heal sequence to be aborted at the next safe spot and starts a new heal sequence.
8 years ago
Move admin APIs to new path and add redesigned heal APIs (#5351) - Changes related to moving admin APIs - admin APIs now have an endpoint under /minio/admin - admin APIs are now versioned - a new API to server the version is added at "GET /minio/admin/version" and all API operations have the path prefix /minio/admin/v1/<operation> - new service stop API added - credentials change API is moved to /minio/admin/v1/config/credential - credentials change API and configuration get/set API now require TLS so that credentials are protected - all API requests now receive JSON - heal APIs are disabled as they will be changed substantially - Heal API changes Heal API is now provided at a single endpoint with the ability for a client to start a heal sequence on all the data in the server, a single bucket, or under a prefix within a bucket. When a heal sequence is started, the server returns a unique token that needs to be used for subsequent 'status' requests to fetch heal results. On each status request from the client, the server returns heal result records that it has accumulated since the previous status request. The server accumulates upto 1000 records and pauses healing further objects until the client requests for status. If the client does not request any further records for a long time, the server aborts the heal sequence automatically. A heal result record is returned for each entity healed on the server, such as system metadata, object metadata, buckets and objects, and has information about the before and after states on each disk. A client may request to force restart a heal sequence - this causes the running heal sequence to be aborted at the next safe spot and starts a new heal sequence.
8 years ago
Move admin APIs to new path and add redesigned heal APIs (#5351) - Changes related to moving admin APIs - admin APIs now have an endpoint under /minio/admin - admin APIs are now versioned - a new API to server the version is added at "GET /minio/admin/version" and all API operations have the path prefix /minio/admin/v1/<operation> - new service stop API added - credentials change API is moved to /minio/admin/v1/config/credential - credentials change API and configuration get/set API now require TLS so that credentials are protected - all API requests now receive JSON - heal APIs are disabled as they will be changed substantially - Heal API changes Heal API is now provided at a single endpoint with the ability for a client to start a heal sequence on all the data in the server, a single bucket, or under a prefix within a bucket. When a heal sequence is started, the server returns a unique token that needs to be used for subsequent 'status' requests to fetch heal results. On each status request from the client, the server returns heal result records that it has accumulated since the previous status request. The server accumulates upto 1000 records and pauses healing further objects until the client requests for status. If the client does not request any further records for a long time, the server aborts the heal sequence automatically. A heal result record is returned for each entity healed on the server, such as system metadata, object metadata, buckets and objects, and has information about the before and after states on each disk. A client may request to force restart a heal sequence - this causes the running heal sequence to be aborted at the next safe spot and starts a new heal sequence.
8 years ago
Move admin APIs to new path and add redesigned heal APIs (#5351) - Changes related to moving admin APIs - admin APIs now have an endpoint under /minio/admin - admin APIs are now versioned - a new API to server the version is added at "GET /minio/admin/version" and all API operations have the path prefix /minio/admin/v1/<operation> - new service stop API added - credentials change API is moved to /minio/admin/v1/config/credential - credentials change API and configuration get/set API now require TLS so that credentials are protected - all API requests now receive JSON - heal APIs are disabled as they will be changed substantially - Heal API changes Heal API is now provided at a single endpoint with the ability for a client to start a heal sequence on all the data in the server, a single bucket, or under a prefix within a bucket. When a heal sequence is started, the server returns a unique token that needs to be used for subsequent 'status' requests to fetch heal results. On each status request from the client, the server returns heal result records that it has accumulated since the previous status request. The server accumulates upto 1000 records and pauses healing further objects until the client requests for status. If the client does not request any further records for a long time, the server aborts the heal sequence automatically. A heal result record is returned for each entity healed on the server, such as system metadata, object metadata, buckets and objects, and has information about the before and after states on each disk. A client may request to force restart a heal sequence - this causes the running heal sequence to be aborted at the next safe spot and starts a new heal sequence.
8 years ago
Move admin APIs to new path and add redesigned heal APIs (#5351) - Changes related to moving admin APIs - admin APIs now have an endpoint under /minio/admin - admin APIs are now versioned - a new API to server the version is added at "GET /minio/admin/version" and all API operations have the path prefix /minio/admin/v1/<operation> - new service stop API added - credentials change API is moved to /minio/admin/v1/config/credential - credentials change API and configuration get/set API now require TLS so that credentials are protected - all API requests now receive JSON - heal APIs are disabled as they will be changed substantially - Heal API changes Heal API is now provided at a single endpoint with the ability for a client to start a heal sequence on all the data in the server, a single bucket, or under a prefix within a bucket. When a heal sequence is started, the server returns a unique token that needs to be used for subsequent 'status' requests to fetch heal results. On each status request from the client, the server returns heal result records that it has accumulated since the previous status request. The server accumulates upto 1000 records and pauses healing further objects until the client requests for status. If the client does not request any further records for a long time, the server aborts the heal sequence automatically. A heal result record is returned for each entity healed on the server, such as system metadata, object metadata, buckets and objects, and has information about the before and after states on each disk. A client may request to force restart a heal sequence - this causes the running heal sequence to be aborted at the next safe spot and starts a new heal sequence.
8 years ago
Move admin APIs to new path and add redesigned heal APIs (#5351) - Changes related to moving admin APIs - admin APIs now have an endpoint under /minio/admin - admin APIs are now versioned - a new API to server the version is added at "GET /minio/admin/version" and all API operations have the path prefix /minio/admin/v1/<operation> - new service stop API added - credentials change API is moved to /minio/admin/v1/config/credential - credentials change API and configuration get/set API now require TLS so that credentials are protected - all API requests now receive JSON - heal APIs are disabled as they will be changed substantially - Heal API changes Heal API is now provided at a single endpoint with the ability for a client to start a heal sequence on all the data in the server, a single bucket, or under a prefix within a bucket. When a heal sequence is started, the server returns a unique token that needs to be used for subsequent 'status' requests to fetch heal results. On each status request from the client, the server returns heal result records that it has accumulated since the previous status request. The server accumulates upto 1000 records and pauses healing further objects until the client requests for status. If the client does not request any further records for a long time, the server aborts the heal sequence automatically. A heal result record is returned for each entity healed on the server, such as system metadata, object metadata, buckets and objects, and has information about the before and after states on each disk. A client may request to force restart a heal sequence - this causes the running heal sequence to be aborted at the next safe spot and starts a new heal sequence.
8 years ago
  1. // Copyright (c) 2015-2021 MinIO, Inc.
  2. //
  3. // This file is part of MinIO Object Storage stack
  4. //
  5. // This program is free software: you can redistribute it and/or modify
  6. // it under the terms of the GNU Affero General Public License as published by
  7. // the Free Software Foundation, either version 3 of the License, or
  8. // (at your option) any later version.
  9. //
  10. // This program is distributed in the hope that it will be useful
  11. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. // GNU Affero General Public License for more details.
  14. //
  15. // You should have received a copy of the GNU Affero General Public License
  16. // along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. package cmd
  18. import (
  19. "bytes"
  20. "context"
  21. "errors"
  22. "fmt"
  23. "io"
  24. "strconv"
  25. "strings"
  26. "sync"
  27. "time"
  28. "github.com/minio/madmin-go/v3"
  29. "github.com/minio/minio/internal/grid"
  30. "github.com/minio/minio/internal/logger"
  31. "github.com/minio/pkg/v3/sync/errgroup"
  32. "golang.org/x/exp/slices"
  33. )
  34. //go:generate stringer -type=healingMetric -trimprefix=healingMetric $GOFILE
  35. type healingMetric uint8
  36. const (
  37. healingMetricBucket healingMetric = iota
  38. healingMetricObject
  39. healingMetricCheckAbandonedParts
  40. )
  41. func (er erasureObjects) listAndHeal(ctx context.Context, bucket, prefix string, scanMode madmin.HealScanMode, healEntry func(string, metaCacheEntry, madmin.HealScanMode) error) error {
  42. ctx, cancel := context.WithCancel(ctx)
  43. defer cancel()
  44. disks, _ := er.getOnlineDisksWithHealing(false)
  45. if len(disks) == 0 {
  46. return errors.New("listAndHeal: No non-healing drives found")
  47. }
  48. expectedDisks := len(disks)/2 + 1
  49. fallbackDisks := disks[expectedDisks:]
  50. disks = disks[:expectedDisks]
  51. // How to resolve partial results.
  52. resolver := metadataResolutionParams{
  53. dirQuorum: 1,
  54. objQuorum: 1,
  55. bucket: bucket,
  56. strict: false, // Allow less strict matching.
  57. }
  58. path := baseDirFromPrefix(prefix)
  59. filterPrefix := strings.Trim(strings.TrimPrefix(prefix, path), slashSeparator)
  60. if path == prefix {
  61. filterPrefix = ""
  62. }
  63. lopts := listPathRawOptions{
  64. disks: disks,
  65. fallbackDisks: fallbackDisks,
  66. bucket: bucket,
  67. path: path,
  68. filterPrefix: filterPrefix,
  69. recursive: true,
  70. forwardTo: "",
  71. minDisks: 1,
  72. reportNotFound: false,
  73. agreed: func(entry metaCacheEntry) {
  74. if err := healEntry(bucket, entry, scanMode); err != nil {
  75. cancel()
  76. }
  77. },
  78. partial: func(entries metaCacheEntries, _ []error) {
  79. entry, ok := entries.resolve(&resolver)
  80. if !ok {
  81. // check if we can get one entry at least
  82. // proceed to heal nonetheless.
  83. entry, _ = entries.firstFound()
  84. }
  85. if err := healEntry(bucket, *entry, scanMode); err != nil {
  86. cancel()
  87. return
  88. }
  89. },
  90. finished: nil,
  91. }
  92. if err := listPathRaw(ctx, lopts); err != nil {
  93. return fmt.Errorf("listPathRaw returned %w: opts(%#v)", err, lopts)
  94. }
  95. return nil
  96. }
  97. // listAllBuckets lists all buckets from all disks. It also
  98. // returns the occurrence of each buckets in all disks
  99. func listAllBuckets(ctx context.Context, storageDisks []StorageAPI, healBuckets map[string]VolInfo, readQuorum int) error {
  100. g := errgroup.WithNErrs(len(storageDisks))
  101. var mu sync.Mutex
  102. for index := range storageDisks {
  103. index := index
  104. g.Go(func() error {
  105. if storageDisks[index] == nil {
  106. // we ignore disk not found errors
  107. return nil
  108. }
  109. volsInfo, err := storageDisks[index].ListVols(ctx)
  110. if err != nil {
  111. return err
  112. }
  113. for _, volInfo := range volsInfo {
  114. // StorageAPI can send volume names which are
  115. // incompatible with buckets - these are
  116. // skipped, like the meta-bucket.
  117. if isReservedOrInvalidBucket(volInfo.Name, false) {
  118. continue
  119. }
  120. mu.Lock()
  121. if _, ok := healBuckets[volInfo.Name]; !ok {
  122. healBuckets[volInfo.Name] = volInfo
  123. }
  124. mu.Unlock()
  125. }
  126. return nil
  127. }, index)
  128. }
  129. return reduceReadQuorumErrs(ctx, g.Wait(), bucketMetadataOpIgnoredErrs, readQuorum)
  130. }
  131. var errLegacyXLMeta = errors.New("legacy XL meta")
  132. var errOutdatedXLMeta = errors.New("outdated XL meta")
  133. var errPartMissingOrCorrupt = errors.New("part missing or corrupt")
  134. // Only heal on disks where we are sure that healing is needed. We can expand
  135. // this list as and when we figure out more errors can be added to this list safely.
  136. func shouldHealObjectOnDisk(erErr error, partsErrs []int, meta FileInfo, latestMeta FileInfo) (bool, error) {
  137. if errors.Is(erErr, errFileNotFound) || errors.Is(erErr, errFileVersionNotFound) || errors.Is(erErr, errFileCorrupt) {
  138. return true, erErr
  139. }
  140. if erErr == nil {
  141. if meta.XLV1 {
  142. // Legacy means heal always
  143. // always check first.
  144. return true, errLegacyXLMeta
  145. }
  146. if !latestMeta.Equals(meta) {
  147. return true, errOutdatedXLMeta
  148. }
  149. if !meta.Deleted && !meta.IsRemote() {
  150. // If xl.meta was read fine but there may be problem with the part.N files.
  151. for _, partErr := range partsErrs {
  152. if slices.Contains([]int{
  153. checkPartFileNotFound,
  154. checkPartFileCorrupt,
  155. }, partErr) {
  156. return true, errPartMissingOrCorrupt
  157. }
  158. }
  159. }
  160. return false, nil
  161. }
  162. return false, erErr
  163. }
  164. const (
  165. xMinIOHealing = ReservedMetadataPrefix + "healing"
  166. xMinIODataMov = ReservedMetadataPrefix + "data-mov"
  167. )
  168. // SetHealing marks object (version) as being healed.
  169. // Note: this is to be used only from healObject
  170. func (fi *FileInfo) SetHealing() {
  171. if fi.Metadata == nil {
  172. fi.Metadata = make(map[string]string)
  173. }
  174. fi.Metadata[xMinIOHealing] = "true"
  175. }
  176. // Healing returns true if object is being healed (i.e fi is being passed down
  177. // from healObject)
  178. func (fi FileInfo) Healing() bool {
  179. _, ok := fi.Metadata[xMinIOHealing]
  180. return ok
  181. }
  182. // SetDataMov marks object (version) as being currently
  183. // in movement, such as decommissioning or rebalance.
  184. func (fi *FileInfo) SetDataMov() {
  185. if fi.Metadata == nil {
  186. fi.Metadata = make(map[string]string)
  187. }
  188. fi.Metadata[xMinIODataMov] = "true"
  189. }
  190. // DataMov returns true if object is being in movement
  191. func (fi FileInfo) DataMov() bool {
  192. _, ok := fi.Metadata[xMinIODataMov]
  193. return ok
  194. }
  195. func (er *erasureObjects) auditHealObject(ctx context.Context, bucket, object, versionID string, result madmin.HealResultItem, err error) {
  196. if len(logger.AuditTargets()) == 0 {
  197. return
  198. }
  199. opts := AuditLogOptions{
  200. Event: "HealObject",
  201. Bucket: bucket,
  202. Object: object,
  203. VersionID: versionID,
  204. }
  205. if err != nil {
  206. opts.Error = err.Error()
  207. }
  208. opts.Tags = map[string]interface{}{
  209. "healResult": result,
  210. "objectLocation": auditObjectOp{
  211. Name: decodeDirObject(object),
  212. Pool: er.poolIndex + 1,
  213. Set: er.setIndex + 1,
  214. Drives: er.getEndpointStrings(),
  215. },
  216. }
  217. auditLogInternal(ctx, opts)
  218. }
  219. // Heals an object by re-writing corrupt/missing erasure blocks.
  220. func (er *erasureObjects) healObject(ctx context.Context, bucket string, object string, versionID string, opts madmin.HealOpts) (result madmin.HealResultItem, err error) {
  221. dryRun := opts.DryRun
  222. scanMode := opts.ScanMode
  223. storageDisks := er.getDisks()
  224. storageEndpoints := er.getEndpoints()
  225. defer func() {
  226. er.auditHealObject(ctx, bucket, object, versionID, result, err)
  227. }()
  228. if globalTrace.NumSubscribers(madmin.TraceHealing) > 0 {
  229. startTime := time.Now()
  230. defer func() {
  231. healTrace(healingMetricObject, startTime, bucket, object, &opts, err, &result)
  232. }()
  233. }
  234. // Initialize heal result object
  235. result = madmin.HealResultItem{
  236. Type: madmin.HealItemObject,
  237. Bucket: bucket,
  238. Object: object,
  239. VersionID: versionID,
  240. DiskCount: len(storageDisks),
  241. }
  242. if !opts.NoLock {
  243. lk := er.NewNSLock(bucket, object)
  244. lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
  245. if err != nil {
  246. return result, err
  247. }
  248. ctx = lkctx.Context()
  249. defer lk.Unlock(lkctx)
  250. }
  251. // Re-read when we have lock...
  252. partsMetadata, errs := readAllFileInfo(ctx, storageDisks, "", bucket, object, versionID, true, true)
  253. if isAllNotFound(errs) {
  254. err := errFileNotFound
  255. if versionID != "" {
  256. err = errFileVersionNotFound
  257. }
  258. // Nothing to do, file is already gone.
  259. return er.defaultHealResult(FileInfo{}, storageDisks, storageEndpoints,
  260. errs, bucket, object, versionID), err
  261. }
  262. readQuorum, _, err := objectQuorumFromMeta(ctx, partsMetadata, errs, er.defaultParityCount)
  263. if err != nil {
  264. m, derr := er.deleteIfDangling(ctx, bucket, object, partsMetadata, errs, nil, ObjectOptions{
  265. VersionID: versionID,
  266. })
  267. errs = make([]error, len(errs))
  268. if derr == nil {
  269. derr = errFileNotFound
  270. if versionID != "" {
  271. derr = errFileVersionNotFound
  272. }
  273. // We did find a new danging object
  274. return er.defaultHealResult(m, storageDisks, storageEndpoints,
  275. errs, bucket, object, versionID), derr
  276. }
  277. return er.defaultHealResult(m, storageDisks, storageEndpoints,
  278. errs, bucket, object, versionID), err
  279. }
  280. result.ParityBlocks = result.DiskCount - readQuorum
  281. result.DataBlocks = readQuorum
  282. // List of disks having latest version of the object xl.meta
  283. // (by modtime).
  284. onlineDisks, modTime, etag := listOnlineDisks(storageDisks, partsMetadata, errs, readQuorum)
  285. // Latest FileInfo for reference. If a valid metadata is not
  286. // present, it is as good as object not found.
  287. latestMeta, err := pickValidFileInfo(ctx, partsMetadata, modTime, etag, readQuorum)
  288. if err != nil {
  289. return result, err
  290. }
  291. // List of disks having all parts as per latest metadata.
  292. // NOTE: do not pass in latestDisks to diskWithAllParts since
  293. // the diskWithAllParts needs to reach the drive to ensure
  294. // validity of the metadata content, we should make sure that
  295. // we pass in disks as is for it to be verified. Once verified
  296. // the disksWithAllParts() returns the actual disks that can be
  297. // used here for reconstruction. This is done to ensure that
  298. // we do not skip drives that have inconsistent metadata to be
  299. // skipped from purging when they are stale.
  300. availableDisks, dataErrsByDisk, dataErrsByPart := disksWithAllParts(ctx, onlineDisks, partsMetadata,
  301. errs, latestMeta, bucket, object, scanMode)
  302. var erasure Erasure
  303. if !latestMeta.Deleted && !latestMeta.IsRemote() {
  304. // Initialize erasure coding
  305. erasure, err = NewErasure(ctx, latestMeta.Erasure.DataBlocks,
  306. latestMeta.Erasure.ParityBlocks, latestMeta.Erasure.BlockSize)
  307. if err != nil {
  308. return result, err
  309. }
  310. }
  311. result.ObjectSize, err = latestMeta.ToObjectInfo(bucket, object, true).GetActualSize()
  312. if err != nil {
  313. return result, err
  314. }
  315. // Loop to find number of disks with valid data, per-drive
  316. // data state and a list of outdated disks on which data needs
  317. // to be healed.
  318. outDatedDisks := make([]StorageAPI, len(storageDisks))
  319. disksToHealCount := 0
  320. for i := range availableDisks {
  321. yes, reason := shouldHealObjectOnDisk(errs[i], dataErrsByDisk[i], partsMetadata[i], latestMeta)
  322. if yes {
  323. outDatedDisks[i] = storageDisks[i]
  324. disksToHealCount++
  325. }
  326. driveState := ""
  327. switch {
  328. case reason == nil:
  329. driveState = madmin.DriveStateOk
  330. case IsErr(reason, errDiskNotFound):
  331. driveState = madmin.DriveStateOffline
  332. case IsErr(reason, errFileNotFound, errFileVersionNotFound, errVolumeNotFound, errPartMissingOrCorrupt, errOutdatedXLMeta, errLegacyXLMeta):
  333. driveState = madmin.DriveStateMissing
  334. default:
  335. // all remaining cases imply corrupt data/metadata
  336. driveState = madmin.DriveStateCorrupt
  337. }
  338. result.Before.Drives = append(result.Before.Drives, madmin.HealDriveInfo{
  339. UUID: "",
  340. Endpoint: storageEndpoints[i].String(),
  341. State: driveState,
  342. })
  343. result.After.Drives = append(result.After.Drives, madmin.HealDriveInfo{
  344. UUID: "",
  345. Endpoint: storageEndpoints[i].String(),
  346. State: driveState,
  347. })
  348. }
  349. if isAllNotFound(errs) {
  350. // File is fully gone, fileInfo is empty.
  351. err := errFileNotFound
  352. if versionID != "" {
  353. err = errFileVersionNotFound
  354. }
  355. return er.defaultHealResult(FileInfo{}, storageDisks, storageEndpoints, errs,
  356. bucket, object, versionID), err
  357. }
  358. if disksToHealCount == 0 {
  359. // Nothing to heal!
  360. return result, nil
  361. }
  362. // After this point, only have to repair data on disk - so
  363. // return if it is a dry-run
  364. if dryRun {
  365. return result, nil
  366. }
  367. if !latestMeta.XLV1 && !latestMeta.Deleted && disksToHealCount > latestMeta.Erasure.ParityBlocks {
  368. // Allow for dangling deletes, on versions that have DataDir missing etc.
  369. // this would end up restoring the correct readable versions.
  370. m, err := er.deleteIfDangling(ctx, bucket, object, partsMetadata, errs, dataErrsByPart, ObjectOptions{
  371. VersionID: versionID,
  372. })
  373. errs = make([]error, len(errs))
  374. if err == nil {
  375. err = errFileNotFound
  376. if versionID != "" {
  377. err = errFileVersionNotFound
  378. }
  379. // We did find a new danging object
  380. return er.defaultHealResult(m, storageDisks, storageEndpoints,
  381. errs, bucket, object, versionID), err
  382. }
  383. for i := range errs {
  384. errs[i] = err
  385. }
  386. return er.defaultHealResult(m, storageDisks, storageEndpoints,
  387. errs, bucket, object, versionID), err
  388. }
  389. cleanFileInfo := func(fi FileInfo) FileInfo {
  390. // Returns a copy of the 'fi' with erasure index, checksums and inline data niled.
  391. nfi := fi
  392. if !nfi.IsRemote() {
  393. nfi.Data = nil
  394. nfi.Erasure.Index = 0
  395. nfi.Erasure.Checksums = nil
  396. }
  397. return nfi
  398. }
  399. // We write at temporary location and then rename to final location.
  400. tmpID := mustGetUUID()
  401. migrateDataDir := mustGetUUID()
  402. // Reorder so that we have data disks first and parity disks next.
  403. if !latestMeta.Deleted && len(latestMeta.Erasure.Distribution) != len(availableDisks) {
  404. err := fmt.Errorf("unexpected file distribution (%v) from available disks (%v), looks like backend disks have been manually modified refusing to heal %s/%s(%s)",
  405. latestMeta.Erasure.Distribution, availableDisks, bucket, object, versionID)
  406. healingLogOnceIf(ctx, err, "heal-object-available-disks")
  407. return er.defaultHealResult(latestMeta, storageDisks, storageEndpoints, errs,
  408. bucket, object, versionID), err
  409. }
  410. latestDisks := shuffleDisks(availableDisks, latestMeta.Erasure.Distribution)
  411. if !latestMeta.Deleted && len(latestMeta.Erasure.Distribution) != len(outDatedDisks) {
  412. err := fmt.Errorf("unexpected file distribution (%v) from outdated disks (%v), looks like backend disks have been manually modified refusing to heal %s/%s(%s)",
  413. latestMeta.Erasure.Distribution, outDatedDisks, bucket, object, versionID)
  414. healingLogOnceIf(ctx, err, "heal-object-outdated-disks")
  415. return er.defaultHealResult(latestMeta, storageDisks, storageEndpoints, errs,
  416. bucket, object, versionID), err
  417. }
  418. outDatedDisks = shuffleDisks(outDatedDisks, latestMeta.Erasure.Distribution)
  419. if !latestMeta.Deleted && len(latestMeta.Erasure.Distribution) != len(partsMetadata) {
  420. err := fmt.Errorf("unexpected file distribution (%v) from metadata entries (%v), looks like backend disks have been manually modified refusing to heal %s/%s(%s)",
  421. latestMeta.Erasure.Distribution, len(partsMetadata), bucket, object, versionID)
  422. healingLogOnceIf(ctx, err, "heal-object-metadata-entries")
  423. return er.defaultHealResult(latestMeta, storageDisks, storageEndpoints, errs,
  424. bucket, object, versionID), err
  425. }
  426. partsMetadata = shufflePartsMetadata(partsMetadata, latestMeta.Erasure.Distribution)
  427. copyPartsMetadata := make([]FileInfo, len(partsMetadata))
  428. for i := range latestDisks {
  429. if latestDisks[i] == nil {
  430. continue
  431. }
  432. copyPartsMetadata[i] = partsMetadata[i]
  433. }
  434. for i := range outDatedDisks {
  435. if outDatedDisks[i] == nil {
  436. continue
  437. }
  438. // Make sure to write the FileInfo information
  439. // that is expected to be in quorum.
  440. partsMetadata[i] = cleanFileInfo(latestMeta)
  441. }
  442. // source data dir shall be empty in case of XLV1
  443. // differentiate it with dstDataDir for readability
  444. // srcDataDir is the one used with newBitrotReader()
  445. // to read existing content.
  446. srcDataDir := latestMeta.DataDir
  447. dstDataDir := latestMeta.DataDir
  448. if latestMeta.XLV1 {
  449. dstDataDir = migrateDataDir
  450. }
  451. var inlineBuffers []*bytes.Buffer
  452. if !latestMeta.Deleted && !latestMeta.IsRemote() {
  453. if latestMeta.InlineData() {
  454. inlineBuffers = make([]*bytes.Buffer, len(outDatedDisks))
  455. }
  456. erasureInfo := latestMeta.Erasure
  457. for partIndex := 0; partIndex < len(latestMeta.Parts); partIndex++ {
  458. partSize := latestMeta.Parts[partIndex].Size
  459. partActualSize := latestMeta.Parts[partIndex].ActualSize
  460. partModTime := latestMeta.Parts[partIndex].ModTime
  461. partNumber := latestMeta.Parts[partIndex].Number
  462. partIdx := latestMeta.Parts[partIndex].Index
  463. partChecksums := latestMeta.Parts[partIndex].Checksums
  464. tillOffset := erasure.ShardFileOffset(0, partSize, partSize)
  465. readers := make([]io.ReaderAt, len(latestDisks))
  466. prefer := make([]bool, len(latestDisks))
  467. checksumAlgo := erasureInfo.GetChecksumInfo(partNumber).Algorithm
  468. for i, disk := range latestDisks {
  469. if disk == OfflineDisk {
  470. continue
  471. }
  472. checksumInfo := copyPartsMetadata[i].Erasure.GetChecksumInfo(partNumber)
  473. partPath := pathJoin(object, srcDataDir, fmt.Sprintf("part.%d", partNumber))
  474. readers[i] = newBitrotReader(disk, copyPartsMetadata[i].Data, bucket, partPath, tillOffset, checksumAlgo,
  475. checksumInfo.Hash, erasure.ShardSize())
  476. prefer[i] = disk.Hostname() == ""
  477. }
  478. writers := make([]io.Writer, len(outDatedDisks))
  479. for i, disk := range outDatedDisks {
  480. if disk == OfflineDisk {
  481. continue
  482. }
  483. partPath := pathJoin(tmpID, dstDataDir, fmt.Sprintf("part.%d", partNumber))
  484. if len(inlineBuffers) > 0 {
  485. buf := grid.GetByteBufferCap(int(erasure.ShardFileSize(latestMeta.Size)) + 64)
  486. inlineBuffers[i] = bytes.NewBuffer(buf[:0])
  487. defer grid.PutByteBuffer(buf)
  488. writers[i] = newStreamingBitrotWriterBuffer(inlineBuffers[i], DefaultBitrotAlgorithm, erasure.ShardSize())
  489. } else {
  490. writers[i] = newBitrotWriter(disk, bucket, minioMetaTmpBucket, partPath,
  491. tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize())
  492. }
  493. }
  494. // Heal each part. erasure.Heal() will write the healed
  495. // part to .minio/tmp/uuid/ which needs to be renamed
  496. // later to the final location.
  497. err = erasure.Heal(ctx, writers, readers, partSize, prefer)
  498. closeBitrotReaders(readers)
  499. closeBitrotWriters(writers)
  500. if err != nil {
  501. return result, err
  502. }
  503. // outDatedDisks that had write errors should not be
  504. // written to for remaining parts, so we nil it out.
  505. for i, disk := range outDatedDisks {
  506. if disk == OfflineDisk {
  507. continue
  508. }
  509. // A non-nil stale disk which did not receive
  510. // a healed part checksum had a write error.
  511. if writers[i] == nil {
  512. outDatedDisks[i] = nil
  513. disksToHealCount--
  514. continue
  515. }
  516. partsMetadata[i].DataDir = dstDataDir
  517. partsMetadata[i].AddObjectPart(partNumber, "", partSize, partActualSize, partModTime, partIdx, partChecksums)
  518. if len(inlineBuffers) > 0 && inlineBuffers[i] != nil {
  519. partsMetadata[i].Data = inlineBuffers[i].Bytes()
  520. partsMetadata[i].SetInlineData()
  521. } else {
  522. partsMetadata[i].Data = nil
  523. }
  524. }
  525. // If all disks are having errors, we give up.
  526. if disksToHealCount == 0 {
  527. return result, fmt.Errorf("all drives had write errors, unable to heal %s/%s", bucket, object)
  528. }
  529. }
  530. }
  531. defer er.deleteAll(context.Background(), minioMetaTmpBucket, tmpID)
  532. // Rename from tmp location to the actual location.
  533. for i, disk := range outDatedDisks {
  534. if disk == OfflineDisk {
  535. continue
  536. }
  537. // record the index of the updated disks
  538. partsMetadata[i].Erasure.Index = i + 1
  539. // Attempt a rename now from healed data to final location.
  540. partsMetadata[i].SetHealing()
  541. if _, err = disk.RenameData(ctx, minioMetaTmpBucket, tmpID, partsMetadata[i], bucket, object, RenameOptions{}); err != nil {
  542. return result, err
  543. }
  544. // - Remove any remaining parts from outdated disks from before transition.
  545. if partsMetadata[i].IsRemote() {
  546. rmDataDir := partsMetadata[i].DataDir
  547. disk.Delete(ctx, bucket, pathJoin(encodeDirObject(object), rmDataDir), DeleteOptions{
  548. Immediate: true,
  549. Recursive: true,
  550. })
  551. }
  552. for i, v := range result.Before.Drives {
  553. if v.Endpoint == disk.Endpoint().String() {
  554. result.After.Drives[i].State = madmin.DriveStateOk
  555. }
  556. }
  557. }
  558. return result, nil
  559. }
  560. // checkAbandonedParts will check if an object has abandoned parts,
  561. // meaning data-dirs or inlined data that are no longer referenced by the xl.meta
  562. // Errors are generally ignored by this function.
  563. func (er *erasureObjects) checkAbandonedParts(ctx context.Context, bucket string, object string, opts madmin.HealOpts) (err error) {
  564. if !opts.Remove || opts.DryRun {
  565. return nil
  566. }
  567. if globalTrace.NumSubscribers(madmin.TraceHealing) > 0 {
  568. startTime := time.Now()
  569. defer func() {
  570. healTrace(healingMetricCheckAbandonedParts, startTime, bucket, object, nil, err, nil)
  571. }()
  572. }
  573. if !opts.NoLock {
  574. lk := er.NewNSLock(bucket, object)
  575. lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
  576. if err != nil {
  577. return err
  578. }
  579. ctx = lkctx.Context()
  580. defer lk.Unlock(lkctx)
  581. }
  582. var wg sync.WaitGroup
  583. for _, disk := range er.getDisks() {
  584. if disk != nil {
  585. wg.Add(1)
  586. go func(disk StorageAPI) {
  587. defer wg.Done()
  588. _ = disk.CleanAbandonedData(ctx, bucket, object)
  589. }(disk)
  590. }
  591. }
  592. wg.Wait()
  593. return nil
  594. }
  595. // healObjectDir - heals object directory specifically, this special call
  596. // is needed since we do not have a special backend format for directories.
  597. func (er *erasureObjects) healObjectDir(ctx context.Context, bucket, object string, dryRun bool, remove bool) (hr madmin.HealResultItem, err error) {
  598. storageDisks := er.getDisks()
  599. storageEndpoints := er.getEndpoints()
  600. // Initialize heal result object
  601. hr = madmin.HealResultItem{
  602. Type: madmin.HealItemObject,
  603. Bucket: bucket,
  604. Object: object,
  605. DiskCount: len(storageDisks),
  606. ParityBlocks: er.defaultParityCount,
  607. DataBlocks: len(storageDisks) - er.defaultParityCount,
  608. ObjectSize: 0,
  609. }
  610. hr.Before.Drives = make([]madmin.HealDriveInfo, len(storageDisks))
  611. hr.After.Drives = make([]madmin.HealDriveInfo, len(storageDisks))
  612. errs := statAllDirs(ctx, storageDisks, bucket, object)
  613. danglingObject := isObjectDirDangling(errs)
  614. if danglingObject {
  615. if !dryRun && remove {
  616. var wg sync.WaitGroup
  617. // Remove versions in bulk for each disk
  618. for index, disk := range storageDisks {
  619. if disk == nil {
  620. continue
  621. }
  622. wg.Add(1)
  623. go func(index int, disk StorageAPI) {
  624. defer wg.Done()
  625. _ = disk.Delete(ctx, bucket, object, DeleteOptions{
  626. Recursive: false,
  627. Immediate: false,
  628. })
  629. }(index, disk)
  630. }
  631. wg.Wait()
  632. }
  633. }
  634. // Prepare object creation in all disks
  635. for i, err := range errs {
  636. drive := storageEndpoints[i].String()
  637. switch err {
  638. case nil:
  639. hr.Before.Drives[i] = madmin.HealDriveInfo{Endpoint: drive, State: madmin.DriveStateOk}
  640. hr.After.Drives[i] = madmin.HealDriveInfo{Endpoint: drive, State: madmin.DriveStateOk}
  641. case errDiskNotFound:
  642. hr.Before.Drives[i] = madmin.HealDriveInfo{State: madmin.DriveStateOffline}
  643. hr.After.Drives[i] = madmin.HealDriveInfo{State: madmin.DriveStateOffline}
  644. case errVolumeNotFound, errFileNotFound:
  645. // Bucket or prefix/directory not found
  646. hr.Before.Drives[i] = madmin.HealDriveInfo{Endpoint: drive, State: madmin.DriveStateMissing}
  647. hr.After.Drives[i] = madmin.HealDriveInfo{Endpoint: drive, State: madmin.DriveStateMissing}
  648. default:
  649. hr.Before.Drives[i] = madmin.HealDriveInfo{Endpoint: drive, State: madmin.DriveStateCorrupt}
  650. hr.After.Drives[i] = madmin.HealDriveInfo{Endpoint: drive, State: madmin.DriveStateCorrupt}
  651. }
  652. }
  653. if danglingObject || isAllNotFound(errs) {
  654. // Nothing to do, file is already gone.
  655. return hr, errFileNotFound
  656. }
  657. if dryRun {
  658. // Quit without try to heal the object dir
  659. return hr, nil
  660. }
  661. for i, err := range errs {
  662. if err == errVolumeNotFound || err == errFileNotFound {
  663. // Bucket or prefix/directory not found
  664. merr := storageDisks[i].MakeVol(ctx, pathJoin(bucket, object))
  665. switch merr {
  666. case nil, errVolumeExists:
  667. hr.After.Drives[i].State = madmin.DriveStateOk
  668. case errDiskNotFound:
  669. hr.After.Drives[i].State = madmin.DriveStateOffline
  670. default:
  671. hr.After.Drives[i].State = madmin.DriveStateCorrupt
  672. }
  673. }
  674. }
  675. return hr, nil
  676. }
  677. // Populates default heal result item entries with possible values when we are returning prematurely.
  678. // This is to ensure that in any circumstance we are not returning empty arrays with wrong values.
  679. func (er *erasureObjects) defaultHealResult(lfi FileInfo, storageDisks []StorageAPI, storageEndpoints []Endpoint, errs []error, bucket, object, versionID string) madmin.HealResultItem {
  680. // Initialize heal result object
  681. result := madmin.HealResultItem{
  682. Type: madmin.HealItemObject,
  683. Bucket: bucket,
  684. Object: object,
  685. ObjectSize: lfi.Size,
  686. VersionID: versionID,
  687. DiskCount: len(storageDisks),
  688. }
  689. if lfi.IsValid() {
  690. result.ParityBlocks = lfi.Erasure.ParityBlocks
  691. } else {
  692. // Default to most common configuration for erasure blocks.
  693. result.ParityBlocks = er.defaultParityCount
  694. }
  695. result.DataBlocks = len(storageDisks) - result.ParityBlocks
  696. for index, disk := range storageDisks {
  697. if disk == nil {
  698. result.Before.Drives = append(result.Before.Drives, madmin.HealDriveInfo{
  699. UUID: "",
  700. Endpoint: storageEndpoints[index].String(),
  701. State: madmin.DriveStateOffline,
  702. })
  703. result.After.Drives = append(result.After.Drives, madmin.HealDriveInfo{
  704. UUID: "",
  705. Endpoint: storageEndpoints[index].String(),
  706. State: madmin.DriveStateOffline,
  707. })
  708. continue
  709. }
  710. driveState := madmin.DriveStateCorrupt
  711. switch errs[index] {
  712. case errFileNotFound, errVolumeNotFound:
  713. driveState = madmin.DriveStateMissing
  714. case nil:
  715. driveState = madmin.DriveStateOk
  716. }
  717. result.Before.Drives = append(result.Before.Drives, madmin.HealDriveInfo{
  718. UUID: "",
  719. Endpoint: storageEndpoints[index].String(),
  720. State: driveState,
  721. })
  722. result.After.Drives = append(result.After.Drives, madmin.HealDriveInfo{
  723. UUID: "",
  724. Endpoint: storageEndpoints[index].String(),
  725. State: driveState,
  726. })
  727. }
  728. return result
  729. }
  730. // Stat all directories.
  731. func statAllDirs(ctx context.Context, storageDisks []StorageAPI, bucket, prefix string) []error {
  732. g := errgroup.WithNErrs(len(storageDisks))
  733. for index, disk := range storageDisks {
  734. if disk == nil {
  735. continue
  736. }
  737. index := index
  738. g.Go(func() error {
  739. entries, err := storageDisks[index].ListDir(ctx, "", bucket, prefix, 1)
  740. if err != nil {
  741. return err
  742. }
  743. if len(entries) > 0 {
  744. return errVolumeNotEmpty
  745. }
  746. return nil
  747. }, index)
  748. }
  749. return g.Wait()
  750. }
  751. func isAllVolumeNotFound(errs []error) bool {
  752. return countErrs(errs, errVolumeNotFound) == len(errs)
  753. }
  754. // isAllNotFound will return if any element of the error slice is not
  755. // errFileNotFound, errFileVersionNotFound or errVolumeNotFound.
  756. // A 0 length slice will always return false.
  757. func isAllNotFound(errs []error) bool {
  758. for _, err := range errs {
  759. if err != nil {
  760. switch err.Error() {
  761. case errFileNotFound.Error():
  762. fallthrough
  763. case errVolumeNotFound.Error():
  764. fallthrough
  765. case errFileVersionNotFound.Error():
  766. continue
  767. }
  768. }
  769. return false
  770. }
  771. return len(errs) > 0
  772. }
  773. // isAllBucketsNotFound will return true if all the errors are either errFileNotFound
  774. // or errFileCorrupt
  775. // A 0 length slice will always return false.
  776. func isAllBucketsNotFound(errs []error) bool {
  777. if len(errs) == 0 {
  778. return false
  779. }
  780. notFoundCount := 0
  781. for _, err := range errs {
  782. if err != nil {
  783. if errors.Is(err, errVolumeNotFound) {
  784. notFoundCount++
  785. } else if isErrBucketNotFound(err) {
  786. notFoundCount++
  787. }
  788. }
  789. }
  790. return len(errs) == notFoundCount
  791. }
  792. // ObjectDir is considered dangling/corrupted if any only
  793. // if total disks - a combination of corrupted and missing
  794. // files is lesser than N/2+1 number of disks.
  795. // If no files were found false will be returned.
  796. func isObjectDirDangling(errs []error) (ok bool) {
  797. var found int
  798. var notFound int
  799. var foundNotEmpty int
  800. var otherFound int
  801. for _, readErr := range errs {
  802. switch {
  803. case readErr == nil:
  804. found++
  805. case readErr == errFileNotFound || readErr == errVolumeNotFound:
  806. notFound++
  807. case readErr == errVolumeNotEmpty:
  808. foundNotEmpty++
  809. default:
  810. otherFound++
  811. }
  812. }
  813. found = found + foundNotEmpty + otherFound
  814. return found < notFound && found > 0
  815. }
  816. func danglingMetaErrsCount(cerrs []error) (notFoundCount int, nonActionableCount int) {
  817. for _, readErr := range cerrs {
  818. if readErr == nil {
  819. continue
  820. }
  821. switch {
  822. case errors.Is(readErr, errFileNotFound) || errors.Is(readErr, errFileVersionNotFound):
  823. notFoundCount++
  824. default:
  825. // All other errors are non-actionable
  826. nonActionableCount++
  827. }
  828. }
  829. return
  830. }
  831. func danglingPartErrsCount(results []int) (notFoundCount int, nonActionableCount int) {
  832. for _, partResult := range results {
  833. switch partResult {
  834. case checkPartSuccess:
  835. continue
  836. case checkPartFileNotFound:
  837. notFoundCount++
  838. default:
  839. // All other errors are non-actionable
  840. nonActionableCount++
  841. }
  842. }
  843. return
  844. }
  845. // Object is considered dangling/corrupted if and only
  846. // if total disks - a combination of corrupted and missing
  847. // files is lesser than number of data blocks.
  848. func isObjectDangling(metaArr []FileInfo, errs []error, dataErrsByPart map[int][]int) (validMeta FileInfo, ok bool) {
  849. // We can consider an object data not reliable
  850. // when xl.meta is not found in read quorum disks.
  851. // or when xl.meta is not readable in read quorum disks.
  852. notFoundMetaErrs, nonActionableMetaErrs := danglingMetaErrsCount(errs)
  853. notFoundPartsErrs, nonActionablePartsErrs := 0, 0
  854. for _, dataErrs := range dataErrsByPart {
  855. if nf, na := danglingPartErrsCount(dataErrs); nf > notFoundPartsErrs {
  856. notFoundPartsErrs, nonActionablePartsErrs = nf, na
  857. }
  858. }
  859. for _, m := range metaArr {
  860. if m.IsValid() {
  861. validMeta = m
  862. break
  863. }
  864. }
  865. if !validMeta.IsValid() {
  866. // validMeta is invalid because all xl.meta is missing apparently
  867. // we should figure out if dataDirs are also missing > dataBlocks.
  868. dataBlocks := (len(metaArr) + 1) / 2
  869. if notFoundPartsErrs > dataBlocks {
  870. // Not using parity to ensure that we do not delete
  871. // any valid content, if any is recoverable. But if
  872. // notFoundDataDirs are already greater than the data
  873. // blocks all bets are off and it is safe to purge.
  874. //
  875. // This is purely a defensive code, ideally parityBlocks
  876. // is sufficient, however we can't know that since we
  877. // do have the FileInfo{}.
  878. return validMeta, true
  879. }
  880. // We have no idea what this file is, leave it as is.
  881. return validMeta, false
  882. }
  883. if nonActionableMetaErrs > 0 || nonActionablePartsErrs > 0 {
  884. return validMeta, false
  885. }
  886. if validMeta.Deleted {
  887. // notFoundPartsErrs is ignored since
  888. // - delete marker does not have any parts
  889. dataBlocks := (len(errs) + 1) / 2
  890. return validMeta, notFoundMetaErrs > dataBlocks
  891. }
  892. // TODO: It is possible to replay the object via just single
  893. // xl.meta file, considering quorum number of data-dirs are still
  894. // present on other drives.
  895. //
  896. // However this requires a bit of a rewrite, leave this up for
  897. // future work.
  898. if notFoundMetaErrs > 0 && notFoundMetaErrs > validMeta.Erasure.ParityBlocks {
  899. // All xl.meta is beyond parity blocks missing, this is dangling
  900. return validMeta, true
  901. }
  902. if !validMeta.IsRemote() && notFoundPartsErrs > 0 && notFoundPartsErrs > validMeta.Erasure.ParityBlocks {
  903. // All data-dir is beyond parity blocks missing, this is dangling
  904. return validMeta, true
  905. }
  906. return validMeta, false
  907. }
  908. // HealObject - heal the given object, automatically deletes the object if stale/corrupted if `remove` is true.
  909. func (er erasureObjects) HealObject(ctx context.Context, bucket, object, versionID string, opts madmin.HealOpts) (hr madmin.HealResultItem, err error) {
  910. // Create context that also contains information about the object and bucket.
  911. // The top level handler might not have this information.
  912. reqInfo := logger.GetReqInfo(ctx)
  913. var newReqInfo *logger.ReqInfo
  914. if reqInfo != nil {
  915. newReqInfo = logger.NewReqInfo(reqInfo.RemoteHost, reqInfo.UserAgent, reqInfo.DeploymentID, reqInfo.RequestID, reqInfo.API, bucket, object)
  916. } else {
  917. newReqInfo = logger.NewReqInfo("", "", globalDeploymentID(), "", "Heal", bucket, object)
  918. }
  919. healCtx := logger.SetReqInfo(GlobalContext, newReqInfo)
  920. // Healing directories handle it separately.
  921. if HasSuffix(object, SlashSeparator) {
  922. hr, err := er.healObjectDir(healCtx, bucket, object, opts.DryRun, opts.Remove)
  923. return hr, toObjectErr(err, bucket, object)
  924. }
  925. storageDisks := er.getDisks()
  926. storageEndpoints := er.getEndpoints()
  927. // When versionID is empty, we read directly from the `null` versionID for healing.
  928. if versionID == "" {
  929. versionID = nullVersionID
  930. }
  931. // Perform quick read without lock.
  932. // This allows to quickly check if all is ok or all are missing.
  933. _, errs := readAllFileInfo(healCtx, storageDisks, "", bucket, object, versionID, false, false)
  934. if isAllNotFound(errs) {
  935. err := errFileNotFound
  936. if versionID != "" {
  937. err = errFileVersionNotFound
  938. }
  939. // Nothing to do, file is already gone.
  940. return er.defaultHealResult(FileInfo{}, storageDisks, storageEndpoints,
  941. errs, bucket, object, versionID), toObjectErr(err, bucket, object, versionID)
  942. }
  943. // Heal the object.
  944. hr, err = er.healObject(healCtx, bucket, object, versionID, opts)
  945. if errors.Is(err, errFileCorrupt) && opts.ScanMode != madmin.HealDeepScan {
  946. // Instead of returning an error when a bitrot error is detected
  947. // during a normal heal scan, heal again with bitrot flag enabled.
  948. opts.ScanMode = madmin.HealDeepScan
  949. hr, err = er.healObject(healCtx, bucket, object, versionID, opts)
  950. }
  951. return hr, toObjectErr(err, bucket, object, versionID)
  952. }
  953. // healTrace sends healing results to trace output.
  954. func healTrace(funcName healingMetric, startTime time.Time, bucket, object string, opts *madmin.HealOpts, err error, result *madmin.HealResultItem) {
  955. tr := madmin.TraceInfo{
  956. TraceType: madmin.TraceHealing,
  957. Time: startTime,
  958. NodeName: globalLocalNodeName,
  959. FuncName: "heal." + funcName.String(),
  960. Duration: time.Since(startTime),
  961. Path: pathJoin(bucket, decodeDirObject(object)),
  962. }
  963. if opts != nil {
  964. tr.Custom = map[string]string{
  965. "dry": fmt.Sprint(opts.DryRun),
  966. "remove": fmt.Sprint(opts.Remove),
  967. "mode": fmt.Sprint(opts.ScanMode),
  968. }
  969. if result != nil {
  970. tr.Custom["version-id"] = result.VersionID
  971. tr.Custom["disks"] = strconv.Itoa(result.DiskCount)
  972. tr.Bytes = result.ObjectSize
  973. }
  974. }
  975. if err != nil {
  976. tr.Error = err.Error()
  977. }
  978. tr.HealResult = result
  979. globalTrace.Publish(tr)
  980. }