You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

614 lines
16 KiB

fs: Break fs package to top-level and introduce ObjectAPI interface. ObjectAPI interface brings in changes needed for XL ObjectAPI layer. The new interface for any ObjectAPI layer is as below ``` // ObjectAPI interface. type ObjectAPI interface { // Bucket resource API. DeleteBucket(bucket string) *probe.Error ListBuckets() ([]BucketInfo, *probe.Error) MakeBucket(bucket string) *probe.Error GetBucketInfo(bucket string) (BucketInfo, *probe.Error) // Bucket query API. ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsResult, *probe.Error) ListMultipartUploads(bucket string, resources BucketMultipartResourcesMetadata) (BucketMultipartResourcesMetadata, *probe.Error) // Object resource API. GetObject(bucket, object string, startOffset int64) (io.ReadCloser, *probe.Error) GetObjectInfo(bucket, object string) (ObjectInfo, *probe.Error) PutObject(bucket string, object string, size int64, data io.Reader, metadata map[string]string) (ObjectInfo, *probe.Error) DeleteObject(bucket, object string) *probe.Error // Object query API. NewMultipartUpload(bucket, object string) (string, *probe.Error) PutObjectPart(bucket, object, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (string, *probe.Error) ListObjectParts(bucket, object string, resources ObjectResourcesMetadata) (ObjectResourcesMetadata, *probe.Error) CompleteMultipartUpload(bucket string, object string, uploadID string, parts []CompletePart) (ObjectInfo, *probe.Error) AbortMultipartUpload(bucket, object, uploadID string) *probe.Error } ```
9 years ago
  1. /*
  2. * Minio Cloud Storage, (C) 2016 Minio, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package main
  17. import (
  18. "io"
  19. "os"
  20. slashpath "path"
  21. "strings"
  22. "syscall"
  23. "github.com/Sirupsen/logrus"
  24. "github.com/minio/minio/pkg/disk"
  25. "github.com/minio/minio/pkg/safe"
  26. )
  27. const (
  28. fsMinSpacePercent = 5
  29. )
  30. // fsStorage - implements StorageAPI interface.
  31. type fsStorage struct {
  32. diskPath string
  33. minFreeDisk int64
  34. }
  35. // isDirEmpty - returns whether given directory is empty or not.
  36. func isDirEmpty(dirname string) bool {
  37. f, err := os.Open(dirname)
  38. if err != nil {
  39. log.Errorf("Unable to access directory %s, failed with %s", dirname, err)
  40. return false
  41. }
  42. defer f.Close()
  43. // List one entry.
  44. _, err = f.Readdirnames(1)
  45. if err != nil {
  46. if err == io.EOF {
  47. // Returns true if we have reached EOF, directory is
  48. // indeed empty.
  49. return true
  50. }
  51. log.Errorf("Unable to list directory %s, failed with %s", dirname, err)
  52. return false
  53. }
  54. // Directory is not empty.
  55. return false
  56. }
  57. // Initialize a new storage disk.
  58. func newPosix(diskPath string) (StorageAPI, error) {
  59. if diskPath == "" {
  60. log.Error("Disk cannot be empty")
  61. return nil, errInvalidArgument
  62. }
  63. st, err := os.Stat(diskPath)
  64. if err != nil {
  65. log.WithFields(logrus.Fields{
  66. "diskPath": diskPath,
  67. }).Debugf("Stat failed, with error %s.", err)
  68. return nil, err
  69. }
  70. if !st.IsDir() {
  71. log.WithFields(logrus.Fields{
  72. "diskPath": diskPath,
  73. }).Debugf("Disk %s.", syscall.ENOTDIR)
  74. return nil, syscall.ENOTDIR
  75. }
  76. fs := fsStorage{
  77. diskPath: diskPath,
  78. minFreeDisk: fsMinSpacePercent, // Minimum 5% disk should be free.
  79. }
  80. log.WithFields(logrus.Fields{
  81. "diskPath": diskPath,
  82. "minFreeDisk": fsMinSpacePercent,
  83. }).Debugf("Successfully configured FS storage API.")
  84. return fs, nil
  85. }
  86. // checkDiskFree verifies if disk path has sufficient minium free disk space.
  87. func checkDiskFree(diskPath string, minFreeDisk int64) (err error) {
  88. di, err := disk.GetInfo(diskPath)
  89. if err != nil {
  90. log.WithFields(logrus.Fields{
  91. "diskPath": diskPath,
  92. }).Debugf("Failed to get disk info, %s", err)
  93. return err
  94. }
  95. // Remove 5% from total space for cumulative disk
  96. // space used for journalling, inodes etc.
  97. availableDiskSpace := (float64(di.Free) / (float64(di.Total) - (0.05 * float64(di.Total)))) * 100
  98. if int64(availableDiskSpace) <= minFreeDisk {
  99. log.WithFields(logrus.Fields{
  100. "availableDiskSpace": int64(availableDiskSpace),
  101. "minFreeDiskSpace": minFreeDisk,
  102. }).Debugf("Disk free space has reached its limit.")
  103. return errDiskFull
  104. }
  105. // Success.
  106. return nil
  107. }
  108. func removeDuplicateVols(volsInfo []VolInfo) []VolInfo {
  109. // Use map to record duplicates as we find them.
  110. result := []VolInfo{}
  111. m := make(map[string]VolInfo)
  112. for _, v := range volsInfo {
  113. if _, found := m[v.Name]; !found {
  114. m[v.Name] = v
  115. }
  116. }
  117. result = make([]VolInfo, 0, len(m))
  118. for _, v := range m {
  119. result = append(result, v)
  120. }
  121. // Return the new slice.
  122. return result
  123. }
  124. // gets all the unique directories from diskPath.
  125. func getAllUniqueVols(dirPath string) ([]VolInfo, error) {
  126. entries, err := readDir(dirPath)
  127. if err != nil {
  128. log.WithFields(logrus.Fields{
  129. "dirPath": dirPath,
  130. }).Debugf("readDir failed with error %s", err)
  131. return nil, err
  132. }
  133. var volsInfo []VolInfo
  134. for _, entry := range entries {
  135. if !strings.HasSuffix(entry, slashSeparator) || !isValidVolname(slashpath.Clean(entry)) {
  136. // Skip if entry is neither a directory not a valid volume name.
  137. continue
  138. }
  139. fi, err := os.Stat(pathJoin(dirPath, entry))
  140. if err != nil {
  141. log.WithFields(logrus.Fields{
  142. "path": pathJoin(dirPath, entry),
  143. }).Debugf("Stat failed with error %s", err)
  144. return nil, err
  145. }
  146. volsInfo = append(volsInfo, VolInfo{
  147. Name: fi.Name(),
  148. // As os.Stat() doesn't carry other than ModTime(), use
  149. // ModTime() as CreatedTime.
  150. Created: fi.ModTime(),
  151. })
  152. }
  153. return removeDuplicateVols(volsInfo), nil
  154. }
  155. // getVolumeDir - will convert incoming volume names to
  156. // corresponding valid volume names on the backend in a platform
  157. // compatible way for all operating systems. If volume is not found
  158. // an error is generated.
  159. func (s fsStorage) getVolumeDir(volume string) (string, error) {
  160. if !isValidVolname(volume) {
  161. return "", errInvalidArgument
  162. }
  163. volumeDir := pathJoin(s.diskPath, volume)
  164. _, err := os.Stat(volumeDir)
  165. if err == nil {
  166. return volumeDir, nil
  167. }
  168. if os.IsNotExist(err) {
  169. var volsInfo []VolInfo
  170. volsInfo, err = getAllUniqueVols(s.diskPath)
  171. if err != nil {
  172. return volumeDir, errVolumeNotFound
  173. }
  174. for _, vol := range volsInfo {
  175. // Verify if lowercase version of the volume is equal to
  176. // the incoming volume, then use the proper name.
  177. if strings.ToLower(vol.Name) == volume {
  178. volumeDir = pathJoin(s.diskPath, vol.Name)
  179. return volumeDir, nil
  180. }
  181. }
  182. return volumeDir, errVolumeNotFound
  183. } else if os.IsPermission(err) {
  184. log.WithFields(logrus.Fields{
  185. "diskPath": s.diskPath,
  186. }).Debugf("Stat failed with error %s", err)
  187. return volumeDir, errVolumeAccessDenied
  188. }
  189. log.WithFields(logrus.Fields{
  190. "diskPath": s.diskPath,
  191. }).Debugf("Stat failed with error %s", err)
  192. return volumeDir, err
  193. }
  194. // Make a volume entry.
  195. func (s fsStorage) MakeVol(volume string) (err error) {
  196. // Validate if disk is free.
  197. if err = checkDiskFree(s.diskPath, s.minFreeDisk); err != nil {
  198. return err
  199. }
  200. volumeDir, err := s.getVolumeDir(volume)
  201. if err == nil {
  202. // Volume already exists, return error.
  203. return errVolumeExists
  204. }
  205. // If volume not found create it.
  206. if err == errVolumeNotFound {
  207. // Make a volume entry.
  208. return os.Mkdir(volumeDir, 0700)
  209. }
  210. log.WithFields(logrus.Fields{
  211. "diskPath": s.diskPath,
  212. "volume": volume,
  213. }).Debugf("MakeVol failed with %s", err)
  214. // For all other errors return here.
  215. return err
  216. }
  217. // ListVols - list volumes.
  218. func (s fsStorage) ListVols() (volsInfo []VolInfo, err error) {
  219. // Get disk info to be populated for VolInfo.
  220. var diskInfo disk.Info
  221. diskInfo, err = disk.GetInfo(s.diskPath)
  222. if err != nil {
  223. log.WithFields(logrus.Fields{
  224. "diskPath": s.diskPath,
  225. }).Debugf("Failed to get disk info, %s", err)
  226. return nil, err
  227. }
  228. volsInfo, err = getAllUniqueVols(s.diskPath)
  229. if err != nil {
  230. log.WithFields(logrus.Fields{
  231. "diskPath": s.diskPath,
  232. }).Debugf("getAllUniqueVols failed with %s", err)
  233. return nil, err
  234. }
  235. for i, vol := range volsInfo {
  236. // Volname on case sensitive fs backends can come in as
  237. // capitalized, but object layer cannot consume it
  238. // directly. Convert it as we see fit.
  239. volName := strings.ToLower(vol.Name)
  240. volInfo := VolInfo{
  241. Name: volName,
  242. Created: vol.Created,
  243. Total: diskInfo.Total,
  244. Free: diskInfo.Free,
  245. FSType: diskInfo.FSType,
  246. }
  247. volsInfo[i] = volInfo
  248. }
  249. return volsInfo, nil
  250. }
  251. // StatVol - get volume info.
  252. func (s fsStorage) StatVol(volume string) (volInfo VolInfo, err error) {
  253. // Verify if volume is valid and it exists.
  254. volumeDir, err := s.getVolumeDir(volume)
  255. if err != nil {
  256. log.WithFields(logrus.Fields{
  257. "diskPath": s.diskPath,
  258. "volume": volume,
  259. }).Debugf("getVolumeDir failed with %s", err)
  260. return VolInfo{}, err
  261. }
  262. // Stat a volume entry.
  263. var st os.FileInfo
  264. st, err = os.Stat(volumeDir)
  265. if err != nil {
  266. log.WithFields(logrus.Fields{
  267. "diskPath": s.diskPath,
  268. "volume": volume,
  269. }).Debugf("Stat on the volume failed with %s", err)
  270. if os.IsNotExist(err) {
  271. return VolInfo{}, errVolumeNotFound
  272. }
  273. return VolInfo{}, err
  274. }
  275. // Get disk info, to be returned back along with volume info.
  276. var diskInfo disk.Info
  277. diskInfo, err = disk.GetInfo(s.diskPath)
  278. if err != nil {
  279. log.WithFields(logrus.Fields{
  280. "diskPath": s.diskPath,
  281. "volume": volume,
  282. }).Debugf("Failed to get disk info, %s", err)
  283. return VolInfo{}, err
  284. }
  285. // As os.Stat() doesn't carry other than ModTime(), use ModTime()
  286. // as CreatedTime.
  287. createdTime := st.ModTime()
  288. return VolInfo{
  289. Name: volume,
  290. Created: createdTime,
  291. Free: diskInfo.Free,
  292. Total: diskInfo.Total,
  293. FSType: diskInfo.FSType,
  294. }, nil
  295. }
  296. // DeleteVol - delete a volume.
  297. func (s fsStorage) DeleteVol(volume string) error {
  298. // Verify if volume is valid and it exists.
  299. volumeDir, err := s.getVolumeDir(volume)
  300. if err != nil {
  301. log.WithFields(logrus.Fields{
  302. "diskPath": s.diskPath,
  303. "volume": volume,
  304. }).Debugf("getVolumeDir failed with %s", err)
  305. return err
  306. }
  307. err = os.Remove(volumeDir)
  308. if err != nil {
  309. log.WithFields(logrus.Fields{
  310. "diskPath": s.diskPath,
  311. "volume": volume,
  312. }).Debugf("Volume remove failed with %s", err)
  313. if os.IsNotExist(err) {
  314. return errVolumeNotFound
  315. } else if strings.Contains(err.Error(), "directory is not empty") {
  316. // On windows the string is slightly different, handle it here.
  317. return errVolumeNotEmpty
  318. } else if strings.Contains(err.Error(), "directory not empty") {
  319. // Hopefully for all other
  320. // operating systems, this is
  321. // assumed to be consistent.
  322. return errVolumeNotEmpty
  323. }
  324. return err
  325. }
  326. return nil
  327. }
  328. // ListDir - return all the entries at the given directory path.
  329. // If an entry is a directory it will be returned with a trailing "/".
  330. func (s fsStorage) ListDir(volume, dirPath string) ([]string, error) {
  331. // Verify if volume is valid and it exists.
  332. volumeDir, err := s.getVolumeDir(volume)
  333. if err != nil {
  334. log.WithFields(logrus.Fields{
  335. "diskPath": s.diskPath,
  336. "volume": volume,
  337. }).Debugf("getVolumeDir failed with %s", err)
  338. return nil, err
  339. }
  340. // Stat a volume entry.
  341. _, err = os.Stat(volumeDir)
  342. if err != nil {
  343. log.WithFields(logrus.Fields{
  344. "diskPath": s.diskPath,
  345. "volume": volume,
  346. }).Debugf("Stat on the volume failed with %s", err)
  347. if os.IsNotExist(err) {
  348. return nil, errVolumeNotFound
  349. }
  350. return nil, err
  351. }
  352. return readDir(pathJoin(volumeDir, dirPath))
  353. }
  354. // ReadFile - read a file at a given offset.
  355. func (s fsStorage) ReadFile(volume string, path string, offset int64) (readCloser io.ReadCloser, err error) {
  356. volumeDir, err := s.getVolumeDir(volume)
  357. if err != nil {
  358. log.WithFields(logrus.Fields{
  359. "diskPath": s.diskPath,
  360. "volume": volume,
  361. }).Debugf("getVolumeDir failed with %s", err)
  362. return nil, err
  363. }
  364. filePath := pathJoin(volumeDir, path)
  365. file, err := os.Open(filePath)
  366. if err != nil {
  367. if os.IsNotExist(err) {
  368. return nil, errFileNotFound
  369. } else if os.IsPermission(err) {
  370. return nil, errFileAccessDenied
  371. }
  372. log.WithFields(logrus.Fields{
  373. "diskPath": s.diskPath,
  374. "filePath": filePath,
  375. }).Debugf("Opening a file failed with %s", err)
  376. return nil, err
  377. }
  378. st, err := file.Stat()
  379. if err != nil {
  380. log.WithFields(logrus.Fields{
  381. "diskPath": s.diskPath,
  382. "filePath": filePath,
  383. }).Debugf("Stat failed with %s", err)
  384. return nil, err
  385. }
  386. // Verify if its not a regular file, since subsequent Seek is undefined.
  387. if !st.Mode().IsRegular() {
  388. log.WithFields(logrus.Fields{
  389. "diskPath": s.diskPath,
  390. "filePath": filePath,
  391. }).Debugf("Unexpected type %s", errIsNotRegular)
  392. return nil, errFileNotFound
  393. }
  394. // Seek to requested offset.
  395. _, err = file.Seek(offset, os.SEEK_SET)
  396. if err != nil {
  397. log.WithFields(logrus.Fields{
  398. "diskPath": s.diskPath,
  399. "filePath": filePath,
  400. "offset": offset,
  401. }).Debugf("Seek failed with %s", err)
  402. return nil, err
  403. }
  404. return file, nil
  405. }
  406. // CreateFile - create a file at path.
  407. func (s fsStorage) CreateFile(volume, path string) (writeCloser io.WriteCloser, err error) {
  408. volumeDir, err := s.getVolumeDir(volume)
  409. if err != nil {
  410. log.WithFields(logrus.Fields{
  411. "diskPath": s.diskPath,
  412. "volume": volume,
  413. }).Debugf("getVolumeDir failed with %s", err)
  414. return nil, err
  415. }
  416. if err = checkDiskFree(s.diskPath, s.minFreeDisk); err != nil {
  417. return nil, err
  418. }
  419. filePath := pathJoin(volumeDir, path)
  420. // Verify if the file already exists and is not of regular type.
  421. var st os.FileInfo
  422. if st, err = os.Stat(filePath); err == nil {
  423. if st.IsDir() {
  424. log.WithFields(logrus.Fields{
  425. "diskPath": s.diskPath,
  426. "filePath": filePath,
  427. }).Debugf("Unexpected type %s", errIsNotRegular)
  428. return nil, errIsNotRegular
  429. }
  430. }
  431. w, err := safe.CreateFileWithPrefix(filePath, "$tmpfile")
  432. if err != nil {
  433. // File path cannot be verified since one of the parents is a file.
  434. if strings.Contains(err.Error(), "not a directory") {
  435. return nil, errFileAccessDenied
  436. }
  437. return nil, err
  438. }
  439. return w, nil
  440. }
  441. // StatFile - get file info.
  442. func (s fsStorage) StatFile(volume, path string) (file FileInfo, err error) {
  443. volumeDir, err := s.getVolumeDir(volume)
  444. if err != nil {
  445. log.WithFields(logrus.Fields{
  446. "diskPath": s.diskPath,
  447. "volume": volume,
  448. }).Debugf("getVolumeDir failed with %s", err)
  449. return FileInfo{}, err
  450. }
  451. filePath := slashpath.Join(volumeDir, path)
  452. st, err := os.Stat(filePath)
  453. if err != nil {
  454. log.WithFields(logrus.Fields{
  455. "diskPath": s.diskPath,
  456. "filePath": filePath,
  457. }).Debugf("Stat failed with %s", err)
  458. // File is really not found.
  459. if os.IsNotExist(err) {
  460. return FileInfo{}, errFileNotFound
  461. }
  462. // File path cannot be verified since one of the parents is a file.
  463. if strings.Contains(err.Error(), "not a directory") {
  464. return FileInfo{}, errFileNotFound
  465. }
  466. // Return all errors here.
  467. return FileInfo{}, err
  468. }
  469. // If its a directory its not a regular file.
  470. if st.Mode().IsDir() {
  471. log.WithFields(logrus.Fields{
  472. "diskPath": s.diskPath,
  473. "filePath": filePath,
  474. }).Debugf("File is %s.", errIsNotRegular)
  475. return FileInfo{}, errFileNotFound
  476. }
  477. return FileInfo{
  478. Volume: volume,
  479. Name: path,
  480. ModTime: st.ModTime(),
  481. Size: st.Size(),
  482. Mode: st.Mode(),
  483. }, nil
  484. }
  485. // deleteFile - delete file path if its empty.
  486. func deleteFile(basePath, deletePath string) error {
  487. if basePath == deletePath {
  488. return nil
  489. }
  490. // Verify if the path exists.
  491. pathSt, err := os.Stat(deletePath)
  492. if err != nil {
  493. log.WithFields(logrus.Fields{
  494. "deletePath": deletePath,
  495. }).Debugf("Stat failed with %s", err)
  496. if os.IsNotExist(err) {
  497. return errFileNotFound
  498. } else if os.IsPermission(err) {
  499. return errFileAccessDenied
  500. }
  501. return err
  502. }
  503. if pathSt.IsDir() && !isDirEmpty(deletePath) {
  504. // Verify if directory is empty.
  505. return nil
  506. }
  507. // Attempt to remove path.
  508. if err := os.Remove(deletePath); err != nil {
  509. log.WithFields(logrus.Fields{
  510. "deletePath": deletePath,
  511. }).Debugf("Remove failed with %s", err)
  512. return err
  513. }
  514. // Recursively go down the next path and delete again.
  515. if err := deleteFile(basePath, slashpath.Dir(deletePath)); err != nil {
  516. log.WithFields(logrus.Fields{
  517. "basePath": basePath,
  518. "deleteDir": slashpath.Dir(deletePath),
  519. }).Debugf("deleteFile failed with %s", err)
  520. return err
  521. }
  522. return nil
  523. }
  524. // DeleteFile - delete a file at path.
  525. func (s fsStorage) DeleteFile(volume, path string) error {
  526. volumeDir, err := s.getVolumeDir(volume)
  527. if err != nil {
  528. log.WithFields(logrus.Fields{
  529. "diskPath": s.diskPath,
  530. "volume": volume,
  531. }).Debugf("getVolumeDir failed with %s", err)
  532. return err
  533. }
  534. // Following code is needed so that we retain "/" suffix if any in
  535. // path argument.
  536. filePath := pathJoin(volumeDir, path)
  537. // Delete file and delete parent directory as well if its empty.
  538. return deleteFile(volumeDir, filePath)
  539. }
  540. // RenameFile - rename file.
  541. func (s fsStorage) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) error {
  542. srcVolumeDir, err := s.getVolumeDir(srcVolume)
  543. if err != nil {
  544. log.WithFields(logrus.Fields{
  545. "diskPath": s.diskPath,
  546. "volume": srcVolume,
  547. }).Errorf("getVolumeDir failed with %s", err)
  548. return err
  549. }
  550. dstVolumeDir, err := s.getVolumeDir(dstVolume)
  551. if err != nil {
  552. log.WithFields(logrus.Fields{
  553. "diskPath": s.diskPath,
  554. "volume": dstVolume,
  555. }).Errorf("getVolumeDir failed with %s", err)
  556. return err
  557. }
  558. if err = os.MkdirAll(slashpath.Join(dstVolumeDir, slashpath.Dir(dstPath)), 0755); err != nil {
  559. // File path cannot be verified since one of the parents is a file.
  560. if strings.Contains(err.Error(), "not a directory") {
  561. return errFileAccessDenied
  562. }
  563. log.Errorf("os.MkdirAll failed with %s", err)
  564. return err
  565. }
  566. err = os.Rename(slashpath.Join(srcVolumeDir, srcPath), slashpath.Join(dstVolumeDir, dstPath))
  567. if err != nil {
  568. if os.IsNotExist(err) {
  569. return errFileNotFound
  570. }
  571. log.Errorf("os.Rename failed with %s", err)
  572. return err
  573. }
  574. return nil
  575. }