You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1265 lines
36 KiB

crypto: add support for decrypting SSE-KMS metadata (#11415) This commit refactors the SSE implementation and add S3-compatible SSE-KMS context handling. SSE-KMS differs from SSE-S3 in two main aspects: 1. The client can request a particular key and specify a KMS context as part of the request. 2. The ETag of an SSE-KMS encrypted object is not the MD5 sum of the object content. This commit only focuses on the 1st aspect. A client can send an optional SSE context when using SSE-KMS. This context is remembered by the S3 server such that the client does not have to specify the context again (during multipart PUT / GET / HEAD ...). The crypto. context also includes the bucket/object name to prevent renaming objects at the backend. Now, AWS S3 behaves as following: - If the user does not provide a SSE-KMS context it does not store one - resp. does not include the SSE-KMS context header in the response (e.g. HEAD). - If the user specifies a SSE-KMS context without the bucket/object name then AWS stores the exact context the client provided but adds the bucket/object name internally. The response contains the KMS context without the bucket/object name. - If the user specifies a SSE-KMS context with the bucket/object name then AWS again stores the exact context provided by the client. The response contains the KMS context with the bucket/object name. This commit implements this behavior w.r.t. SSE-KMS. However, as of now, no such object can be created since the server rejects SSE-KMS encryption requests. This commit is one stepping stone for SSE-KMS support. Co-authored-by: Harshavardhana <harsha@minio.io>
5 years ago
crypto: add support for decrypting SSE-KMS metadata (#11415) This commit refactors the SSE implementation and add S3-compatible SSE-KMS context handling. SSE-KMS differs from SSE-S3 in two main aspects: 1. The client can request a particular key and specify a KMS context as part of the request. 2. The ETag of an SSE-KMS encrypted object is not the MD5 sum of the object content. This commit only focuses on the 1st aspect. A client can send an optional SSE context when using SSE-KMS. This context is remembered by the S3 server such that the client does not have to specify the context again (during multipart PUT / GET / HEAD ...). The crypto. context also includes the bucket/object name to prevent renaming objects at the backend. Now, AWS S3 behaves as following: - If the user does not provide a SSE-KMS context it does not store one - resp. does not include the SSE-KMS context header in the response (e.g. HEAD). - If the user specifies a SSE-KMS context without the bucket/object name then AWS stores the exact context the client provided but adds the bucket/object name internally. The response contains the KMS context without the bucket/object name. - If the user specifies a SSE-KMS context with the bucket/object name then AWS again stores the exact context provided by the client. The response contains the KMS context with the bucket/object name. This commit implements this behavior w.r.t. SSE-KMS. However, as of now, no such object can be created since the server rejects SSE-KMS encryption requests. This commit is one stepping stone for SSE-KMS support. Co-authored-by: Harshavardhana <harsha@minio.io>
5 years ago
  1. // Copyright (c) 2015-2021 MinIO, Inc.
  2. //
  3. // This file is part of MinIO Object Storage stack
  4. //
  5. // This program is free software: you can redistribute it and/or modify
  6. // it under the terms of the GNU Affero General Public License as published by
  7. // the Free Software Foundation, either version 3 of the License, or
  8. // (at your option) any later version.
  9. //
  10. // This program is distributed in the hope that it will be useful
  11. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. // GNU Affero General Public License for more details.
  14. //
  15. // You should have received a copy of the GNU Affero General Public License
  16. // along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. package cmd
  18. import (
  19. "bytes"
  20. "context"
  21. "encoding/hex"
  22. "errors"
  23. "fmt"
  24. "io"
  25. "math/rand"
  26. "net"
  27. "net/http"
  28. "path"
  29. "runtime"
  30. "strconv"
  31. "strings"
  32. "sync"
  33. "time"
  34. "unicode/utf8"
  35. "github.com/google/uuid"
  36. "github.com/klauspost/compress/s2"
  37. "github.com/klauspost/readahead"
  38. "github.com/minio/minio-go/v7/pkg/s3utils"
  39. "github.com/minio/minio/internal/config/compress"
  40. "github.com/minio/minio/internal/config/dns"
  41. "github.com/minio/minio/internal/config/storageclass"
  42. "github.com/minio/minio/internal/crypto"
  43. "github.com/minio/minio/internal/hash"
  44. xhttp "github.com/minio/minio/internal/http"
  45. "github.com/minio/minio/internal/ioutil"
  46. xioutil "github.com/minio/minio/internal/ioutil"
  47. "github.com/minio/minio/internal/logger"
  48. "github.com/minio/pkg/v3/trie"
  49. "github.com/minio/pkg/v3/wildcard"
  50. "github.com/valyala/bytebufferpool"
  51. "golang.org/x/exp/slices"
  52. )
  53. const (
  54. // MinIO meta bucket.
  55. minioMetaBucket = ".minio.sys"
  56. // Multipart meta prefix.
  57. mpartMetaPrefix = "multipart"
  58. // MinIO Multipart meta prefix.
  59. minioMetaMultipartBucket = minioMetaBucket + SlashSeparator + mpartMetaPrefix
  60. // MinIO tmp meta prefix.
  61. minioMetaTmpBucket = minioMetaBucket + "/tmp"
  62. // MinIO tmp meta prefix for deleted objects.
  63. minioMetaTmpDeletedBucket = minioMetaTmpBucket + "/.trash"
  64. // DNS separator (period), used for bucket name validation.
  65. dnsDelimiter = "."
  66. // On compressed files bigger than this;
  67. compReadAheadSize = 100 << 20
  68. // Read this many buffers ahead.
  69. compReadAheadBuffers = 5
  70. // Size of each buffer.
  71. compReadAheadBufSize = 1 << 20
  72. // Pad Encrypted+Compressed files to a multiple of this.
  73. compPadEncrypted = 256
  74. // Disable compressed file indices below this size
  75. compMinIndexSize = 8 << 20
  76. )
  77. // getkeyeparator - returns the separator to be used for
  78. // persisting on drive.
  79. //
  80. // - ":" is used on non-windows platforms
  81. // - "_" is used on windows platforms
  82. func getKeySeparator() string {
  83. if runtime.GOOS == globalWindowsOSName {
  84. return "_"
  85. }
  86. return ":"
  87. }
  88. // isMinioBucket returns true if given bucket is a MinIO internal
  89. // bucket and false otherwise.
  90. func isMinioMetaBucketName(bucket string) bool {
  91. return strings.HasPrefix(bucket, minioMetaBucket)
  92. }
  93. // IsValidBucketName verifies that a bucket name is in accordance with
  94. // Amazon's requirements (i.e. DNS naming conventions). It must be 3-63
  95. // characters long, and it must be a sequence of one or more labels
  96. // separated by periods. Each label can contain lowercase ascii
  97. // letters, decimal digits and hyphens, but must not begin or end with
  98. // a hyphen. See:
  99. // http://docs.aws.amazon.com/AmazonS3/latest/dev/BucketRestrictions.html
  100. func IsValidBucketName(bucket string) bool {
  101. // Special case when bucket is equal to one of the meta buckets.
  102. if isMinioMetaBucketName(bucket) {
  103. return true
  104. }
  105. if len(bucket) < 3 || len(bucket) > 63 {
  106. return false
  107. }
  108. // Split on dot and check each piece conforms to rules.
  109. allNumbers := true
  110. pieces := strings.Split(bucket, dnsDelimiter)
  111. for _, piece := range pieces {
  112. if len(piece) == 0 || piece[0] == '-' ||
  113. piece[len(piece)-1] == '-' {
  114. // Current piece has 0-length or starts or
  115. // ends with a hyphen.
  116. return false
  117. }
  118. // Now only need to check if each piece is a valid
  119. // 'label' in AWS terminology and if the bucket looks
  120. // like an IP address.
  121. isNotNumber := false
  122. for i := 0; i < len(piece); i++ {
  123. switch {
  124. case (piece[i] >= 'a' && piece[i] <= 'z' ||
  125. piece[i] == '-'):
  126. // Found a non-digit character, so
  127. // this piece is not a number.
  128. isNotNumber = true
  129. case piece[i] >= '0' && piece[i] <= '9':
  130. // Nothing to do.
  131. default:
  132. // Found invalid character.
  133. return false
  134. }
  135. }
  136. allNumbers = allNumbers && !isNotNumber
  137. }
  138. // Does the bucket name look like an IP address?
  139. return !(len(pieces) == 4 && allNumbers)
  140. }
  141. // IsValidObjectName verifies an object name in accordance with Amazon's
  142. // requirements. It cannot exceed 1024 characters and must be a valid UTF8
  143. // string.
  144. //
  145. // See:
  146. // http://docs.aws.amazon.com/AmazonS3/latest/dev/UsingMetadata.html
  147. //
  148. // You should avoid the following characters in a key name because of
  149. // significant special handling for consistency across all
  150. // applications.
  151. //
  152. // Rejects strings with following characters.
  153. //
  154. // - Backslash ("\")
  155. //
  156. // additionally minio does not support object names with trailing SlashSeparator.
  157. func IsValidObjectName(object string) bool {
  158. if len(object) == 0 {
  159. return false
  160. }
  161. if HasSuffix(object, SlashSeparator) {
  162. return false
  163. }
  164. return IsValidObjectPrefix(object)
  165. }
  166. // IsValidObjectPrefix verifies whether the prefix is a valid object name.
  167. // Its valid to have a empty prefix.
  168. func IsValidObjectPrefix(object string) bool {
  169. if hasBadPathComponent(object) {
  170. return false
  171. }
  172. if !utf8.ValidString(object) {
  173. return false
  174. }
  175. if strings.Contains(object, `//`) {
  176. return false
  177. }
  178. // This is valid for AWS S3 but it will never
  179. // work with file systems, we will reject here
  180. // to return object name invalid rather than
  181. // a cryptic error from the file system.
  182. return !strings.ContainsRune(object, 0)
  183. }
  184. // checkObjectNameForLengthAndSlash -check for the validity of object name length and prefis as slash
  185. func checkObjectNameForLengthAndSlash(bucket, object string) error {
  186. // Check for the length of object name
  187. if len(object) > 1024 {
  188. return ObjectNameTooLong{
  189. Bucket: bucket,
  190. Object: object,
  191. }
  192. }
  193. // Check for slash as prefix in object name
  194. if HasPrefix(object, SlashSeparator) {
  195. return ObjectNamePrefixAsSlash{
  196. Bucket: bucket,
  197. Object: object,
  198. }
  199. }
  200. if runtime.GOOS == globalWindowsOSName {
  201. // Explicitly disallowed characters on windows.
  202. // Avoids most problematic names.
  203. if strings.ContainsAny(object, `\:*?"|<>`) {
  204. return ObjectNameInvalid{
  205. Bucket: bucket,
  206. Object: object,
  207. }
  208. }
  209. }
  210. return nil
  211. }
  212. // SlashSeparator - slash separator.
  213. const SlashSeparator = "/"
  214. // SlashSeparatorChar - slash separator.
  215. const SlashSeparatorChar = '/'
  216. // retainSlash - retains slash from a path.
  217. func retainSlash(s string) string {
  218. if s == "" {
  219. return s
  220. }
  221. return strings.TrimSuffix(s, SlashSeparator) + SlashSeparator
  222. }
  223. // pathsJoinPrefix - like pathJoin retains trailing SlashSeparator
  224. // for all elements, prepends them with 'prefix' respectively.
  225. func pathsJoinPrefix(prefix string, elem ...string) (paths []string) {
  226. paths = make([]string, len(elem))
  227. for i, e := range elem {
  228. paths[i] = pathJoin(prefix, e)
  229. }
  230. return paths
  231. }
  232. // pathJoin - like path.Join() but retains trailing SlashSeparator of the last element
  233. func pathJoin(elem ...string) string {
  234. sb := bytebufferpool.Get()
  235. defer func() {
  236. sb.Reset()
  237. bytebufferpool.Put(sb)
  238. }()
  239. return pathJoinBuf(sb, elem...)
  240. }
  241. // pathJoinBuf - like path.Join() but retains trailing SlashSeparator of the last element.
  242. // Provide a string builder to reduce allocation.
  243. func pathJoinBuf(dst *bytebufferpool.ByteBuffer, elem ...string) string {
  244. trailingSlash := len(elem) > 0 && hasSuffixByte(elem[len(elem)-1], SlashSeparatorChar)
  245. dst.Reset()
  246. added := 0
  247. for _, e := range elem {
  248. if added > 0 || e != "" {
  249. if added > 0 {
  250. dst.WriteByte(SlashSeparatorChar)
  251. }
  252. dst.WriteString(e)
  253. added += len(e)
  254. }
  255. }
  256. if pathNeedsClean(dst.Bytes()) {
  257. s := path.Clean(dst.String())
  258. if trailingSlash {
  259. return s + SlashSeparator
  260. }
  261. return s
  262. }
  263. if trailingSlash {
  264. dst.WriteByte(SlashSeparatorChar)
  265. }
  266. return dst.String()
  267. }
  268. // hasSuffixByte returns true if the last byte of s is 'suffix'
  269. func hasSuffixByte(s string, suffix byte) bool {
  270. return len(s) > 0 && s[len(s)-1] == suffix
  271. }
  272. // pathNeedsClean returns whether path.Clean may change the path.
  273. // Will detect all cases that will be cleaned,
  274. // but may produce false positives on non-trivial paths.
  275. func pathNeedsClean(path []byte) bool {
  276. if len(path) == 0 {
  277. return true
  278. }
  279. rooted := path[0] == '/'
  280. n := len(path)
  281. r, w := 0, 0
  282. if rooted {
  283. r, w = 1, 1
  284. }
  285. for r < n {
  286. switch {
  287. case path[r] > 127:
  288. // Non ascii.
  289. return true
  290. case path[r] == '/':
  291. // multiple / elements
  292. return true
  293. case path[r] == '.' && (r+1 == n || path[r+1] == '/'):
  294. // . element - assume it has to be cleaned.
  295. return true
  296. case path[r] == '.' && path[r+1] == '.' && (r+2 == n || path[r+2] == '/'):
  297. // .. element: remove to last / - assume it has to be cleaned.
  298. return true
  299. default:
  300. // real path element.
  301. // add slash if needed
  302. if rooted && w != 1 || !rooted && w != 0 {
  303. w++
  304. }
  305. // copy element
  306. for ; r < n && path[r] != '/'; r++ {
  307. w++
  308. }
  309. // allow one slash, not at end
  310. if r < n-1 && path[r] == '/' {
  311. r++
  312. }
  313. }
  314. }
  315. // Turn empty string into "."
  316. if w == 0 {
  317. return true
  318. }
  319. return false
  320. }
  321. // mustGetUUID - get a random UUID.
  322. func mustGetUUID() string {
  323. u, err := uuid.NewRandom()
  324. if err != nil {
  325. logger.CriticalIf(GlobalContext, err)
  326. }
  327. return u.String()
  328. }
  329. // mustGetUUIDBytes - get a random UUID as 16 bytes unencoded.
  330. func mustGetUUIDBytes() []byte {
  331. u, err := uuid.NewRandom()
  332. if err != nil {
  333. logger.CriticalIf(GlobalContext, err)
  334. }
  335. return u[:]
  336. }
  337. // Create an s3 compatible MD5sum for complete multipart transaction.
  338. func getCompleteMultipartMD5(parts []CompletePart) string {
  339. var finalMD5Bytes []byte
  340. for _, part := range parts {
  341. md5Bytes, err := hex.DecodeString(canonicalizeETag(part.ETag))
  342. if err != nil {
  343. finalMD5Bytes = append(finalMD5Bytes, []byte(part.ETag)...)
  344. } else {
  345. finalMD5Bytes = append(finalMD5Bytes, md5Bytes...)
  346. }
  347. }
  348. s3MD5 := fmt.Sprintf("%s-%d", getMD5Hash(finalMD5Bytes), len(parts))
  349. return s3MD5
  350. }
  351. // Clean unwanted fields from metadata
  352. func cleanMetadata(metadata map[string]string) map[string]string {
  353. // Remove STANDARD StorageClass
  354. metadata = removeStandardStorageClass(metadata)
  355. // Clean meta etag keys 'md5Sum', 'etag', "expires", "x-amz-tagging".
  356. return cleanMetadataKeys(metadata, "md5Sum", "etag", "expires", xhttp.AmzObjectTagging, "last-modified", VersionPurgeStatusKey)
  357. }
  358. // Filter X-Amz-Storage-Class field only if it is set to STANDARD.
  359. // This is done since AWS S3 doesn't return STANDARD Storage class as response header.
  360. func removeStandardStorageClass(metadata map[string]string) map[string]string {
  361. if metadata[xhttp.AmzStorageClass] == storageclass.STANDARD {
  362. delete(metadata, xhttp.AmzStorageClass)
  363. }
  364. return metadata
  365. }
  366. // cleanMetadataKeys takes keyNames to be filtered
  367. // and returns a new map with all the entries with keyNames removed.
  368. func cleanMetadataKeys(metadata map[string]string, keyNames ...string) map[string]string {
  369. newMeta := make(map[string]string, len(metadata))
  370. for k, v := range metadata {
  371. if slices.Contains(keyNames, k) {
  372. continue
  373. }
  374. newMeta[k] = v
  375. }
  376. return newMeta
  377. }
  378. // Extracts etag value from the metadata.
  379. func extractETag(metadata map[string]string) string {
  380. etag, ok := metadata["etag"]
  381. if !ok {
  382. // md5Sum tag is kept for backward compatibility.
  383. etag = metadata["md5Sum"]
  384. }
  385. // Success.
  386. return etag
  387. }
  388. // HasPrefix - Prefix matcher string matches prefix in a platform specific way.
  389. // For example on windows since its case insensitive we are supposed
  390. // to do case insensitive checks.
  391. func HasPrefix(s string, prefix string) bool {
  392. if runtime.GOOS == globalWindowsOSName {
  393. return stringsHasPrefixFold(s, prefix)
  394. }
  395. return strings.HasPrefix(s, prefix)
  396. }
  397. // HasSuffix - Suffix matcher string matches suffix in a platform specific way.
  398. // For example on windows since its case insensitive we are supposed
  399. // to do case insensitive checks.
  400. func HasSuffix(s string, suffix string) bool {
  401. if runtime.GOOS == globalWindowsOSName {
  402. return strings.HasSuffix(strings.ToLower(s), strings.ToLower(suffix))
  403. }
  404. return strings.HasSuffix(s, suffix)
  405. }
  406. // Validates if two strings are equal.
  407. func isStringEqual(s1 string, s2 string) bool {
  408. if runtime.GOOS == globalWindowsOSName {
  409. return strings.EqualFold(s1, s2)
  410. }
  411. return s1 == s2
  412. }
  413. // Ignores all reserved bucket names or invalid bucket names.
  414. func isReservedOrInvalidBucket(bucketEntry string, strict bool) bool {
  415. if bucketEntry == "" {
  416. return true
  417. }
  418. bucketEntry = strings.TrimSuffix(bucketEntry, SlashSeparator)
  419. if strict {
  420. if err := s3utils.CheckValidBucketNameStrict(bucketEntry); err != nil {
  421. return true
  422. }
  423. } else {
  424. if err := s3utils.CheckValidBucketName(bucketEntry); err != nil {
  425. return true
  426. }
  427. }
  428. return isMinioMetaBucket(bucketEntry) || isMinioReservedBucket(bucketEntry)
  429. }
  430. // Returns true if input bucket is a reserved minio meta bucket '.minio.sys'.
  431. func isMinioMetaBucket(bucketName string) bool {
  432. return bucketName == minioMetaBucket
  433. }
  434. // Returns true if input bucket is a reserved minio bucket 'minio'.
  435. func isMinioReservedBucket(bucketName string) bool {
  436. return bucketName == minioReservedBucket
  437. }
  438. // returns a slice of hosts by reading a slice of DNS records
  439. func getHostsSlice(records []dns.SrvRecord) []string {
  440. hosts := make([]string, len(records))
  441. for i, r := range records {
  442. hosts[i] = net.JoinHostPort(r.Host, string(r.Port))
  443. }
  444. return hosts
  445. }
  446. // returns an online host (and corresponding port) from a slice of DNS records
  447. func getHostFromSrv(records []dns.SrvRecord) (host string) {
  448. hosts := getHostsSlice(records)
  449. rng := rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
  450. var d net.Dialer
  451. var retry int
  452. for retry < len(hosts) {
  453. ctx, cancel := context.WithTimeout(GlobalContext, 300*time.Millisecond)
  454. host = hosts[rng.Intn(len(hosts))]
  455. conn, err := d.DialContext(ctx, "tcp", host)
  456. cancel()
  457. if err != nil {
  458. retry++
  459. continue
  460. }
  461. conn.Close()
  462. break
  463. }
  464. return host
  465. }
  466. // IsCompressed returns true if the object is marked as compressed.
  467. func (o *ObjectInfo) IsCompressed() bool {
  468. _, ok := o.UserDefined[ReservedMetadataPrefix+"compression"]
  469. return ok
  470. }
  471. // IsCompressedOK returns whether the object is compressed and can be decompressed.
  472. func (o *ObjectInfo) IsCompressedOK() (bool, error) {
  473. scheme, ok := o.UserDefined[ReservedMetadataPrefix+"compression"]
  474. if !ok {
  475. return false, nil
  476. }
  477. switch scheme {
  478. case compressionAlgorithmV1, compressionAlgorithmV2:
  479. return true, nil
  480. }
  481. return true, fmt.Errorf("unknown compression scheme: %s", scheme)
  482. }
  483. // GetActualSize - returns the actual size of the stored object
  484. func (o ObjectInfo) GetActualSize() (int64, error) {
  485. if o.ActualSize != nil {
  486. return *o.ActualSize, nil
  487. }
  488. if o.IsCompressed() {
  489. sizeStr, ok := o.UserDefined[ReservedMetadataPrefix+"actual-size"]
  490. if !ok {
  491. return -1, errInvalidDecompressedSize
  492. }
  493. size, err := strconv.ParseInt(sizeStr, 10, 64)
  494. if err != nil {
  495. return -1, errInvalidDecompressedSize
  496. }
  497. return size, nil
  498. }
  499. if _, ok := crypto.IsEncrypted(o.UserDefined); ok {
  500. sizeStr, ok := o.UserDefined[ReservedMetadataPrefix+"actual-size"]
  501. if ok {
  502. size, err := strconv.ParseInt(sizeStr, 10, 64)
  503. if err != nil {
  504. return -1, errObjectTampered
  505. }
  506. return size, nil
  507. }
  508. return o.DecryptedSize()
  509. }
  510. return o.Size, nil
  511. }
  512. // Disabling compression for encrypted enabled requests.
  513. // Using compression and encryption together enables room for side channel attacks.
  514. // Eliminate non-compressible objects by extensions/content-types.
  515. func isCompressible(header http.Header, object string) bool {
  516. globalCompressConfigMu.Lock()
  517. cfg := globalCompressConfig
  518. globalCompressConfigMu.Unlock()
  519. return !excludeForCompression(header, object, cfg)
  520. }
  521. // Eliminate the non-compressible objects.
  522. func excludeForCompression(header http.Header, object string, cfg compress.Config) bool {
  523. objStr := object
  524. contentType := header.Get(xhttp.ContentType)
  525. if !cfg.Enabled {
  526. return true
  527. }
  528. if crypto.Requested(header) && !cfg.AllowEncrypted {
  529. return true
  530. }
  531. // We strictly disable compression for standard extensions/content-types (`compressed`).
  532. if hasStringSuffixInSlice(objStr, standardExcludeCompressExtensions) || hasPattern(standardExcludeCompressContentTypes, contentType) {
  533. return true
  534. }
  535. // Filter compression includes.
  536. if len(cfg.Extensions) == 0 && len(cfg.MimeTypes) == 0 {
  537. // Nothing to filter, include everything.
  538. return false
  539. }
  540. if len(cfg.Extensions) > 0 && hasStringSuffixInSlice(objStr, cfg.Extensions) {
  541. // Matched an extension to compress, do not exclude.
  542. return false
  543. }
  544. if len(cfg.MimeTypes) > 0 && hasPattern(cfg.MimeTypes, contentType) {
  545. // Matched an MIME type to compress, do not exclude.
  546. return false
  547. }
  548. // Did not match any inclusion filters, exclude from compression.
  549. return true
  550. }
  551. // Utility which returns if a string is present in the list.
  552. // Comparison is case insensitive. Explicit short-circuit if
  553. // the list contains the wildcard "*".
  554. func hasStringSuffixInSlice(str string, list []string) bool {
  555. str = strings.ToLower(str)
  556. for _, v := range list {
  557. if v == "*" {
  558. return true
  559. }
  560. if strings.HasSuffix(str, strings.ToLower(v)) {
  561. return true
  562. }
  563. }
  564. return false
  565. }
  566. // Returns true if any of the given wildcard patterns match the matchStr.
  567. func hasPattern(patterns []string, matchStr string) bool {
  568. for _, pattern := range patterns {
  569. if ok := wildcard.MatchSimple(pattern, matchStr); ok {
  570. return true
  571. }
  572. }
  573. return false
  574. }
  575. // Returns the part file name which matches the partNumber and etag.
  576. func getPartFile(entriesTrie *trie.Trie, partNumber int, etag string) (partFile string) {
  577. for _, match := range entriesTrie.PrefixMatch(fmt.Sprintf("%.5d.%s.", partNumber, etag)) {
  578. partFile = match
  579. break
  580. }
  581. return partFile
  582. }
  583. func partNumberToRangeSpec(oi ObjectInfo, partNumber int) *HTTPRangeSpec {
  584. if oi.Size == 0 || len(oi.Parts) == 0 {
  585. return nil
  586. }
  587. var start int64
  588. end := int64(-1)
  589. for i := 0; i < len(oi.Parts) && i < partNumber; i++ {
  590. start = end + 1
  591. end = start + oi.Parts[i].ActualSize - 1
  592. }
  593. return &HTTPRangeSpec{Start: start, End: end}
  594. }
  595. // Returns the compressed offset which should be skipped.
  596. // If encrypted offsets are adjusted for encrypted block headers/trailers.
  597. // Since de-compression is after decryption encryption overhead is only added to compressedOffset.
  598. func getCompressedOffsets(oi ObjectInfo, offset int64, decrypt func([]byte) ([]byte, error)) (compressedOffset int64, partSkip int64, firstPart int, decryptSkip int64, seqNum uint32) {
  599. var skipLength int64
  600. var cumulativeActualSize int64
  601. var firstPartIdx int
  602. for i, part := range oi.Parts {
  603. cumulativeActualSize += part.ActualSize
  604. if cumulativeActualSize <= offset {
  605. compressedOffset += part.Size
  606. } else {
  607. firstPartIdx = i
  608. skipLength = cumulativeActualSize - part.ActualSize
  609. break
  610. }
  611. }
  612. partSkip = offset - skipLength
  613. // Load index and skip more if feasible.
  614. if partSkip > 0 && len(oi.Parts) > firstPartIdx && len(oi.Parts[firstPartIdx].Index) > 0 {
  615. _, isEncrypted := crypto.IsEncrypted(oi.UserDefined)
  616. if isEncrypted {
  617. dec, err := decrypt(oi.Parts[firstPartIdx].Index)
  618. if err == nil {
  619. // Load Index
  620. var idx s2.Index
  621. _, err := idx.Load(s2.RestoreIndexHeaders(dec))
  622. // Find compressed/uncompressed offsets of our partskip
  623. compOff, uCompOff, err2 := idx.Find(partSkip)
  624. if err == nil && err2 == nil && compOff > 0 {
  625. // Encrypted.
  626. const sseDAREEncPackageBlockSize = SSEDAREPackageBlockSize + SSEDAREPackageMetaSize
  627. // Number of full blocks in skipped area
  628. seqNum = uint32(compOff / SSEDAREPackageBlockSize)
  629. // Skip this many inside a decrypted block to get to compression block start
  630. decryptSkip = compOff % SSEDAREPackageBlockSize
  631. // Skip this number of full blocks.
  632. skipEnc := compOff / SSEDAREPackageBlockSize
  633. skipEnc *= sseDAREEncPackageBlockSize
  634. compressedOffset += skipEnc
  635. // Skip this number of uncompressed bytes.
  636. partSkip -= uCompOff
  637. }
  638. }
  639. } else {
  640. // Not encrypted
  641. var idx s2.Index
  642. _, err := idx.Load(s2.RestoreIndexHeaders(oi.Parts[firstPartIdx].Index))
  643. // Find compressed/uncompressed offsets of our partskip
  644. compOff, uCompOff, err2 := idx.Find(partSkip)
  645. if err == nil && err2 == nil && compOff > 0 {
  646. compressedOffset += compOff
  647. partSkip -= uCompOff
  648. }
  649. }
  650. }
  651. return compressedOffset, partSkip, firstPartIdx, decryptSkip, seqNum
  652. }
  653. // GetObjectReader is a type that wraps a reader with a lock to
  654. // provide a ReadCloser interface that unlocks on Close()
  655. type GetObjectReader struct {
  656. io.Reader
  657. ObjInfo ObjectInfo
  658. cleanUpFns []func()
  659. once sync.Once
  660. }
  661. // WithCleanupFuncs sets additional cleanup functions to be called when closing
  662. // the GetObjectReader.
  663. func (g *GetObjectReader) WithCleanupFuncs(fns ...func()) *GetObjectReader {
  664. g.cleanUpFns = append(g.cleanUpFns, fns...)
  665. return g
  666. }
  667. // NewGetObjectReaderFromReader sets up a GetObjectReader with a given
  668. // reader. This ignores any object properties.
  669. func NewGetObjectReaderFromReader(r io.Reader, oi ObjectInfo, opts ObjectOptions, cleanupFns ...func()) (*GetObjectReader, error) {
  670. if opts.CheckPrecondFn != nil && opts.CheckPrecondFn(oi) {
  671. // Call the cleanup funcs
  672. for i := len(cleanupFns) - 1; i >= 0; i-- {
  673. cleanupFns[i]()
  674. }
  675. return nil, PreConditionFailed{}
  676. }
  677. return &GetObjectReader{
  678. ObjInfo: oi,
  679. Reader: r,
  680. cleanUpFns: cleanupFns,
  681. }, nil
  682. }
  683. // ObjReaderFn is a function type that takes a reader and returns
  684. // GetObjectReader and an error. Request headers are passed to provide
  685. // encryption parameters. cleanupFns allow cleanup funcs to be
  686. // registered for calling after usage of the reader.
  687. type ObjReaderFn func(inputReader io.Reader, h http.Header, cleanupFns ...func()) (r *GetObjectReader, err error)
  688. // NewGetObjectReader creates a new GetObjectReader. The cleanUpFns
  689. // are called on Close() in FIFO order as passed in ObjReadFn(). NOTE: It is
  690. // assumed that clean up functions do not panic (otherwise, they may
  691. // not all run!).
  692. func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions, h http.Header) (
  693. fn ObjReaderFn, off, length int64, err error,
  694. ) {
  695. if opts.CheckPrecondFn != nil && opts.CheckPrecondFn(oi) {
  696. return nil, 0, 0, PreConditionFailed{}
  697. }
  698. if rs == nil && opts.PartNumber > 0 {
  699. rs = partNumberToRangeSpec(oi, opts.PartNumber)
  700. }
  701. _, isEncrypted := crypto.IsEncrypted(oi.UserDefined)
  702. isCompressed, err := oi.IsCompressedOK()
  703. if err != nil {
  704. return nil, 0, 0, err
  705. }
  706. // if object is encrypted and it is a restore request or if NoDecryption
  707. // was requested, fetch content without decrypting.
  708. if opts.Transition.RestoreRequest != nil || opts.NoDecryption {
  709. isEncrypted = false
  710. isCompressed = false
  711. }
  712. // Calculate range to read (different for encrypted/compressed objects)
  713. switch {
  714. case isCompressed:
  715. var firstPart int
  716. if opts.PartNumber > 0 {
  717. // firstPart is an index to Parts slice,
  718. // make sure that PartNumber uses the
  719. // index value properly.
  720. firstPart = opts.PartNumber - 1
  721. }
  722. // If compressed, we start from the beginning of the part.
  723. // Read the decompressed size from the meta.json.
  724. actualSize, err := oi.GetActualSize()
  725. if err != nil {
  726. return nil, 0, 0, err
  727. }
  728. var decryptSkip int64
  729. var seqNum uint32
  730. off, length = int64(0), oi.Size
  731. decOff, decLength := int64(0), actualSize
  732. if rs != nil {
  733. off, length, err = rs.GetOffsetLength(actualSize)
  734. if err != nil {
  735. return nil, 0, 0, err
  736. }
  737. decrypt := func(b []byte) ([]byte, error) {
  738. return b, nil
  739. }
  740. if isEncrypted {
  741. decrypt = func(b []byte) ([]byte, error) {
  742. return oi.compressionIndexDecrypt(b, h)
  743. }
  744. }
  745. // In case of range based queries on multiparts, the offset and length are reduced.
  746. off, decOff, firstPart, decryptSkip, seqNum = getCompressedOffsets(oi, off, decrypt)
  747. decLength = length
  748. length = oi.Size - off
  749. // For negative length we read everything.
  750. if decLength < 0 {
  751. decLength = actualSize - decOff
  752. }
  753. // Reply back invalid range if the input offset and length fall out of range.
  754. if decOff > actualSize || decOff+decLength > actualSize {
  755. return nil, 0, 0, errInvalidRange
  756. }
  757. }
  758. fn = func(inputReader io.Reader, h http.Header, cFns ...func()) (r *GetObjectReader, err error) {
  759. if isEncrypted {
  760. copySource := h.Get(xhttp.AmzServerSideEncryptionCopyCustomerAlgorithm) != ""
  761. // Attach decrypter on inputReader
  762. inputReader, err = DecryptBlocksRequestR(inputReader, h, seqNum, firstPart, oi, copySource)
  763. if err != nil {
  764. // Call the cleanup funcs
  765. for i := len(cFns) - 1; i >= 0; i-- {
  766. cFns[i]()
  767. }
  768. return nil, err
  769. }
  770. if decryptSkip > 0 {
  771. inputReader = ioutil.NewSkipReader(inputReader, decryptSkip)
  772. }
  773. oi.Size = decLength
  774. }
  775. // Decompression reader.
  776. var dopts []s2.ReaderOption
  777. if off > 0 || decOff > 0 {
  778. // We are not starting at the beginning, so ignore stream identifiers.
  779. dopts = append(dopts, s2.ReaderIgnoreStreamIdentifier())
  780. }
  781. s2Reader := s2.NewReader(inputReader, dopts...)
  782. // Apply the skipLen and limit on the decompressed stream.
  783. if decOff > 0 {
  784. if err = s2Reader.Skip(decOff); err != nil {
  785. // Call the cleanup funcs
  786. for i := len(cFns) - 1; i >= 0; i-- {
  787. cFns[i]()
  788. }
  789. return nil, err
  790. }
  791. }
  792. decReader := io.LimitReader(s2Reader, decLength)
  793. if decLength > compReadAheadSize {
  794. rah, err := readahead.NewReaderSize(decReader, compReadAheadBuffers, compReadAheadBufSize)
  795. if err == nil {
  796. decReader = rah
  797. cFns = append([]func(){func() {
  798. rah.Close()
  799. }}, cFns...)
  800. }
  801. }
  802. oi.Size = decLength
  803. // Assemble the GetObjectReader
  804. r = &GetObjectReader{
  805. ObjInfo: oi,
  806. Reader: decReader,
  807. cleanUpFns: cFns,
  808. }
  809. return r, nil
  810. }
  811. case isEncrypted:
  812. var seqNumber uint32
  813. var partStart int
  814. var skipLen int64
  815. off, length, skipLen, seqNumber, partStart, err = oi.GetDecryptedRange(rs)
  816. if err != nil {
  817. return nil, 0, 0, err
  818. }
  819. var decSize int64
  820. decSize, err = oi.DecryptedSize()
  821. if err != nil {
  822. return nil, 0, 0, err
  823. }
  824. var decRangeLength int64
  825. decRangeLength, err = rs.GetLength(decSize)
  826. if err != nil {
  827. return nil, 0, 0, err
  828. }
  829. // We define a closure that performs decryption given
  830. // a reader that returns the desired range of
  831. // encrypted bytes. The header parameter is used to
  832. // provide encryption parameters.
  833. fn = func(inputReader io.Reader, h http.Header, cFns ...func()) (r *GetObjectReader, err error) {
  834. copySource := h.Get(xhttp.AmzServerSideEncryptionCopyCustomerAlgorithm) != ""
  835. // Attach decrypter on inputReader
  836. var decReader io.Reader
  837. decReader, err = DecryptBlocksRequestR(inputReader, h, seqNumber, partStart, oi, copySource)
  838. if err != nil {
  839. // Call the cleanup funcs
  840. for i := len(cFns) - 1; i >= 0; i-- {
  841. cFns[i]()
  842. }
  843. return nil, err
  844. }
  845. oi.ETag = getDecryptedETag(h, oi, false)
  846. // Apply the skipLen and limit on the
  847. // decrypted stream
  848. decReader = io.LimitReader(ioutil.NewSkipReader(decReader, skipLen), decRangeLength)
  849. // Assemble the GetObjectReader
  850. r = &GetObjectReader{
  851. ObjInfo: oi,
  852. Reader: decReader,
  853. cleanUpFns: cFns,
  854. }
  855. return r, nil
  856. }
  857. default:
  858. off, length, err = rs.GetOffsetLength(oi.Size)
  859. if err != nil {
  860. return nil, 0, 0, err
  861. }
  862. fn = func(inputReader io.Reader, _ http.Header, cFns ...func()) (r *GetObjectReader, err error) {
  863. r = &GetObjectReader{
  864. ObjInfo: oi,
  865. Reader: inputReader,
  866. cleanUpFns: cFns,
  867. }
  868. return r, nil
  869. }
  870. }
  871. return fn, off, length, nil
  872. }
  873. // Close - calls the cleanup actions in reverse order
  874. func (g *GetObjectReader) Close() error {
  875. if g == nil {
  876. return nil
  877. }
  878. // sync.Once is used here to ensure that Close() is
  879. // idempotent.
  880. g.once.Do(func() {
  881. for i := len(g.cleanUpFns) - 1; i >= 0; i-- {
  882. g.cleanUpFns[i]()
  883. }
  884. })
  885. return nil
  886. }
  887. // compressionIndexEncrypter returns a function that will read data from input,
  888. // encrypt it using the provided key and return the result.
  889. func compressionIndexEncrypter(key crypto.ObjectKey, input func() []byte) func() []byte {
  890. var data []byte
  891. var fetched bool
  892. return func() []byte {
  893. if !fetched {
  894. data = input()
  895. fetched = true
  896. }
  897. return metadataEncrypter(key)("compression-index", data)
  898. }
  899. }
  900. // compressionIndexDecrypt reverses compressionIndexEncrypter.
  901. func (o *ObjectInfo) compressionIndexDecrypt(input []byte, h http.Header) ([]byte, error) {
  902. return o.metadataDecrypter(h)("compression-index", input)
  903. }
  904. // SealMD5CurrFn seals md5sum with object encryption key and returns sealed
  905. // md5sum
  906. type SealMD5CurrFn func([]byte) []byte
  907. // PutObjReader is a type that wraps sio.EncryptReader and
  908. // underlying hash.Reader in a struct
  909. type PutObjReader struct {
  910. *hash.Reader // actual data stream
  911. rawReader *hash.Reader // original data stream
  912. sealMD5Fn SealMD5CurrFn
  913. }
  914. // Size returns the absolute number of bytes the Reader
  915. // will return during reading. It returns -1 for unlimited
  916. // data.
  917. func (p *PutObjReader) Size() int64 {
  918. return p.Reader.Size()
  919. }
  920. // MD5CurrentHexString returns the current MD5Sum or encrypted MD5Sum
  921. // as a hex encoded string
  922. func (p *PutObjReader) MD5CurrentHexString() string {
  923. md5sumCurr := p.rawReader.MD5Current()
  924. var appendHyphen bool
  925. // md5sumcurr is not empty in two scenarios
  926. // - server is running in strict compatibility mode
  927. // - client set Content-Md5 during PUT operation
  928. if len(md5sumCurr) == 0 {
  929. // md5sumCurr is only empty when we are running
  930. // in non-compatibility mode.
  931. md5sumCurr = make([]byte, 16)
  932. rand.Read(md5sumCurr)
  933. appendHyphen = true
  934. }
  935. if p.sealMD5Fn != nil {
  936. md5sumCurr = p.sealMD5Fn(md5sumCurr)
  937. }
  938. if appendHyphen {
  939. // Make sure to return etag string upto 32 length, for SSE
  940. // requests ETag might be longer and the code decrypting the
  941. // ETag ignores ETag in multipart ETag form i.e <hex>-N
  942. return hex.EncodeToString(md5sumCurr)[:32] + "-1"
  943. }
  944. return hex.EncodeToString(md5sumCurr)
  945. }
  946. // WithEncryption sets up encrypted reader and the sealing for content md5sum
  947. // using objEncKey. Unsealed md5sum is computed from the rawReader setup when
  948. // NewPutObjReader was called. It returns an error if called on an uninitialized
  949. // PutObjReader.
  950. func (p *PutObjReader) WithEncryption(encReader *hash.Reader, objEncKey *crypto.ObjectKey) (*PutObjReader, error) {
  951. if p.Reader == nil {
  952. return nil, errors.New("put-object reader uninitialized")
  953. }
  954. p.Reader = encReader
  955. p.sealMD5Fn = sealETagFn(*objEncKey)
  956. return p, nil
  957. }
  958. // NewPutObjReader returns a new PutObjReader. It uses given hash.Reader's
  959. // MD5Current method to construct md5sum when requested downstream.
  960. func NewPutObjReader(rawReader *hash.Reader) *PutObjReader {
  961. return &PutObjReader{Reader: rawReader, rawReader: rawReader}
  962. }
  963. func sealETag(encKey crypto.ObjectKey, md5CurrSum []byte) []byte {
  964. var emptyKey [32]byte
  965. if bytes.Equal(encKey[:], emptyKey[:]) {
  966. return md5CurrSum
  967. }
  968. return encKey.SealETag(md5CurrSum)
  969. }
  970. func sealETagFn(key crypto.ObjectKey) SealMD5CurrFn {
  971. fn := func(md5sumcurr []byte) []byte {
  972. return sealETag(key, md5sumcurr)
  973. }
  974. return fn
  975. }
  976. // compressOpts are the options for writing compressed data.
  977. var compressOpts []s2.WriterOption
  978. func init() {
  979. if runtime.GOARCH == "amd64" {
  980. // On amd64 we have assembly and can use stronger compression.
  981. compressOpts = append(compressOpts, s2.WriterBetterCompression())
  982. }
  983. }
  984. // newS2CompressReader will read data from r, compress it and return the compressed data as a Reader.
  985. // Use Close to ensure resources are released on incomplete streams.
  986. //
  987. // input 'on' is always recommended such that this function works
  988. // properly, because we do not wish to create an object even if
  989. // client closed the stream prematurely.
  990. func newS2CompressReader(r io.Reader, on int64, encrypted bool) (rc io.ReadCloser, idx func() []byte) {
  991. pr, pw := io.Pipe()
  992. // Copy input to compressor
  993. opts := compressOpts
  994. if encrypted {
  995. // The values used for padding are not a security concern,
  996. // but we choose pseudo-random numbers instead of just zeros.
  997. rng := rand.New(rand.NewSource(time.Now().UnixNano()))
  998. opts = append([]s2.WriterOption{s2.WriterPadding(compPadEncrypted), s2.WriterPaddingSrc(rng)}, compressOpts...)
  999. }
  1000. comp := s2.NewWriter(pw, opts...)
  1001. indexCh := make(chan []byte, 1)
  1002. go func() {
  1003. defer xioutil.SafeClose(indexCh)
  1004. cn, err := io.Copy(comp, r)
  1005. if err != nil {
  1006. comp.Close()
  1007. pw.CloseWithError(err)
  1008. return
  1009. }
  1010. if on > 0 && on != cn {
  1011. // if client didn't sent all data
  1012. // from the client verify here.
  1013. comp.Close()
  1014. pw.CloseWithError(IncompleteBody{})
  1015. return
  1016. }
  1017. // Close the stream.
  1018. // If more than compMinIndexSize was written, generate index.
  1019. if cn > compMinIndexSize {
  1020. idx, err := comp.CloseIndex()
  1021. idx = s2.RemoveIndexHeaders(idx)
  1022. indexCh <- idx
  1023. pw.CloseWithError(err)
  1024. return
  1025. }
  1026. pw.CloseWithError(comp.Close())
  1027. }()
  1028. var gotIdx []byte
  1029. return pr, func() []byte {
  1030. if gotIdx != nil {
  1031. return gotIdx
  1032. }
  1033. // Will get index or nil if closed.
  1034. gotIdx = <-indexCh
  1035. return gotIdx
  1036. }
  1037. }
  1038. // compressSelfTest performs a self-test to ensure that compression
  1039. // algorithms completes a roundtrip. If any algorithm
  1040. // produces an incorrect checksum it fails with a hard error.
  1041. //
  1042. // compressSelfTest tries to catch any issue in the compression implementation
  1043. // early instead of silently corrupting data.
  1044. func compressSelfTest() {
  1045. // 4 MB block.
  1046. // Approx runtime ~30ms
  1047. data := make([]byte, 4<<20)
  1048. rng := rand.New(rand.NewSource(0))
  1049. for i := range data {
  1050. // Generate compressible stream...
  1051. data[i] = byte(rng.Int63() & 3)
  1052. }
  1053. failOnErr := func(err error) {
  1054. if err != nil {
  1055. logger.Fatal(errSelfTestFailure, "compress: error on self-test: %v", err)
  1056. }
  1057. }
  1058. const skip = 2<<20 + 511
  1059. r, _ := newS2CompressReader(bytes.NewBuffer(data), int64(len(data)), true)
  1060. b, err := io.ReadAll(r)
  1061. failOnErr(err)
  1062. failOnErr(r.Close())
  1063. // Decompression reader.
  1064. s2Reader := s2.NewReader(bytes.NewBuffer(b))
  1065. // Apply the skipLen on the decompressed stream.
  1066. failOnErr(s2Reader.Skip(skip))
  1067. got, err := io.ReadAll(s2Reader)
  1068. failOnErr(err)
  1069. if !bytes.Equal(got, data[skip:]) {
  1070. logger.Fatal(errSelfTestFailure, "compress: self-test roundtrip mismatch.")
  1071. }
  1072. }
  1073. // getDiskInfos returns the disk information for the provided disks.
  1074. // If a disk is nil or an error is returned the result will be nil as well.
  1075. func getDiskInfos(ctx context.Context, disks ...StorageAPI) []*DiskInfo {
  1076. res := make([]*DiskInfo, len(disks))
  1077. opts := DiskInfoOptions{}
  1078. for i, disk := range disks {
  1079. if disk == nil {
  1080. continue
  1081. }
  1082. if di, err := disk.DiskInfo(ctx, opts); err == nil {
  1083. res[i] = &di
  1084. }
  1085. }
  1086. return res
  1087. }
  1088. // hasSpaceFor returns whether the disks in `di` have space for and object of a given size.
  1089. func hasSpaceFor(di []*DiskInfo, size int64) (bool, error) {
  1090. // We multiply the size by 2 to account for erasure coding.
  1091. size *= 2
  1092. if size < 0 {
  1093. // If no size, assume diskAssumeUnknownSize.
  1094. size = diskAssumeUnknownSize
  1095. }
  1096. var available uint64
  1097. var total uint64
  1098. var nDisks int
  1099. for _, disk := range di {
  1100. if disk == nil || disk.Total == 0 {
  1101. // Disk offline, no inodes or something else is wrong.
  1102. continue
  1103. }
  1104. nDisks++
  1105. total += disk.Total
  1106. available += disk.Total - disk.Used
  1107. }
  1108. if nDisks < len(di)/2 || nDisks <= 0 {
  1109. var errs []error
  1110. for index, disk := range di {
  1111. switch {
  1112. case disk == nil:
  1113. errs = append(errs, fmt.Errorf("disk[%d]: offline", index))
  1114. case disk.Error != "":
  1115. errs = append(errs, fmt.Errorf("disk %s: %s", disk.Endpoint, disk.Error))
  1116. case disk.Total == 0:
  1117. errs = append(errs, fmt.Errorf("disk %s: total is zero", disk.Endpoint))
  1118. }
  1119. }
  1120. // Log disk errors.
  1121. peersLogIf(context.Background(), errors.Join(errs...))
  1122. return false, fmt.Errorf("not enough online disks to calculate the available space, need %d, found %d", (len(di)/2)+1, nDisks)
  1123. }
  1124. // Check we have enough on each disk, ignoring diskFillFraction.
  1125. perDisk := size / int64(nDisks)
  1126. for _, disk := range di {
  1127. if disk == nil || disk.Total == 0 {
  1128. continue
  1129. }
  1130. if !globalIsErasureSD && disk.FreeInodes < diskMinInodes && disk.UsedInodes > 0 {
  1131. // We have an inode count, but not enough inodes.
  1132. return false, nil
  1133. }
  1134. if int64(disk.Free) <= perDisk {
  1135. return false, nil
  1136. }
  1137. }
  1138. // Make sure we can fit "size" on to the disk without getting above the diskFillFraction
  1139. if available < uint64(size) {
  1140. return false, nil
  1141. }
  1142. // How much will be left after adding the file.
  1143. available -= uint64(size)
  1144. // wantLeft is how much space there at least must be left.
  1145. wantLeft := uint64(float64(total) * (1.0 - diskFillFraction))
  1146. return available > wantLeft, nil
  1147. }