You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1030 lines
28 KiB

  1. // Copyright (c) 2015-2022 MinIO, Inc.
  2. //
  3. // This file is part of MinIO Object Storage stack
  4. //
  5. // This program is free software: you can redistribute it and/or modify
  6. // it under the terms of the GNU Affero General Public License as published by
  7. // the Free Software Foundation, either version 3 of the License, or
  8. // (at your option) any later version.
  9. //
  10. // This program is distributed in the hope that it will be useful
  11. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. // GNU Affero General Public License for more details.
  14. //
  15. // You should have received a copy of the GNU Affero General Public License
  16. // along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. package cmd
  18. import (
  19. "context"
  20. "encoding/binary"
  21. "errors"
  22. "fmt"
  23. "io"
  24. "math"
  25. "math/rand"
  26. "net/http"
  27. "strings"
  28. "time"
  29. "github.com/dustin/go-humanize"
  30. "github.com/lithammer/shortuuid/v4"
  31. "github.com/minio/madmin-go/v3"
  32. "github.com/minio/minio/internal/bucket/lifecycle"
  33. objectlock "github.com/minio/minio/internal/bucket/object/lock"
  34. "github.com/minio/minio/internal/bucket/replication"
  35. "github.com/minio/minio/internal/bucket/versioning"
  36. "github.com/minio/minio/internal/hash"
  37. xioutil "github.com/minio/minio/internal/ioutil"
  38. "github.com/minio/minio/internal/logger"
  39. "github.com/minio/pkg/v3/env"
  40. "github.com/minio/pkg/v3/workers"
  41. )
  42. //go:generate msgp -file $GOFILE -unexported
  43. // rebalanceStats contains per-pool rebalance statistics like number of objects,
  44. // versions and bytes rebalanced out of a pool
  45. type rebalanceStats struct {
  46. InitFreeSpace uint64 `json:"initFreeSpace" msg:"ifs"` // Pool free space at the start of rebalance
  47. InitCapacity uint64 `json:"initCapacity" msg:"ic"` // Pool capacity at the start of rebalance
  48. Buckets []string `json:"buckets" msg:"bus"` // buckets being rebalanced or to be rebalanced
  49. RebalancedBuckets []string `json:"rebalancedBuckets" msg:"rbs"` // buckets rebalanced
  50. Bucket string `json:"bucket" msg:"bu"` // Last rebalanced bucket
  51. Object string `json:"object" msg:"ob"` // Last rebalanced object
  52. NumObjects uint64 `json:"numObjects" msg:"no"` // Number of objects rebalanced
  53. NumVersions uint64 `json:"numVersions" msg:"nv"` // Number of versions rebalanced
  54. Bytes uint64 `json:"bytes" msg:"bs"` // Number of bytes rebalanced
  55. Participating bool `json:"participating" msg:"par"`
  56. Info rebalanceInfo `json:"info" msg:"inf"`
  57. }
  58. func (rs *rebalanceStats) update(bucket string, fi FileInfo) {
  59. if fi.IsLatest {
  60. rs.NumObjects++
  61. }
  62. rs.NumVersions++
  63. onDiskSz := int64(0)
  64. if !fi.Deleted {
  65. onDiskSz = fi.Size * int64(fi.Erasure.DataBlocks+fi.Erasure.ParityBlocks) / int64(fi.Erasure.DataBlocks)
  66. }
  67. rs.Bytes += uint64(onDiskSz)
  68. rs.Bucket = bucket
  69. rs.Object = fi.Name
  70. }
  71. type rstats []*rebalanceStats
  72. //go:generate stringer -type=rebalStatus -trimprefix=rebal $GOFILE
  73. type rebalStatus uint8
  74. const (
  75. rebalNone rebalStatus = iota
  76. rebalStarted
  77. rebalCompleted
  78. rebalStopped
  79. rebalFailed
  80. )
  81. type rebalanceInfo struct {
  82. StartTime time.Time `msg:"startTs"` // Time at which rebalance-start was issued
  83. EndTime time.Time `msg:"stopTs"` // Time at which rebalance operation completed or rebalance-stop was called
  84. Status rebalStatus `msg:"status"` // Current state of rebalance operation. One of Started|Stopped|Completed|Failed.
  85. }
  86. // rebalanceMeta contains information pertaining to an ongoing rebalance operation.
  87. type rebalanceMeta struct {
  88. cancel context.CancelFunc `msg:"-"` // to be invoked on rebalance-stop
  89. lastRefreshedAt time.Time `msg:"-"`
  90. StoppedAt time.Time `msg:"stopTs"` // Time when rebalance-stop was issued.
  91. ID string `msg:"id"` // ID of the ongoing rebalance operation
  92. PercentFreeGoal float64 `msg:"pf"` // Computed from total free space and capacity at the start of rebalance
  93. PoolStats []*rebalanceStats `msg:"rss"` // Per-pool rebalance stats keyed by pool index
  94. }
  95. var errRebalanceNotStarted = errors.New("rebalance not started")
  96. func (z *erasureServerPools) loadRebalanceMeta(ctx context.Context) error {
  97. r := &rebalanceMeta{}
  98. if err := r.load(ctx, z.serverPools[0]); err == nil {
  99. z.rebalMu.Lock()
  100. z.rebalMeta = r
  101. z.updateRebalanceStats(ctx)
  102. z.rebalMu.Unlock()
  103. } else if !errors.Is(err, errConfigNotFound) {
  104. rebalanceLogIf(ctx, fmt.Errorf("failed to load rebalance metadata, continue to restart rebalance as needed: %w", err))
  105. }
  106. return nil
  107. }
  108. // updates rebalance.bin from let's say 2 pool setup in the middle
  109. // of a rebalance, was expanded can cause z.rebalMeta to be outdated
  110. // due to a missing new pool. This function tries to handle this
  111. // scenario, albeit rare it seems to have occurred in the wild.
  112. //
  113. // since we do not explicitly disallow it, but it is okay for them
  114. // expand and then we continue to rebalance.
  115. func (z *erasureServerPools) updateRebalanceStats(ctx context.Context) error {
  116. var ok bool
  117. for i := range z.serverPools {
  118. if z.findIndex(i) == -1 {
  119. // Also ensure to initialize rebalanceStats to indicate
  120. // its a new pool that can receive rebalanced data.
  121. z.rebalMeta.PoolStats = append(z.rebalMeta.PoolStats, &rebalanceStats{})
  122. ok = true
  123. }
  124. }
  125. if ok {
  126. return z.rebalMeta.save(ctx, z.serverPools[0])
  127. }
  128. return nil
  129. }
  130. func (z *erasureServerPools) findIndex(index int) int {
  131. if z.rebalMeta == nil {
  132. return 0
  133. }
  134. for i := range len(z.rebalMeta.PoolStats) {
  135. if i == index {
  136. return index
  137. }
  138. }
  139. return -1
  140. }
  141. // initRebalanceMeta initializes rebalance metadata for a new rebalance
  142. // operation and saves it in the object store.
  143. func (z *erasureServerPools) initRebalanceMeta(ctx context.Context, buckets []string) (arn string, err error) {
  144. r := &rebalanceMeta{
  145. ID: shortuuid.New(),
  146. PoolStats: make([]*rebalanceStats, len(z.serverPools)),
  147. }
  148. // Fetch disk capacity and available space.
  149. si := z.StorageInfo(ctx, true)
  150. diskStats := make([]struct {
  151. AvailableSpace uint64
  152. TotalSpace uint64
  153. }, len(z.serverPools))
  154. var totalCap, totalFree uint64
  155. for _, disk := range si.Disks {
  156. // Ignore invalid.
  157. if disk.PoolIndex < 0 || len(diskStats) <= disk.PoolIndex {
  158. // https://github.com/minio/minio/issues/16500
  159. continue
  160. }
  161. totalCap += disk.TotalSpace
  162. totalFree += disk.AvailableSpace
  163. diskStats[disk.PoolIndex].AvailableSpace += disk.AvailableSpace
  164. diskStats[disk.PoolIndex].TotalSpace += disk.TotalSpace
  165. }
  166. r.PercentFreeGoal = float64(totalFree) / float64(totalCap)
  167. now := time.Now()
  168. for idx := range z.serverPools {
  169. r.PoolStats[idx] = &rebalanceStats{
  170. Buckets: make([]string, len(buckets)),
  171. RebalancedBuckets: make([]string, 0, len(buckets)),
  172. InitFreeSpace: diskStats[idx].AvailableSpace,
  173. InitCapacity: diskStats[idx].TotalSpace,
  174. }
  175. copy(r.PoolStats[idx].Buckets, buckets)
  176. if pfi := float64(diskStats[idx].AvailableSpace) / float64(diskStats[idx].TotalSpace); pfi < r.PercentFreeGoal {
  177. r.PoolStats[idx].Participating = true
  178. r.PoolStats[idx].Info = rebalanceInfo{
  179. StartTime: now,
  180. Status: rebalStarted,
  181. }
  182. }
  183. }
  184. err = r.save(ctx, z.serverPools[0])
  185. if err != nil {
  186. return arn, err
  187. }
  188. z.rebalMeta = r
  189. return r.ID, nil
  190. }
  191. func (z *erasureServerPools) updatePoolStats(poolIdx int, bucket string, fi FileInfo) {
  192. z.rebalMu.Lock()
  193. defer z.rebalMu.Unlock()
  194. r := z.rebalMeta
  195. if r == nil {
  196. return
  197. }
  198. r.PoolStats[poolIdx].update(bucket, fi)
  199. }
  200. const (
  201. rebalMetaName = "rebalance.bin"
  202. rebalMetaFmt = 1
  203. rebalMetaVer = 1
  204. )
  205. func (z *erasureServerPools) nextRebalBucket(poolIdx int) (string, bool) {
  206. z.rebalMu.RLock()
  207. defer z.rebalMu.RUnlock()
  208. r := z.rebalMeta
  209. if r == nil {
  210. return "", false
  211. }
  212. ps := r.PoolStats[poolIdx]
  213. if ps == nil {
  214. return "", false
  215. }
  216. if ps.Info.Status == rebalCompleted || !ps.Participating {
  217. return "", false
  218. }
  219. if len(ps.Buckets) == 0 {
  220. return "", false
  221. }
  222. return ps.Buckets[0], true
  223. }
  224. func (z *erasureServerPools) bucketRebalanceDone(bucket string, poolIdx int) {
  225. z.rebalMu.Lock()
  226. defer z.rebalMu.Unlock()
  227. if z.rebalMeta == nil {
  228. return
  229. }
  230. ps := z.rebalMeta.PoolStats[poolIdx]
  231. if ps == nil {
  232. return
  233. }
  234. for i, b := range ps.Buckets {
  235. if b == bucket {
  236. ps.Buckets = append(ps.Buckets[:i], ps.Buckets[i+1:]...)
  237. ps.RebalancedBuckets = append(ps.RebalancedBuckets, bucket)
  238. break
  239. }
  240. }
  241. }
  242. func (r *rebalanceMeta) load(ctx context.Context, store objectIO) error {
  243. return r.loadWithOpts(ctx, store, ObjectOptions{})
  244. }
  245. func (r *rebalanceMeta) loadWithOpts(ctx context.Context, store objectIO, opts ObjectOptions) error {
  246. data, _, err := readConfigWithMetadata(ctx, store, rebalMetaName, opts)
  247. if err != nil {
  248. return err
  249. }
  250. if len(data) == 0 {
  251. return nil
  252. }
  253. if len(data) <= 4 {
  254. return fmt.Errorf("rebalanceMeta: no data")
  255. }
  256. // Read header
  257. switch binary.LittleEndian.Uint16(data[0:2]) {
  258. case rebalMetaFmt:
  259. default:
  260. return fmt.Errorf("rebalanceMeta: unknown format: %d", binary.LittleEndian.Uint16(data[0:2]))
  261. }
  262. switch binary.LittleEndian.Uint16(data[2:4]) {
  263. case rebalMetaVer:
  264. default:
  265. return fmt.Errorf("rebalanceMeta: unknown version: %d", binary.LittleEndian.Uint16(data[2:4]))
  266. }
  267. // OK, parse data.
  268. if _, err = r.UnmarshalMsg(data[4:]); err != nil {
  269. return err
  270. }
  271. r.lastRefreshedAt = time.Now()
  272. return nil
  273. }
  274. func (r *rebalanceMeta) saveWithOpts(ctx context.Context, store objectIO, opts ObjectOptions) error {
  275. if r == nil {
  276. return nil
  277. }
  278. data := make([]byte, 4, r.Msgsize()+4)
  279. // Initialize the header.
  280. binary.LittleEndian.PutUint16(data[0:2], rebalMetaFmt)
  281. binary.LittleEndian.PutUint16(data[2:4], rebalMetaVer)
  282. buf, err := r.MarshalMsg(data)
  283. if err != nil {
  284. return err
  285. }
  286. return saveConfigWithOpts(ctx, store, rebalMetaName, buf, opts)
  287. }
  288. func (r *rebalanceMeta) save(ctx context.Context, store objectIO) error {
  289. return r.saveWithOpts(ctx, store, ObjectOptions{})
  290. }
  291. func (z *erasureServerPools) IsRebalanceStarted() bool {
  292. z.rebalMu.RLock()
  293. defer z.rebalMu.RUnlock()
  294. r := z.rebalMeta
  295. if r == nil {
  296. return false
  297. }
  298. if !r.StoppedAt.IsZero() {
  299. return false
  300. }
  301. for _, ps := range r.PoolStats {
  302. if ps.Participating && ps.Info.Status != rebalCompleted {
  303. return true
  304. }
  305. }
  306. return false
  307. }
  308. func (z *erasureServerPools) IsPoolRebalancing(poolIndex int) bool {
  309. z.rebalMu.RLock()
  310. defer z.rebalMu.RUnlock()
  311. if r := z.rebalMeta; r != nil {
  312. if !r.StoppedAt.IsZero() {
  313. return false
  314. }
  315. ps := r.PoolStats[poolIndex]
  316. return ps.Participating && ps.Info.Status == rebalStarted
  317. }
  318. return false
  319. }
  320. func (z *erasureServerPools) rebalanceBuckets(ctx context.Context, poolIdx int) (err error) {
  321. doneCh := make(chan error, 1)
  322. defer xioutil.SafeClose(doneCh)
  323. // Save rebalance.bin periodically.
  324. go func() {
  325. // Update rebalance.bin periodically once every 5-10s, chosen randomly
  326. // to avoid multiple pool leaders herding to update around the same
  327. // time.
  328. r := rand.New(rand.NewSource(time.Now().UnixNano()))
  329. randSleepFor := func() time.Duration {
  330. return 5*time.Second + time.Duration(float64(5*time.Second)*r.Float64())
  331. }
  332. timer := time.NewTimer(randSleepFor())
  333. defer timer.Stop()
  334. var (
  335. quit bool
  336. traceMsg string
  337. )
  338. for {
  339. select {
  340. case rebalErr := <-doneCh:
  341. quit = true
  342. now := time.Now()
  343. var status rebalStatus
  344. switch {
  345. case errors.Is(rebalErr, context.Canceled):
  346. status = rebalStopped
  347. traceMsg = fmt.Sprintf("stopped at %s", now)
  348. case rebalErr == nil:
  349. status = rebalCompleted
  350. traceMsg = fmt.Sprintf("completed at %s", now)
  351. default:
  352. status = rebalFailed
  353. traceMsg = fmt.Sprintf("stopped at %s with err: %v", now, rebalErr)
  354. }
  355. z.rebalMu.Lock()
  356. z.rebalMeta.PoolStats[poolIdx].Info.Status = status
  357. z.rebalMeta.PoolStats[poolIdx].Info.EndTime = now
  358. z.rebalMu.Unlock()
  359. case <-timer.C:
  360. traceMsg = fmt.Sprintf("saved at %s", time.Now())
  361. }
  362. stopFn := globalRebalanceMetrics.log(rebalanceMetricSaveMetadata, poolIdx, traceMsg)
  363. err := z.saveRebalanceStats(GlobalContext, poolIdx, rebalSaveStats)
  364. stopFn(0, err)
  365. rebalanceLogIf(GlobalContext, err)
  366. if quit {
  367. return
  368. }
  369. timer.Reset(randSleepFor())
  370. }
  371. }()
  372. rebalanceLogEvent(ctx, "Pool %d rebalancing is started", poolIdx+1)
  373. for {
  374. select {
  375. case <-ctx.Done():
  376. doneCh <- ctx.Err()
  377. return
  378. default:
  379. }
  380. bucket, ok := z.nextRebalBucket(poolIdx)
  381. if !ok {
  382. // no more buckets to rebalance or target free_space/capacity reached
  383. break
  384. }
  385. stopFn := globalRebalanceMetrics.log(rebalanceMetricRebalanceBucket, poolIdx, bucket)
  386. if err = z.rebalanceBucket(ctx, bucket, poolIdx); err != nil {
  387. stopFn(0, err)
  388. if errors.Is(err, errServerNotInitialized) || errors.Is(err, errBucketMetadataNotInitialized) {
  389. continue
  390. }
  391. rebalanceLogIf(GlobalContext, err)
  392. doneCh <- err
  393. return
  394. }
  395. stopFn(0, nil)
  396. z.bucketRebalanceDone(bucket, poolIdx)
  397. }
  398. rebalanceLogEvent(GlobalContext, "Pool %d rebalancing is done", poolIdx+1)
  399. return err
  400. }
  401. func (z *erasureServerPools) checkIfRebalanceDone(poolIdx int) bool {
  402. z.rebalMu.Lock()
  403. defer z.rebalMu.Unlock()
  404. // check if enough objects have been rebalanced
  405. r := z.rebalMeta
  406. poolStats := r.PoolStats[poolIdx]
  407. if poolStats.Info.Status == rebalCompleted {
  408. return true
  409. }
  410. pfi := float64(poolStats.InitFreeSpace+poolStats.Bytes) / float64(poolStats.InitCapacity)
  411. // Mark pool rebalance as done if within 5% from PercentFreeGoal.
  412. if diff := math.Abs(pfi - r.PercentFreeGoal); diff <= 0.05 {
  413. r.PoolStats[poolIdx].Info.Status = rebalCompleted
  414. r.PoolStats[poolIdx].Info.EndTime = time.Now()
  415. return true
  416. }
  417. return false
  418. }
  419. func (set *erasureObjects) listObjectsToRebalance(ctx context.Context, bucketName string, fn func(entry metaCacheEntry)) error {
  420. disks, _ := set.getOnlineDisksWithHealing(false)
  421. if len(disks) == 0 {
  422. return fmt.Errorf("no online drives found for set with endpoints %s", set.getEndpoints())
  423. }
  424. // However many we ask, versions must exist on ~50%
  425. listingQuorum := (set.setDriveCount + 1) / 2
  426. // How to resolve partial results.
  427. resolver := metadataResolutionParams{
  428. dirQuorum: listingQuorum, // make sure to capture all quorum ratios
  429. objQuorum: listingQuorum, // make sure to capture all quorum ratios
  430. bucket: bucketName,
  431. }
  432. err := listPathRaw(ctx, listPathRawOptions{
  433. disks: disks,
  434. bucket: bucketName,
  435. recursive: true,
  436. forwardTo: "",
  437. minDisks: listingQuorum,
  438. reportNotFound: false,
  439. agreed: fn,
  440. partial: func(entries metaCacheEntries, _ []error) {
  441. entry, ok := entries.resolve(&resolver)
  442. if ok {
  443. fn(*entry)
  444. }
  445. },
  446. finished: nil,
  447. })
  448. return err
  449. }
  450. // rebalanceBucket rebalances objects under bucket in poolIdx pool
  451. func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string, poolIdx int) (err error) {
  452. ctx = logger.SetReqInfo(ctx, &logger.ReqInfo{})
  453. var vc *versioning.Versioning
  454. var lc *lifecycle.Lifecycle
  455. var lr objectlock.Retention
  456. var rcfg *replication.Config
  457. if bucket != minioMetaBucket {
  458. vc, err = globalBucketVersioningSys.Get(bucket)
  459. if err != nil {
  460. return err
  461. }
  462. // Check if the current bucket has a configured lifecycle policy
  463. lc, err = globalLifecycleSys.Get(bucket)
  464. if err != nil && !errors.Is(err, BucketLifecycleNotFound{Bucket: bucket}) {
  465. return err
  466. }
  467. // Check if bucket is object locked.
  468. lr, err = globalBucketObjectLockSys.Get(bucket)
  469. if err != nil {
  470. return err
  471. }
  472. rcfg, err = getReplicationConfig(ctx, bucket)
  473. if err != nil {
  474. return err
  475. }
  476. }
  477. pool := z.serverPools[poolIdx]
  478. const envRebalanceWorkers = "_MINIO_REBALANCE_WORKERS"
  479. workerSize, err := env.GetInt(envRebalanceWorkers, len(pool.sets))
  480. if err != nil {
  481. rebalanceLogIf(ctx, fmt.Errorf("invalid workers value err: %v, defaulting to %d", err, len(pool.sets)))
  482. workerSize = len(pool.sets)
  483. }
  484. // Each decom worker needs one List() goroutine/worker
  485. // add that many extra workers.
  486. workerSize += len(pool.sets)
  487. wk, err := workers.New(workerSize)
  488. if err != nil {
  489. return err
  490. }
  491. for setIdx, set := range pool.sets {
  492. set := set
  493. filterLifecycle := func(bucket, object string, fi FileInfo) bool {
  494. if lc == nil {
  495. return false
  496. }
  497. versioned := vc != nil && vc.Versioned(object)
  498. objInfo := fi.ToObjectInfo(bucket, object, versioned)
  499. evt := evalActionFromLifecycle(ctx, *lc, lr, rcfg, objInfo)
  500. if evt.Action.Delete() {
  501. globalExpiryState.enqueueByDays(objInfo, evt, lcEventSrc_Rebal)
  502. return true
  503. }
  504. return false
  505. }
  506. rebalanceEntry := func(entry metaCacheEntry) {
  507. defer wk.Give()
  508. if entry.isDir() {
  509. return
  510. }
  511. // rebalance on poolIdx has reached its goal
  512. if z.checkIfRebalanceDone(poolIdx) {
  513. return
  514. }
  515. fivs, err := entry.fileInfoVersions(bucket)
  516. if err != nil {
  517. return
  518. }
  519. // We need a reversed order for rebalance,
  520. // to create the appropriate stack.
  521. versionsSorter(fivs.Versions).reverse()
  522. var rebalanced, expired int
  523. for _, version := range fivs.Versions {
  524. stopFn := globalRebalanceMetrics.log(rebalanceMetricRebalanceObject, poolIdx, bucket, version.Name, version.VersionID)
  525. // Skip transitioned objects for now. TBD
  526. if version.IsRemote() {
  527. stopFn(version.Size, errors.New("ILM Tiered version will be skipped for now"))
  528. continue
  529. }
  530. // Apply lifecycle rules on the objects that are expired.
  531. if filterLifecycle(bucket, version.Name, version) {
  532. expired++
  533. stopFn(version.Size, errors.New("ILM expired object/version will be skipped"))
  534. continue
  535. }
  536. // any object with only single DEL marker we don't need
  537. // to rebalance, just skip it, this also includes
  538. // any other versions that have already expired.
  539. remainingVersions := len(fivs.Versions) - expired
  540. if version.Deleted && remainingVersions == 1 {
  541. rebalanced++
  542. stopFn(version.Size, errors.New("DELETE marked object with no other non-current versions will be skipped"))
  543. continue
  544. }
  545. versionID := version.VersionID
  546. if versionID == "" {
  547. versionID = nullVersionID
  548. }
  549. var failure, ignore bool
  550. if version.Deleted {
  551. _, err := z.DeleteObject(ctx,
  552. bucket,
  553. version.Name,
  554. ObjectOptions{
  555. Versioned: true,
  556. VersionID: versionID,
  557. MTime: version.ModTime,
  558. DeleteReplication: version.ReplicationState,
  559. SrcPoolIdx: poolIdx,
  560. DataMovement: true,
  561. DeleteMarker: true, // make sure we create a delete marker
  562. SkipRebalancing: true, // make sure we skip the decommissioned pool
  563. NoAuditLog: true,
  564. })
  565. // This can happen when rebalance stop races with ongoing rebalance workers.
  566. // These rebalance failures can be ignored.
  567. if err != nil {
  568. // This can happen when rebalance stop races with ongoing rebalance workers.
  569. // These rebalance failures can be ignored.
  570. if isErrObjectNotFound(err) || isErrVersionNotFound(err) || isDataMovementOverWriteErr(err) {
  571. ignore = true
  572. stopFn(0, nil)
  573. continue
  574. }
  575. }
  576. stopFn(version.Size, err)
  577. rebalanceLogIf(ctx, err)
  578. failure = err != nil
  579. if !failure {
  580. z.updatePoolStats(poolIdx, bucket, version)
  581. rebalanced++
  582. }
  583. auditLogRebalance(ctx, "Rebalance:DeleteMarker", bucket, version.Name, versionID, err)
  584. continue
  585. }
  586. for try := 0; try < 3; try++ {
  587. // GetObjectReader.Close is called by rebalanceObject
  588. gr, err := set.GetObjectNInfo(ctx,
  589. bucket,
  590. encodeDirObject(version.Name),
  591. nil,
  592. http.Header{},
  593. ObjectOptions{
  594. VersionID: versionID,
  595. NoDecryption: true,
  596. NoLock: true,
  597. NoAuditLog: true,
  598. })
  599. if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
  600. // object deleted by the application, nothing to do here we move on.
  601. ignore = true
  602. stopFn(0, nil)
  603. break
  604. }
  605. if err != nil {
  606. failure = true
  607. rebalanceLogIf(ctx, err)
  608. stopFn(0, err)
  609. continue
  610. }
  611. if err = z.rebalanceObject(ctx, poolIdx, bucket, gr); err != nil {
  612. // This can happen when rebalance stop races with ongoing rebalance workers.
  613. // These rebalance failures can be ignored.
  614. if isErrObjectNotFound(err) || isErrVersionNotFound(err) || isDataMovementOverWriteErr(err) {
  615. ignore = true
  616. stopFn(0, nil)
  617. break
  618. }
  619. failure = true
  620. rebalanceLogIf(ctx, err)
  621. stopFn(version.Size, err)
  622. continue
  623. }
  624. stopFn(version.Size, nil)
  625. failure = false
  626. break
  627. }
  628. if ignore {
  629. continue
  630. }
  631. if failure {
  632. break // break out on first error
  633. }
  634. z.updatePoolStats(poolIdx, bucket, version)
  635. rebalanced++
  636. }
  637. // if all versions were rebalanced, we can delete the object versions.
  638. if rebalanced == len(fivs.Versions) {
  639. stopFn := globalRebalanceMetrics.log(rebalanceMetricRebalanceRemoveObject, poolIdx, bucket, entry.name)
  640. _, err := set.DeleteObject(ctx,
  641. bucket,
  642. encodeDirObject(entry.name),
  643. ObjectOptions{
  644. DeletePrefix: true, // use prefix delete to delete all versions at once.
  645. DeletePrefixObject: true, // use prefix delete on exact object (this is an optimization to avoid fan-out calls)
  646. NoAuditLog: true,
  647. },
  648. )
  649. stopFn(0, err)
  650. auditLogRebalance(ctx, "Rebalance:DeleteObject", bucket, entry.name, "", err)
  651. if err != nil {
  652. rebalanceLogIf(ctx, err)
  653. }
  654. }
  655. }
  656. wk.Take()
  657. go func(setIdx int) {
  658. defer wk.Give()
  659. err := set.listObjectsToRebalance(ctx, bucket,
  660. func(entry metaCacheEntry) {
  661. wk.Take()
  662. go rebalanceEntry(entry)
  663. },
  664. )
  665. if err == nil || errors.Is(err, context.Canceled) {
  666. return
  667. }
  668. setN := humanize.Ordinal(setIdx + 1)
  669. rebalanceLogIf(ctx, fmt.Errorf("listing objects from %s set failed with %v", setN, err), "rebalance-listing-failed"+setN)
  670. }(setIdx)
  671. }
  672. wk.Wait()
  673. return nil
  674. }
  675. type rebalSaveOpts uint8
  676. const (
  677. rebalSaveStats rebalSaveOpts = iota
  678. rebalSaveStoppedAt
  679. )
  680. func (z *erasureServerPools) saveRebalanceStats(ctx context.Context, poolIdx int, opts rebalSaveOpts) error {
  681. lock := z.serverPools[0].NewNSLock(minioMetaBucket, rebalMetaName)
  682. lkCtx, err := lock.GetLock(ctx, globalOperationTimeout)
  683. if err != nil {
  684. rebalanceLogIf(ctx, fmt.Errorf("failed to acquire write lock on %s/%s: %w", minioMetaBucket, rebalMetaName, err))
  685. return err
  686. }
  687. defer lock.Unlock(lkCtx)
  688. ctx = lkCtx.Context()
  689. noLockOpts := ObjectOptions{NoLock: true}
  690. r := &rebalanceMeta{}
  691. if err := r.loadWithOpts(ctx, z.serverPools[0], noLockOpts); err != nil {
  692. return err
  693. }
  694. z.rebalMu.Lock()
  695. defer z.rebalMu.Unlock()
  696. switch opts {
  697. case rebalSaveStoppedAt:
  698. r.StoppedAt = time.Now()
  699. case rebalSaveStats:
  700. if z.rebalMeta != nil {
  701. r.PoolStats[poolIdx] = z.rebalMeta.PoolStats[poolIdx]
  702. }
  703. }
  704. z.rebalMeta = r
  705. return z.rebalMeta.saveWithOpts(ctx, z.serverPools[0], noLockOpts)
  706. }
  707. func auditLogRebalance(ctx context.Context, apiName, bucket, object, versionID string, err error) {
  708. errStr := ""
  709. if err != nil {
  710. errStr = err.Error()
  711. }
  712. auditLogInternal(ctx, AuditLogOptions{
  713. Event: "rebalance",
  714. APIName: apiName,
  715. Bucket: bucket,
  716. Object: object,
  717. VersionID: versionID,
  718. Error: errStr,
  719. })
  720. }
  721. func (z *erasureServerPools) rebalanceObject(ctx context.Context, poolIdx int, bucket string, gr *GetObjectReader) (err error) {
  722. oi := gr.ObjInfo
  723. defer func() {
  724. gr.Close()
  725. auditLogRebalance(ctx, "RebalanceCopyData", oi.Bucket, oi.Name, oi.VersionID, err)
  726. }()
  727. actualSize, err := oi.GetActualSize()
  728. if err != nil {
  729. return err
  730. }
  731. if oi.isMultipart() {
  732. res, err := z.NewMultipartUpload(ctx, bucket, oi.Name, ObjectOptions{
  733. VersionID: oi.VersionID,
  734. UserDefined: oi.UserDefined,
  735. NoAuditLog: true,
  736. DataMovement: true,
  737. SrcPoolIdx: poolIdx,
  738. })
  739. if err != nil {
  740. return fmt.Errorf("rebalanceObject: NewMultipartUpload() %w", err)
  741. }
  742. defer z.AbortMultipartUpload(ctx, bucket, oi.Name, res.UploadID, ObjectOptions{NoAuditLog: true})
  743. parts := make([]CompletePart, len(oi.Parts))
  744. for i, part := range oi.Parts {
  745. hr, err := hash.NewReader(ctx, io.LimitReader(gr, part.Size), part.Size, "", "", part.ActualSize)
  746. if err != nil {
  747. return fmt.Errorf("rebalanceObject: hash.NewReader() %w", err)
  748. }
  749. pi, err := z.PutObjectPart(ctx, bucket, oi.Name, res.UploadID,
  750. part.Number,
  751. NewPutObjReader(hr),
  752. ObjectOptions{
  753. PreserveETag: part.ETag, // Preserve original ETag to ensure same metadata.
  754. IndexCB: func() []byte {
  755. return part.Index // Preserve part Index to ensure decompression works.
  756. },
  757. NoAuditLog: true,
  758. })
  759. if err != nil {
  760. return fmt.Errorf("rebalanceObject: PutObjectPart() %w", err)
  761. }
  762. parts[i] = CompletePart{
  763. ETag: pi.ETag,
  764. PartNumber: pi.PartNumber,
  765. }
  766. }
  767. _, err = z.CompleteMultipartUpload(ctx, bucket, oi.Name, res.UploadID, parts, ObjectOptions{
  768. DataMovement: true,
  769. MTime: oi.ModTime,
  770. NoAuditLog: true,
  771. })
  772. if err != nil {
  773. err = fmt.Errorf("rebalanceObject: CompleteMultipartUpload() %w", err)
  774. }
  775. return err
  776. }
  777. hr, err := hash.NewReader(ctx, gr, oi.Size, "", "", actualSize)
  778. if err != nil {
  779. return fmt.Errorf("rebalanceObject: hash.NewReader() %w", err)
  780. }
  781. _, err = z.PutObject(ctx,
  782. bucket,
  783. oi.Name,
  784. NewPutObjReader(hr),
  785. ObjectOptions{
  786. SrcPoolIdx: poolIdx,
  787. DataMovement: true,
  788. VersionID: oi.VersionID,
  789. MTime: oi.ModTime,
  790. UserDefined: oi.UserDefined,
  791. PreserveETag: oi.ETag, // Preserve original ETag to ensure same metadata.
  792. IndexCB: func() []byte {
  793. return oi.Parts[0].Index // Preserve part Index to ensure decompression works.
  794. },
  795. NoAuditLog: true,
  796. })
  797. if err != nil {
  798. err = fmt.Errorf("rebalanceObject: PutObject() %w", err)
  799. }
  800. return err
  801. }
  802. func (z *erasureServerPools) StartRebalance() {
  803. z.rebalMu.Lock()
  804. if z.rebalMeta == nil || !z.rebalMeta.StoppedAt.IsZero() { // rebalance not running, nothing to do
  805. z.rebalMu.Unlock()
  806. return
  807. }
  808. ctx, cancel := context.WithCancel(GlobalContext)
  809. z.rebalMeta.cancel = cancel // to be used when rebalance-stop is called
  810. z.rebalMu.Unlock()
  811. z.rebalMu.RLock()
  812. participants := make([]bool, len(z.rebalMeta.PoolStats))
  813. for i, ps := range z.rebalMeta.PoolStats {
  814. // skip pools which have completed rebalancing
  815. if ps.Info.Status != rebalStarted {
  816. continue
  817. }
  818. participants[i] = ps.Participating
  819. }
  820. z.rebalMu.RUnlock()
  821. for poolIdx, doRebalance := range participants {
  822. if !doRebalance {
  823. continue
  824. }
  825. // nothing to do if this node is not pool's first node (i.e pool's rebalance 'leader').
  826. if !globalEndpoints[poolIdx].Endpoints[0].IsLocal {
  827. continue
  828. }
  829. go func(idx int) {
  830. stopfn := globalRebalanceMetrics.log(rebalanceMetricRebalanceBuckets, idx)
  831. err := z.rebalanceBuckets(ctx, idx)
  832. stopfn(0, err)
  833. }(poolIdx)
  834. }
  835. }
  836. // StopRebalance signals the rebalance goroutine running on this node (if any)
  837. // to stop, using the context.CancelFunc(s) saved at the time ofStartRebalance.
  838. func (z *erasureServerPools) StopRebalance() error {
  839. z.rebalMu.Lock()
  840. defer z.rebalMu.Unlock()
  841. r := z.rebalMeta
  842. if r == nil { // rebalance not running in this node, nothing to do
  843. return nil
  844. }
  845. if cancel := r.cancel; cancel != nil {
  846. // cancel != nil only on pool leaders
  847. r.cancel = nil
  848. cancel()
  849. }
  850. return nil
  851. }
  852. // for rebalance trace support
  853. type rebalanceMetrics struct{}
  854. var globalRebalanceMetrics rebalanceMetrics
  855. //go:generate stringer -type=rebalanceMetric -trimprefix=rebalanceMetric $GOFILE
  856. type rebalanceMetric uint8
  857. const (
  858. rebalanceMetricRebalanceBuckets rebalanceMetric = iota
  859. rebalanceMetricRebalanceBucket
  860. rebalanceMetricRebalanceObject
  861. rebalanceMetricRebalanceRemoveObject
  862. rebalanceMetricSaveMetadata
  863. )
  864. var errDataMovementSrcDstPoolSame = errors.New("source and destination pool are the same")
  865. func rebalanceTrace(r rebalanceMetric, poolIdx int, startTime time.Time, duration time.Duration, err error, path string, sz int64) madmin.TraceInfo {
  866. var errStr string
  867. if err != nil {
  868. errStr = err.Error()
  869. }
  870. return madmin.TraceInfo{
  871. TraceType: madmin.TraceRebalance,
  872. Time: startTime,
  873. NodeName: globalLocalNodeName,
  874. FuncName: fmt.Sprintf("rebalance.%s (pool-id=%d)", r.String(), poolIdx),
  875. Duration: duration,
  876. Path: path,
  877. Error: errStr,
  878. Bytes: sz,
  879. }
  880. }
  881. func (p *rebalanceMetrics) log(r rebalanceMetric, poolIdx int, paths ...string) func(sz int64, err error) {
  882. startTime := time.Now()
  883. return func(sz int64, err error) {
  884. duration := time.Since(startTime)
  885. if globalTrace.NumSubscribers(madmin.TraceRebalance) > 0 {
  886. globalTrace.Publish(rebalanceTrace(r, poolIdx, startTime, duration, err, strings.Join(paths, " "), sz))
  887. }
  888. }
  889. }