You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

3811 lines
118 KiB

  1. // Copyright (c) 2015-2021 MinIO, Inc.
  2. //
  3. // This file is part of MinIO Object Storage stack
  4. //
  5. // This program is free software: you can redistribute it and/or modify
  6. // it under the terms of the GNU Affero General Public License as published by
  7. // the Free Software Foundation, either version 3 of the License, or
  8. // (at your option) any later version.
  9. //
  10. // This program is distributed in the hope that it will be useful
  11. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. // GNU Affero General Public License for more details.
  14. //
  15. // You should have received a copy of the GNU Affero General Public License
  16. // along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. package cmd
  18. import (
  19. "context"
  20. "encoding/base64"
  21. "encoding/binary"
  22. "errors"
  23. "fmt"
  24. "io"
  25. "math/rand"
  26. "net/http"
  27. "net/url"
  28. "path"
  29. "reflect"
  30. "strings"
  31. "sync"
  32. "sync/atomic"
  33. "time"
  34. "github.com/dustin/go-humanize"
  35. "github.com/minio/madmin-go/v3"
  36. "github.com/minio/minio-go/v7"
  37. "github.com/minio/minio-go/v7/pkg/encrypt"
  38. "github.com/minio/minio-go/v7/pkg/tags"
  39. "github.com/minio/minio/internal/amztime"
  40. "github.com/minio/minio/internal/bucket/bandwidth"
  41. objectlock "github.com/minio/minio/internal/bucket/object/lock"
  42. "github.com/minio/minio/internal/bucket/replication"
  43. "github.com/minio/minio/internal/config/storageclass"
  44. "github.com/minio/minio/internal/crypto"
  45. "github.com/minio/minio/internal/event"
  46. "github.com/minio/minio/internal/hash"
  47. xhttp "github.com/minio/minio/internal/http"
  48. xioutil "github.com/minio/minio/internal/ioutil"
  49. "github.com/minio/minio/internal/kms"
  50. "github.com/minio/minio/internal/logger"
  51. "github.com/minio/minio/internal/once"
  52. "github.com/tinylib/msgp/msgp"
  53. "github.com/zeebo/xxh3"
  54. )
  55. const (
  56. throttleDeadline = 1 * time.Hour
  57. // ReplicationReset has reset id and timestamp of last reset operation
  58. ReplicationReset = "replication-reset"
  59. // ReplicationStatus has internal replication status - stringified representation of target's replication status for all replication
  60. // activity initiated from this cluster
  61. ReplicationStatus = "replication-status"
  62. // ReplicationTimestamp - the last time replication was initiated on this cluster for this object version
  63. ReplicationTimestamp = "replication-timestamp"
  64. // ReplicaStatus - this header is present if a replica was received by this cluster for this object version
  65. ReplicaStatus = "replica-status"
  66. // ReplicaTimestamp - the last time a replica was received by this cluster for this object version
  67. ReplicaTimestamp = "replica-timestamp"
  68. // TaggingTimestamp - the last time a tag metadata modification happened on this cluster for this object version
  69. TaggingTimestamp = "tagging-timestamp"
  70. // ObjectLockRetentionTimestamp - the last time a object lock metadata modification happened on this cluster for this object version
  71. ObjectLockRetentionTimestamp = "objectlock-retention-timestamp"
  72. // ObjectLockLegalHoldTimestamp - the last time a legal hold metadata modification happened on this cluster for this object version
  73. ObjectLockLegalHoldTimestamp = "objectlock-legalhold-timestamp"
  74. // ReplicationSsecChecksumHeader - the encrypted checksum of the SSE-C encrypted object.
  75. ReplicationSsecChecksumHeader = "X-Minio-Replication-Ssec-Crc"
  76. )
  77. // gets replication config associated to a given bucket name.
  78. func getReplicationConfig(ctx context.Context, bucketName string) (rc *replication.Config, err error) {
  79. rCfg, _, err := globalBucketMetadataSys.GetReplicationConfig(ctx, bucketName)
  80. if err != nil && !errors.Is(err, BucketReplicationConfigNotFound{Bucket: bucketName}) {
  81. return rCfg, err
  82. }
  83. return rCfg, nil
  84. }
  85. // validateReplicationDestination returns error if replication destination bucket missing or not configured
  86. // It also returns true if replication destination is same as this server.
  87. func validateReplicationDestination(ctx context.Context, bucket string, rCfg *replication.Config, opts *validateReplicationDestinationOptions) (bool, APIError) {
  88. if opts == nil {
  89. opts = &validateReplicationDestinationOptions{}
  90. }
  91. var arns []string
  92. if rCfg.RoleArn != "" {
  93. arns = append(arns, rCfg.RoleArn)
  94. } else {
  95. for _, rule := range rCfg.Rules {
  96. arns = append(arns, rule.Destination.String())
  97. }
  98. }
  99. var sameTarget bool
  100. for _, arnStr := range arns {
  101. arn, err := madmin.ParseARN(arnStr)
  102. if err != nil {
  103. return sameTarget, errorCodes.ToAPIErrWithErr(ErrBucketRemoteArnInvalid, err)
  104. }
  105. if arn.Type != madmin.ReplicationService {
  106. return sameTarget, toAPIError(ctx, BucketRemoteArnTypeInvalid{Bucket: bucket})
  107. }
  108. clnt := globalBucketTargetSys.GetRemoteTargetClient(bucket, arnStr)
  109. if clnt == nil {
  110. return sameTarget, toAPIError(ctx, BucketRemoteTargetNotFound{Bucket: bucket})
  111. }
  112. if opts.CheckRemoteBucket { // validate remote bucket
  113. found, err := clnt.BucketExists(ctx, arn.Bucket)
  114. if err != nil {
  115. return sameTarget, errorCodes.ToAPIErrWithErr(ErrRemoteDestinationNotFoundError, err)
  116. }
  117. if !found {
  118. return sameTarget, errorCodes.ToAPIErrWithErr(ErrRemoteDestinationNotFoundError, BucketRemoteTargetNotFound{Bucket: arn.Bucket})
  119. }
  120. if ret, err := globalBucketObjectLockSys.Get(bucket); err == nil {
  121. if ret.LockEnabled {
  122. lock, _, _, _, err := clnt.GetObjectLockConfig(ctx, arn.Bucket)
  123. if err != nil {
  124. return sameTarget, errorCodes.ToAPIErrWithErr(ErrReplicationDestinationMissingLock, err)
  125. }
  126. if lock != objectlock.Enabled {
  127. return sameTarget, errorCodes.ToAPIErrWithErr(ErrReplicationDestinationMissingLock, nil)
  128. }
  129. }
  130. }
  131. }
  132. // if checked bucket, then check the ready is unnecessary
  133. if !opts.CheckRemoteBucket && opts.CheckReady {
  134. endpoint := clnt.EndpointURL().String()
  135. if errInt, ok := opts.checkReadyErr.Load(endpoint); !ok {
  136. err = checkRemoteEndpoint(ctx, clnt.EndpointURL())
  137. opts.checkReadyErr.Store(endpoint, err)
  138. } else {
  139. if errInt == nil {
  140. err = nil
  141. } else {
  142. err, _ = errInt.(error)
  143. }
  144. }
  145. switch err.(type) {
  146. case BucketRemoteIdenticalToSource:
  147. return true, errorCodes.ToAPIErrWithErr(ErrBucketRemoteIdenticalToSource, fmt.Errorf("remote target endpoint %s is self referential", clnt.EndpointURL().String()))
  148. default:
  149. }
  150. }
  151. // validate replication ARN against target endpoint
  152. selfTarget, _ := isLocalHost(clnt.EndpointURL().Hostname(), clnt.EndpointURL().Port(), globalMinioPort)
  153. if !sameTarget {
  154. sameTarget = selfTarget
  155. }
  156. }
  157. if len(arns) == 0 {
  158. return false, toAPIError(ctx, BucketRemoteTargetNotFound{Bucket: bucket})
  159. }
  160. return sameTarget, toAPIError(ctx, nil)
  161. }
  162. // performs a http request to remote endpoint to check if deployment id of remote endpoint is same as
  163. // local cluster deployment id. This is to prevent replication to self, especially in case of a loadbalancer
  164. // in front of MinIO.
  165. func checkRemoteEndpoint(ctx context.Context, epURL *url.URL) error {
  166. reqURL := &url.URL{
  167. Scheme: epURL.Scheme,
  168. Host: epURL.Host,
  169. Path: healthCheckPathPrefix + healthCheckReadinessPath,
  170. }
  171. req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqURL.String(), nil)
  172. if err != nil {
  173. return err
  174. }
  175. client := &http.Client{
  176. Transport: globalRemoteTargetTransport,
  177. Timeout: 10 * time.Second,
  178. }
  179. resp, err := client.Do(req)
  180. if err != nil {
  181. return err
  182. }
  183. if err == nil {
  184. // Drain the connection.
  185. xhttp.DrainBody(resp.Body)
  186. }
  187. if resp != nil {
  188. amzid := resp.Header.Get(xhttp.AmzRequestHostID)
  189. if _, ok := globalNodeNamesHex[amzid]; ok {
  190. return BucketRemoteIdenticalToSource{
  191. Endpoint: epURL.String(),
  192. }
  193. }
  194. }
  195. return nil
  196. }
  197. type mustReplicateOptions struct {
  198. meta map[string]string
  199. status replication.StatusType
  200. opType replication.Type
  201. replicationRequest bool // incoming request is a replication request
  202. }
  203. func (o mustReplicateOptions) ReplicationStatus() (s replication.StatusType) {
  204. if rs, ok := o.meta[xhttp.AmzBucketReplicationStatus]; ok {
  205. return replication.StatusType(rs)
  206. }
  207. return s
  208. }
  209. func (o mustReplicateOptions) isExistingObjectReplication() bool {
  210. return o.opType == replication.ExistingObjectReplicationType
  211. }
  212. func (o mustReplicateOptions) isMetadataReplication() bool {
  213. return o.opType == replication.MetadataReplicationType
  214. }
  215. func (o ObjectInfo) getMustReplicateOptions(op replication.Type, opts ObjectOptions) mustReplicateOptions {
  216. return getMustReplicateOptions(o.UserDefined, o.UserTags, o.ReplicationStatus, op, opts)
  217. }
  218. func getMustReplicateOptions(userDefined map[string]string, userTags string, status replication.StatusType, op replication.Type, opts ObjectOptions) mustReplicateOptions {
  219. meta := cloneMSS(userDefined)
  220. if userTags != "" {
  221. meta[xhttp.AmzObjectTagging] = userTags
  222. }
  223. return mustReplicateOptions{
  224. meta: meta,
  225. status: status,
  226. opType: op,
  227. replicationRequest: opts.ReplicationRequest,
  228. }
  229. }
  230. // mustReplicate returns 2 booleans - true if object meets replication criteria and true if replication is to be done in
  231. // a synchronous manner.
  232. func mustReplicate(ctx context.Context, bucket, object string, mopts mustReplicateOptions) (dsc ReplicateDecision) {
  233. // object layer not initialized we return with no decision.
  234. if newObjectLayerFn() == nil {
  235. return
  236. }
  237. // Disable server-side replication on object prefixes which are excluded
  238. // from versioning via the MinIO bucket versioning extension.
  239. if !globalBucketVersioningSys.PrefixEnabled(bucket, object) {
  240. return
  241. }
  242. replStatus := mopts.ReplicationStatus()
  243. if replStatus == replication.Replica && !mopts.isMetadataReplication() {
  244. return
  245. }
  246. if mopts.replicationRequest { // incoming replication request on target cluster
  247. return
  248. }
  249. cfg, err := getReplicationConfig(ctx, bucket)
  250. if err != nil {
  251. replLogOnceIf(ctx, err, bucket)
  252. return
  253. }
  254. if cfg == nil {
  255. return
  256. }
  257. opts := replication.ObjectOpts{
  258. Name: object,
  259. SSEC: crypto.SSEC.IsEncrypted(mopts.meta),
  260. Replica: replStatus == replication.Replica,
  261. ExistingObject: mopts.isExistingObjectReplication(),
  262. }
  263. tagStr, ok := mopts.meta[xhttp.AmzObjectTagging]
  264. if ok {
  265. opts.UserTags = tagStr
  266. }
  267. tgtArns := cfg.FilterTargetArns(opts)
  268. for _, tgtArn := range tgtArns {
  269. tgt := globalBucketTargetSys.GetRemoteTargetClient(bucket, tgtArn)
  270. // the target online status should not be used here while deciding
  271. // whether to replicate as the target could be temporarily down
  272. opts.TargetArn = tgtArn
  273. replicate := cfg.Replicate(opts)
  274. var synchronous bool
  275. if tgt != nil {
  276. synchronous = tgt.replicateSync
  277. }
  278. dsc.Set(newReplicateTargetDecision(tgtArn, replicate, synchronous))
  279. }
  280. return dsc
  281. }
  282. // Standard headers that needs to be extracted from User metadata.
  283. var standardHeaders = []string{
  284. xhttp.ContentType,
  285. xhttp.CacheControl,
  286. xhttp.ContentEncoding,
  287. xhttp.ContentLanguage,
  288. xhttp.ContentDisposition,
  289. xhttp.AmzStorageClass,
  290. xhttp.AmzObjectTagging,
  291. xhttp.AmzBucketReplicationStatus,
  292. xhttp.AmzObjectLockMode,
  293. xhttp.AmzObjectLockRetainUntilDate,
  294. xhttp.AmzObjectLockLegalHold,
  295. xhttp.AmzTagCount,
  296. xhttp.AmzServerSideEncryption,
  297. }
  298. // returns true if any of the objects being deleted qualifies for replication.
  299. func hasReplicationRules(ctx context.Context, bucket string, objects []ObjectToDelete) bool {
  300. c, err := getReplicationConfig(ctx, bucket)
  301. if err != nil || c == nil {
  302. replLogOnceIf(ctx, err, bucket)
  303. return false
  304. }
  305. for _, obj := range objects {
  306. if c.HasActiveRules(obj.ObjectName, true) {
  307. return true
  308. }
  309. }
  310. return false
  311. }
  312. // isStandardHeader returns true if header is a supported header and not a custom header
  313. func isStandardHeader(matchHeaderKey string) bool {
  314. return equals(matchHeaderKey, standardHeaders...)
  315. }
  316. // returns whether object version is a deletemarker and if object qualifies for replication
  317. func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelete, oi ObjectInfo, delOpts ObjectOptions, gerr error) (dsc ReplicateDecision) {
  318. rcfg, err := getReplicationConfig(ctx, bucket)
  319. if err != nil || rcfg == nil {
  320. replLogOnceIf(ctx, err, bucket)
  321. return
  322. }
  323. // If incoming request is a replication request, it does not need to be re-replicated.
  324. if delOpts.ReplicationRequest {
  325. return
  326. }
  327. // Skip replication if this object's prefix is excluded from being
  328. // versioned.
  329. if !delOpts.Versioned {
  330. return
  331. }
  332. opts := replication.ObjectOpts{
  333. Name: dobj.ObjectName,
  334. SSEC: crypto.SSEC.IsEncrypted(oi.UserDefined),
  335. UserTags: oi.UserTags,
  336. DeleteMarker: oi.DeleteMarker,
  337. VersionID: dobj.VersionID,
  338. OpType: replication.DeleteReplicationType,
  339. }
  340. tgtArns := rcfg.FilterTargetArns(opts)
  341. dsc.targetsMap = make(map[string]replicateTargetDecision, len(tgtArns))
  342. if len(tgtArns) == 0 {
  343. return dsc
  344. }
  345. var sync, replicate bool
  346. for _, tgtArn := range tgtArns {
  347. opts.TargetArn = tgtArn
  348. replicate = rcfg.Replicate(opts)
  349. // when incoming delete is removal of a delete marker(a.k.a versioned delete),
  350. // GetObjectInfo returns extra information even though it returns errFileNotFound
  351. if gerr != nil {
  352. validReplStatus := false
  353. switch oi.TargetReplicationStatus(tgtArn) {
  354. case replication.Pending, replication.Completed, replication.Failed:
  355. validReplStatus = true
  356. }
  357. if oi.DeleteMarker && (validReplStatus || replicate) {
  358. dsc.Set(newReplicateTargetDecision(tgtArn, replicate, sync))
  359. continue
  360. }
  361. // can be the case that other cluster is down and duplicate `mc rm --vid`
  362. // is issued - this still needs to be replicated back to the other target
  363. if !oi.VersionPurgeStatus.Empty() {
  364. replicate = oi.VersionPurgeStatus == Pending || oi.VersionPurgeStatus == Failed
  365. dsc.Set(newReplicateTargetDecision(tgtArn, replicate, sync))
  366. }
  367. continue
  368. }
  369. tgt := globalBucketTargetSys.GetRemoteTargetClient(bucket, tgtArn)
  370. // the target online status should not be used here while deciding
  371. // whether to replicate deletes as the target could be temporarily down
  372. tgtDsc := newReplicateTargetDecision(tgtArn, false, false)
  373. if tgt != nil {
  374. tgtDsc = newReplicateTargetDecision(tgtArn, replicate, tgt.replicateSync)
  375. }
  376. dsc.Set(tgtDsc)
  377. }
  378. return dsc
  379. }
  380. // replicate deletes to the designated replication target if replication configuration
  381. // has delete marker replication or delete replication (MinIO extension to allow deletes where version id
  382. // is specified) enabled.
  383. // Similar to bucket replication for PUT operation, soft delete (a.k.a setting delete marker) and
  384. // permanent deletes (by specifying a version ID in the delete operation) have three states "Pending", "Complete"
  385. // and "Failed" to mark the status of the replication of "DELETE" operation. All failed operations can
  386. // then be retried by healing. In the case of permanent deletes, until the replication is completed on the
  387. // target cluster, the object version is marked deleted on the source and hidden from listing. It is permanently
  388. // deleted from the source when the VersionPurgeStatus changes to "Complete", i.e after replication succeeds
  389. // on target.
  390. func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, objectAPI ObjectLayer) {
  391. var replicationStatus replication.StatusType
  392. bucket := dobj.Bucket
  393. versionID := dobj.DeleteMarkerVersionID
  394. if versionID == "" {
  395. versionID = dobj.VersionID
  396. }
  397. defer func() {
  398. replStatus := string(replicationStatus)
  399. auditLogInternal(context.Background(), AuditLogOptions{
  400. Event: dobj.EventType,
  401. APIName: ReplicateDeleteAPI,
  402. Bucket: bucket,
  403. Object: dobj.ObjectName,
  404. VersionID: versionID,
  405. Status: replStatus,
  406. })
  407. }()
  408. rcfg, err := getReplicationConfig(ctx, bucket)
  409. if err != nil || rcfg == nil {
  410. replLogOnceIf(ctx, fmt.Errorf("unable to obtain replication config for bucket: %s: err: %s", bucket, err), bucket)
  411. sendEvent(eventArgs{
  412. BucketName: bucket,
  413. Object: ObjectInfo{
  414. Bucket: bucket,
  415. Name: dobj.ObjectName,
  416. VersionID: versionID,
  417. DeleteMarker: dobj.DeleteMarker,
  418. },
  419. UserAgent: "Internal: [Replication]",
  420. Host: globalLocalNodeName,
  421. EventName: event.ObjectReplicationNotTracked,
  422. })
  423. return
  424. }
  425. dsc, err := parseReplicateDecision(ctx, bucket, dobj.ReplicationState.ReplicateDecisionStr)
  426. if err != nil {
  427. replLogOnceIf(ctx, fmt.Errorf("unable to parse replication decision parameters for bucket: %s, err: %s, decision: %s",
  428. bucket, err, dobj.ReplicationState.ReplicateDecisionStr), dobj.ReplicationState.ReplicateDecisionStr)
  429. sendEvent(eventArgs{
  430. BucketName: bucket,
  431. Object: ObjectInfo{
  432. Bucket: bucket,
  433. Name: dobj.ObjectName,
  434. VersionID: versionID,
  435. DeleteMarker: dobj.DeleteMarker,
  436. },
  437. UserAgent: "Internal: [Replication]",
  438. Host: globalLocalNodeName,
  439. EventName: event.ObjectReplicationNotTracked,
  440. })
  441. return
  442. }
  443. // Lock the object name before starting replication operation.
  444. // Use separate lock that doesn't collide with regular objects.
  445. lk := objectAPI.NewNSLock(bucket, "/[replicate]/"+dobj.ObjectName)
  446. lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
  447. if err != nil {
  448. globalReplicationPool.Get().queueMRFSave(dobj.ToMRFEntry())
  449. sendEvent(eventArgs{
  450. BucketName: bucket,
  451. Object: ObjectInfo{
  452. Bucket: bucket,
  453. Name: dobj.ObjectName,
  454. VersionID: versionID,
  455. DeleteMarker: dobj.DeleteMarker,
  456. },
  457. UserAgent: "Internal: [Replication]",
  458. Host: globalLocalNodeName,
  459. EventName: event.ObjectReplicationNotTracked,
  460. })
  461. return
  462. }
  463. ctx = lkctx.Context()
  464. defer lk.Unlock(lkctx)
  465. rinfos := replicatedInfos{Targets: make([]replicatedTargetInfo, 0, len(dsc.targetsMap))}
  466. var wg sync.WaitGroup
  467. var mu sync.Mutex
  468. for _, tgtEntry := range dsc.targetsMap {
  469. if !tgtEntry.Replicate {
  470. continue
  471. }
  472. // if dobj.TargetArn is not empty string, this is a case of specific target being re-synced.
  473. if dobj.TargetArn != "" && dobj.TargetArn != tgtEntry.Arn {
  474. continue
  475. }
  476. tgtClnt := globalBucketTargetSys.GetRemoteTargetClient(bucket, tgtEntry.Arn)
  477. if tgtClnt == nil {
  478. // Skip stale targets if any and log them to be missing at least once.
  479. replLogOnceIf(ctx, fmt.Errorf("failed to get target for bucket:%s arn:%s", bucket, tgtEntry.Arn), tgtEntry.Arn)
  480. sendEvent(eventArgs{
  481. EventName: event.ObjectReplicationNotTracked,
  482. BucketName: bucket,
  483. Object: ObjectInfo{
  484. Bucket: bucket,
  485. Name: dobj.ObjectName,
  486. VersionID: versionID,
  487. DeleteMarker: dobj.DeleteMarker,
  488. },
  489. UserAgent: "Internal: [Replication]",
  490. Host: globalLocalNodeName,
  491. })
  492. continue
  493. }
  494. wg.Add(1)
  495. go func(tgt *TargetClient) {
  496. defer wg.Done()
  497. tgtInfo := replicateDeleteToTarget(ctx, dobj, tgt)
  498. mu.Lock()
  499. rinfos.Targets = append(rinfos.Targets, tgtInfo)
  500. mu.Unlock()
  501. }(tgtClnt)
  502. }
  503. wg.Wait()
  504. replicationStatus = rinfos.ReplicationStatus()
  505. prevStatus := dobj.DeleteMarkerReplicationStatus()
  506. if dobj.VersionID != "" {
  507. prevStatus = replication.StatusType(dobj.VersionPurgeStatus())
  508. replicationStatus = replication.StatusType(rinfos.VersionPurgeStatus())
  509. }
  510. // to decrement pending count later.
  511. for _, rinfo := range rinfos.Targets {
  512. if rinfo.ReplicationStatus != rinfo.PrevReplicationStatus {
  513. globalReplicationStats.Load().Update(dobj.Bucket, rinfo, replicationStatus,
  514. prevStatus)
  515. }
  516. }
  517. eventName := event.ObjectReplicationComplete
  518. if replicationStatus == replication.Failed {
  519. eventName = event.ObjectReplicationFailed
  520. globalReplicationPool.Get().queueMRFSave(dobj.ToMRFEntry())
  521. }
  522. drs := getReplicationState(rinfos, dobj.ReplicationState, dobj.VersionID)
  523. if replicationStatus != prevStatus {
  524. drs.ReplicationTimeStamp = UTCNow()
  525. }
  526. dobjInfo, err := objectAPI.DeleteObject(ctx, bucket, dobj.ObjectName, ObjectOptions{
  527. VersionID: versionID,
  528. MTime: dobj.DeleteMarkerMTime.Time,
  529. DeleteReplication: drs,
  530. Versioned: globalBucketVersioningSys.PrefixEnabled(bucket, dobj.ObjectName),
  531. // Objects matching prefixes should not leave delete markers,
  532. // dramatically reduces namespace pollution while keeping the
  533. // benefits of replication, make sure to apply version suspension
  534. // only at bucket level instead.
  535. VersionSuspended: globalBucketVersioningSys.Suspended(bucket),
  536. })
  537. if err != nil && !isErrVersionNotFound(err) { // VersionNotFound would be reported by pool that object version is missing on.
  538. sendEvent(eventArgs{
  539. BucketName: bucket,
  540. Object: ObjectInfo{
  541. Bucket: bucket,
  542. Name: dobj.ObjectName,
  543. VersionID: versionID,
  544. DeleteMarker: dobj.DeleteMarker,
  545. },
  546. UserAgent: "Internal: [Replication]",
  547. Host: globalLocalNodeName,
  548. EventName: eventName,
  549. })
  550. } else {
  551. sendEvent(eventArgs{
  552. BucketName: bucket,
  553. Object: dobjInfo,
  554. UserAgent: "Internal: [Replication]",
  555. Host: globalLocalNodeName,
  556. EventName: eventName,
  557. })
  558. }
  559. }
  560. func replicateDeleteToTarget(ctx context.Context, dobj DeletedObjectReplicationInfo, tgt *TargetClient) (rinfo replicatedTargetInfo) {
  561. versionID := dobj.DeleteMarkerVersionID
  562. if versionID == "" {
  563. versionID = dobj.VersionID
  564. }
  565. rinfo = dobj.ReplicationState.targetState(tgt.ARN)
  566. rinfo.OpType = dobj.OpType
  567. rinfo.endpoint = tgt.EndpointURL().Host
  568. rinfo.secure = tgt.EndpointURL().Scheme == "https"
  569. defer func() {
  570. if rinfo.ReplicationStatus == replication.Completed && tgt.ResetID != "" && dobj.OpType == replication.ExistingObjectReplicationType {
  571. rinfo.ResyncTimestamp = fmt.Sprintf("%s;%s", UTCNow().Format(http.TimeFormat), tgt.ResetID)
  572. }
  573. }()
  574. if dobj.VersionID == "" && rinfo.PrevReplicationStatus == replication.Completed && dobj.OpType != replication.ExistingObjectReplicationType {
  575. rinfo.ReplicationStatus = rinfo.PrevReplicationStatus
  576. return
  577. }
  578. if dobj.VersionID != "" && rinfo.VersionPurgeStatus == Complete {
  579. return
  580. }
  581. if globalBucketTargetSys.isOffline(tgt.EndpointURL()) {
  582. replLogOnceIf(ctx, fmt.Errorf("remote target is offline for bucket:%s arn:%s", dobj.Bucket, tgt.ARN), "replication-target-offline-delete-"+tgt.ARN)
  583. sendEvent(eventArgs{
  584. BucketName: dobj.Bucket,
  585. Object: ObjectInfo{
  586. Bucket: dobj.Bucket,
  587. Name: dobj.ObjectName,
  588. VersionID: dobj.VersionID,
  589. DeleteMarker: dobj.DeleteMarker,
  590. },
  591. UserAgent: "Internal: [Replication]",
  592. Host: globalLocalNodeName,
  593. EventName: event.ObjectReplicationNotTracked,
  594. })
  595. if dobj.VersionID == "" {
  596. rinfo.ReplicationStatus = replication.Failed
  597. } else {
  598. rinfo.VersionPurgeStatus = Failed
  599. }
  600. return
  601. }
  602. // early return if already replicated delete marker for existing object replication/ healing delete markers
  603. if dobj.DeleteMarkerVersionID != "" {
  604. toi, err := tgt.StatObject(ctx, tgt.Bucket, dobj.ObjectName, minio.StatObjectOptions{
  605. VersionID: versionID,
  606. Internal: minio.AdvancedGetOptions{
  607. ReplicationProxyRequest: "false",
  608. IsReplicationReadyForDeleteMarker: true,
  609. },
  610. })
  611. serr := ErrorRespToObjectError(err, dobj.Bucket, dobj.ObjectName, dobj.VersionID)
  612. switch {
  613. case isErrMethodNotAllowed(serr):
  614. // delete marker already replicated
  615. if dobj.VersionID == "" && rinfo.VersionPurgeStatus.Empty() {
  616. rinfo.ReplicationStatus = replication.Completed
  617. return
  618. }
  619. case isErrObjectNotFound(serr), isErrVersionNotFound(serr):
  620. // version being purged is already not found on target.
  621. if !rinfo.VersionPurgeStatus.Empty() {
  622. rinfo.VersionPurgeStatus = Complete
  623. return
  624. }
  625. case isErrReadQuorum(serr), isErrWriteQuorum(serr):
  626. // destination has some quorum issues, perform removeObject() anyways
  627. // to complete the operation.
  628. default:
  629. if err != nil && minio.IsNetworkOrHostDown(err, true) && !globalBucketTargetSys.isOffline(tgt.EndpointURL()) {
  630. globalBucketTargetSys.markOffline(tgt.EndpointURL())
  631. }
  632. // mark delete marker replication as failed if target cluster not ready to receive
  633. // this request yet (object version not replicated yet)
  634. if err != nil && !toi.ReplicationReady {
  635. rinfo.ReplicationStatus = replication.Failed
  636. rinfo.Err = err
  637. return
  638. }
  639. }
  640. }
  641. rmErr := tgt.RemoveObject(ctx, tgt.Bucket, dobj.ObjectName, minio.RemoveObjectOptions{
  642. VersionID: versionID,
  643. Internal: minio.AdvancedRemoveOptions{
  644. ReplicationDeleteMarker: dobj.DeleteMarkerVersionID != "",
  645. ReplicationMTime: dobj.DeleteMarkerMTime.Time,
  646. ReplicationStatus: minio.ReplicationStatusReplica,
  647. ReplicationRequest: true, // always set this to distinguish between `mc mirror` replication and serverside
  648. },
  649. })
  650. if rmErr != nil {
  651. rinfo.Err = rmErr
  652. if dobj.VersionID == "" {
  653. rinfo.ReplicationStatus = replication.Failed
  654. } else {
  655. rinfo.VersionPurgeStatus = Failed
  656. }
  657. replLogIf(ctx, fmt.Errorf("unable to replicate delete marker to %s: %s/%s(%s): %w", tgt.EndpointURL(), tgt.Bucket, dobj.ObjectName, versionID, rmErr))
  658. if rmErr != nil && minio.IsNetworkOrHostDown(rmErr, true) && !globalBucketTargetSys.isOffline(tgt.EndpointURL()) {
  659. globalBucketTargetSys.markOffline(tgt.EndpointURL())
  660. }
  661. } else {
  662. if dobj.VersionID == "" {
  663. rinfo.ReplicationStatus = replication.Completed
  664. } else {
  665. rinfo.VersionPurgeStatus = Complete
  666. }
  667. }
  668. return
  669. }
  670. func getCopyObjMetadata(oi ObjectInfo, sc string) map[string]string {
  671. meta := make(map[string]string, len(oi.UserDefined))
  672. for k, v := range oi.UserDefined {
  673. if stringsHasPrefixFold(k, ReservedMetadataPrefixLower) {
  674. continue
  675. }
  676. if equals(k, xhttp.AmzBucketReplicationStatus) {
  677. continue
  678. }
  679. // https://github.com/google/security-research/security/advisories/GHSA-76wf-9vgp-pj7w
  680. if equals(k, xhttp.AmzMetaUnencryptedContentLength, xhttp.AmzMetaUnencryptedContentMD5) {
  681. continue
  682. }
  683. meta[k] = v
  684. }
  685. if oi.ContentEncoding != "" {
  686. meta[xhttp.ContentEncoding] = oi.ContentEncoding
  687. }
  688. if oi.ContentType != "" {
  689. meta[xhttp.ContentType] = oi.ContentType
  690. }
  691. meta[xhttp.AmzObjectTagging] = oi.UserTags
  692. meta[xhttp.AmzTagDirective] = "REPLACE"
  693. if sc == "" {
  694. sc = oi.StorageClass
  695. }
  696. // drop non standard storage classes for tiering from replication
  697. if sc != "" && (sc == storageclass.RRS || sc == storageclass.STANDARD) {
  698. meta[xhttp.AmzStorageClass] = sc
  699. }
  700. meta[xhttp.MinIOSourceETag] = oi.ETag
  701. meta[xhttp.MinIOSourceMTime] = oi.ModTime.UTC().Format(time.RFC3339Nano)
  702. meta[xhttp.AmzBucketReplicationStatus] = replication.Replica.String()
  703. return meta
  704. }
  705. type caseInsensitiveMap map[string]string
  706. // Lookup map entry case insensitively.
  707. func (m caseInsensitiveMap) Lookup(key string) (string, bool) {
  708. if len(m) == 0 {
  709. return "", false
  710. }
  711. for _, k := range []string{
  712. key,
  713. strings.ToLower(key),
  714. http.CanonicalHeaderKey(key),
  715. } {
  716. v, ok := m[k]
  717. if ok {
  718. return v, ok
  719. }
  720. }
  721. return "", false
  722. }
  723. func putReplicationOpts(ctx context.Context, sc string, objInfo ObjectInfo) (putOpts minio.PutObjectOptions, isMP bool, err error) {
  724. meta := make(map[string]string)
  725. isSSEC := crypto.SSEC.IsEncrypted(objInfo.UserDefined)
  726. for k, v := range objInfo.UserDefined {
  727. _, isValidSSEHeader := validSSEReplicationHeaders[k]
  728. // In case of SSE-C objects copy the allowed internal headers as well
  729. if !isSSEC || !isValidSSEHeader {
  730. if stringsHasPrefixFold(k, ReservedMetadataPrefixLower) {
  731. continue
  732. }
  733. if isStandardHeader(k) {
  734. continue
  735. }
  736. }
  737. if isValidSSEHeader {
  738. meta[validSSEReplicationHeaders[k]] = v
  739. } else {
  740. meta[k] = v
  741. }
  742. }
  743. isMP = objInfo.isMultipart()
  744. if len(objInfo.Checksum) > 0 {
  745. // Add encrypted CRC to metadata for SSE-C objects.
  746. if isSSEC {
  747. meta[ReplicationSsecChecksumHeader] = base64.StdEncoding.EncodeToString(objInfo.Checksum)
  748. } else {
  749. cs, mp := getCRCMeta(objInfo, 0, nil)
  750. // Set object checksum.
  751. for k, v := range cs {
  752. meta[k] = v
  753. }
  754. isMP = mp
  755. if !objInfo.isMultipart() && cs[xhttp.AmzChecksumType] == xhttp.AmzChecksumTypeFullObject {
  756. // For objects where checksum is full object, it will be the same.
  757. // Therefore, we use the cheaper PutObject replication.
  758. isMP = false
  759. }
  760. }
  761. }
  762. if sc == "" && (objInfo.StorageClass == storageclass.STANDARD || objInfo.StorageClass == storageclass.RRS) {
  763. sc = objInfo.StorageClass
  764. }
  765. putOpts = minio.PutObjectOptions{
  766. UserMetadata: meta,
  767. ContentType: objInfo.ContentType,
  768. ContentEncoding: objInfo.ContentEncoding,
  769. Expires: objInfo.Expires,
  770. StorageClass: sc,
  771. Internal: minio.AdvancedPutOptions{
  772. SourceVersionID: objInfo.VersionID,
  773. ReplicationStatus: minio.ReplicationStatusReplica,
  774. SourceMTime: objInfo.ModTime,
  775. SourceETag: objInfo.ETag,
  776. ReplicationRequest: true, // always set this to distinguish between `mc mirror` replication and serverside
  777. },
  778. }
  779. if objInfo.UserTags != "" {
  780. tag, _ := tags.ParseObjectTags(objInfo.UserTags)
  781. if tag != nil {
  782. putOpts.UserTags = tag.ToMap()
  783. // set tag timestamp in opts
  784. tagTimestamp := objInfo.ModTime
  785. if tagTmstampStr, ok := objInfo.UserDefined[ReservedMetadataPrefixLower+TaggingTimestamp]; ok {
  786. tagTimestamp, err = time.Parse(time.RFC3339Nano, tagTmstampStr)
  787. if err != nil {
  788. return putOpts, false, err
  789. }
  790. }
  791. putOpts.Internal.TaggingTimestamp = tagTimestamp
  792. }
  793. }
  794. lkMap := caseInsensitiveMap(objInfo.UserDefined)
  795. if lang, ok := lkMap.Lookup(xhttp.ContentLanguage); ok {
  796. putOpts.ContentLanguage = lang
  797. }
  798. if disp, ok := lkMap.Lookup(xhttp.ContentDisposition); ok {
  799. putOpts.ContentDisposition = disp
  800. }
  801. if cc, ok := lkMap.Lookup(xhttp.CacheControl); ok {
  802. putOpts.CacheControl = cc
  803. }
  804. if mode, ok := lkMap.Lookup(xhttp.AmzObjectLockMode); ok {
  805. rmode := minio.RetentionMode(mode)
  806. putOpts.Mode = rmode
  807. }
  808. if retainDateStr, ok := lkMap.Lookup(xhttp.AmzObjectLockRetainUntilDate); ok {
  809. rdate, err := amztime.ISO8601Parse(retainDateStr)
  810. if err != nil {
  811. return putOpts, false, err
  812. }
  813. putOpts.RetainUntilDate = rdate
  814. // set retention timestamp in opts
  815. retTimestamp := objInfo.ModTime
  816. if retainTmstampStr, ok := objInfo.UserDefined[ReservedMetadataPrefixLower+ObjectLockRetentionTimestamp]; ok {
  817. retTimestamp, err = time.Parse(time.RFC3339Nano, retainTmstampStr)
  818. if err != nil {
  819. return putOpts, false, err
  820. }
  821. }
  822. putOpts.Internal.RetentionTimestamp = retTimestamp
  823. }
  824. if lhold, ok := lkMap.Lookup(xhttp.AmzObjectLockLegalHold); ok {
  825. putOpts.LegalHold = minio.LegalHoldStatus(lhold)
  826. // set legalhold timestamp in opts
  827. lholdTimestamp := objInfo.ModTime
  828. if lholdTmstampStr, ok := objInfo.UserDefined[ReservedMetadataPrefixLower+ObjectLockLegalHoldTimestamp]; ok {
  829. lholdTimestamp, err = time.Parse(time.RFC3339Nano, lholdTmstampStr)
  830. if err != nil {
  831. return putOpts, false, err
  832. }
  833. }
  834. putOpts.Internal.LegalholdTimestamp = lholdTimestamp
  835. }
  836. if crypto.S3.IsEncrypted(objInfo.UserDefined) {
  837. putOpts.ServerSideEncryption = encrypt.NewSSE()
  838. }
  839. if crypto.S3KMS.IsEncrypted(objInfo.UserDefined) {
  840. // If KMS key ID replication is enabled (as by default)
  841. // we include the object's KMS key ID. In any case, we
  842. // always set the SSE-KMS header. If no KMS key ID is
  843. // specified, MinIO is supposed to use whatever default
  844. // config applies on the site or bucket.
  845. var keyID string
  846. if kms.ReplicateKeyID() {
  847. keyID = objInfo.KMSKeyID()
  848. }
  849. sseEnc, err := encrypt.NewSSEKMS(keyID, nil)
  850. if err != nil {
  851. return putOpts, false, err
  852. }
  853. putOpts.ServerSideEncryption = sseEnc
  854. }
  855. return
  856. }
  857. type replicationAction string
  858. const (
  859. replicateMetadata replicationAction = "metadata"
  860. replicateNone replicationAction = "none"
  861. replicateAll replicationAction = "all"
  862. )
  863. // matches k1 with all keys, returns 'true' if one of them matches
  864. func equals(k1 string, keys ...string) bool {
  865. for _, k2 := range keys {
  866. if strings.EqualFold(k1, k2) {
  867. return true
  868. }
  869. }
  870. return false
  871. }
  872. // returns replicationAction by comparing metadata between source and target
  873. func getReplicationAction(oi1 ObjectInfo, oi2 minio.ObjectInfo, opType replication.Type) replicationAction {
  874. // Avoid resyncing null versions created prior to enabling replication if target has a newer copy
  875. if opType == replication.ExistingObjectReplicationType &&
  876. oi1.ModTime.Unix() > oi2.LastModified.Unix() && oi1.VersionID == nullVersionID {
  877. return replicateNone
  878. }
  879. sz, _ := oi1.GetActualSize()
  880. // needs full replication
  881. if oi1.ETag != oi2.ETag ||
  882. oi1.VersionID != oi2.VersionID ||
  883. sz != oi2.Size ||
  884. oi1.DeleteMarker != oi2.IsDeleteMarker ||
  885. oi1.ModTime.Unix() != oi2.LastModified.Unix() {
  886. return replicateAll
  887. }
  888. if oi1.ContentType != oi2.ContentType {
  889. return replicateMetadata
  890. }
  891. if oi1.ContentEncoding != "" {
  892. enc, ok := oi2.Metadata[xhttp.ContentEncoding]
  893. if !ok {
  894. enc, ok = oi2.Metadata[strings.ToLower(xhttp.ContentEncoding)]
  895. if !ok {
  896. return replicateMetadata
  897. }
  898. }
  899. if strings.Join(enc, ",") != oi1.ContentEncoding {
  900. return replicateMetadata
  901. }
  902. }
  903. t, _ := tags.ParseObjectTags(oi1.UserTags)
  904. oi2Map := make(map[string]string)
  905. for k, v := range oi2.UserTags {
  906. oi2Map[k] = v
  907. }
  908. if (oi2.UserTagCount > 0 && !reflect.DeepEqual(oi2Map, t.ToMap())) || (oi2.UserTagCount != len(t.ToMap())) {
  909. return replicateMetadata
  910. }
  911. // Compare only necessary headers
  912. compareKeys := []string{
  913. "Expires",
  914. "Cache-Control",
  915. "Content-Language",
  916. "Content-Disposition",
  917. "X-Amz-Object-Lock-Mode",
  918. "X-Amz-Object-Lock-Retain-Until-Date",
  919. "X-Amz-Object-Lock-Legal-Hold",
  920. "X-Amz-Website-Redirect-Location",
  921. "X-Amz-Meta-",
  922. }
  923. // compare metadata on both maps to see if meta is identical
  924. compareMeta1 := make(map[string]string)
  925. for k, v := range oi1.UserDefined {
  926. var found bool
  927. for _, prefix := range compareKeys {
  928. if !stringsHasPrefixFold(k, prefix) {
  929. continue
  930. }
  931. found = true
  932. break
  933. }
  934. if found {
  935. compareMeta1[strings.ToLower(k)] = v
  936. }
  937. }
  938. compareMeta2 := make(map[string]string)
  939. for k, v := range oi2.Metadata {
  940. var found bool
  941. for _, prefix := range compareKeys {
  942. if !stringsHasPrefixFold(k, prefix) {
  943. continue
  944. }
  945. found = true
  946. break
  947. }
  948. if found {
  949. compareMeta2[strings.ToLower(k)] = strings.Join(v, ",")
  950. }
  951. }
  952. if !reflect.DeepEqual(compareMeta1, compareMeta2) {
  953. return replicateMetadata
  954. }
  955. return replicateNone
  956. }
  957. // replicateObject replicates the specified version of the object to destination bucket
  958. // The source object is then updated to reflect the replication status.
  959. func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI ObjectLayer) {
  960. var replicationStatus replication.StatusType
  961. defer func() {
  962. if replicationStatus.Empty() {
  963. // replication status is empty means
  964. // replication was not attempted for some
  965. // reason, notify the state of the object
  966. // on disk.
  967. replicationStatus = ri.ReplicationStatus
  968. }
  969. auditLogInternal(ctx, AuditLogOptions{
  970. Event: ri.EventType,
  971. APIName: ReplicateObjectAPI,
  972. Bucket: ri.Bucket,
  973. Object: ri.Name,
  974. VersionID: ri.VersionID,
  975. Status: replicationStatus.String(),
  976. })
  977. }()
  978. bucket := ri.Bucket
  979. object := ri.Name
  980. cfg, err := getReplicationConfig(ctx, bucket)
  981. if err != nil || cfg == nil {
  982. replLogOnceIf(ctx, err, "get-replication-config-"+bucket)
  983. sendEvent(eventArgs{
  984. EventName: event.ObjectReplicationNotTracked,
  985. BucketName: bucket,
  986. Object: ri.ToObjectInfo(),
  987. UserAgent: "Internal: [Replication]",
  988. Host: globalLocalNodeName,
  989. })
  990. return
  991. }
  992. tgtArns := cfg.FilterTargetArns(replication.ObjectOpts{
  993. Name: object,
  994. SSEC: ri.SSEC,
  995. UserTags: ri.UserTags,
  996. })
  997. // Lock the object name before starting replication.
  998. // Use separate lock that doesn't collide with regular objects.
  999. lk := objectAPI.NewNSLock(bucket, "/[replicate]/"+object)
  1000. lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
  1001. if err != nil {
  1002. sendEvent(eventArgs{
  1003. EventName: event.ObjectReplicationNotTracked,
  1004. BucketName: bucket,
  1005. Object: ri.ToObjectInfo(),
  1006. UserAgent: "Internal: [Replication]",
  1007. Host: globalLocalNodeName,
  1008. })
  1009. globalReplicationPool.Get().queueMRFSave(ri.ToMRFEntry())
  1010. return
  1011. }
  1012. ctx = lkctx.Context()
  1013. defer lk.Unlock(lkctx)
  1014. rinfos := replicatedInfos{Targets: make([]replicatedTargetInfo, 0, len(tgtArns))}
  1015. var wg sync.WaitGroup
  1016. var mu sync.Mutex
  1017. for _, tgtArn := range tgtArns {
  1018. tgt := globalBucketTargetSys.GetRemoteTargetClient(bucket, tgtArn)
  1019. if tgt == nil {
  1020. replLogOnceIf(ctx, fmt.Errorf("failed to get target for bucket:%s arn:%s", bucket, tgtArn), tgtArn)
  1021. sendEvent(eventArgs{
  1022. EventName: event.ObjectReplicationNotTracked,
  1023. BucketName: bucket,
  1024. Object: ri.ToObjectInfo(),
  1025. UserAgent: "Internal: [Replication]",
  1026. Host: globalLocalNodeName,
  1027. })
  1028. continue
  1029. }
  1030. wg.Add(1)
  1031. go func(tgt *TargetClient) {
  1032. defer wg.Done()
  1033. var tgtInfo replicatedTargetInfo
  1034. if ri.OpType == replication.ObjectReplicationType {
  1035. // all incoming calls go through optimized path.
  1036. tgtInfo = ri.replicateObject(ctx, objectAPI, tgt)
  1037. } else {
  1038. tgtInfo = ri.replicateAll(ctx, objectAPI, tgt)
  1039. }
  1040. mu.Lock()
  1041. rinfos.Targets = append(rinfos.Targets, tgtInfo)
  1042. mu.Unlock()
  1043. }(tgt)
  1044. }
  1045. wg.Wait()
  1046. replicationStatus = rinfos.ReplicationStatus() // used in defer function
  1047. // FIXME: add support for missing replication events
  1048. // - event.ObjectReplicationMissedThreshold
  1049. // - event.ObjectReplicationReplicatedAfterThreshold
  1050. eventName := event.ObjectReplicationComplete
  1051. if replicationStatus == replication.Failed {
  1052. eventName = event.ObjectReplicationFailed
  1053. }
  1054. newReplStatusInternal := rinfos.ReplicationStatusInternal()
  1055. // Note that internal replication status(es) may match for previously replicated objects - in such cases
  1056. // metadata should be updated with last resync timestamp.
  1057. objInfo := ri.ToObjectInfo()
  1058. if ri.ReplicationStatusInternal != newReplStatusInternal || rinfos.ReplicationResynced() {
  1059. popts := ObjectOptions{
  1060. MTime: ri.ModTime,
  1061. VersionID: ri.VersionID,
  1062. EvalMetadataFn: func(oi *ObjectInfo, gerr error) (dsc ReplicateDecision, err error) {
  1063. oi.UserDefined[ReservedMetadataPrefixLower+ReplicationStatus] = newReplStatusInternal
  1064. oi.UserDefined[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano)
  1065. oi.UserDefined[xhttp.AmzBucketReplicationStatus] = string(rinfos.ReplicationStatus())
  1066. for _, rinfo := range rinfos.Targets {
  1067. if rinfo.ResyncTimestamp != "" {
  1068. oi.UserDefined[targetResetHeader(rinfo.Arn)] = rinfo.ResyncTimestamp
  1069. }
  1070. }
  1071. if ri.UserTags != "" {
  1072. oi.UserDefined[xhttp.AmzObjectTagging] = ri.UserTags
  1073. }
  1074. return dsc, nil
  1075. },
  1076. }
  1077. uobjInfo, _ := objectAPI.PutObjectMetadata(ctx, bucket, object, popts)
  1078. if uobjInfo.Name != "" {
  1079. objInfo = uobjInfo
  1080. }
  1081. opType := replication.MetadataReplicationType
  1082. if rinfos.Action() == replicateAll {
  1083. opType = replication.ObjectReplicationType
  1084. }
  1085. for _, rinfo := range rinfos.Targets {
  1086. if rinfo.ReplicationStatus != rinfo.PrevReplicationStatus {
  1087. rinfo.OpType = opType // update optype to reflect correct operation.
  1088. globalReplicationStats.Load().Update(bucket, rinfo, rinfo.ReplicationStatus, rinfo.PrevReplicationStatus)
  1089. }
  1090. }
  1091. }
  1092. sendEvent(eventArgs{
  1093. EventName: eventName,
  1094. BucketName: bucket,
  1095. Object: objInfo,
  1096. UserAgent: "Internal: [Replication]",
  1097. Host: globalLocalNodeName,
  1098. })
  1099. // re-queue failures once more - keep a retry count to avoid flooding the queue if
  1100. // the target site is down. Leave it to scanner to catch up instead.
  1101. if rinfos.ReplicationStatus() != replication.Completed {
  1102. ri.OpType = replication.HealReplicationType
  1103. ri.EventType = ReplicateMRF
  1104. ri.ReplicationStatusInternal = rinfos.ReplicationStatusInternal()
  1105. ri.RetryCount++
  1106. globalReplicationPool.Get().queueMRFSave(ri.ToMRFEntry())
  1107. }
  1108. }
  1109. // replicateObject replicates object data for specified version of the object to destination bucket
  1110. // The source object is then updated to reflect the replication status.
  1111. func (ri ReplicateObjectInfo) replicateObject(ctx context.Context, objectAPI ObjectLayer, tgt *TargetClient) (rinfo replicatedTargetInfo) {
  1112. startTime := time.Now()
  1113. bucket := ri.Bucket
  1114. object := ri.Name
  1115. rAction := replicateAll
  1116. rinfo = replicatedTargetInfo{
  1117. Size: ri.ActualSize,
  1118. Arn: tgt.ARN,
  1119. PrevReplicationStatus: ri.TargetReplicationStatus(tgt.ARN),
  1120. ReplicationStatus: replication.Failed,
  1121. OpType: ri.OpType,
  1122. ReplicationAction: rAction,
  1123. endpoint: tgt.EndpointURL().Host,
  1124. secure: tgt.EndpointURL().Scheme == "https",
  1125. }
  1126. if ri.TargetReplicationStatus(tgt.ARN) == replication.Completed && !ri.ExistingObjResync.Empty() && !ri.ExistingObjResync.mustResyncTarget(tgt.ARN) {
  1127. rinfo.ReplicationStatus = replication.Completed
  1128. rinfo.ReplicationResynced = true
  1129. return
  1130. }
  1131. if globalBucketTargetSys.isOffline(tgt.EndpointURL()) {
  1132. replLogOnceIf(ctx, fmt.Errorf("remote target is offline for bucket:%s arn:%s retry:%d", bucket, tgt.ARN, ri.RetryCount), "replication-target-offline"+tgt.ARN)
  1133. sendEvent(eventArgs{
  1134. EventName: event.ObjectReplicationNotTracked,
  1135. BucketName: bucket,
  1136. Object: ri.ToObjectInfo(),
  1137. UserAgent: "Internal: [Replication]",
  1138. Host: globalLocalNodeName,
  1139. })
  1140. return
  1141. }
  1142. versioned := globalBucketVersioningSys.PrefixEnabled(bucket, object)
  1143. versionSuspended := globalBucketVersioningSys.PrefixSuspended(bucket, object)
  1144. gr, err := objectAPI.GetObjectNInfo(ctx, bucket, object, nil, http.Header{}, ObjectOptions{
  1145. VersionID: ri.VersionID,
  1146. Versioned: versioned,
  1147. VersionSuspended: versionSuspended,
  1148. ReplicationRequest: true,
  1149. })
  1150. if err != nil {
  1151. if !isErrVersionNotFound(err) && !isErrObjectNotFound(err) {
  1152. objInfo := ri.ToObjectInfo()
  1153. sendEvent(eventArgs{
  1154. EventName: event.ObjectReplicationNotTracked,
  1155. BucketName: bucket,
  1156. Object: objInfo,
  1157. UserAgent: "Internal: [Replication]",
  1158. Host: globalLocalNodeName,
  1159. })
  1160. replLogOnceIf(ctx, fmt.Errorf("unable to read source object %s/%s(%s): %w", bucket, object, objInfo.VersionID, err), object+":"+objInfo.VersionID)
  1161. }
  1162. return
  1163. }
  1164. defer gr.Close()
  1165. objInfo := gr.ObjInfo
  1166. // make sure we have the latest metadata for metrics calculation
  1167. rinfo.PrevReplicationStatus = objInfo.TargetReplicationStatus(tgt.ARN)
  1168. // Set the encrypted size for SSE-C objects
  1169. var size int64
  1170. if crypto.SSEC.IsEncrypted(objInfo.UserDefined) {
  1171. size = objInfo.Size
  1172. } else {
  1173. size, err = objInfo.GetActualSize()
  1174. if err != nil {
  1175. replLogIf(ctx, err)
  1176. sendEvent(eventArgs{
  1177. EventName: event.ObjectReplicationNotTracked,
  1178. BucketName: bucket,
  1179. Object: objInfo,
  1180. UserAgent: "Internal: [Replication]",
  1181. Host: globalLocalNodeName,
  1182. })
  1183. return
  1184. }
  1185. }
  1186. if tgt.Bucket == "" {
  1187. replLogIf(ctx, fmt.Errorf("unable to replicate object %s(%s), bucket is empty for target %s", objInfo.Name, objInfo.VersionID, tgt.EndpointURL()))
  1188. sendEvent(eventArgs{
  1189. EventName: event.ObjectReplicationNotTracked,
  1190. BucketName: bucket,
  1191. Object: objInfo,
  1192. UserAgent: "Internal: [Replication]",
  1193. Host: globalLocalNodeName,
  1194. })
  1195. return rinfo
  1196. }
  1197. defer func() {
  1198. if rinfo.ReplicationStatus == replication.Completed && ri.OpType == replication.ExistingObjectReplicationType && tgt.ResetID != "" {
  1199. rinfo.ResyncTimestamp = fmt.Sprintf("%s;%s", UTCNow().Format(http.TimeFormat), tgt.ResetID)
  1200. rinfo.ReplicationResynced = true
  1201. }
  1202. rinfo.Duration = time.Since(startTime)
  1203. }()
  1204. rinfo.ReplicationStatus = replication.Completed
  1205. rinfo.Size = size
  1206. rinfo.ReplicationAction = rAction
  1207. // use core client to avoid doing multipart on PUT
  1208. c := &minio.Core{Client: tgt.Client}
  1209. putOpts, isMP, err := putReplicationOpts(ctx, tgt.StorageClass, objInfo)
  1210. if err != nil {
  1211. replLogIf(ctx, fmt.Errorf("failure setting options for replication bucket:%s err:%w", bucket, err))
  1212. sendEvent(eventArgs{
  1213. EventName: event.ObjectReplicationNotTracked,
  1214. BucketName: bucket,
  1215. Object: objInfo,
  1216. UserAgent: "Internal: [Replication]",
  1217. Host: globalLocalNodeName,
  1218. })
  1219. return
  1220. }
  1221. var headerSize int
  1222. for k, v := range putOpts.Header() {
  1223. headerSize += len(k) + len(v)
  1224. }
  1225. opts := &bandwidth.MonitorReaderOptions{
  1226. BucketOptions: bandwidth.BucketOptions{
  1227. Name: ri.Bucket,
  1228. ReplicationARN: tgt.ARN,
  1229. },
  1230. HeaderSize: headerSize,
  1231. }
  1232. newCtx := ctx
  1233. if globalBucketMonitor.IsThrottled(bucket, tgt.ARN) && objInfo.Size < minLargeObjSize {
  1234. var cancel context.CancelFunc
  1235. newCtx, cancel = context.WithTimeout(ctx, throttleDeadline)
  1236. defer cancel()
  1237. }
  1238. r := bandwidth.NewMonitoredReader(newCtx, globalBucketMonitor, gr, opts)
  1239. if isMP {
  1240. rinfo.Err = replicateObjectWithMultipart(ctx, c, tgt.Bucket, object, r, objInfo, putOpts)
  1241. } else {
  1242. _, rinfo.Err = c.PutObject(ctx, tgt.Bucket, object, r, size, "", "", putOpts)
  1243. }
  1244. if rinfo.Err != nil {
  1245. if minio.ToErrorResponse(rinfo.Err).Code != "PreconditionFailed" {
  1246. rinfo.ReplicationStatus = replication.Failed
  1247. replLogIf(ctx, fmt.Errorf("unable to replicate for object %s/%s(%s): to (target: %s): %w",
  1248. bucket, objInfo.Name, objInfo.VersionID, tgt.EndpointURL(), rinfo.Err))
  1249. }
  1250. if minio.IsNetworkOrHostDown(rinfo.Err, true) && !globalBucketTargetSys.isOffline(tgt.EndpointURL()) {
  1251. globalBucketTargetSys.markOffline(tgt.EndpointURL())
  1252. }
  1253. }
  1254. return
  1255. }
  1256. // replicateAll replicates metadata for specified version of the object to destination bucket
  1257. // if the destination version is missing it automatically does fully copy as well.
  1258. // The source object is then updated to reflect the replication status.
  1259. func (ri ReplicateObjectInfo) replicateAll(ctx context.Context, objectAPI ObjectLayer, tgt *TargetClient) (rinfo replicatedTargetInfo) {
  1260. startTime := time.Now()
  1261. bucket := ri.Bucket
  1262. object := ri.Name
  1263. // set defaults for replication action based on operation being performed - actual
  1264. // replication action can only be determined after stat on remote. This default is
  1265. // needed for updating replication metrics correctly when target is offline.
  1266. rAction := replicateMetadata
  1267. rinfo = replicatedTargetInfo{
  1268. Size: ri.ActualSize,
  1269. Arn: tgt.ARN,
  1270. PrevReplicationStatus: ri.TargetReplicationStatus(tgt.ARN),
  1271. ReplicationStatus: replication.Failed,
  1272. OpType: ri.OpType,
  1273. ReplicationAction: rAction,
  1274. endpoint: tgt.EndpointURL().Host,
  1275. secure: tgt.EndpointURL().Scheme == "https",
  1276. }
  1277. if globalBucketTargetSys.isOffline(tgt.EndpointURL()) {
  1278. replLogOnceIf(ctx, fmt.Errorf("remote target is offline for bucket:%s arn:%s retry:%d", bucket, tgt.ARN, ri.RetryCount), "replication-target-offline-heal"+tgt.ARN)
  1279. sendEvent(eventArgs{
  1280. EventName: event.ObjectReplicationNotTracked,
  1281. BucketName: bucket,
  1282. Object: ri.ToObjectInfo(),
  1283. UserAgent: "Internal: [Replication]",
  1284. Host: globalLocalNodeName,
  1285. })
  1286. return
  1287. }
  1288. versioned := globalBucketVersioningSys.PrefixEnabled(bucket, object)
  1289. versionSuspended := globalBucketVersioningSys.PrefixSuspended(bucket, object)
  1290. gr, err := objectAPI.GetObjectNInfo(ctx, bucket, object, nil, http.Header{},
  1291. ObjectOptions{
  1292. VersionID: ri.VersionID,
  1293. Versioned: versioned,
  1294. VersionSuspended: versionSuspended,
  1295. ReplicationRequest: true,
  1296. })
  1297. if err != nil {
  1298. if !isErrVersionNotFound(err) && !isErrObjectNotFound(err) {
  1299. objInfo := ri.ToObjectInfo()
  1300. sendEvent(eventArgs{
  1301. EventName: event.ObjectReplicationNotTracked,
  1302. BucketName: bucket,
  1303. Object: objInfo,
  1304. UserAgent: "Internal: [Replication]",
  1305. Host: globalLocalNodeName,
  1306. })
  1307. replLogIf(ctx, fmt.Errorf("unable to replicate to target %s for %s/%s(%s): %w", tgt.EndpointURL(), bucket, object, objInfo.VersionID, err))
  1308. }
  1309. return
  1310. }
  1311. defer gr.Close()
  1312. objInfo := gr.ObjInfo
  1313. // make sure we have the latest metadata for metrics calculation
  1314. rinfo.PrevReplicationStatus = objInfo.TargetReplicationStatus(tgt.ARN)
  1315. // use latest ObjectInfo to check if previous replication attempt succeeded
  1316. if objInfo.TargetReplicationStatus(tgt.ARN) == replication.Completed && !ri.ExistingObjResync.Empty() && !ri.ExistingObjResync.mustResyncTarget(tgt.ARN) {
  1317. rinfo.ReplicationStatus = replication.Completed
  1318. rinfo.ReplicationResynced = true
  1319. return
  1320. }
  1321. size, err := objInfo.GetActualSize()
  1322. if err != nil {
  1323. replLogIf(ctx, err)
  1324. sendEvent(eventArgs{
  1325. EventName: event.ObjectReplicationNotTracked,
  1326. BucketName: bucket,
  1327. Object: objInfo,
  1328. UserAgent: "Internal: [Replication]",
  1329. Host: globalLocalNodeName,
  1330. })
  1331. return
  1332. }
  1333. // Set the encrypted size for SSE-C objects
  1334. isSSEC := crypto.SSEC.IsEncrypted(objInfo.UserDefined)
  1335. if isSSEC {
  1336. size = objInfo.Size
  1337. }
  1338. if tgt.Bucket == "" {
  1339. replLogIf(ctx, fmt.Errorf("unable to replicate object %s(%s) to %s, target bucket is missing", objInfo.Name, objInfo.VersionID, tgt.EndpointURL()))
  1340. sendEvent(eventArgs{
  1341. EventName: event.ObjectReplicationNotTracked,
  1342. BucketName: bucket,
  1343. Object: objInfo,
  1344. UserAgent: "Internal: [Replication]",
  1345. Host: globalLocalNodeName,
  1346. })
  1347. return rinfo
  1348. }
  1349. defer func() {
  1350. if rinfo.ReplicationStatus == replication.Completed && ri.OpType == replication.ExistingObjectReplicationType && tgt.ResetID != "" {
  1351. rinfo.ResyncTimestamp = fmt.Sprintf("%s;%s", UTCNow().Format(http.TimeFormat), tgt.ResetID)
  1352. rinfo.ReplicationResynced = true
  1353. }
  1354. rinfo.Duration = time.Since(startTime)
  1355. }()
  1356. sOpts := minio.StatObjectOptions{
  1357. VersionID: objInfo.VersionID,
  1358. Internal: minio.AdvancedGetOptions{
  1359. ReplicationProxyRequest: "false",
  1360. },
  1361. }
  1362. sOpts.Set(xhttp.AmzTagDirective, "ACCESS")
  1363. oi, cerr := tgt.StatObject(ctx, tgt.Bucket, object, sOpts)
  1364. if cerr == nil {
  1365. rAction = getReplicationAction(objInfo, oi, ri.OpType)
  1366. rinfo.ReplicationStatus = replication.Completed
  1367. if rAction == replicateNone {
  1368. if ri.OpType == replication.ExistingObjectReplicationType &&
  1369. objInfo.ModTime.Unix() > oi.LastModified.Unix() && objInfo.VersionID == nullVersionID {
  1370. replLogIf(ctx, fmt.Errorf("unable to replicate %s/%s (null). Newer version exists on target %s", bucket, object, tgt.EndpointURL()))
  1371. sendEvent(eventArgs{
  1372. EventName: event.ObjectReplicationNotTracked,
  1373. BucketName: bucket,
  1374. Object: objInfo,
  1375. UserAgent: "Internal: [Replication]",
  1376. Host: globalLocalNodeName,
  1377. })
  1378. }
  1379. // object with same VersionID already exists, replication kicked off by
  1380. // PutObject might have completed
  1381. if objInfo.TargetReplicationStatus(tgt.ARN) == replication.Pending ||
  1382. objInfo.TargetReplicationStatus(tgt.ARN) == replication.Failed ||
  1383. ri.OpType == replication.ExistingObjectReplicationType {
  1384. // if metadata is not updated for some reason after replication, such as
  1385. // 503 encountered while updating metadata - make sure to set ReplicationStatus
  1386. // as Completed.
  1387. //
  1388. // Note: Replication Stats would have been updated despite metadata update failure.
  1389. rinfo.ReplicationAction = rAction
  1390. rinfo.ReplicationStatus = replication.Completed
  1391. }
  1392. return
  1393. }
  1394. } else {
  1395. // SSEC objects will refuse HeadObject without the decryption key.
  1396. // Ignore the error, since we know the object exists and versioning prevents overwriting existing versions.
  1397. if isSSEC && strings.Contains(cerr.Error(), errorCodes[ErrSSEEncryptedObject].Description) {
  1398. rinfo.ReplicationStatus = replication.Completed
  1399. rinfo.ReplicationAction = replicateNone
  1400. goto applyAction
  1401. }
  1402. // if target returns error other than NoSuchKey, defer replication attempt
  1403. if minio.IsNetworkOrHostDown(cerr, true) && !globalBucketTargetSys.isOffline(tgt.EndpointURL()) {
  1404. globalBucketTargetSys.markOffline(tgt.EndpointURL())
  1405. }
  1406. serr := ErrorRespToObjectError(cerr, bucket, object, objInfo.VersionID)
  1407. switch {
  1408. case isErrMethodNotAllowed(serr):
  1409. rAction = replicateAll
  1410. case isErrObjectNotFound(serr), isErrVersionNotFound(serr):
  1411. rAction = replicateAll
  1412. case isErrReadQuorum(serr), isErrWriteQuorum(serr):
  1413. rAction = replicateAll
  1414. default:
  1415. rinfo.Err = cerr
  1416. replLogIf(ctx, fmt.Errorf("unable to replicate %s/%s (%s). Target (%s) returned %s error on HEAD",
  1417. bucket, object, objInfo.VersionID, tgt.EndpointURL(), cerr))
  1418. sendEvent(eventArgs{
  1419. EventName: event.ObjectReplicationNotTracked,
  1420. BucketName: bucket,
  1421. Object: objInfo,
  1422. UserAgent: "Internal: [Replication]",
  1423. Host: globalLocalNodeName,
  1424. })
  1425. return
  1426. }
  1427. }
  1428. applyAction:
  1429. rinfo.ReplicationStatus = replication.Completed
  1430. rinfo.Size = size
  1431. rinfo.ReplicationAction = rAction
  1432. // use core client to avoid doing multipart on PUT
  1433. c := &minio.Core{Client: tgt.Client}
  1434. if rAction != replicateAll {
  1435. // replicate metadata for object tagging/copy with metadata replacement
  1436. srcOpts := minio.CopySrcOptions{
  1437. Bucket: tgt.Bucket,
  1438. Object: object,
  1439. VersionID: objInfo.VersionID,
  1440. }
  1441. dstOpts := minio.PutObjectOptions{
  1442. Internal: minio.AdvancedPutOptions{
  1443. SourceVersionID: objInfo.VersionID,
  1444. ReplicationRequest: true, // always set this to distinguish between `mc mirror` replication and serverside
  1445. },
  1446. }
  1447. // default timestamps to ModTime unless present in metadata
  1448. lkMap := caseInsensitiveMap(objInfo.UserDefined)
  1449. if _, ok := lkMap.Lookup(xhttp.AmzObjectLockLegalHold); ok {
  1450. dstOpts.Internal.LegalholdTimestamp = objInfo.ModTime
  1451. }
  1452. if _, ok := lkMap.Lookup(xhttp.AmzObjectLockRetainUntilDate); ok {
  1453. dstOpts.Internal.RetentionTimestamp = objInfo.ModTime
  1454. }
  1455. if objInfo.UserTags != "" {
  1456. dstOpts.Internal.TaggingTimestamp = objInfo.ModTime
  1457. }
  1458. if tagTmStr, ok := lkMap.Lookup(ReservedMetadataPrefixLower + TaggingTimestamp); ok {
  1459. ondiskTimestamp, err := time.Parse(time.RFC3339, tagTmStr)
  1460. if err == nil {
  1461. dstOpts.Internal.TaggingTimestamp = ondiskTimestamp
  1462. }
  1463. }
  1464. if retTmStr, ok := lkMap.Lookup(ReservedMetadataPrefixLower + ObjectLockRetentionTimestamp); ok {
  1465. ondiskTimestamp, err := time.Parse(time.RFC3339, retTmStr)
  1466. if err == nil {
  1467. dstOpts.Internal.RetentionTimestamp = ondiskTimestamp
  1468. }
  1469. }
  1470. if lholdTmStr, ok := lkMap.Lookup(ReservedMetadataPrefixLower + ObjectLockLegalHoldTimestamp); ok {
  1471. ondiskTimestamp, err := time.Parse(time.RFC3339, lholdTmStr)
  1472. if err == nil {
  1473. dstOpts.Internal.LegalholdTimestamp = ondiskTimestamp
  1474. }
  1475. }
  1476. if _, rinfo.Err = c.CopyObject(ctx, tgt.Bucket, object, tgt.Bucket, object, getCopyObjMetadata(objInfo, tgt.StorageClass), srcOpts, dstOpts); rinfo.Err != nil {
  1477. rinfo.ReplicationStatus = replication.Failed
  1478. replLogIf(ctx, fmt.Errorf("unable to replicate metadata for object %s/%s(%s) to target %s: %w", bucket, objInfo.Name, objInfo.VersionID, tgt.EndpointURL(), rinfo.Err))
  1479. }
  1480. } else {
  1481. putOpts, isMP, err := putReplicationOpts(ctx, tgt.StorageClass, objInfo)
  1482. if err != nil {
  1483. replLogIf(ctx, fmt.Errorf("failed to set replicate options for object %s/%s(%s) (target %s) err:%w", bucket, objInfo.Name, objInfo.VersionID, tgt.EndpointURL(), err))
  1484. sendEvent(eventArgs{
  1485. EventName: event.ObjectReplicationNotTracked,
  1486. BucketName: bucket,
  1487. Object: objInfo,
  1488. UserAgent: "Internal: [Replication]",
  1489. Host: globalLocalNodeName,
  1490. })
  1491. return
  1492. }
  1493. var headerSize int
  1494. for k, v := range putOpts.Header() {
  1495. headerSize += len(k) + len(v)
  1496. }
  1497. opts := &bandwidth.MonitorReaderOptions{
  1498. BucketOptions: bandwidth.BucketOptions{
  1499. Name: objInfo.Bucket,
  1500. ReplicationARN: tgt.ARN,
  1501. },
  1502. HeaderSize: headerSize,
  1503. }
  1504. newCtx := ctx
  1505. if globalBucketMonitor.IsThrottled(bucket, tgt.ARN) && objInfo.Size < minLargeObjSize {
  1506. var cancel context.CancelFunc
  1507. newCtx, cancel = context.WithTimeout(ctx, throttleDeadline)
  1508. defer cancel()
  1509. }
  1510. r := bandwidth.NewMonitoredReader(newCtx, globalBucketMonitor, gr, opts)
  1511. if isMP {
  1512. rinfo.Err = replicateObjectWithMultipart(ctx, c, tgt.Bucket, object, r, objInfo, putOpts)
  1513. } else {
  1514. _, rinfo.Err = c.PutObject(ctx, tgt.Bucket, object, r, size, "", "", putOpts)
  1515. }
  1516. if rinfo.Err != nil {
  1517. if minio.ToErrorResponse(rinfo.Err).Code != "PreconditionFailed" {
  1518. rinfo.ReplicationStatus = replication.Failed
  1519. replLogIf(ctx, fmt.Errorf("unable to replicate for object %s/%s(%s) to target %s: %w",
  1520. bucket, objInfo.Name, objInfo.VersionID, tgt.EndpointURL(), rinfo.Err))
  1521. }
  1522. if minio.IsNetworkOrHostDown(rinfo.Err, true) && !globalBucketTargetSys.isOffline(tgt.EndpointURL()) {
  1523. globalBucketTargetSys.markOffline(tgt.EndpointURL())
  1524. }
  1525. }
  1526. }
  1527. return
  1528. }
  1529. func replicateObjectWithMultipart(ctx context.Context, c *minio.Core, bucket, object string, r io.Reader, objInfo ObjectInfo, opts minio.PutObjectOptions) (err error) {
  1530. var uploadedParts []minio.CompletePart
  1531. // new multipart must not set mtime as it may lead to erroneous cleanups at various intervals.
  1532. opts.Internal.SourceMTime = time.Time{} // this value is saved properly in CompleteMultipartUpload()
  1533. var uploadID string
  1534. attempts := 1
  1535. for attempts <= 3 {
  1536. nctx, cancel := context.WithTimeout(ctx, time.Minute)
  1537. uploadID, err = c.NewMultipartUpload(nctx, bucket, object, opts)
  1538. cancel()
  1539. if err == nil {
  1540. break
  1541. }
  1542. if minio.ToErrorResponse(err).Code == "PreconditionFailed" {
  1543. return nil
  1544. }
  1545. attempts++
  1546. time.Sleep(time.Duration(rand.Int63n(int64(time.Second))))
  1547. }
  1548. if err != nil {
  1549. return err
  1550. }
  1551. defer func() {
  1552. if err != nil {
  1553. // block and abort remote upload upon failure.
  1554. attempts := 1
  1555. for attempts <= 3 {
  1556. actx, acancel := context.WithTimeout(ctx, time.Minute)
  1557. aerr := c.AbortMultipartUpload(actx, bucket, object, uploadID)
  1558. acancel()
  1559. if aerr == nil {
  1560. return
  1561. }
  1562. attempts++
  1563. time.Sleep(time.Duration(rand.Int63n(int64(time.Second))))
  1564. }
  1565. }
  1566. }()
  1567. var (
  1568. hr *hash.Reader
  1569. isSSEC = crypto.SSEC.IsEncrypted(objInfo.UserDefined)
  1570. )
  1571. var objectSize int64
  1572. for _, partInfo := range objInfo.Parts {
  1573. if isSSEC {
  1574. hr, err = hash.NewReader(ctx, io.LimitReader(r, partInfo.Size), partInfo.Size, "", "", partInfo.ActualSize)
  1575. } else {
  1576. hr, err = hash.NewReader(ctx, io.LimitReader(r, partInfo.ActualSize), partInfo.ActualSize, "", "", partInfo.ActualSize)
  1577. }
  1578. if err != nil {
  1579. return err
  1580. }
  1581. cHeader := http.Header{}
  1582. cHeader.Add(xhttp.MinIOSourceReplicationRequest, "true")
  1583. if !isSSEC {
  1584. cs, _ := getCRCMeta(objInfo, partInfo.Number, nil)
  1585. for k, v := range cs {
  1586. cHeader.Add(k, v)
  1587. }
  1588. }
  1589. popts := minio.PutObjectPartOptions{
  1590. SSE: opts.ServerSideEncryption,
  1591. CustomHeader: cHeader,
  1592. }
  1593. var size int64
  1594. if isSSEC {
  1595. size = partInfo.Size
  1596. } else {
  1597. size = partInfo.ActualSize
  1598. }
  1599. objectSize += size
  1600. pInfo, err := c.PutObjectPart(ctx, bucket, object, uploadID, partInfo.Number, hr, size, popts)
  1601. if err != nil {
  1602. return err
  1603. }
  1604. if pInfo.Size != size {
  1605. return fmt.Errorf("ssec(%t): Part size mismatch: got %d, want %d", isSSEC, pInfo.Size, size)
  1606. }
  1607. uploadedParts = append(uploadedParts, minio.CompletePart{
  1608. PartNumber: pInfo.PartNumber,
  1609. ETag: pInfo.ETag,
  1610. ChecksumCRC32: pInfo.ChecksumCRC32,
  1611. ChecksumCRC32C: pInfo.ChecksumCRC32C,
  1612. ChecksumSHA1: pInfo.ChecksumSHA1,
  1613. ChecksumSHA256: pInfo.ChecksumSHA256,
  1614. ChecksumCRC64NVME: pInfo.ChecksumCRC64NVME,
  1615. })
  1616. }
  1617. userMeta := map[string]string{
  1618. xhttp.MinIOReplicationActualObjectSize: objInfo.UserDefined[ReservedMetadataPrefix+"actual-size"],
  1619. }
  1620. if isSSEC && objInfo.UserDefined[ReplicationSsecChecksumHeader] != "" {
  1621. userMeta[ReplicationSsecChecksumHeader] = objInfo.UserDefined[ReplicationSsecChecksumHeader]
  1622. }
  1623. // really big value but its okay on heavily loaded systems. This is just tail end timeout.
  1624. cctx, ccancel := context.WithTimeout(ctx, 10*time.Minute)
  1625. defer ccancel()
  1626. if len(objInfo.Checksum) > 0 {
  1627. cs, _ := getCRCMeta(objInfo, 0, nil)
  1628. for k, v := range cs {
  1629. userMeta[k] = strings.Split(v, "-")[0]
  1630. }
  1631. }
  1632. _, err = c.CompleteMultipartUpload(cctx, bucket, object, uploadID, uploadedParts, minio.PutObjectOptions{
  1633. UserMetadata: userMeta,
  1634. Internal: minio.AdvancedPutOptions{
  1635. SourceMTime: objInfo.ModTime,
  1636. SourceETag: objInfo.ETag,
  1637. // always set this to distinguish between `mc mirror` replication and serverside
  1638. ReplicationRequest: true,
  1639. },
  1640. })
  1641. return err
  1642. }
  1643. // filterReplicationStatusMetadata filters replication status metadata for COPY
  1644. func filterReplicationStatusMetadata(metadata map[string]string) map[string]string {
  1645. // Copy on write
  1646. dst := metadata
  1647. var copied bool
  1648. delKey := func(key string) {
  1649. if _, ok := metadata[key]; !ok {
  1650. return
  1651. }
  1652. if !copied {
  1653. dst = make(map[string]string, len(metadata))
  1654. for k, v := range metadata {
  1655. dst[k] = v
  1656. }
  1657. copied = true
  1658. }
  1659. delete(dst, key)
  1660. }
  1661. delKey(xhttp.AmzBucketReplicationStatus)
  1662. return dst
  1663. }
  1664. // DeletedObjectReplicationInfo has info on deleted object
  1665. type DeletedObjectReplicationInfo struct {
  1666. DeletedObject
  1667. Bucket string
  1668. EventType string
  1669. OpType replication.Type
  1670. ResetID string
  1671. TargetArn string
  1672. }
  1673. // ToMRFEntry returns the relevant info needed by MRF
  1674. func (di DeletedObjectReplicationInfo) ToMRFEntry() MRFReplicateEntry {
  1675. versionID := di.DeleteMarkerVersionID
  1676. if versionID == "" {
  1677. versionID = di.VersionID
  1678. }
  1679. return MRFReplicateEntry{
  1680. Bucket: di.Bucket,
  1681. Object: di.ObjectName,
  1682. versionID: versionID,
  1683. }
  1684. }
  1685. // Replication specific APIName
  1686. const (
  1687. ReplicateObjectAPI = "ReplicateObject"
  1688. ReplicateDeleteAPI = "ReplicateDelete"
  1689. )
  1690. const (
  1691. // ReplicateQueued - replication being queued trail
  1692. ReplicateQueued = "replicate:queue"
  1693. // ReplicateExisting - audit trail for existing objects replication
  1694. ReplicateExisting = "replicate:existing"
  1695. // ReplicateExistingDelete - audit trail for delete replication triggered for existing delete markers
  1696. ReplicateExistingDelete = "replicate:existing:delete"
  1697. // ReplicateMRF - audit trail for replication from Most Recent Failures (MRF) queue
  1698. ReplicateMRF = "replicate:mrf"
  1699. // ReplicateIncoming - audit trail of inline replication
  1700. ReplicateIncoming = "replicate:incoming"
  1701. // ReplicateIncomingDelete - audit trail of inline replication of deletes.
  1702. ReplicateIncomingDelete = "replicate:incoming:delete"
  1703. // ReplicateHeal - audit trail for healing of failed/pending replications
  1704. ReplicateHeal = "replicate:heal"
  1705. // ReplicateHealDelete - audit trail of healing of failed/pending delete replications.
  1706. ReplicateHealDelete = "replicate:heal:delete"
  1707. )
  1708. var (
  1709. globalReplicationPool = once.NewSingleton[ReplicationPool]()
  1710. globalReplicationStats atomic.Pointer[ReplicationStats]
  1711. )
  1712. // ReplicationPool describes replication pool
  1713. type ReplicationPool struct {
  1714. // atomic ops:
  1715. activeWorkers int32
  1716. activeLrgWorkers int32
  1717. activeMRFWorkers int32
  1718. objLayer ObjectLayer
  1719. ctx context.Context
  1720. priority string
  1721. maxWorkers int
  1722. maxLWorkers int
  1723. stats *ReplicationStats
  1724. mu sync.RWMutex
  1725. mrfMU sync.Mutex
  1726. resyncer *replicationResyncer
  1727. // workers:
  1728. workers []chan ReplicationWorkerOperation
  1729. lrgworkers []chan ReplicationWorkerOperation
  1730. // mrf:
  1731. mrfWorkerKillCh chan struct{}
  1732. mrfReplicaCh chan ReplicationWorkerOperation
  1733. mrfSaveCh chan MRFReplicateEntry
  1734. mrfStopCh chan struct{}
  1735. mrfWorkerSize int
  1736. }
  1737. // ReplicationWorkerOperation is a shared interface of replication operations.
  1738. type ReplicationWorkerOperation interface {
  1739. ToMRFEntry() MRFReplicateEntry
  1740. }
  1741. const (
  1742. // WorkerMaxLimit max number of workers per node for "fast" mode
  1743. WorkerMaxLimit = 500
  1744. // WorkerMinLimit min number of workers per node for "slow" mode
  1745. WorkerMinLimit = 50
  1746. // WorkerAutoDefault is default number of workers for "auto" mode
  1747. WorkerAutoDefault = 100
  1748. // MRFWorkerMaxLimit max number of mrf workers per node for "fast" mode
  1749. MRFWorkerMaxLimit = 8
  1750. // MRFWorkerMinLimit min number of mrf workers per node for "slow" mode
  1751. MRFWorkerMinLimit = 2
  1752. // MRFWorkerAutoDefault is default number of mrf workers for "auto" mode
  1753. MRFWorkerAutoDefault = 4
  1754. // LargeWorkerCount is default number of workers assigned to large uploads ( >= 128MiB)
  1755. LargeWorkerCount = 10
  1756. )
  1757. // NewReplicationPool creates a pool of replication workers of specified size
  1758. func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPoolOpts, stats *ReplicationStats) *ReplicationPool {
  1759. var workers, failedWorkers int
  1760. priority := "auto"
  1761. maxWorkers := WorkerMaxLimit
  1762. if opts.Priority != "" {
  1763. priority = opts.Priority
  1764. }
  1765. if opts.MaxWorkers > 0 {
  1766. maxWorkers = opts.MaxWorkers
  1767. }
  1768. switch priority {
  1769. case "fast":
  1770. workers = WorkerMaxLimit
  1771. failedWorkers = MRFWorkerMaxLimit
  1772. case "slow":
  1773. workers = WorkerMinLimit
  1774. failedWorkers = MRFWorkerMinLimit
  1775. default:
  1776. workers = WorkerAutoDefault
  1777. failedWorkers = MRFWorkerAutoDefault
  1778. }
  1779. if maxWorkers > 0 && workers > maxWorkers {
  1780. workers = maxWorkers
  1781. }
  1782. if maxWorkers > 0 && failedWorkers > maxWorkers {
  1783. failedWorkers = maxWorkers
  1784. }
  1785. maxLWorkers := LargeWorkerCount
  1786. if opts.MaxLWorkers > 0 {
  1787. maxLWorkers = opts.MaxLWorkers
  1788. }
  1789. pool := &ReplicationPool{
  1790. workers: make([]chan ReplicationWorkerOperation, 0, workers),
  1791. lrgworkers: make([]chan ReplicationWorkerOperation, 0, maxLWorkers),
  1792. mrfReplicaCh: make(chan ReplicationWorkerOperation, 100000),
  1793. mrfWorkerKillCh: make(chan struct{}, failedWorkers),
  1794. resyncer: newresyncer(),
  1795. mrfSaveCh: make(chan MRFReplicateEntry, 100000),
  1796. mrfStopCh: make(chan struct{}, 1),
  1797. ctx: ctx,
  1798. objLayer: o,
  1799. stats: stats,
  1800. priority: priority,
  1801. maxWorkers: maxWorkers,
  1802. maxLWorkers: maxLWorkers,
  1803. }
  1804. pool.ResizeLrgWorkers(maxLWorkers, 0)
  1805. pool.ResizeWorkers(workers, 0)
  1806. pool.ResizeFailedWorkers(failedWorkers)
  1807. go pool.resyncer.PersistToDisk(ctx, o)
  1808. go pool.processMRF()
  1809. go pool.persistMRF()
  1810. return pool
  1811. }
  1812. // AddMRFWorker adds a pending/failed replication worker to handle requests that could not be queued
  1813. // to the other workers
  1814. func (p *ReplicationPool) AddMRFWorker() {
  1815. for {
  1816. select {
  1817. case <-p.ctx.Done():
  1818. return
  1819. case oi, ok := <-p.mrfReplicaCh:
  1820. if !ok {
  1821. return
  1822. }
  1823. switch v := oi.(type) {
  1824. case ReplicateObjectInfo:
  1825. p.stats.incQ(v.Bucket, v.Size, v.DeleteMarker, v.OpType)
  1826. atomic.AddInt32(&p.activeMRFWorkers, 1)
  1827. replicateObject(p.ctx, v, p.objLayer)
  1828. atomic.AddInt32(&p.activeMRFWorkers, -1)
  1829. p.stats.decQ(v.Bucket, v.Size, v.DeleteMarker, v.OpType)
  1830. default:
  1831. bugLogIf(p.ctx, fmt.Errorf("unknown mrf replication type: %T", oi), "unknown-mrf-replicate-type")
  1832. }
  1833. case <-p.mrfWorkerKillCh:
  1834. return
  1835. }
  1836. }
  1837. }
  1838. // AddWorker adds a replication worker to the pool.
  1839. // An optional pointer to a tracker that will be atomically
  1840. // incremented when operations are running can be provided.
  1841. func (p *ReplicationPool) AddWorker(input <-chan ReplicationWorkerOperation, opTracker *int32) {
  1842. for {
  1843. select {
  1844. case <-p.ctx.Done():
  1845. return
  1846. case oi, ok := <-input:
  1847. if !ok {
  1848. return
  1849. }
  1850. switch v := oi.(type) {
  1851. case ReplicateObjectInfo:
  1852. if opTracker != nil {
  1853. atomic.AddInt32(opTracker, 1)
  1854. }
  1855. p.stats.incQ(v.Bucket, v.Size, v.DeleteMarker, v.OpType)
  1856. replicateObject(p.ctx, v, p.objLayer)
  1857. p.stats.decQ(v.Bucket, v.Size, v.DeleteMarker, v.OpType)
  1858. if opTracker != nil {
  1859. atomic.AddInt32(opTracker, -1)
  1860. }
  1861. case DeletedObjectReplicationInfo:
  1862. if opTracker != nil {
  1863. atomic.AddInt32(opTracker, 1)
  1864. }
  1865. p.stats.incQ(v.Bucket, 0, true, v.OpType)
  1866. replicateDelete(p.ctx, v, p.objLayer)
  1867. p.stats.decQ(v.Bucket, 0, true, v.OpType)
  1868. if opTracker != nil {
  1869. atomic.AddInt32(opTracker, -1)
  1870. }
  1871. default:
  1872. bugLogIf(p.ctx, fmt.Errorf("unknown replication type: %T", oi), "unknown-replicate-type")
  1873. }
  1874. }
  1875. }
  1876. }
  1877. // AddLargeWorker adds a replication worker to the static pool for large uploads.
  1878. func (p *ReplicationPool) AddLargeWorker(input <-chan ReplicationWorkerOperation, opTracker *int32) {
  1879. for {
  1880. select {
  1881. case <-p.ctx.Done():
  1882. return
  1883. case oi, ok := <-input:
  1884. if !ok {
  1885. return
  1886. }
  1887. switch v := oi.(type) {
  1888. case ReplicateObjectInfo:
  1889. if opTracker != nil {
  1890. atomic.AddInt32(opTracker, 1)
  1891. }
  1892. p.stats.incQ(v.Bucket, v.Size, v.DeleteMarker, v.OpType)
  1893. replicateObject(p.ctx, v, p.objLayer)
  1894. p.stats.decQ(v.Bucket, v.Size, v.DeleteMarker, v.OpType)
  1895. if opTracker != nil {
  1896. atomic.AddInt32(opTracker, -1)
  1897. }
  1898. case DeletedObjectReplicationInfo:
  1899. if opTracker != nil {
  1900. atomic.AddInt32(opTracker, 1)
  1901. }
  1902. replicateDelete(p.ctx, v, p.objLayer)
  1903. if opTracker != nil {
  1904. atomic.AddInt32(opTracker, -1)
  1905. }
  1906. default:
  1907. bugLogIf(p.ctx, fmt.Errorf("unknown replication type: %T", oi), "unknown-replicate-type")
  1908. }
  1909. }
  1910. }
  1911. }
  1912. // ResizeLrgWorkers sets replication workers pool for large transfers(>=128MiB) to new size.
  1913. // checkOld can be set to an expected value.
  1914. // If the worker count changed
  1915. func (p *ReplicationPool) ResizeLrgWorkers(n, checkOld int) {
  1916. p.mu.Lock()
  1917. defer p.mu.Unlock()
  1918. if (checkOld > 0 && len(p.lrgworkers) != checkOld) || n == len(p.lrgworkers) || n < 1 {
  1919. // Either already satisfied or worker count changed while we waited for the lock.
  1920. return
  1921. }
  1922. for len(p.lrgworkers) < n {
  1923. input := make(chan ReplicationWorkerOperation, 100000)
  1924. p.lrgworkers = append(p.lrgworkers, input)
  1925. go p.AddLargeWorker(input, &p.activeLrgWorkers)
  1926. }
  1927. for len(p.lrgworkers) > n {
  1928. worker := p.lrgworkers[len(p.lrgworkers)-1]
  1929. p.lrgworkers = p.lrgworkers[:len(p.lrgworkers)-1]
  1930. xioutil.SafeClose(worker)
  1931. }
  1932. }
  1933. // ActiveWorkers returns the number of active workers handling replication traffic.
  1934. func (p *ReplicationPool) ActiveWorkers() int {
  1935. return int(atomic.LoadInt32(&p.activeWorkers))
  1936. }
  1937. // ActiveMRFWorkers returns the number of active workers handling replication failures.
  1938. func (p *ReplicationPool) ActiveMRFWorkers() int {
  1939. return int(atomic.LoadInt32(&p.activeMRFWorkers))
  1940. }
  1941. // ActiveLrgWorkers returns the number of active workers handling traffic > 128MiB object size.
  1942. func (p *ReplicationPool) ActiveLrgWorkers() int {
  1943. return int(atomic.LoadInt32(&p.activeLrgWorkers))
  1944. }
  1945. // ResizeWorkers sets replication workers pool to new size.
  1946. // checkOld can be set to an expected value.
  1947. // If the worker count changed
  1948. func (p *ReplicationPool) ResizeWorkers(n, checkOld int) {
  1949. p.mu.Lock()
  1950. defer p.mu.Unlock()
  1951. if (checkOld > 0 && len(p.workers) != checkOld) || n == len(p.workers) || n < 1 {
  1952. // Either already satisfied or worker count changed while we waited for the lock.
  1953. return
  1954. }
  1955. for len(p.workers) < n {
  1956. input := make(chan ReplicationWorkerOperation, 10000)
  1957. p.workers = append(p.workers, input)
  1958. go p.AddWorker(input, &p.activeWorkers)
  1959. }
  1960. for len(p.workers) > n {
  1961. worker := p.workers[len(p.workers)-1]
  1962. p.workers = p.workers[:len(p.workers)-1]
  1963. xioutil.SafeClose(worker)
  1964. }
  1965. }
  1966. // ResizeWorkerPriority sets replication failed workers pool size
  1967. func (p *ReplicationPool) ResizeWorkerPriority(pri string, maxWorkers, maxLWorkers int) {
  1968. var workers, mrfWorkers int
  1969. p.mu.Lock()
  1970. switch pri {
  1971. case "fast":
  1972. workers = WorkerMaxLimit
  1973. mrfWorkers = MRFWorkerMaxLimit
  1974. case "slow":
  1975. workers = WorkerMinLimit
  1976. mrfWorkers = MRFWorkerMinLimit
  1977. default:
  1978. workers = WorkerAutoDefault
  1979. mrfWorkers = MRFWorkerAutoDefault
  1980. if len(p.workers) < WorkerAutoDefault {
  1981. workers = min(len(p.workers)+1, WorkerAutoDefault)
  1982. }
  1983. if p.mrfWorkerSize < MRFWorkerAutoDefault {
  1984. mrfWorkers = min(p.mrfWorkerSize+1, MRFWorkerAutoDefault)
  1985. }
  1986. }
  1987. if maxWorkers > 0 && workers > maxWorkers {
  1988. workers = maxWorkers
  1989. }
  1990. if maxWorkers > 0 && mrfWorkers > maxWorkers {
  1991. mrfWorkers = maxWorkers
  1992. }
  1993. if maxLWorkers <= 0 {
  1994. maxLWorkers = LargeWorkerCount
  1995. }
  1996. p.priority = pri
  1997. p.maxWorkers = maxWorkers
  1998. p.mu.Unlock()
  1999. p.ResizeWorkers(workers, 0)
  2000. p.ResizeFailedWorkers(mrfWorkers)
  2001. p.ResizeLrgWorkers(maxLWorkers, 0)
  2002. }
  2003. // ResizeFailedWorkers sets replication failed workers pool size
  2004. func (p *ReplicationPool) ResizeFailedWorkers(n int) {
  2005. p.mu.Lock()
  2006. defer p.mu.Unlock()
  2007. for p.mrfWorkerSize < n {
  2008. p.mrfWorkerSize++
  2009. go p.AddMRFWorker()
  2010. }
  2011. for p.mrfWorkerSize > n {
  2012. p.mrfWorkerSize--
  2013. go func() { p.mrfWorkerKillCh <- struct{}{} }()
  2014. }
  2015. }
  2016. const (
  2017. minLargeObjSize = 128 * humanize.MiByte // 128MiB
  2018. )
  2019. // getWorkerCh gets a worker channel deterministically based on bucket and object names.
  2020. // Must be able to grab read lock from p.
  2021. func (p *ReplicationPool) getWorkerCh(bucket, object string, sz int64) chan<- ReplicationWorkerOperation {
  2022. h := xxh3.HashString(bucket + object)
  2023. p.mu.RLock()
  2024. defer p.mu.RUnlock()
  2025. if len(p.workers) == 0 {
  2026. return nil
  2027. }
  2028. return p.workers[h%uint64(len(p.workers))]
  2029. }
  2030. func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) {
  2031. if p == nil {
  2032. return
  2033. }
  2034. // if object is large, queue it to a static set of large workers
  2035. if ri.Size >= int64(minLargeObjSize) {
  2036. h := xxh3.HashString(ri.Bucket + ri.Name)
  2037. select {
  2038. case <-p.ctx.Done():
  2039. case p.lrgworkers[h%uint64(len(p.lrgworkers))] <- ri:
  2040. default:
  2041. p.queueMRFSave(ri.ToMRFEntry())
  2042. p.mu.RLock()
  2043. maxLWorkers := p.maxLWorkers
  2044. existing := len(p.lrgworkers)
  2045. p.mu.RUnlock()
  2046. maxLWorkers = min(maxLWorkers, LargeWorkerCount)
  2047. if p.ActiveLrgWorkers() < maxLWorkers {
  2048. workers := min(existing+1, maxLWorkers)
  2049. p.ResizeLrgWorkers(workers, existing)
  2050. }
  2051. }
  2052. return
  2053. }
  2054. var ch, healCh chan<- ReplicationWorkerOperation
  2055. switch ri.OpType {
  2056. case replication.HealReplicationType, replication.ExistingObjectReplicationType:
  2057. ch = p.mrfReplicaCh
  2058. healCh = p.getWorkerCh(ri.Name, ri.Bucket, ri.Size)
  2059. default:
  2060. ch = p.getWorkerCh(ri.Name, ri.Bucket, ri.Size)
  2061. }
  2062. if ch == nil && healCh == nil {
  2063. return
  2064. }
  2065. select {
  2066. case <-p.ctx.Done():
  2067. case healCh <- ri:
  2068. case ch <- ri:
  2069. default:
  2070. globalReplicationPool.Get().queueMRFSave(ri.ToMRFEntry())
  2071. p.mu.RLock()
  2072. prio := p.priority
  2073. maxWorkers := p.maxWorkers
  2074. p.mu.RUnlock()
  2075. switch prio {
  2076. case "fast":
  2077. replLogOnceIf(GlobalContext, fmt.Errorf("Unable to keep up with incoming traffic"), string(replicationSubsystem), logger.WarningKind)
  2078. case "slow":
  2079. replLogOnceIf(GlobalContext, fmt.Errorf("Unable to keep up with incoming traffic - we recommend increasing replication priority with `mc admin config set api replication_priority=auto`"), string(replicationSubsystem), logger.WarningKind)
  2080. default:
  2081. maxWorkers = min(maxWorkers, WorkerMaxLimit)
  2082. if p.ActiveWorkers() < maxWorkers {
  2083. p.mu.RLock()
  2084. workers := min(len(p.workers)+1, maxWorkers)
  2085. existing := len(p.workers)
  2086. p.mu.RUnlock()
  2087. p.ResizeWorkers(workers, existing)
  2088. }
  2089. maxMRFWorkers := min(maxWorkers, MRFWorkerMaxLimit)
  2090. if p.ActiveMRFWorkers() < maxMRFWorkers {
  2091. p.mu.RLock()
  2092. workers := min(p.mrfWorkerSize+1, maxMRFWorkers)
  2093. p.mu.RUnlock()
  2094. p.ResizeFailedWorkers(workers)
  2095. }
  2096. }
  2097. }
  2098. }
  2099. func queueReplicateDeletesWrapper(doi DeletedObjectReplicationInfo, existingObjectResync ResyncDecision) {
  2100. for k, v := range existingObjectResync.targets {
  2101. if v.Replicate {
  2102. doi.ResetID = v.ResetID
  2103. doi.TargetArn = k
  2104. globalReplicationPool.Get().queueReplicaDeleteTask(doi)
  2105. }
  2106. }
  2107. }
  2108. func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectReplicationInfo) {
  2109. if p == nil {
  2110. return
  2111. }
  2112. var ch chan<- ReplicationWorkerOperation
  2113. switch doi.OpType {
  2114. case replication.HealReplicationType, replication.ExistingObjectReplicationType:
  2115. fallthrough
  2116. default:
  2117. ch = p.getWorkerCh(doi.Bucket, doi.ObjectName, 0)
  2118. }
  2119. select {
  2120. case <-p.ctx.Done():
  2121. case ch <- doi:
  2122. default:
  2123. p.queueMRFSave(doi.ToMRFEntry())
  2124. p.mu.RLock()
  2125. prio := p.priority
  2126. maxWorkers := p.maxWorkers
  2127. p.mu.RUnlock()
  2128. switch prio {
  2129. case "fast":
  2130. replLogOnceIf(GlobalContext, fmt.Errorf("Unable to keep up with incoming deletes"), string(replicationSubsystem), logger.WarningKind)
  2131. case "slow":
  2132. replLogOnceIf(GlobalContext, fmt.Errorf("Unable to keep up with incoming deletes - we recommend increasing replication priority with `mc admin config set api replication_priority=auto`"), string(replicationSubsystem), logger.WarningKind)
  2133. default:
  2134. maxWorkers = min(maxWorkers, WorkerMaxLimit)
  2135. if p.ActiveWorkers() < maxWorkers {
  2136. p.mu.RLock()
  2137. workers := min(len(p.workers)+1, maxWorkers)
  2138. existing := len(p.workers)
  2139. p.mu.RUnlock()
  2140. p.ResizeWorkers(workers, existing)
  2141. }
  2142. }
  2143. }
  2144. }
  2145. type replicationPoolOpts struct {
  2146. Priority string
  2147. MaxWorkers int
  2148. MaxLWorkers int
  2149. }
  2150. func initBackgroundReplication(ctx context.Context, objectAPI ObjectLayer) {
  2151. stats := NewReplicationStats(ctx, objectAPI)
  2152. globalReplicationPool.Set(NewReplicationPool(ctx, objectAPI, globalAPIConfig.getReplicationOpts(), stats))
  2153. globalReplicationStats.Store(stats)
  2154. go stats.trackEWMA()
  2155. }
  2156. type proxyResult struct {
  2157. Proxy bool
  2158. Err error
  2159. }
  2160. // get Reader from replication target if active-active replication is in place and
  2161. // this node returns a 404
  2162. func proxyGetToReplicationTarget(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions, proxyTargets *madmin.BucketTargets) (gr *GetObjectReader, proxy proxyResult, err error) {
  2163. tgt, oi, proxy := proxyHeadToRepTarget(ctx, bucket, object, rs, opts, proxyTargets)
  2164. if !proxy.Proxy {
  2165. return nil, proxy, nil
  2166. }
  2167. fn, _, _, err := NewGetObjectReader(nil, oi, opts, h)
  2168. if err != nil {
  2169. return nil, proxy, err
  2170. }
  2171. gopts := minio.GetObjectOptions{
  2172. VersionID: opts.VersionID,
  2173. ServerSideEncryption: opts.ServerSideEncryption,
  2174. Internal: minio.AdvancedGetOptions{
  2175. ReplicationProxyRequest: "true",
  2176. },
  2177. PartNumber: opts.PartNumber,
  2178. }
  2179. // get correct offsets for encrypted object
  2180. if rs != nil {
  2181. h, err := rs.ToHeader()
  2182. if err != nil {
  2183. return nil, proxy, err
  2184. }
  2185. gopts.Set(xhttp.Range, h)
  2186. }
  2187. // Make sure to match ETag when proxying.
  2188. if err = gopts.SetMatchETag(oi.ETag); err != nil {
  2189. return nil, proxy, err
  2190. }
  2191. c := minio.Core{Client: tgt.Client}
  2192. obj, _, h, err := c.GetObject(ctx, tgt.Bucket, object, gopts)
  2193. if err != nil {
  2194. return nil, proxy, err
  2195. }
  2196. closeReader := func() { obj.Close() }
  2197. reader, err := fn(obj, h, closeReader)
  2198. if err != nil {
  2199. return nil, proxy, err
  2200. }
  2201. reader.ObjInfo = oi.Clone()
  2202. if rs != nil {
  2203. contentSize, err := parseSizeFromContentRange(h)
  2204. if err != nil {
  2205. return nil, proxy, err
  2206. }
  2207. reader.ObjInfo.Size = contentSize
  2208. }
  2209. return reader, proxyResult{Proxy: true}, nil
  2210. }
  2211. func getProxyTargets(ctx context.Context, bucket, object string, opts ObjectOptions) (tgts *madmin.BucketTargets) {
  2212. if opts.VersionSuspended {
  2213. return &madmin.BucketTargets{}
  2214. }
  2215. if opts.ProxyRequest || (opts.ProxyHeaderSet && !opts.ProxyRequest) {
  2216. return &madmin.BucketTargets{}
  2217. }
  2218. cfg, err := getReplicationConfig(ctx, bucket)
  2219. if err != nil || cfg == nil {
  2220. replLogOnceIf(ctx, err, bucket)
  2221. return &madmin.BucketTargets{}
  2222. }
  2223. topts := replication.ObjectOpts{Name: object}
  2224. tgtArns := cfg.FilterTargetArns(topts)
  2225. tgts = &madmin.BucketTargets{Targets: make([]madmin.BucketTarget, len(tgtArns))}
  2226. for i, tgtArn := range tgtArns {
  2227. tgt := globalBucketTargetSys.GetRemoteBucketTargetByArn(ctx, bucket, tgtArn)
  2228. tgts.Targets[i] = tgt
  2229. }
  2230. return tgts
  2231. }
  2232. func proxyHeadToRepTarget(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, opts ObjectOptions, proxyTargets *madmin.BucketTargets) (tgt *TargetClient, oi ObjectInfo, proxy proxyResult) {
  2233. // this option is set when active-active replication is in place between site A -> B,
  2234. // and site B does not have the object yet.
  2235. if opts.ProxyRequest || (opts.ProxyHeaderSet && !opts.ProxyRequest) { // true only when site B sets MinIOSourceProxyRequest header
  2236. return nil, oi, proxy
  2237. }
  2238. var perr error
  2239. for _, t := range proxyTargets.Targets {
  2240. tgt = globalBucketTargetSys.GetRemoteTargetClient(bucket, t.Arn)
  2241. if tgt == nil || globalBucketTargetSys.isOffline(tgt.EndpointURL()) {
  2242. continue
  2243. }
  2244. // if proxying explicitly disabled on remote target
  2245. if tgt.disableProxy {
  2246. continue
  2247. }
  2248. gopts := minio.GetObjectOptions{
  2249. VersionID: opts.VersionID,
  2250. ServerSideEncryption: opts.ServerSideEncryption,
  2251. Internal: minio.AdvancedGetOptions{
  2252. ReplicationProxyRequest: "true",
  2253. },
  2254. PartNumber: opts.PartNumber,
  2255. }
  2256. if rs != nil {
  2257. h, err := rs.ToHeader()
  2258. if err != nil {
  2259. replLogIf(ctx, fmt.Errorf("invalid range header for %s/%s(%s) - %w", bucket, object, opts.VersionID, err))
  2260. continue
  2261. }
  2262. gopts.Set(xhttp.Range, h)
  2263. }
  2264. objInfo, err := tgt.StatObject(ctx, t.TargetBucket, object, gopts)
  2265. if err != nil {
  2266. perr = err
  2267. if isErrInvalidRange(ErrorRespToObjectError(err, bucket, object)) {
  2268. return nil, oi, proxyResult{Err: err}
  2269. }
  2270. continue
  2271. }
  2272. tags, _ := tags.MapToObjectTags(objInfo.UserTags)
  2273. oi = ObjectInfo{
  2274. Bucket: bucket,
  2275. Name: object,
  2276. ModTime: objInfo.LastModified,
  2277. Size: objInfo.Size,
  2278. ETag: objInfo.ETag,
  2279. VersionID: objInfo.VersionID,
  2280. IsLatest: objInfo.IsLatest,
  2281. DeleteMarker: objInfo.IsDeleteMarker,
  2282. ContentType: objInfo.ContentType,
  2283. Expires: objInfo.Expires,
  2284. StorageClass: objInfo.StorageClass,
  2285. ReplicationStatusInternal: objInfo.ReplicationStatus,
  2286. UserTags: tags.String(),
  2287. ReplicationStatus: replication.StatusType(objInfo.ReplicationStatus),
  2288. }
  2289. oi.UserDefined = make(map[string]string, len(objInfo.Metadata))
  2290. for k, v := range objInfo.Metadata {
  2291. oi.UserDefined[k] = v[0]
  2292. }
  2293. ce, ok := oi.UserDefined[xhttp.ContentEncoding]
  2294. if !ok {
  2295. ce, ok = oi.UserDefined[strings.ToLower(xhttp.ContentEncoding)]
  2296. }
  2297. if ok {
  2298. oi.ContentEncoding = ce
  2299. }
  2300. return tgt, oi, proxyResult{Proxy: true}
  2301. }
  2302. proxy.Err = perr
  2303. return nil, oi, proxy
  2304. }
  2305. // get object info from replication target if active-active replication is in place and
  2306. // this node returns a 404
  2307. func proxyHeadToReplicationTarget(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, opts ObjectOptions, proxyTargets *madmin.BucketTargets) (oi ObjectInfo, proxy proxyResult) {
  2308. _, oi, proxy = proxyHeadToRepTarget(ctx, bucket, object, rs, opts, proxyTargets)
  2309. return oi, proxy
  2310. }
  2311. func scheduleReplication(ctx context.Context, oi ObjectInfo, o ObjectLayer, dsc ReplicateDecision, opType replication.Type) {
  2312. tgtStatuses := replicationStatusesMap(oi.ReplicationStatusInternal)
  2313. purgeStatuses := versionPurgeStatusesMap(oi.VersionPurgeStatusInternal)
  2314. tm, _ := time.Parse(time.RFC3339Nano, oi.UserDefined[ReservedMetadataPrefixLower+ReplicationTimestamp])
  2315. rstate := oi.ReplicationState()
  2316. rstate.ReplicateDecisionStr = dsc.String()
  2317. asz, _ := oi.GetActualSize()
  2318. ri := ReplicateObjectInfo{
  2319. Name: oi.Name,
  2320. Size: oi.Size,
  2321. ActualSize: asz,
  2322. Bucket: oi.Bucket,
  2323. VersionID: oi.VersionID,
  2324. ETag: oi.ETag,
  2325. ModTime: oi.ModTime,
  2326. ReplicationStatus: oi.ReplicationStatus,
  2327. ReplicationStatusInternal: oi.ReplicationStatusInternal,
  2328. DeleteMarker: oi.DeleteMarker,
  2329. VersionPurgeStatusInternal: oi.VersionPurgeStatusInternal,
  2330. VersionPurgeStatus: oi.VersionPurgeStatus,
  2331. ReplicationState: rstate,
  2332. OpType: opType,
  2333. Dsc: dsc,
  2334. TargetStatuses: tgtStatuses,
  2335. TargetPurgeStatuses: purgeStatuses,
  2336. ReplicationTimestamp: tm,
  2337. SSEC: crypto.SSEC.IsEncrypted(oi.UserDefined),
  2338. UserTags: oi.UserTags,
  2339. }
  2340. if ri.SSEC {
  2341. ri.Checksum = oi.Checksum
  2342. }
  2343. if dsc.Synchronous() {
  2344. replicateObject(ctx, ri, o)
  2345. } else {
  2346. globalReplicationPool.Get().queueReplicaTask(ri)
  2347. }
  2348. }
  2349. // proxyTaggingToRepTarget proxies tagging requests to remote targets for
  2350. // active-active replicated setups
  2351. func proxyTaggingToRepTarget(ctx context.Context, bucket, object string, tags *tags.Tags, opts ObjectOptions, proxyTargets *madmin.BucketTargets) (proxy proxyResult) {
  2352. // this option is set when active-active replication is in place between site A -> B,
  2353. // and request hits site B that does not have the object yet.
  2354. if opts.ProxyRequest || (opts.ProxyHeaderSet && !opts.ProxyRequest) { // true only when site B sets MinIOSourceProxyRequest header
  2355. return proxy
  2356. }
  2357. var wg sync.WaitGroup
  2358. errs := make([]error, len(proxyTargets.Targets))
  2359. for idx, t := range proxyTargets.Targets {
  2360. tgt := globalBucketTargetSys.GetRemoteTargetClient(bucket, t.Arn)
  2361. if tgt == nil || globalBucketTargetSys.isOffline(tgt.EndpointURL()) {
  2362. continue
  2363. }
  2364. // if proxying explicitly disabled on remote target
  2365. if tgt.disableProxy {
  2366. continue
  2367. }
  2368. idx := idx
  2369. wg.Add(1)
  2370. go func(idx int, tgt *TargetClient) {
  2371. defer wg.Done()
  2372. var err error
  2373. if tags != nil {
  2374. popts := minio.PutObjectTaggingOptions{
  2375. VersionID: opts.VersionID,
  2376. Internal: minio.AdvancedObjectTaggingOptions{
  2377. ReplicationProxyRequest: "true",
  2378. },
  2379. }
  2380. err = tgt.PutObjectTagging(ctx, tgt.Bucket, object, tags, popts)
  2381. } else {
  2382. dopts := minio.RemoveObjectTaggingOptions{
  2383. VersionID: opts.VersionID,
  2384. Internal: minio.AdvancedObjectTaggingOptions{
  2385. ReplicationProxyRequest: "true",
  2386. },
  2387. }
  2388. err = tgt.RemoveObjectTagging(ctx, tgt.Bucket, object, dopts)
  2389. }
  2390. if err != nil {
  2391. errs[idx] = err
  2392. }
  2393. }(idx, tgt)
  2394. }
  2395. wg.Wait()
  2396. var (
  2397. terr error
  2398. taggedCount int
  2399. )
  2400. for _, err := range errs {
  2401. if err == nil {
  2402. taggedCount++
  2403. continue
  2404. }
  2405. if err != nil {
  2406. terr = err
  2407. }
  2408. }
  2409. // don't return error if at least one target was tagged successfully
  2410. if taggedCount == 0 && terr != nil {
  2411. proxy.Err = terr
  2412. }
  2413. return proxy
  2414. }
  2415. // proxyGetTaggingToRepTarget proxies get tagging requests to remote targets for
  2416. // active-active replicated setups
  2417. func proxyGetTaggingToRepTarget(ctx context.Context, bucket, object string, opts ObjectOptions, proxyTargets *madmin.BucketTargets) (tgs *tags.Tags, proxy proxyResult) {
  2418. // this option is set when active-active replication is in place between site A -> B,
  2419. // and request hits site B that does not have the object yet.
  2420. if opts.ProxyRequest || (opts.ProxyHeaderSet && !opts.ProxyRequest) { // true only when site B sets MinIOSourceProxyRequest header
  2421. return nil, proxy
  2422. }
  2423. var wg sync.WaitGroup
  2424. errs := make([]error, len(proxyTargets.Targets))
  2425. tagSlc := make([]map[string]string, len(proxyTargets.Targets))
  2426. for idx, t := range proxyTargets.Targets {
  2427. tgt := globalBucketTargetSys.GetRemoteTargetClient(bucket, t.Arn)
  2428. if tgt == nil || globalBucketTargetSys.isOffline(tgt.EndpointURL()) {
  2429. continue
  2430. }
  2431. // if proxying explicitly disabled on remote target
  2432. if tgt.disableProxy {
  2433. continue
  2434. }
  2435. idx := idx
  2436. wg.Add(1)
  2437. go func(idx int, tgt *TargetClient) {
  2438. defer wg.Done()
  2439. var err error
  2440. gopts := minio.GetObjectTaggingOptions{
  2441. VersionID: opts.VersionID,
  2442. Internal: minio.AdvancedObjectTaggingOptions{
  2443. ReplicationProxyRequest: "true",
  2444. },
  2445. }
  2446. tgs, err = tgt.GetObjectTagging(ctx, tgt.Bucket, object, gopts)
  2447. if err != nil {
  2448. errs[idx] = err
  2449. } else {
  2450. tagSlc[idx] = tgs.ToMap()
  2451. }
  2452. }(idx, tgt)
  2453. }
  2454. wg.Wait()
  2455. for idx, err := range errs {
  2456. errCode := minio.ToErrorResponse(err).Code
  2457. if err != nil && errCode != "NoSuchKey" && errCode != "NoSuchVersion" {
  2458. return nil, proxyResult{Err: err}
  2459. }
  2460. if err == nil {
  2461. tgs, _ = tags.MapToObjectTags(tagSlc[idx])
  2462. }
  2463. }
  2464. if len(errs) == 1 {
  2465. proxy.Err = errs[0]
  2466. }
  2467. return tgs, proxy
  2468. }
  2469. func scheduleReplicationDelete(ctx context.Context, dv DeletedObjectReplicationInfo, o ObjectLayer) {
  2470. globalReplicationPool.Get().queueReplicaDeleteTask(dv)
  2471. for arn := range dv.ReplicationState.Targets {
  2472. globalReplicationStats.Load().Update(dv.Bucket, replicatedTargetInfo{Arn: arn, Size: 0, Duration: 0, OpType: replication.DeleteReplicationType}, replication.Pending, replication.StatusType(""))
  2473. }
  2474. }
  2475. type replicationConfig struct {
  2476. Config *replication.Config
  2477. remotes *madmin.BucketTargets
  2478. }
  2479. func (c replicationConfig) Empty() bool {
  2480. return c.Config == nil
  2481. }
  2482. func (c replicationConfig) Replicate(opts replication.ObjectOpts) bool {
  2483. return c.Config.Replicate(opts)
  2484. }
  2485. // Resync returns true if replication reset is requested
  2486. func (c replicationConfig) Resync(ctx context.Context, oi ObjectInfo, dsc ReplicateDecision, tgtStatuses map[string]replication.StatusType) (r ResyncDecision) {
  2487. if c.Empty() {
  2488. return
  2489. }
  2490. // Now overlay existing object replication choices for target
  2491. if oi.DeleteMarker {
  2492. opts := replication.ObjectOpts{
  2493. Name: oi.Name,
  2494. DeleteMarker: oi.DeleteMarker,
  2495. VersionID: oi.VersionID,
  2496. OpType: replication.DeleteReplicationType,
  2497. ExistingObject: true,
  2498. }
  2499. tgtArns := c.Config.FilterTargetArns(opts)
  2500. // indicates no matching target with Existing object replication enabled.
  2501. if len(tgtArns) == 0 {
  2502. return
  2503. }
  2504. for _, t := range tgtArns {
  2505. opts.TargetArn = t
  2506. // Update replication decision for target based on existing object replciation rule.
  2507. dsc.Set(newReplicateTargetDecision(t, c.Replicate(opts), false))
  2508. }
  2509. return c.resync(oi, dsc, tgtStatuses)
  2510. }
  2511. // Ignore previous replication status when deciding if object can be re-replicated
  2512. userDefined := cloneMSS(oi.UserDefined)
  2513. delete(userDefined, xhttp.AmzBucketReplicationStatus)
  2514. rdsc := mustReplicate(ctx, oi.Bucket, oi.Name, getMustReplicateOptions(userDefined, oi.UserTags, "", replication.ExistingObjectReplicationType, ObjectOptions{}))
  2515. return c.resync(oi, rdsc, tgtStatuses)
  2516. }
  2517. // wrapper function for testability. Returns true if a new reset is requested on
  2518. // already replicated objects OR object qualifies for existing object replication
  2519. // and no reset requested.
  2520. func (c replicationConfig) resync(oi ObjectInfo, dsc ReplicateDecision, tgtStatuses map[string]replication.StatusType) (r ResyncDecision) {
  2521. r = ResyncDecision{
  2522. targets: make(map[string]ResyncTargetDecision, len(dsc.targetsMap)),
  2523. }
  2524. if c.remotes == nil {
  2525. return
  2526. }
  2527. for _, tgt := range c.remotes.Targets {
  2528. d, ok := dsc.targetsMap[tgt.Arn]
  2529. if !ok {
  2530. continue
  2531. }
  2532. if !d.Replicate {
  2533. continue
  2534. }
  2535. r.targets[d.Arn] = resyncTarget(oi, tgt.Arn, tgt.ResetID, tgt.ResetBeforeDate, tgtStatuses[tgt.Arn])
  2536. }
  2537. return
  2538. }
  2539. func targetResetHeader(arn string) string {
  2540. return fmt.Sprintf("%s-%s", ReservedMetadataPrefixLower+ReplicationReset, arn)
  2541. }
  2542. func resyncTarget(oi ObjectInfo, arn string, resetID string, resetBeforeDate time.Time, tgtStatus replication.StatusType) (rd ResyncTargetDecision) {
  2543. rd = ResyncTargetDecision{
  2544. ResetID: resetID,
  2545. ResetBeforeDate: resetBeforeDate,
  2546. }
  2547. rs, ok := oi.UserDefined[targetResetHeader(arn)]
  2548. if !ok {
  2549. rs, ok = oi.UserDefined[xhttp.MinIOReplicationResetStatus] // for backward compatibility
  2550. }
  2551. if !ok { // existing object replication is enabled and object version is unreplicated so far.
  2552. if resetID != "" && oi.ModTime.Before(resetBeforeDate) { // trigger replication if `mc replicate reset` requested
  2553. rd.Replicate = true
  2554. return
  2555. }
  2556. // For existing object reset - this condition is needed
  2557. rd.Replicate = tgtStatus == ""
  2558. return
  2559. }
  2560. if resetID == "" || resetBeforeDate.Equal(timeSentinel) { // no reset in progress
  2561. return
  2562. }
  2563. // if already replicated, return true if a new reset was requested.
  2564. splits := strings.SplitN(rs, ";", 2)
  2565. if len(splits) != 2 {
  2566. return
  2567. }
  2568. newReset := splits[1] != resetID
  2569. if !newReset && tgtStatus == replication.Completed {
  2570. // already replicated and no reset requested
  2571. return
  2572. }
  2573. rd.Replicate = newReset && oi.ModTime.Before(resetBeforeDate)
  2574. return
  2575. }
  2576. const resyncTimeInterval = time.Minute * 1
  2577. // PersistToDisk persists in-memory resync metadata stats to disk at periodic intervals
  2578. func (s *replicationResyncer) PersistToDisk(ctx context.Context, objectAPI ObjectLayer) {
  2579. resyncTimer := time.NewTimer(resyncTimeInterval)
  2580. defer resyncTimer.Stop()
  2581. // For each bucket name, store the last timestamp of the
  2582. // successful save of replication status in the backend disks.
  2583. lastResyncStatusSave := make(map[string]time.Time)
  2584. for {
  2585. select {
  2586. case <-resyncTimer.C:
  2587. s.RLock()
  2588. for bucket, brs := range s.statusMap {
  2589. var updt bool
  2590. // Save the replication status if one resync to any bucket target is still not finished
  2591. for _, st := range brs.TargetsMap {
  2592. if st.LastUpdate.Equal(timeSentinel) {
  2593. updt = true
  2594. break
  2595. }
  2596. }
  2597. // Save the replication status if a new stats update is found and not saved in the backend yet
  2598. if brs.LastUpdate.After(lastResyncStatusSave[bucket]) {
  2599. updt = true
  2600. }
  2601. if updt {
  2602. if err := saveResyncStatus(ctx, bucket, brs, objectAPI); err != nil {
  2603. replLogIf(ctx, fmt.Errorf("could not save resync metadata to drive for %s - %w", bucket, err))
  2604. } else {
  2605. lastResyncStatusSave[bucket] = brs.LastUpdate
  2606. }
  2607. }
  2608. }
  2609. s.RUnlock()
  2610. resyncTimer.Reset(resyncTimeInterval)
  2611. case <-ctx.Done():
  2612. // server could be restarting - need
  2613. // to exit immediately
  2614. return
  2615. }
  2616. }
  2617. }
  2618. const (
  2619. resyncWorkerCnt = 10 // limit of number of bucket resyncs is progress at any given time
  2620. resyncParallelRoutines = 10 // number of parallel resync ops per bucket
  2621. )
  2622. func newresyncer() *replicationResyncer {
  2623. rs := replicationResyncer{
  2624. statusMap: make(map[string]BucketReplicationResyncStatus),
  2625. workerSize: resyncWorkerCnt,
  2626. resyncCancelCh: make(chan struct{}, resyncWorkerCnt),
  2627. workerCh: make(chan struct{}, resyncWorkerCnt),
  2628. }
  2629. for i := 0; i < rs.workerSize; i++ {
  2630. rs.workerCh <- struct{}{}
  2631. }
  2632. return &rs
  2633. }
  2634. // mark status of replication resync on remote target for the bucket
  2635. func (s *replicationResyncer) markStatus(status ResyncStatusType, opts resyncOpts, objAPI ObjectLayer) {
  2636. s.Lock()
  2637. defer s.Unlock()
  2638. m := s.statusMap[opts.bucket]
  2639. st := m.TargetsMap[opts.arn]
  2640. st.LastUpdate = UTCNow()
  2641. st.ResyncStatus = status
  2642. m.TargetsMap[opts.arn] = st
  2643. m.LastUpdate = UTCNow()
  2644. s.statusMap[opts.bucket] = m
  2645. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  2646. defer cancel()
  2647. saveResyncStatus(ctx, opts.bucket, m, objAPI)
  2648. }
  2649. // update replication resync stats for bucket's remote target
  2650. func (s *replicationResyncer) incStats(ts TargetReplicationResyncStatus, opts resyncOpts) {
  2651. s.Lock()
  2652. defer s.Unlock()
  2653. m := s.statusMap[opts.bucket]
  2654. st := m.TargetsMap[opts.arn]
  2655. st.Object = ts.Object
  2656. st.ReplicatedCount += ts.ReplicatedCount
  2657. st.FailedCount += ts.FailedCount
  2658. st.ReplicatedSize += ts.ReplicatedSize
  2659. st.FailedSize += ts.FailedSize
  2660. m.TargetsMap[opts.arn] = st
  2661. m.LastUpdate = UTCNow()
  2662. s.statusMap[opts.bucket] = m
  2663. }
  2664. // resyncBucket resyncs all qualifying objects as per replication rules for the target
  2665. // ARN
  2666. func (s *replicationResyncer) resyncBucket(ctx context.Context, objectAPI ObjectLayer, heal bool, opts resyncOpts) {
  2667. select {
  2668. case <-s.workerCh: // block till a worker is available
  2669. case <-ctx.Done():
  2670. return
  2671. }
  2672. resyncStatus := ResyncFailed
  2673. defer func() {
  2674. s.markStatus(resyncStatus, opts, objectAPI)
  2675. globalSiteResyncMetrics.incBucket(opts, resyncStatus)
  2676. s.workerCh <- struct{}{}
  2677. }()
  2678. // Allocate new results channel to receive ObjectInfo.
  2679. objInfoCh := make(chan itemOrErr[ObjectInfo])
  2680. cfg, err := getReplicationConfig(ctx, opts.bucket)
  2681. if err != nil {
  2682. replLogIf(ctx, fmt.Errorf("replication resync of %s for arn %s failed with %w", opts.bucket, opts.arn, err))
  2683. return
  2684. }
  2685. tgts, err := globalBucketTargetSys.ListBucketTargets(ctx, opts.bucket)
  2686. if err != nil {
  2687. replLogIf(ctx, fmt.Errorf("replication resync of %s for arn %s failed %w", opts.bucket, opts.arn, err))
  2688. return
  2689. }
  2690. rcfg := replicationConfig{
  2691. Config: cfg,
  2692. remotes: tgts,
  2693. }
  2694. tgtArns := cfg.FilterTargetArns(
  2695. replication.ObjectOpts{
  2696. OpType: replication.ResyncReplicationType,
  2697. TargetArn: opts.arn,
  2698. })
  2699. if len(tgtArns) != 1 {
  2700. replLogIf(ctx, fmt.Errorf("replication resync failed for %s - arn specified %s is missing in the replication config", opts.bucket, opts.arn))
  2701. return
  2702. }
  2703. tgt := globalBucketTargetSys.GetRemoteTargetClient(opts.bucket, opts.arn)
  2704. if tgt == nil {
  2705. replLogIf(ctx, fmt.Errorf("replication resync failed for %s - target could not be created for arn %s", opts.bucket, opts.arn))
  2706. return
  2707. }
  2708. // mark resync status as resync started
  2709. if !heal {
  2710. s.markStatus(ResyncStarted, opts, objectAPI)
  2711. }
  2712. // Walk through all object versions - Walk() is always in ascending order needed to ensure
  2713. // delete marker replicated to target after object version is first created.
  2714. if err := objectAPI.Walk(ctx, opts.bucket, "", objInfoCh, WalkOptions{}); err != nil {
  2715. replLogIf(ctx, err)
  2716. return
  2717. }
  2718. s.RLock()
  2719. m := s.statusMap[opts.bucket]
  2720. st := m.TargetsMap[opts.arn]
  2721. s.RUnlock()
  2722. var lastCheckpoint string
  2723. if st.ResyncStatus == ResyncStarted || st.ResyncStatus == ResyncFailed {
  2724. lastCheckpoint = st.Object
  2725. }
  2726. workers := make([]chan ReplicateObjectInfo, resyncParallelRoutines)
  2727. resultCh := make(chan TargetReplicationResyncStatus, 1)
  2728. defer xioutil.SafeClose(resultCh)
  2729. go func() {
  2730. for r := range resultCh {
  2731. s.incStats(r, opts)
  2732. globalSiteResyncMetrics.updateMetric(r, opts.resyncID)
  2733. }
  2734. }()
  2735. var wg sync.WaitGroup
  2736. for i := 0; i < resyncParallelRoutines; i++ {
  2737. wg.Add(1)
  2738. workers[i] = make(chan ReplicateObjectInfo, 100)
  2739. i := i
  2740. go func(ctx context.Context, idx int) {
  2741. defer wg.Done()
  2742. for roi := range workers[idx] {
  2743. select {
  2744. case <-ctx.Done():
  2745. return
  2746. case <-s.resyncCancelCh:
  2747. default:
  2748. }
  2749. traceFn := s.trace(tgt.ResetID, fmt.Sprintf("%s/%s (%s)", opts.bucket, roi.Name, roi.VersionID))
  2750. if roi.DeleteMarker || !roi.VersionPurgeStatus.Empty() {
  2751. versionID := ""
  2752. dmVersionID := ""
  2753. if roi.VersionPurgeStatus.Empty() {
  2754. dmVersionID = roi.VersionID
  2755. } else {
  2756. versionID = roi.VersionID
  2757. }
  2758. doi := DeletedObjectReplicationInfo{
  2759. DeletedObject: DeletedObject{
  2760. ObjectName: roi.Name,
  2761. DeleteMarkerVersionID: dmVersionID,
  2762. VersionID: versionID,
  2763. ReplicationState: roi.ReplicationState,
  2764. DeleteMarkerMTime: DeleteMarkerMTime{roi.ModTime},
  2765. DeleteMarker: roi.DeleteMarker,
  2766. },
  2767. Bucket: roi.Bucket,
  2768. OpType: replication.ExistingObjectReplicationType,
  2769. EventType: ReplicateExistingDelete,
  2770. }
  2771. replicateDelete(ctx, doi, objectAPI)
  2772. } else {
  2773. roi.OpType = replication.ExistingObjectReplicationType
  2774. roi.EventType = ReplicateExisting
  2775. replicateObject(ctx, roi, objectAPI)
  2776. }
  2777. st := TargetReplicationResyncStatus{
  2778. Object: roi.Name,
  2779. Bucket: roi.Bucket,
  2780. }
  2781. _, err := tgt.StatObject(ctx, tgt.Bucket, roi.Name, minio.StatObjectOptions{
  2782. VersionID: roi.VersionID,
  2783. Internal: minio.AdvancedGetOptions{
  2784. ReplicationProxyRequest: "false",
  2785. },
  2786. })
  2787. sz := roi.Size
  2788. if err != nil {
  2789. if roi.DeleteMarker && isErrMethodNotAllowed(ErrorRespToObjectError(err, opts.bucket, roi.Name)) {
  2790. st.ReplicatedCount++
  2791. } else {
  2792. st.FailedCount++
  2793. }
  2794. sz = 0
  2795. } else {
  2796. st.ReplicatedCount++
  2797. st.ReplicatedSize += roi.Size
  2798. }
  2799. traceFn(sz, err)
  2800. select {
  2801. case <-ctx.Done():
  2802. return
  2803. case <-s.resyncCancelCh:
  2804. return
  2805. case resultCh <- st:
  2806. }
  2807. }
  2808. }(ctx, i)
  2809. }
  2810. for res := range objInfoCh {
  2811. if res.Err != nil {
  2812. resyncStatus = ResyncFailed
  2813. replLogIf(ctx, res.Err)
  2814. return
  2815. }
  2816. select {
  2817. case <-s.resyncCancelCh:
  2818. resyncStatus = ResyncCanceled
  2819. return
  2820. case <-ctx.Done():
  2821. return
  2822. default:
  2823. }
  2824. if heal && lastCheckpoint != "" && lastCheckpoint != res.Item.Name {
  2825. continue
  2826. }
  2827. lastCheckpoint = ""
  2828. roi := getHealReplicateObjectInfo(res.Item, rcfg)
  2829. if !roi.ExistingObjResync.mustResync() {
  2830. continue
  2831. }
  2832. select {
  2833. case <-s.resyncCancelCh:
  2834. return
  2835. case <-ctx.Done():
  2836. return
  2837. default:
  2838. h := xxh3.HashString(roi.Bucket + roi.Name)
  2839. workers[h%uint64(resyncParallelRoutines)] <- roi
  2840. }
  2841. }
  2842. for i := 0; i < resyncParallelRoutines; i++ {
  2843. xioutil.SafeClose(workers[i])
  2844. }
  2845. wg.Wait()
  2846. resyncStatus = ResyncCompleted
  2847. }
  2848. // start replication resync for the remote target ARN specified
  2849. func (s *replicationResyncer) start(ctx context.Context, objAPI ObjectLayer, opts resyncOpts) error {
  2850. if opts.bucket == "" {
  2851. return fmt.Errorf("bucket name is empty")
  2852. }
  2853. if opts.arn == "" {
  2854. return fmt.Errorf("target ARN specified for resync is empty")
  2855. }
  2856. // Check if the current bucket has quota restrictions, if not skip it
  2857. cfg, err := getReplicationConfig(ctx, opts.bucket)
  2858. if err != nil {
  2859. return err
  2860. }
  2861. tgtArns := cfg.FilterTargetArns(
  2862. replication.ObjectOpts{
  2863. OpType: replication.ResyncReplicationType,
  2864. TargetArn: opts.arn,
  2865. })
  2866. if len(tgtArns) == 0 {
  2867. return fmt.Errorf("arn %s specified for resync not found in replication config", opts.arn)
  2868. }
  2869. globalReplicationPool.Get().resyncer.RLock()
  2870. data, ok := globalReplicationPool.Get().resyncer.statusMap[opts.bucket]
  2871. globalReplicationPool.Get().resyncer.RUnlock()
  2872. if !ok {
  2873. data, err = loadBucketResyncMetadata(ctx, opts.bucket, objAPI)
  2874. if err != nil {
  2875. return err
  2876. }
  2877. }
  2878. // validate if resync is in progress for this arn
  2879. for tArn, st := range data.TargetsMap {
  2880. if opts.arn == tArn && (st.ResyncStatus == ResyncStarted || st.ResyncStatus == ResyncPending) {
  2881. return fmt.Errorf("Resync of bucket %s is already in progress for remote bucket %s", opts.bucket, opts.arn)
  2882. }
  2883. }
  2884. status := TargetReplicationResyncStatus{
  2885. ResyncID: opts.resyncID,
  2886. ResyncBeforeDate: opts.resyncBefore,
  2887. StartTime: UTCNow(),
  2888. ResyncStatus: ResyncPending,
  2889. Bucket: opts.bucket,
  2890. }
  2891. data.TargetsMap[opts.arn] = status
  2892. if err = saveResyncStatus(ctx, opts.bucket, data, objAPI); err != nil {
  2893. return err
  2894. }
  2895. globalReplicationPool.Get().resyncer.Lock()
  2896. defer globalReplicationPool.Get().resyncer.Unlock()
  2897. brs, ok := globalReplicationPool.Get().resyncer.statusMap[opts.bucket]
  2898. if !ok {
  2899. brs = BucketReplicationResyncStatus{
  2900. Version: resyncMetaVersion,
  2901. TargetsMap: make(map[string]TargetReplicationResyncStatus),
  2902. }
  2903. }
  2904. brs.TargetsMap[opts.arn] = status
  2905. globalReplicationPool.Get().resyncer.statusMap[opts.bucket] = brs
  2906. go globalReplicationPool.Get().resyncer.resyncBucket(GlobalContext, objAPI, false, opts)
  2907. return nil
  2908. }
  2909. func (s *replicationResyncer) trace(resyncID string, path string) func(sz int64, err error) {
  2910. startTime := time.Now()
  2911. return func(sz int64, err error) {
  2912. duration := time.Since(startTime)
  2913. if globalTrace.NumSubscribers(madmin.TraceReplicationResync) > 0 {
  2914. globalTrace.Publish(replicationResyncTrace(resyncID, startTime, duration, path, err, sz))
  2915. }
  2916. }
  2917. }
  2918. func replicationResyncTrace(resyncID string, startTime time.Time, duration time.Duration, path string, err error, sz int64) madmin.TraceInfo {
  2919. var errStr string
  2920. if err != nil {
  2921. errStr = err.Error()
  2922. }
  2923. funcName := fmt.Sprintf("replication.(resyncID=%s)", resyncID)
  2924. return madmin.TraceInfo{
  2925. TraceType: madmin.TraceReplicationResync,
  2926. Time: startTime,
  2927. NodeName: globalLocalNodeName,
  2928. FuncName: funcName,
  2929. Duration: duration,
  2930. Path: path,
  2931. Error: errStr,
  2932. Bytes: sz,
  2933. }
  2934. }
  2935. // delete resync metadata from replication resync state in memory
  2936. func (p *ReplicationPool) deleteResyncMetadata(ctx context.Context, bucket string) {
  2937. if p == nil {
  2938. return
  2939. }
  2940. p.resyncer.Lock()
  2941. delete(p.resyncer.statusMap, bucket)
  2942. defer p.resyncer.Unlock()
  2943. globalSiteResyncMetrics.deleteBucket(bucket)
  2944. }
  2945. // initResync - initializes bucket replication resync for all buckets.
  2946. func (p *ReplicationPool) initResync(ctx context.Context, buckets []string, objAPI ObjectLayer) error {
  2947. if objAPI == nil {
  2948. return errServerNotInitialized
  2949. }
  2950. // Load bucket metadata sys in background
  2951. go p.startResyncRoutine(ctx, buckets, objAPI)
  2952. return nil
  2953. }
  2954. func (p *ReplicationPool) startResyncRoutine(ctx context.Context, buckets []string, objAPI ObjectLayer) {
  2955. r := rand.New(rand.NewSource(time.Now().UnixNano()))
  2956. // Run the replication resync in a loop
  2957. for {
  2958. if err := p.loadResync(ctx, buckets, objAPI); err == nil {
  2959. <-ctx.Done()
  2960. return
  2961. }
  2962. duration := time.Duration(r.Float64() * float64(time.Minute))
  2963. if duration < time.Second {
  2964. // Make sure to sleep at least a second to avoid high CPU ticks.
  2965. duration = time.Second
  2966. }
  2967. time.Sleep(duration)
  2968. }
  2969. }
  2970. // Loads bucket replication resync statuses into memory.
  2971. func (p *ReplicationPool) loadResync(ctx context.Context, buckets []string, objAPI ObjectLayer) error {
  2972. // Make sure only one node running resync on the cluster.
  2973. ctx, cancel := globalLeaderLock.GetLock(ctx)
  2974. defer cancel()
  2975. for index := range buckets {
  2976. bucket := buckets[index]
  2977. meta, err := loadBucketResyncMetadata(ctx, bucket, objAPI)
  2978. if err != nil {
  2979. if !errors.Is(err, errVolumeNotFound) {
  2980. replLogIf(ctx, err)
  2981. }
  2982. continue
  2983. }
  2984. p.resyncer.Lock()
  2985. p.resyncer.statusMap[bucket] = meta
  2986. p.resyncer.Unlock()
  2987. tgts := meta.cloneTgtStats()
  2988. for arn, st := range tgts {
  2989. switch st.ResyncStatus {
  2990. case ResyncFailed, ResyncStarted, ResyncPending:
  2991. go p.resyncer.resyncBucket(ctx, objAPI, true, resyncOpts{
  2992. bucket: bucket,
  2993. arn: arn,
  2994. resyncID: st.ResyncID,
  2995. resyncBefore: st.ResyncBeforeDate,
  2996. })
  2997. }
  2998. }
  2999. }
  3000. return nil
  3001. }
  3002. // load bucket resync metadata from disk
  3003. func loadBucketResyncMetadata(ctx context.Context, bucket string, objAPI ObjectLayer) (brs BucketReplicationResyncStatus, e error) {
  3004. brs = newBucketResyncStatus(bucket)
  3005. resyncDirPath := path.Join(bucketMetaPrefix, bucket, replicationDir)
  3006. data, err := readConfig(GlobalContext, objAPI, pathJoin(resyncDirPath, resyncFileName))
  3007. if err != nil && err != errConfigNotFound {
  3008. return brs, err
  3009. }
  3010. if len(data) == 0 {
  3011. // Seems to be empty.
  3012. return brs, nil
  3013. }
  3014. if len(data) <= 4 {
  3015. return brs, fmt.Errorf("replication resync: no data")
  3016. }
  3017. // Read resync meta header
  3018. switch binary.LittleEndian.Uint16(data[0:2]) {
  3019. case resyncMetaFormat:
  3020. default:
  3021. return brs, fmt.Errorf("resyncMeta: unknown format: %d", binary.LittleEndian.Uint16(data[0:2]))
  3022. }
  3023. switch binary.LittleEndian.Uint16(data[2:4]) {
  3024. case resyncMetaVersion:
  3025. default:
  3026. return brs, fmt.Errorf("resyncMeta: unknown version: %d", binary.LittleEndian.Uint16(data[2:4]))
  3027. }
  3028. // OK, parse data.
  3029. if _, err = brs.UnmarshalMsg(data[4:]); err != nil {
  3030. return brs, err
  3031. }
  3032. switch brs.Version {
  3033. case resyncMetaVersionV1:
  3034. default:
  3035. return brs, fmt.Errorf("unexpected resync meta version: %d", brs.Version)
  3036. }
  3037. return brs, nil
  3038. }
  3039. // save resync status to resync.bin
  3040. func saveResyncStatus(ctx context.Context, bucket string, brs BucketReplicationResyncStatus, objectAPI ObjectLayer) error {
  3041. data := make([]byte, 4, brs.Msgsize()+4)
  3042. // Initialize the resync meta header.
  3043. binary.LittleEndian.PutUint16(data[0:2], resyncMetaFormat)
  3044. binary.LittleEndian.PutUint16(data[2:4], resyncMetaVersion)
  3045. buf, err := brs.MarshalMsg(data)
  3046. if err != nil {
  3047. return err
  3048. }
  3049. configFile := path.Join(bucketMetaPrefix, bucket, replicationDir, resyncFileName)
  3050. return saveConfig(ctx, objectAPI, configFile, buf)
  3051. }
  3052. // getReplicationDiff returns un-replicated objects in a channel.
  3053. // If a non-nil channel is returned it must be consumed fully or
  3054. // the provided context must be canceled.
  3055. func getReplicationDiff(ctx context.Context, objAPI ObjectLayer, bucket string, opts madmin.ReplDiffOpts) (chan madmin.DiffInfo, error) {
  3056. cfg, err := getReplicationConfig(ctx, bucket)
  3057. if err != nil {
  3058. replLogOnceIf(ctx, err, bucket)
  3059. return nil, err
  3060. }
  3061. tgts, err := globalBucketTargetSys.ListBucketTargets(ctx, bucket)
  3062. if err != nil {
  3063. replLogIf(ctx, err)
  3064. return nil, err
  3065. }
  3066. objInfoCh := make(chan itemOrErr[ObjectInfo], 10)
  3067. if err := objAPI.Walk(ctx, bucket, opts.Prefix, objInfoCh, WalkOptions{}); err != nil {
  3068. replLogIf(ctx, err)
  3069. return nil, err
  3070. }
  3071. rcfg := replicationConfig{
  3072. Config: cfg,
  3073. remotes: tgts,
  3074. }
  3075. diffCh := make(chan madmin.DiffInfo, 4000)
  3076. go func() {
  3077. defer xioutil.SafeClose(diffCh)
  3078. for res := range objInfoCh {
  3079. if res.Err != nil {
  3080. diffCh <- madmin.DiffInfo{Err: res.Err}
  3081. return
  3082. }
  3083. if contextCanceled(ctx) {
  3084. // Just consume input...
  3085. continue
  3086. }
  3087. obj := res.Item
  3088. // Ignore object prefixes which are excluded
  3089. // from versioning via the MinIO bucket versioning extension.
  3090. if globalBucketVersioningSys.PrefixSuspended(bucket, obj.Name) {
  3091. continue
  3092. }
  3093. roi := getHealReplicateObjectInfo(obj, rcfg)
  3094. switch roi.ReplicationStatus {
  3095. case replication.Completed, replication.Replica:
  3096. if !opts.Verbose {
  3097. continue
  3098. }
  3099. fallthrough
  3100. default:
  3101. // ignore pre-existing objects that don't satisfy replication rule(s)
  3102. if roi.ReplicationStatus.Empty() && !roi.ExistingObjResync.mustResync() {
  3103. continue
  3104. }
  3105. tgtsMap := make(map[string]madmin.TgtDiffInfo)
  3106. for arn, st := range roi.TargetStatuses {
  3107. if opts.ARN == "" || opts.ARN == arn {
  3108. if !opts.Verbose && (st == replication.Completed || st == replication.Replica) {
  3109. continue
  3110. }
  3111. tgtsMap[arn] = madmin.TgtDiffInfo{
  3112. ReplicationStatus: st.String(),
  3113. }
  3114. }
  3115. }
  3116. for arn, st := range roi.TargetPurgeStatuses {
  3117. if opts.ARN == "" || opts.ARN == arn {
  3118. if !opts.Verbose && st == Complete {
  3119. continue
  3120. }
  3121. t, ok := tgtsMap[arn]
  3122. if !ok {
  3123. t = madmin.TgtDiffInfo{}
  3124. }
  3125. t.DeleteReplicationStatus = string(st)
  3126. tgtsMap[arn] = t
  3127. }
  3128. }
  3129. select {
  3130. case diffCh <- madmin.DiffInfo{
  3131. Object: obj.Name,
  3132. VersionID: obj.VersionID,
  3133. LastModified: obj.ModTime,
  3134. IsDeleteMarker: obj.DeleteMarker,
  3135. ReplicationStatus: string(roi.ReplicationStatus),
  3136. DeleteReplicationStatus: string(roi.VersionPurgeStatus),
  3137. ReplicationTimestamp: roi.ReplicationTimestamp,
  3138. Targets: tgtsMap,
  3139. }:
  3140. case <-ctx.Done():
  3141. continue
  3142. }
  3143. }
  3144. }
  3145. }()
  3146. return diffCh, nil
  3147. }
  3148. // QueueReplicationHeal is a wrapper for queueReplicationHeal
  3149. func QueueReplicationHeal(ctx context.Context, bucket string, oi ObjectInfo, retryCount int) {
  3150. // ignore modtime zero objects
  3151. if oi.ModTime.IsZero() {
  3152. return
  3153. }
  3154. rcfg, err := getReplicationConfig(ctx, bucket)
  3155. if err != nil {
  3156. replLogOnceIf(ctx, err, bucket)
  3157. return
  3158. }
  3159. tgts, _ := globalBucketTargetSys.ListBucketTargets(ctx, bucket)
  3160. queueReplicationHeal(ctx, bucket, oi, replicationConfig{
  3161. Config: rcfg,
  3162. remotes: tgts,
  3163. }, retryCount)
  3164. }
  3165. // queueReplicationHeal enqueues objects that failed replication OR eligible for resyncing through
  3166. // an ongoing resync operation or via existing objects replication configuration setting.
  3167. func queueReplicationHeal(ctx context.Context, bucket string, oi ObjectInfo, rcfg replicationConfig, retryCount int) (roi ReplicateObjectInfo) {
  3168. // ignore modtime zero objects
  3169. if oi.ModTime.IsZero() {
  3170. return roi
  3171. }
  3172. if isVeeamSOSAPIObject(oi.Name) {
  3173. return roi
  3174. }
  3175. if rcfg.Config == nil || rcfg.remotes == nil {
  3176. return roi
  3177. }
  3178. roi = getHealReplicateObjectInfo(oi, rcfg)
  3179. roi.RetryCount = uint32(retryCount)
  3180. if !roi.Dsc.ReplicateAny() {
  3181. return
  3182. }
  3183. // early return if replication already done, otherwise we need to determine if this
  3184. // version is an existing object that needs healing.
  3185. if oi.ReplicationStatus == replication.Completed && oi.VersionPurgeStatus.Empty() && !roi.ExistingObjResync.mustResync() {
  3186. return
  3187. }
  3188. if roi.DeleteMarker || !roi.VersionPurgeStatus.Empty() {
  3189. versionID := ""
  3190. dmVersionID := ""
  3191. if roi.VersionPurgeStatus.Empty() {
  3192. dmVersionID = roi.VersionID
  3193. } else {
  3194. versionID = roi.VersionID
  3195. }
  3196. dv := DeletedObjectReplicationInfo{
  3197. DeletedObject: DeletedObject{
  3198. ObjectName: roi.Name,
  3199. DeleteMarkerVersionID: dmVersionID,
  3200. VersionID: versionID,
  3201. ReplicationState: roi.ReplicationState,
  3202. DeleteMarkerMTime: DeleteMarkerMTime{roi.ModTime},
  3203. DeleteMarker: roi.DeleteMarker,
  3204. },
  3205. Bucket: roi.Bucket,
  3206. OpType: replication.HealReplicationType,
  3207. EventType: ReplicateHealDelete,
  3208. }
  3209. // heal delete marker replication failure or versioned delete replication failure
  3210. if roi.ReplicationStatus == replication.Pending ||
  3211. roi.ReplicationStatus == replication.Failed ||
  3212. roi.VersionPurgeStatus == Failed || roi.VersionPurgeStatus == Pending {
  3213. globalReplicationPool.Get().queueReplicaDeleteTask(dv)
  3214. return
  3215. }
  3216. // if replication status is Complete on DeleteMarker and existing object resync required
  3217. if roi.ExistingObjResync.mustResync() && (roi.ReplicationStatus == replication.Completed || roi.ReplicationStatus.Empty()) {
  3218. queueReplicateDeletesWrapper(dv, roi.ExistingObjResync)
  3219. return
  3220. }
  3221. return
  3222. }
  3223. if roi.ExistingObjResync.mustResync() {
  3224. roi.OpType = replication.ExistingObjectReplicationType
  3225. }
  3226. switch roi.ReplicationStatus {
  3227. case replication.Pending, replication.Failed:
  3228. roi.EventType = ReplicateHeal
  3229. globalReplicationPool.Get().queueReplicaTask(roi)
  3230. return
  3231. }
  3232. if roi.ExistingObjResync.mustResync() {
  3233. roi.EventType = ReplicateExisting
  3234. globalReplicationPool.Get().queueReplicaTask(roi)
  3235. }
  3236. return
  3237. }
  3238. const (
  3239. mrfSaveInterval = 5 * time.Minute
  3240. mrfQueueInterval = mrfSaveInterval + time.Minute // A minute higher than save interval
  3241. mrfRetryLimit = 3 // max number of retries before letting scanner catch up on this object version
  3242. mrfMaxEntries = 1000000
  3243. )
  3244. func (p *ReplicationPool) persistMRF() {
  3245. if !p.initialized() {
  3246. return
  3247. }
  3248. entries := make(map[string]MRFReplicateEntry)
  3249. mTimer := time.NewTimer(mrfSaveInterval)
  3250. defer mTimer.Stop()
  3251. saveMRFToDisk := func() {
  3252. if len(entries) == 0 {
  3253. return
  3254. }
  3255. // queue all entries for healing before overwriting the node mrf file
  3256. if !contextCanceled(p.ctx) {
  3257. p.queueMRFHeal()
  3258. }
  3259. p.saveMRFEntries(p.ctx, entries)
  3260. entries = make(map[string]MRFReplicateEntry)
  3261. }
  3262. for {
  3263. select {
  3264. case <-mTimer.C:
  3265. saveMRFToDisk()
  3266. mTimer.Reset(mrfSaveInterval)
  3267. case <-p.ctx.Done():
  3268. p.mrfStopCh <- struct{}{}
  3269. xioutil.SafeClose(p.mrfSaveCh)
  3270. // We try to save if possible, but we don't care beyond that.
  3271. saveMRFToDisk()
  3272. return
  3273. case e, ok := <-p.mrfSaveCh:
  3274. if !ok {
  3275. return
  3276. }
  3277. entries[e.versionID] = e
  3278. if len(entries) >= mrfMaxEntries {
  3279. saveMRFToDisk()
  3280. }
  3281. }
  3282. }
  3283. }
  3284. func (p *ReplicationPool) queueMRFSave(entry MRFReplicateEntry) {
  3285. if !p.initialized() {
  3286. return
  3287. }
  3288. if entry.RetryCount > mrfRetryLimit { // let scanner catch up if retry count exceeded
  3289. atomic.AddUint64(&p.stats.mrfStats.TotalDroppedCount, 1)
  3290. atomic.AddUint64(&p.stats.mrfStats.TotalDroppedBytes, uint64(entry.sz))
  3291. return
  3292. }
  3293. select {
  3294. case <-GlobalContext.Done():
  3295. return
  3296. case <-p.mrfStopCh:
  3297. return
  3298. default:
  3299. select {
  3300. case p.mrfSaveCh <- entry:
  3301. default:
  3302. atomic.AddUint64(&p.stats.mrfStats.TotalDroppedCount, 1)
  3303. atomic.AddUint64(&p.stats.mrfStats.TotalDroppedBytes, uint64(entry.sz))
  3304. }
  3305. }
  3306. }
  3307. func (p *ReplicationPool) persistToDrive(ctx context.Context, v MRFReplicateEntries) {
  3308. newReader := func() io.ReadCloser {
  3309. r, w := io.Pipe()
  3310. go func() {
  3311. // Initialize MRF meta header.
  3312. var data [4]byte
  3313. binary.LittleEndian.PutUint16(data[0:2], mrfMetaFormat)
  3314. binary.LittleEndian.PutUint16(data[2:4], mrfMetaVersion)
  3315. mw := msgp.NewWriter(w)
  3316. n, err := mw.Write(data[:])
  3317. if err != nil {
  3318. w.CloseWithError(err)
  3319. return
  3320. }
  3321. if n != len(data) {
  3322. w.CloseWithError(io.ErrShortWrite)
  3323. return
  3324. }
  3325. err = v.EncodeMsg(mw)
  3326. mw.Flush()
  3327. w.CloseWithError(err)
  3328. }()
  3329. return r
  3330. }
  3331. globalLocalDrivesMu.RLock()
  3332. localDrives := cloneDrives(globalLocalDrivesMap)
  3333. globalLocalDrivesMu.RUnlock()
  3334. for _, localDrive := range localDrives {
  3335. r := newReader()
  3336. err := localDrive.CreateFile(ctx, "", minioMetaBucket, pathJoin(replicationMRFDir, globalLocalNodeNameHex+".bin"), -1, r)
  3337. r.Close()
  3338. if err == nil {
  3339. break
  3340. }
  3341. }
  3342. }
  3343. // save mrf entries to nodenamehex.bin
  3344. func (p *ReplicationPool) saveMRFEntries(ctx context.Context, entries map[string]MRFReplicateEntry) {
  3345. if !p.initialized() {
  3346. return
  3347. }
  3348. atomic.StoreUint64(&p.stats.mrfStats.LastFailedCount, uint64(len(entries)))
  3349. if len(entries) == 0 {
  3350. return
  3351. }
  3352. v := MRFReplicateEntries{
  3353. Entries: entries,
  3354. Version: mrfMetaVersion,
  3355. }
  3356. p.persistToDrive(ctx, v)
  3357. }
  3358. // load mrf entries from disk
  3359. func (p *ReplicationPool) loadMRF() (mrfRec MRFReplicateEntries, err error) {
  3360. loadMRF := func(rc io.ReadCloser) (re MRFReplicateEntries, err error) {
  3361. defer rc.Close()
  3362. if !p.initialized() {
  3363. return re, nil
  3364. }
  3365. var data [4]byte
  3366. n, err := rc.Read(data[:])
  3367. if err != nil {
  3368. return re, err
  3369. }
  3370. if n != len(data) {
  3371. return re, errors.New("replication mrf: no data")
  3372. }
  3373. // Read resync meta header
  3374. switch binary.LittleEndian.Uint16(data[0:2]) {
  3375. case mrfMetaFormat:
  3376. default:
  3377. return re, fmt.Errorf("replication mrf: unknown format: %d", binary.LittleEndian.Uint16(data[0:2]))
  3378. }
  3379. switch binary.LittleEndian.Uint16(data[2:4]) {
  3380. case mrfMetaVersion:
  3381. default:
  3382. return re, fmt.Errorf("replication mrf: unknown version: %d", binary.LittleEndian.Uint16(data[2:4]))
  3383. }
  3384. // OK, parse data.
  3385. // ignore any parsing errors, we do not care this file is generated again anyways.
  3386. re.DecodeMsg(msgp.NewReader(rc))
  3387. return re, nil
  3388. }
  3389. globalLocalDrivesMu.RLock()
  3390. localDrives := cloneDrives(globalLocalDrivesMap)
  3391. globalLocalDrivesMu.RUnlock()
  3392. for _, localDrive := range localDrives {
  3393. rc, err := localDrive.ReadFileStream(p.ctx, minioMetaBucket, pathJoin(replicationMRFDir, globalLocalNodeNameHex+".bin"), 0, -1)
  3394. if err != nil {
  3395. continue
  3396. }
  3397. mrfRec, err = loadMRF(rc)
  3398. if err != nil {
  3399. continue
  3400. }
  3401. // finally delete the file after processing mrf entries
  3402. localDrive.Delete(p.ctx, minioMetaBucket, pathJoin(replicationMRFDir, globalLocalNodeNameHex+".bin"), DeleteOptions{})
  3403. break
  3404. }
  3405. return mrfRec, nil
  3406. }
  3407. func (p *ReplicationPool) processMRF() {
  3408. if !p.initialized() {
  3409. return
  3410. }
  3411. pTimer := time.NewTimer(mrfQueueInterval)
  3412. defer pTimer.Stop()
  3413. for {
  3414. select {
  3415. case <-pTimer.C:
  3416. // skip healing if all targets are offline
  3417. var offlineCnt int
  3418. tgts := globalBucketTargetSys.ListTargets(p.ctx, "", "")
  3419. for _, tgt := range tgts {
  3420. if globalBucketTargetSys.isOffline(tgt.URL()) {
  3421. offlineCnt++
  3422. }
  3423. }
  3424. if len(tgts) == offlineCnt {
  3425. pTimer.Reset(mrfQueueInterval)
  3426. continue
  3427. }
  3428. if err := p.queueMRFHeal(); err != nil && !osIsNotExist(err) {
  3429. replLogIf(p.ctx, err)
  3430. }
  3431. pTimer.Reset(mrfQueueInterval)
  3432. case <-p.ctx.Done():
  3433. return
  3434. }
  3435. }
  3436. }
  3437. // process sends error logs to the heal channel for an attempt to heal replication.
  3438. func (p *ReplicationPool) queueMRFHeal() error {
  3439. p.mrfMU.Lock()
  3440. defer p.mrfMU.Unlock()
  3441. if !p.initialized() {
  3442. return errServerNotInitialized
  3443. }
  3444. mrfRec, err := p.loadMRF()
  3445. if err != nil {
  3446. return err
  3447. }
  3448. // queue replication heal in a goroutine to avoid holding up mrf save routine
  3449. go func() {
  3450. for vID, e := range mrfRec.Entries {
  3451. ctx, cancel := context.WithTimeout(p.ctx, time.Second) // Do not waste more than a second on this.
  3452. oi, err := p.objLayer.GetObjectInfo(ctx, e.Bucket, e.Object, ObjectOptions{
  3453. VersionID: vID,
  3454. })
  3455. cancel()
  3456. if err != nil {
  3457. continue
  3458. }
  3459. QueueReplicationHeal(p.ctx, e.Bucket, oi, e.RetryCount)
  3460. }
  3461. }()
  3462. return nil
  3463. }
  3464. func (p *ReplicationPool) initialized() bool {
  3465. return !(p == nil || p.objLayer == nil)
  3466. }
  3467. // getMRF returns MRF entries for this node.
  3468. func (p *ReplicationPool) getMRF(ctx context.Context, bucket string) (ch <-chan madmin.ReplicationMRF, err error) {
  3469. mrfRec, err := p.loadMRF()
  3470. if err != nil {
  3471. return nil, err
  3472. }
  3473. mrfCh := make(chan madmin.ReplicationMRF, 100)
  3474. go func() {
  3475. defer xioutil.SafeClose(mrfCh)
  3476. for vID, e := range mrfRec.Entries {
  3477. if bucket != "" && e.Bucket != bucket {
  3478. continue
  3479. }
  3480. select {
  3481. case mrfCh <- madmin.ReplicationMRF{
  3482. NodeName: globalLocalNodeName,
  3483. Object: e.Object,
  3484. VersionID: vID,
  3485. Bucket: e.Bucket,
  3486. RetryCount: e.RetryCount,
  3487. }:
  3488. case <-ctx.Done():
  3489. return
  3490. }
  3491. }
  3492. }()
  3493. return mrfCh, nil
  3494. }
  3495. // validateReplicationDestinationOptions is used to configure the validation of the replication destination.
  3496. // validateReplicationDestination uses this to configure the validation.
  3497. type validateReplicationDestinationOptions struct {
  3498. CheckRemoteBucket bool
  3499. CheckReady bool
  3500. checkReadyErr sync.Map
  3501. }
  3502. func getCRCMeta(oi ObjectInfo, partNum int, h http.Header) (cs map[string]string, isMP bool) {
  3503. meta := make(map[string]string)
  3504. cs, isMP = oi.decryptChecksums(partNum, h)
  3505. for k, v := range cs {
  3506. cksum := hash.NewChecksumString(k, v)
  3507. if cksum == nil {
  3508. continue
  3509. }
  3510. if cksum.Valid() {
  3511. meta[cksum.Type.Key()] = v
  3512. meta[xhttp.AmzChecksumType] = cs[xhttp.AmzChecksumType]
  3513. meta[xhttp.AmzChecksumAlgo] = cksum.Type.String()
  3514. }
  3515. }
  3516. return meta, isMP
  3517. }