You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

839 lines
23 KiB

  1. // Copyright (c) 2015-2023 MinIO, Inc.
  2. //
  3. // This file is part of MinIO Object Storage stack
  4. //
  5. // This program is free software: you can redistribute it and/or modify
  6. // it under the terms of the GNU Affero General Public License as published by
  7. // the Free Software Foundation, either version 3 of the License, or
  8. // (at your option) any later version.
  9. //
  10. // This program is distributed in the hope that it will be useful
  11. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. // GNU Affero General Public License for more details.
  14. //
  15. // You should have received a copy of the GNU Affero General Public License
  16. // along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. package cmd
  18. import (
  19. "bytes"
  20. "context"
  21. "encoding/json"
  22. "errors"
  23. "fmt"
  24. "io"
  25. "net/http"
  26. "runtime"
  27. "strconv"
  28. "time"
  29. "github.com/minio/minio-go/v7/pkg/tags"
  30. "github.com/minio/minio/internal/bucket/versioning"
  31. xhttp "github.com/minio/minio/internal/http"
  32. xioutil "github.com/minio/minio/internal/ioutil"
  33. "github.com/minio/pkg/v3/env"
  34. "github.com/minio/pkg/v3/wildcard"
  35. "github.com/minio/pkg/v3/workers"
  36. "github.com/minio/pkg/v3/xtime"
  37. "gopkg.in/yaml.v3"
  38. )
  39. // expire: # Expire objects that match a condition
  40. // apiVersion: v1
  41. // bucket: mybucket # Bucket where this batch job will expire matching objects from
  42. // prefix: myprefix # (Optional) Prefix under which this job will expire objects matching the rules below.
  43. // rules:
  44. // - type: object # regular objects with zero or more older versions
  45. // name: NAME # match object names that satisfy the wildcard expression.
  46. // olderThan: 70h # match objects older than this value
  47. // createdBefore: "2006-01-02T15:04:05.00Z" # match objects created before "date"
  48. // tags:
  49. // - key: name
  50. // value: pick* # match objects with tag 'name', all values starting with 'pick'
  51. // metadata:
  52. // - key: content-type
  53. // value: image/* # match objects with 'content-type', all values starting with 'image/'
  54. // size:
  55. // lessThan: "10MiB" # match objects with size less than this value (e.g. 10MiB)
  56. // greaterThan: 1MiB # match objects with size greater than this value (e.g. 1MiB)
  57. // purge:
  58. // # retainVersions: 0 # (default) delete all versions of the object. This option is the fastest.
  59. // # retainVersions: 5 # keep the latest 5 versions of the object.
  60. //
  61. // - type: deleted # objects with delete marker as their latest version
  62. // name: NAME # match object names that satisfy the wildcard expression.
  63. // olderThan: 10h # match objects older than this value (e.g. 7d10h31s)
  64. // createdBefore: "2006-01-02T15:04:05.00Z" # match objects created before "date"
  65. // purge:
  66. // # retainVersions: 0 # (default) delete all versions of the object. This option is the fastest.
  67. // # retainVersions: 5 # keep the latest 5 versions of the object including delete markers.
  68. //
  69. // notify:
  70. // endpoint: https://notify.endpoint # notification endpoint to receive job completion status
  71. // token: Bearer xxxxx # optional authentication token for the notification endpoint
  72. //
  73. // retry:
  74. // attempts: 10 # number of retries for the job before giving up
  75. // delay: 500ms # least amount of delay between each retry
  76. //go:generate msgp -file $GOFILE
  77. // BatchJobExpirePurge type accepts non-negative versions to be retained
  78. type BatchJobExpirePurge struct {
  79. line, col int
  80. RetainVersions int `yaml:"retainVersions" json:"retainVersions"`
  81. }
  82. var _ yaml.Unmarshaler = &BatchJobExpirePurge{}
  83. // UnmarshalYAML - BatchJobExpirePurge extends unmarshal to extract line, col
  84. func (p *BatchJobExpirePurge) UnmarshalYAML(val *yaml.Node) error {
  85. type purge BatchJobExpirePurge
  86. var tmp purge
  87. err := val.Decode(&tmp)
  88. if err != nil {
  89. return err
  90. }
  91. *p = BatchJobExpirePurge(tmp)
  92. p.line, p.col = val.Line, val.Column
  93. return nil
  94. }
  95. // Validate returns nil if value is valid, ie > 0.
  96. func (p BatchJobExpirePurge) Validate() error {
  97. if p.RetainVersions < 0 {
  98. return BatchJobYamlErr{
  99. line: p.line,
  100. col: p.col,
  101. msg: "retainVersions must be >= 0",
  102. }
  103. }
  104. return nil
  105. }
  106. // BatchJobExpireFilter holds all the filters currently supported for batch replication
  107. type BatchJobExpireFilter struct {
  108. line, col int
  109. OlderThan xtime.Duration `yaml:"olderThan,omitempty" json:"olderThan"`
  110. CreatedBefore *time.Time `yaml:"createdBefore,omitempty" json:"createdBefore"`
  111. Tags []BatchJobKV `yaml:"tags,omitempty" json:"tags"`
  112. Metadata []BatchJobKV `yaml:"metadata,omitempty" json:"metadata"`
  113. Size BatchJobSizeFilter `yaml:"size" json:"size"`
  114. Type string `yaml:"type" json:"type"`
  115. Name string `yaml:"name" json:"name"`
  116. Purge BatchJobExpirePurge `yaml:"purge" json:"purge"`
  117. }
  118. var _ yaml.Unmarshaler = &BatchJobExpireFilter{}
  119. // UnmarshalYAML - BatchJobExpireFilter extends unmarshal to extract line, col
  120. // information
  121. func (ef *BatchJobExpireFilter) UnmarshalYAML(value *yaml.Node) error {
  122. type expFilter BatchJobExpireFilter
  123. var tmp expFilter
  124. err := value.Decode(&tmp)
  125. if err != nil {
  126. return err
  127. }
  128. *ef = BatchJobExpireFilter(tmp)
  129. ef.line, ef.col = value.Line, value.Column
  130. return err
  131. }
  132. // Matches returns true if obj matches the filter conditions specified in ef.
  133. func (ef BatchJobExpireFilter) Matches(obj ObjectInfo, now time.Time) bool {
  134. switch ef.Type {
  135. case BatchJobExpireObject:
  136. if obj.DeleteMarker {
  137. return false
  138. }
  139. case BatchJobExpireDeleted:
  140. if !obj.DeleteMarker {
  141. return false
  142. }
  143. default:
  144. // we should never come here, Validate should have caught this.
  145. batchLogOnceIf(context.Background(), fmt.Errorf("invalid filter type: %s", ef.Type), ef.Type)
  146. return false
  147. }
  148. if len(ef.Name) > 0 && !wildcard.Match(ef.Name, obj.Name) {
  149. return false
  150. }
  151. if ef.OlderThan > 0 && now.Sub(obj.ModTime) <= ef.OlderThan.D() {
  152. return false
  153. }
  154. if ef.CreatedBefore != nil && !obj.ModTime.Before(*ef.CreatedBefore) {
  155. return false
  156. }
  157. if len(ef.Tags) > 0 && !obj.DeleteMarker {
  158. // Only parse object tags if tags filter is specified.
  159. var tagMap map[string]string
  160. if len(obj.UserTags) != 0 {
  161. t, err := tags.ParseObjectTags(obj.UserTags)
  162. if err != nil {
  163. return false
  164. }
  165. tagMap = t.ToMap()
  166. }
  167. for _, kv := range ef.Tags {
  168. // Object (version) must match all tags specified in
  169. // the filter
  170. var match bool
  171. for t, v := range tagMap {
  172. if kv.Match(BatchJobKV{Key: t, Value: v}) {
  173. match = true
  174. }
  175. }
  176. if !match {
  177. return false
  178. }
  179. }
  180. }
  181. if len(ef.Metadata) > 0 && !obj.DeleteMarker {
  182. for _, kv := range ef.Metadata {
  183. // Object (version) must match all x-amz-meta and
  184. // standard metadata headers
  185. // specified in the filter
  186. var match bool
  187. for k, v := range obj.UserDefined {
  188. if !stringsHasPrefixFold(k, "x-amz-meta-") && !isStandardHeader(k) {
  189. continue
  190. }
  191. // We only need to match x-amz-meta or standardHeaders
  192. if kv.Match(BatchJobKV{Key: k, Value: v}) {
  193. match = true
  194. }
  195. }
  196. if !match {
  197. return false
  198. }
  199. }
  200. }
  201. return ef.Size.InRange(obj.Size)
  202. }
  203. const (
  204. // BatchJobExpireObject - object type
  205. BatchJobExpireObject string = "object"
  206. // BatchJobExpireDeleted - delete marker type
  207. BatchJobExpireDeleted string = "deleted"
  208. )
  209. // Validate returns nil if ef has valid fields, validation error otherwise.
  210. func (ef BatchJobExpireFilter) Validate() error {
  211. switch ef.Type {
  212. case BatchJobExpireObject:
  213. case BatchJobExpireDeleted:
  214. if len(ef.Tags) > 0 || len(ef.Metadata) > 0 {
  215. return BatchJobYamlErr{
  216. line: ef.line,
  217. col: ef.col,
  218. msg: "delete type filter can't have tags or metadata",
  219. }
  220. }
  221. default:
  222. return BatchJobYamlErr{
  223. line: ef.line,
  224. col: ef.col,
  225. msg: "invalid batch-expire type",
  226. }
  227. }
  228. for _, tag := range ef.Tags {
  229. if err := tag.Validate(); err != nil {
  230. return err
  231. }
  232. }
  233. for _, meta := range ef.Metadata {
  234. if err := meta.Validate(); err != nil {
  235. return err
  236. }
  237. }
  238. if err := ef.Purge.Validate(); err != nil {
  239. return err
  240. }
  241. if err := ef.Size.Validate(); err != nil {
  242. return err
  243. }
  244. if ef.CreatedBefore != nil && !ef.CreatedBefore.Before(time.Now()) {
  245. return BatchJobYamlErr{
  246. line: ef.line,
  247. col: ef.col,
  248. msg: "CreatedBefore is in the future",
  249. }
  250. }
  251. return nil
  252. }
  253. // BatchJobExpire represents configuration parameters for a batch expiration
  254. // job typically supplied in yaml form
  255. type BatchJobExpire struct {
  256. line, col int
  257. APIVersion string `yaml:"apiVersion" json:"apiVersion"`
  258. Bucket string `yaml:"bucket" json:"bucket"`
  259. Prefix BatchJobPrefix `yaml:"prefix" json:"prefix"`
  260. NotificationCfg BatchJobNotification `yaml:"notify" json:"notify"`
  261. Retry BatchJobRetry `yaml:"retry" json:"retry"`
  262. Rules []BatchJobExpireFilter `yaml:"rules" json:"rules"`
  263. }
  264. var _ yaml.Unmarshaler = &BatchJobExpire{}
  265. // RedactSensitive will redact any sensitive information in b.
  266. func (r *BatchJobExpire) RedactSensitive() {
  267. if r == nil {
  268. return
  269. }
  270. if r.NotificationCfg.Token != "" {
  271. r.NotificationCfg.Token = redactedText
  272. }
  273. }
  274. // UnmarshalYAML - BatchJobExpire extends default unmarshal to extract line, col information.
  275. func (r *BatchJobExpire) UnmarshalYAML(val *yaml.Node) error {
  276. type expireJob BatchJobExpire
  277. var tmp expireJob
  278. err := val.Decode(&tmp)
  279. if err != nil {
  280. return err
  281. }
  282. *r = BatchJobExpire(tmp)
  283. r.line, r.col = val.Line, val.Column
  284. return nil
  285. }
  286. // Notify notifies notification endpoint if configured regarding job failure or success.
  287. func (r BatchJobExpire) Notify(ctx context.Context, body io.Reader) error {
  288. if r.NotificationCfg.Endpoint == "" {
  289. return nil
  290. }
  291. ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
  292. defer cancel()
  293. req, err := http.NewRequestWithContext(ctx, http.MethodPost, r.NotificationCfg.Endpoint, body)
  294. if err != nil {
  295. return err
  296. }
  297. if r.NotificationCfg.Token != "" {
  298. req.Header.Set("Authorization", r.NotificationCfg.Token)
  299. }
  300. clnt := http.Client{Transport: getRemoteInstanceTransport()}
  301. resp, err := clnt.Do(req)
  302. if err != nil {
  303. return err
  304. }
  305. xhttp.DrainBody(resp.Body)
  306. if resp.StatusCode != http.StatusOK {
  307. return errors.New(resp.Status)
  308. }
  309. return nil
  310. }
  311. // Expire expires object versions which have already matched supplied filter conditions
  312. func (r *BatchJobExpire) Expire(ctx context.Context, api ObjectLayer, vc *versioning.Versioning, objsToDel []ObjectToDelete) []error {
  313. opts := ObjectOptions{
  314. PrefixEnabledFn: vc.PrefixEnabled,
  315. VersionSuspended: vc.Suspended(),
  316. }
  317. allErrs := make([]error, 0, len(objsToDel))
  318. for {
  319. count := len(objsToDel)
  320. if count == 0 {
  321. break
  322. }
  323. if count > maxDeleteList {
  324. count = maxDeleteList
  325. }
  326. _, errs := api.DeleteObjects(ctx, r.Bucket, objsToDel[:count], opts)
  327. allErrs = append(allErrs, errs...)
  328. // Next batch of deletion
  329. objsToDel = objsToDel[count:]
  330. }
  331. return allErrs
  332. }
  333. const (
  334. batchExpireName = "batch-expire.bin"
  335. batchExpireFormat = 1
  336. batchExpireVersionV1 = 1
  337. batchExpireVersion = batchExpireVersionV1
  338. batchExpireAPIVersion = "v1"
  339. batchExpireJobDefaultRetries = 3
  340. batchExpireJobDefaultRetryDelay = 250 * time.Millisecond
  341. )
  342. type objInfoCache map[string]*ObjectInfo
  343. func newObjInfoCache() objInfoCache {
  344. return objInfoCache(make(map[string]*ObjectInfo))
  345. }
  346. func (oiCache objInfoCache) Add(toDel ObjectToDelete, oi *ObjectInfo) {
  347. oiCache[fmt.Sprintf("%s-%s", toDel.ObjectName, toDel.VersionID)] = oi
  348. }
  349. func (oiCache objInfoCache) Get(toDel ObjectToDelete) (*ObjectInfo, bool) {
  350. oi, ok := oiCache[fmt.Sprintf("%s-%s", toDel.ObjectName, toDel.VersionID)]
  351. return oi, ok
  352. }
  353. func batchObjsForDelete(ctx context.Context, r *BatchJobExpire, ri *batchJobInfo, job BatchJobRequest, api ObjectLayer, wk *workers.Workers, expireCh <-chan []expireObjInfo) {
  354. vc, _ := globalBucketVersioningSys.Get(r.Bucket)
  355. retryAttempts := job.Expire.Retry.Attempts
  356. if retryAttempts <= 0 {
  357. retryAttempts = batchExpireJobDefaultRetries
  358. }
  359. delay := job.Expire.Retry.Delay
  360. if delay <= 0 {
  361. delay = batchExpireJobDefaultRetryDelay
  362. }
  363. var i int
  364. for toExpire := range expireCh {
  365. select {
  366. case <-ctx.Done():
  367. return
  368. default:
  369. }
  370. if i > 0 {
  371. if wait := globalBatchConfig.ExpirationWait(); wait > 0 {
  372. time.Sleep(wait)
  373. }
  374. }
  375. i++
  376. wk.Take()
  377. go func(toExpire []expireObjInfo) {
  378. defer wk.Give()
  379. toExpireAll := make([]expireObjInfo, 0, len(toExpire))
  380. toDel := make([]ObjectToDelete, 0, len(toExpire))
  381. oiCache := newObjInfoCache()
  382. for _, exp := range toExpire {
  383. if exp.ExpireAll {
  384. toExpireAll = append(toExpireAll, exp)
  385. continue
  386. }
  387. // Cache ObjectInfo value via pointers for
  388. // subsequent use to track objects which
  389. // couldn't be deleted.
  390. od := ObjectToDelete{
  391. ObjectV: ObjectV{
  392. ObjectName: exp.Name,
  393. VersionID: exp.VersionID,
  394. },
  395. }
  396. toDel = append(toDel, od)
  397. oiCache.Add(od, &exp.ObjectInfo)
  398. }
  399. // DeleteObject(deletePrefix: true) to expire all versions of an object
  400. for _, exp := range toExpireAll {
  401. var success bool
  402. for attempts := 1; attempts <= retryAttempts; attempts++ {
  403. select {
  404. case <-ctx.Done():
  405. ri.trackMultipleObjectVersions(exp, success)
  406. return
  407. default:
  408. }
  409. stopFn := globalBatchJobsMetrics.trace(batchJobMetricExpire, ri.JobID, attempts)
  410. _, err := api.DeleteObject(ctx, exp.Bucket, encodeDirObject(exp.Name), ObjectOptions{
  411. DeletePrefix: true,
  412. DeletePrefixObject: true, // use prefix delete on exact object (this is an optimization to avoid fan-out calls)
  413. })
  414. if err != nil {
  415. stopFn(exp, err)
  416. batchLogIf(ctx, fmt.Errorf("Failed to expire %s/%s due to %v (attempts=%d)", exp.Bucket, exp.Name, err, attempts))
  417. } else {
  418. stopFn(exp, err)
  419. success = true
  420. break
  421. }
  422. }
  423. ri.trackMultipleObjectVersions(exp, success)
  424. }
  425. // DeleteMultiple objects
  426. toDelCopy := make([]ObjectToDelete, len(toDel))
  427. for attempts := 1; attempts <= retryAttempts; attempts++ {
  428. select {
  429. case <-ctx.Done():
  430. return
  431. default:
  432. }
  433. stopFn := globalBatchJobsMetrics.trace(batchJobMetricExpire, ri.JobID, attempts)
  434. // Copying toDel to select from objects whose
  435. // deletion failed
  436. copy(toDelCopy, toDel)
  437. var failed int
  438. errs := r.Expire(ctx, api, vc, toDel)
  439. // reslice toDel in preparation for next retry attempt
  440. toDel = toDel[:0]
  441. for i, err := range errs {
  442. if err != nil {
  443. stopFn(toDelCopy[i], err)
  444. batchLogIf(ctx, fmt.Errorf("Failed to expire %s/%s versionID=%s due to %v (attempts=%d)", ri.Bucket, toDelCopy[i].ObjectName, toDelCopy[i].VersionID,
  445. err, attempts))
  446. failed++
  447. if oi, ok := oiCache.Get(toDelCopy[i]); ok {
  448. ri.trackCurrentBucketObject(r.Bucket, *oi, false, attempts)
  449. }
  450. if attempts != retryAttempts {
  451. // retry
  452. toDel = append(toDel, toDelCopy[i])
  453. }
  454. } else {
  455. stopFn(toDelCopy[i], nil)
  456. if oi, ok := oiCache.Get(toDelCopy[i]); ok {
  457. ri.trackCurrentBucketObject(r.Bucket, *oi, true, attempts)
  458. }
  459. }
  460. }
  461. globalBatchJobsMetrics.save(ri.JobID, ri)
  462. if failed == 0 {
  463. break
  464. }
  465. // Add a delay between retry attempts
  466. if attempts < retryAttempts {
  467. time.Sleep(delay)
  468. }
  469. }
  470. }(toExpire)
  471. }
  472. }
  473. type expireObjInfo struct {
  474. ObjectInfo
  475. ExpireAll bool
  476. DeleteMarkerCount int64
  477. }
  478. // Start the batch expiration job, resumes if there was a pending job via "job.ID"
  479. func (r *BatchJobExpire) Start(ctx context.Context, api ObjectLayer, job BatchJobRequest) error {
  480. ri := &batchJobInfo{
  481. JobID: job.ID,
  482. JobType: string(job.Type()),
  483. StartTime: job.Started,
  484. }
  485. if err := ri.loadOrInit(ctx, api, job); err != nil {
  486. return err
  487. }
  488. globalBatchJobsMetrics.save(job.ID, ri)
  489. lastObject := ri.Object
  490. now := time.Now().UTC()
  491. workerSize, err := strconv.Atoi(env.Get("_MINIO_BATCH_EXPIRATION_WORKERS", strconv.Itoa(runtime.GOMAXPROCS(0)/2)))
  492. if err != nil {
  493. return err
  494. }
  495. wk, err := workers.New(workerSize)
  496. if err != nil {
  497. // invalid worker size.
  498. return err
  499. }
  500. ctx, cancelCause := context.WithCancelCause(ctx)
  501. defer cancelCause(nil)
  502. results := make(chan itemOrErr[ObjectInfo], workerSize)
  503. go func() {
  504. prefixes := r.Prefix.F()
  505. if len(prefixes) == 0 {
  506. prefixes = []string{""}
  507. }
  508. for _, prefix := range prefixes {
  509. prefixResultCh := make(chan itemOrErr[ObjectInfo], workerSize)
  510. err := api.Walk(ctx, r.Bucket, prefix, prefixResultCh, WalkOptions{
  511. Marker: lastObject,
  512. LatestOnly: false, // we need to visit all versions of the object to implement purge: retainVersions
  513. VersionsSort: WalkVersionsSortDesc,
  514. })
  515. if err != nil {
  516. cancelCause(err)
  517. xioutil.SafeClose(results)
  518. return
  519. }
  520. for result := range prefixResultCh {
  521. results <- result
  522. }
  523. }
  524. xioutil.SafeClose(results)
  525. }()
  526. // Goroutine to periodically save batch-expire job's in-memory state
  527. saverQuitCh := make(chan struct{})
  528. go func() {
  529. saveTicker := time.NewTicker(10 * time.Second)
  530. defer saveTicker.Stop()
  531. quit := false
  532. after := time.Minute
  533. for !quit {
  534. select {
  535. case <-saveTicker.C:
  536. case <-ctx.Done():
  537. quit = true
  538. case <-saverQuitCh:
  539. quit = true
  540. }
  541. if quit {
  542. // save immediately if we are quitting
  543. after = 0
  544. }
  545. ctx, cancel := context.WithTimeout(GlobalContext, 30*time.Second) // independent context
  546. batchLogIf(ctx, ri.updateAfter(ctx, api, after, job))
  547. cancel()
  548. }
  549. }()
  550. expireCh := make(chan []expireObjInfo, workerSize)
  551. expireDoneCh := make(chan struct{})
  552. go func() {
  553. defer close(expireDoneCh)
  554. batchObjsForDelete(ctx, r, ri, job, api, wk, expireCh)
  555. }()
  556. var (
  557. prevObj ObjectInfo
  558. matchedFilter BatchJobExpireFilter
  559. versionsCount int
  560. toDel []expireObjInfo
  561. failed bool
  562. done bool
  563. )
  564. deleteMarkerCountMap := map[string]int64{}
  565. pushToExpire := func() {
  566. // set preObject deleteMarkerCount
  567. if len(toDel) > 0 {
  568. lastDelIndex := len(toDel) - 1
  569. lastDel := toDel[lastDelIndex]
  570. if lastDel.ExpireAll {
  571. toDel[lastDelIndex].DeleteMarkerCount = deleteMarkerCountMap[lastDel.Name]
  572. // delete the key
  573. delete(deleteMarkerCountMap, lastDel.Name)
  574. }
  575. }
  576. // send down filtered entries to be deleted using
  577. // DeleteObjects method
  578. if len(toDel) > 10 { // batch up to 10 objects/versions to be expired simultaneously.
  579. xfer := make([]expireObjInfo, len(toDel))
  580. copy(xfer, toDel)
  581. select {
  582. case expireCh <- xfer:
  583. toDel = toDel[:0] // resetting toDel
  584. case <-ctx.Done():
  585. done = true
  586. }
  587. }
  588. }
  589. for {
  590. select {
  591. case result, ok := <-results:
  592. if !ok {
  593. done = true
  594. break
  595. }
  596. if result.Err != nil {
  597. failed = true
  598. batchLogIf(ctx, result.Err)
  599. continue
  600. }
  601. if result.Item.DeleteMarker {
  602. deleteMarkerCountMap[result.Item.Name]++
  603. }
  604. // Apply filter to find the matching rule to apply expiry
  605. // actions accordingly.
  606. // nolint:gocritic
  607. if result.Item.IsLatest {
  608. var match BatchJobExpireFilter
  609. var found bool
  610. for _, rule := range r.Rules {
  611. if rule.Matches(result.Item, now) {
  612. match = rule
  613. found = true
  614. break
  615. }
  616. }
  617. if !found {
  618. continue
  619. }
  620. if prevObj.Name != result.Item.Name {
  621. // switch the object
  622. pushToExpire()
  623. }
  624. prevObj = result.Item
  625. matchedFilter = match
  626. versionsCount = 1
  627. // Include the latest version
  628. if matchedFilter.Purge.RetainVersions == 0 {
  629. toDel = append(toDel, expireObjInfo{
  630. ObjectInfo: result.Item,
  631. ExpireAll: true,
  632. })
  633. continue
  634. }
  635. } else if prevObj.Name == result.Item.Name {
  636. if matchedFilter.Purge.RetainVersions == 0 {
  637. continue // including latest version in toDel suffices, skipping other versions
  638. }
  639. versionsCount++
  640. } else {
  641. // switch the object
  642. pushToExpire()
  643. // a file switched with no LatestVersion, logging it
  644. batchLogIf(ctx, fmt.Errorf("skipping object %s, no latest version found", result.Item.Name))
  645. continue
  646. }
  647. if versionsCount <= matchedFilter.Purge.RetainVersions {
  648. continue // retain versions
  649. }
  650. toDel = append(toDel, expireObjInfo{
  651. ObjectInfo: result.Item,
  652. })
  653. pushToExpire()
  654. case <-ctx.Done():
  655. done = true
  656. }
  657. if done {
  658. break
  659. }
  660. }
  661. if context.Cause(ctx) != nil {
  662. xioutil.SafeClose(expireCh)
  663. return context.Cause(ctx)
  664. }
  665. pushToExpire()
  666. // Send any remaining objects downstream
  667. if len(toDel) > 0 {
  668. select {
  669. case <-ctx.Done():
  670. case expireCh <- toDel:
  671. }
  672. }
  673. xioutil.SafeClose(expireCh)
  674. <-expireDoneCh // waits for the expire goroutine to complete
  675. wk.Wait() // waits for all expire workers to retire
  676. ri.Complete = !failed && ri.ObjectsFailed == 0
  677. ri.Failed = failed || ri.ObjectsFailed > 0
  678. globalBatchJobsMetrics.save(job.ID, ri)
  679. // Close the saverQuitCh - this also triggers saving in-memory state
  680. // immediately one last time before we exit this method.
  681. xioutil.SafeClose(saverQuitCh)
  682. // Notify expire jobs final status to the configured endpoint
  683. buf, _ := json.Marshal(ri)
  684. if err := r.Notify(context.Background(), bytes.NewReader(buf)); err != nil {
  685. batchLogIf(context.Background(), fmt.Errorf("unable to notify %v", err))
  686. }
  687. return nil
  688. }
  689. //msgp:ignore batchExpireJobError
  690. type batchExpireJobError struct {
  691. Code string
  692. Description string
  693. HTTPStatusCode int
  694. }
  695. func (e batchExpireJobError) Error() string {
  696. return e.Description
  697. }
  698. // maxBatchRules maximum number of rules a batch-expiry job supports
  699. const maxBatchRules = 50
  700. // Validate validates the job definition input
  701. func (r *BatchJobExpire) Validate(ctx context.Context, job BatchJobRequest, o ObjectLayer) error {
  702. if r == nil {
  703. return nil
  704. }
  705. if r.APIVersion != batchExpireAPIVersion {
  706. return batchExpireJobError{
  707. Code: "InvalidArgument",
  708. Description: "Unsupported batch expire API version",
  709. HTTPStatusCode: http.StatusBadRequest,
  710. }
  711. }
  712. if r.Bucket == "" {
  713. return batchExpireJobError{
  714. Code: "InvalidArgument",
  715. Description: "Bucket argument missing",
  716. HTTPStatusCode: http.StatusBadRequest,
  717. }
  718. }
  719. if _, err := o.GetBucketInfo(ctx, r.Bucket, BucketOptions{}); err != nil {
  720. if isErrBucketNotFound(err) {
  721. return batchExpireJobError{
  722. Code: "NoSuchSourceBucket",
  723. Description: "The specified source bucket does not exist",
  724. HTTPStatusCode: http.StatusNotFound,
  725. }
  726. }
  727. return err
  728. }
  729. if len(r.Rules) > maxBatchRules {
  730. return batchExpireJobError{
  731. Code: "InvalidArgument",
  732. Description: "Too many rules. Batch expire job can't have more than 100 rules",
  733. HTTPStatusCode: http.StatusBadRequest,
  734. }
  735. }
  736. for _, rule := range r.Rules {
  737. if err := rule.Validate(); err != nil {
  738. return batchExpireJobError{
  739. Code: "InvalidArgument",
  740. Description: fmt.Sprintf("Invalid batch expire rule: %s", err),
  741. HTTPStatusCode: http.StatusBadRequest,
  742. }
  743. }
  744. }
  745. if err := r.Retry.Validate(); err != nil {
  746. return batchExpireJobError{
  747. Code: "InvalidArgument",
  748. Description: fmt.Sprintf("Invalid batch expire retry configuration: %s", err),
  749. HTTPStatusCode: http.StatusBadRequest,
  750. }
  751. }
  752. return nil
  753. }