
17 changed files with 476 additions and 483 deletions
-
8api-router.go
-
18bucket-handlers.go
-
6bucket-policy-handlers.go
-
7fs.go
-
4generic-handlers.go
-
239network-fs.go
-
33object-api-interface.go
-
24object-handlers.go
-
182object_api_suite_test.go
-
31routers.go
-
38server-main.go
-
10server_test.go
-
178storage-network.go
-
23storage-rpc-datatypes.go
-
126storage-rpc-server.go
-
24web-handlers.go
-
8web-router.go
@ -0,0 +1,239 @@ |
|||
/* |
|||
* Minio Cloud Storage, (C) 2016 Minio, Inc. |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
|
|||
package main |
|||
|
|||
import ( |
|||
"errors" |
|||
"fmt" |
|||
"io" |
|||
"net/http" |
|||
"net/rpc" |
|||
"net/url" |
|||
urlpath "path" |
|||
"strconv" |
|||
"strings" |
|||
"time" |
|||
) |
|||
|
|||
type networkFS struct { |
|||
netAddr string |
|||
netPath string |
|||
rpcClient *rpc.Client |
|||
httpClient *http.Client |
|||
} |
|||
|
|||
const ( |
|||
connected = "200 Connected to Go RPC" |
|||
dialTimeoutSecs = 30 // 30 seconds.
|
|||
) |
|||
|
|||
// splits network path into its components Address and Path.
|
|||
func splitNetPath(networkPath string) (netAddr, netPath string) { |
|||
index := strings.LastIndex(networkPath, ":") |
|||
netAddr = networkPath[:index] |
|||
netPath = networkPath[index+1:] |
|||
return netAddr, netPath |
|||
} |
|||
|
|||
// Initialize new network file system.
|
|||
func newNetworkFS(networkPath string) (StorageAPI, error) { |
|||
// Input validation.
|
|||
if networkPath == "" && strings.LastIndex(networkPath, ":") != -1 { |
|||
return nil, errInvalidArgument |
|||
} |
|||
|
|||
// TODO validate netAddr and netPath.
|
|||
netAddr, netPath := splitNetPath(networkPath) |
|||
|
|||
// Dial minio rpc storage http path.
|
|||
rpcClient, err := rpc.DialHTTPPath("tcp", netAddr, "/minio/rpc/storage") |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
|
|||
// Initialize http client.
|
|||
httpClient := &http.Client{ |
|||
// Setting a sensible time out of 6minutes to wait for
|
|||
// response headers. Request is pro-actively cancelled
|
|||
// after 6minutes if no response was received from server.
|
|||
Timeout: 6 * time.Minute, |
|||
Transport: http.DefaultTransport, |
|||
} |
|||
|
|||
// Initialize network storage.
|
|||
ndisk := &networkFS{ |
|||
netAddr: netAddr, |
|||
netPath: netPath, |
|||
rpcClient: rpcClient, |
|||
httpClient: httpClient, |
|||
} |
|||
|
|||
// Returns successfully here.
|
|||
return ndisk, nil |
|||
} |
|||
|
|||
// MakeVol - make a volume.
|
|||
func (n networkFS) MakeVol(volume string) error { |
|||
reply := GenericReply{} |
|||
if err := n.rpcClient.Call("Storage.MakeVolHandler", volume, &reply); err != nil { |
|||
if err.Error() == errVolumeExists.Error() { |
|||
return errVolumeExists |
|||
} |
|||
return err |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
// ListVols - List all volumes.
|
|||
func (n networkFS) ListVols() (vols []VolInfo, err error) { |
|||
ListVols := ListVolsReply{} |
|||
err = n.rpcClient.Call("Storage.ListVolsHandler", "", &ListVols) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
return ListVols.Vols, nil |
|||
} |
|||
|
|||
// StatVol - get current Stat volume info.
|
|||
func (n networkFS) StatVol(volume string) (volInfo VolInfo, err error) { |
|||
if err = n.rpcClient.Call("Storage.StatVolHandler", volume, &volInfo); err != nil { |
|||
if err.Error() == errVolumeNotFound.Error() { |
|||
return VolInfo{}, errVolumeNotFound |
|||
} |
|||
return VolInfo{}, err |
|||
} |
|||
return volInfo, nil |
|||
} |
|||
|
|||
// DeleteVol - Delete a volume.
|
|||
func (n networkFS) DeleteVol(volume string) error { |
|||
reply := GenericReply{} |
|||
if err := n.rpcClient.Call("Storage.DeleteVolHandler", volume, &reply); err != nil { |
|||
if err.Error() == errVolumeNotFound.Error() { |
|||
return errVolumeNotFound |
|||
} |
|||
return err |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
// File operations.
|
|||
|
|||
// CreateFile - create file.
|
|||
func (n networkFS) CreateFile(volume, path string) (writeCloser io.WriteCloser, err error) { |
|||
writeURL := new(url.URL) |
|||
writeURL.Scheme = "http" // TODO fix this.
|
|||
writeURL.Host = n.netAddr |
|||
writeURL.Path = fmt.Sprintf("/minio/rpc/storage/upload/%s", urlpath.Join(volume, path)) |
|||
|
|||
contentType := "application/octet-stream" |
|||
readCloser, writeCloser := io.Pipe() |
|||
go func() { |
|||
resp, err := n.httpClient.Post(writeURL.String(), contentType, readCloser) |
|||
if err != nil { |
|||
readCloser.CloseWithError(err) |
|||
return |
|||
} |
|||
if resp != nil { |
|||
if resp.StatusCode != http.StatusNotFound { |
|||
readCloser.CloseWithError(errFileNotFound) |
|||
return |
|||
} |
|||
readCloser.CloseWithError(errors.New("Invalid response.")) |
|||
} |
|||
}() |
|||
return writeCloser, nil |
|||
} |
|||
|
|||
// StatFile - get latest Stat information for a file at path.
|
|||
func (n networkFS) StatFile(volume, path string) (fileInfo FileInfo, err error) { |
|||
if err = n.rpcClient.Call("Storage.StatFileHandler", StatFileArgs{ |
|||
Vol: volume, |
|||
Path: path, |
|||
}, &fileInfo); err != nil { |
|||
if err.Error() == errVolumeNotFound.Error() { |
|||
return FileInfo{}, errVolumeNotFound |
|||
} else if err.Error() == errFileNotFound.Error() { |
|||
return FileInfo{}, errFileNotFound |
|||
} else if err.Error() == errIsNotRegular.Error() { |
|||
return FileInfo{}, errFileNotFound |
|||
} |
|||
return FileInfo{}, err |
|||
} |
|||
return fileInfo, nil |
|||
} |
|||
|
|||
// ReadFile - reads a file.
|
|||
func (n networkFS) ReadFile(volume string, path string, offset int64) (reader io.ReadCloser, err error) { |
|||
readURL := new(url.URL) |
|||
readURL.Scheme = "http" // TODO fix this.
|
|||
readURL.Host = n.netAddr |
|||
readURL.Path = fmt.Sprintf("/minio/rpc/storage/download/%s", urlpath.Join(volume, path)) |
|||
readQuery := make(url.Values) |
|||
readQuery.Set("offset", strconv.FormatInt(offset, 10)) |
|||
readURL.RawQuery = readQuery.Encode() |
|||
resp, err := n.httpClient.Get(readURL.String()) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
if resp.StatusCode != http.StatusOK { |
|||
if resp.StatusCode == http.StatusNotFound { |
|||
return nil, errFileNotFound |
|||
} |
|||
return nil, errors.New("Invalid response") |
|||
} |
|||
return resp.Body, nil |
|||
} |
|||
|
|||
// ListFiles - List all files in a volume.
|
|||
func (n networkFS) ListFiles(volume, prefix, marker string, recursive bool, count int) (files []FileInfo, eof bool, err error) { |
|||
listFilesReply := ListFilesReply{} |
|||
if err = n.rpcClient.Call("Storage.ListFilesHandler", ListFilesArgs{ |
|||
Vol: volume, |
|||
Prefix: prefix, |
|||
Marker: marker, |
|||
Recursive: recursive, |
|||
Count: count, |
|||
}, &listFilesReply); err != nil { |
|||
if err.Error() == errVolumeNotFound.Error() { |
|||
return nil, true, errVolumeNotFound |
|||
} |
|||
return nil, true, err |
|||
} |
|||
// List of files.
|
|||
files = listFilesReply.Files |
|||
// EOF.
|
|||
eof = listFilesReply.EOF |
|||
return files, eof, nil |
|||
} |
|||
|
|||
// DeleteFile - Delete a file at path.
|
|||
func (n networkFS) DeleteFile(volume, path string) (err error) { |
|||
reply := GenericReply{} |
|||
if err = n.rpcClient.Call("Storage.DeleteFileHandler", DeleteFileArgs{ |
|||
Vol: volume, |
|||
Path: path, |
|||
}, &reply); err != nil { |
|||
if err.Error() == errVolumeNotFound.Error() { |
|||
return errVolumeNotFound |
|||
} else if err.Error() == errFileNotFound.Error() { |
|||
return errFileNotFound |
|||
} |
|||
return err |
|||
} |
|||
return nil |
|||
} |
@ -1,33 +0,0 @@ |
|||
package main |
|||
|
|||
import ( |
|||
"io" |
|||
|
|||
"github.com/minio/minio/pkg/probe" |
|||
) |
|||
|
|||
// ObjectAPI interface.
|
|||
type ObjectAPI interface { |
|||
// Bucket resource API.
|
|||
DeleteBucket(bucket string) *probe.Error |
|||
ListBuckets() ([]BucketInfo, *probe.Error) |
|||
MakeBucket(bucket string) *probe.Error |
|||
GetBucketInfo(bucket string) (BucketInfo, *probe.Error) |
|||
|
|||
// Bucket query API.
|
|||
ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, *probe.Error) |
|||
ListMultipartUploads(bucket, objectPrefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, *probe.Error) |
|||
|
|||
// Object resource API.
|
|||
GetObject(bucket, object string, startOffset int64) (io.ReadCloser, *probe.Error) |
|||
GetObjectInfo(bucket, object string) (ObjectInfo, *probe.Error) |
|||
PutObject(bucket string, object string, size int64, data io.Reader, metadata map[string]string) (ObjectInfo, *probe.Error) |
|||
DeleteObject(bucket, object string) *probe.Error |
|||
|
|||
// Object query API.
|
|||
NewMultipartUpload(bucket, object string) (string, *probe.Error) |
|||
PutObjectPart(bucket, object, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (string, *probe.Error) |
|||
ListObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (ListPartsInfo, *probe.Error) |
|||
CompleteMultipartUpload(bucket string, object string, uploadID string, parts []completePart) (ObjectInfo, *probe.Error) |
|||
AbortMultipartUpload(bucket, object, uploadID string) *probe.Error |
|||
} |
@ -1,178 +0,0 @@ |
|||
/* |
|||
* Minio Cloud Storage, (C) 2016 Minio, Inc. |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
|
|||
package main |
|||
|
|||
import ( |
|||
"errors" |
|||
"io" |
|||
"net" |
|||
"net/http" |
|||
"net/rpc" |
|||
"time" |
|||
) |
|||
|
|||
type networkStorage struct { |
|||
address string |
|||
connection *rpc.Client |
|||
httpClient *http.Client |
|||
} |
|||
|
|||
const ( |
|||
connected = "200 Connected to Go RPC" |
|||
dialTimeoutSecs = 30 // 30 seconds.
|
|||
) |
|||
|
|||
// Initialize new network storage.
|
|||
func newNetworkStorage(address string) (StorageAPI, error) { |
|||
// Dial to the address with timeout of 30secs, this includes DNS resolution.
|
|||
conn, err := net.DialTimeout("tcp", address, dialTimeoutSecs*time.Second) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
|
|||
// Initialize rpc client with dialed connection.
|
|||
rpcClient := rpc.NewClient(conn) |
|||
|
|||
// Initialize http client.
|
|||
httpClient := &http.Client{ |
|||
// Setting a sensible time out of 2minutes to wait for
|
|||
// response headers. Request is pro-actively cancelled
|
|||
// after 2minutes if no response was received from server.
|
|||
Timeout: 2 * time.Minute, |
|||
Transport: http.DefaultTransport, |
|||
} |
|||
|
|||
// Initialize network storage.
|
|||
ndisk := &networkStorage{ |
|||
address: address, |
|||
connection: rpcClient, |
|||
httpClient: httpClient, |
|||
} |
|||
|
|||
// Returns successfully here.
|
|||
return ndisk, nil |
|||
} |
|||
|
|||
// MakeVol - make a volume.
|
|||
func (n networkStorage) MakeVol(volume string) error { |
|||
reply := GenericReply{} |
|||
return n.connection.Call("Storage.MakeVolHandler", volume, &reply) |
|||
} |
|||
|
|||
// ListVols - List all volumes.
|
|||
func (n networkStorage) ListVols() (vols []VolInfo, err error) { |
|||
ListVols := ListVolsReply{} |
|||
err = n.connection.Call("Storage.ListVolsHandler", "", &ListVols) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
return ListVols.Vols, nil |
|||
} |
|||
|
|||
// StatVol - get current Stat volume info.
|
|||
func (n networkStorage) StatVol(volume string) (volInfo VolInfo, err error) { |
|||
if err = n.connection.Call("Storage.StatVolHandler", volume, &volInfo); err != nil { |
|||
return VolInfo{}, err |
|||
} |
|||
return volInfo, nil |
|||
} |
|||
|
|||
// DeleteVol - Delete a volume.
|
|||
func (n networkStorage) DeleteVol(volume string) error { |
|||
reply := GenericReply{} |
|||
return n.connection.Call("Storage.DeleteVolHandler", volume, &reply) |
|||
} |
|||
|
|||
// File operations.
|
|||
|
|||
// CreateFile - create file.
|
|||
func (n networkStorage) CreateFile(volume, path string) (writeCloser io.WriteCloser, err error) { |
|||
createFileReply := CreateFileReply{} |
|||
if err = n.connection.Call("Storage.CreateFileHandler", CreateFileArgs{ |
|||
Vol: volume, |
|||
Path: path, |
|||
}, &createFileReply); err != nil { |
|||
return nil, err |
|||
} |
|||
contentType := "application/octet-stream" |
|||
readCloser, writeCloser := io.Pipe() |
|||
defer readCloser.Close() |
|||
go n.httpClient.Post(createFileReply.URL, contentType, readCloser) |
|||
return writeCloser, nil |
|||
} |
|||
|
|||
// StatFile - get latest Stat information for a file at path.
|
|||
func (n networkStorage) StatFile(volume, path string) (fileInfo FileInfo, err error) { |
|||
if err = n.connection.Call("Storage.StatFileHandler", StatFileArgs{ |
|||
Vol: volume, |
|||
Path: path, |
|||
}, &fileInfo); err != nil { |
|||
return FileInfo{}, err |
|||
} |
|||
return fileInfo, nil |
|||
} |
|||
|
|||
// ReadFile - reads a file.
|
|||
func (n networkStorage) ReadFile(volume string, path string, offset int64) (reader io.ReadCloser, err error) { |
|||
readFileReply := ReadFileReply{} |
|||
if err = n.connection.Call("Storage.ReadFileHandler", ReadFileArgs{ |
|||
Vol: volume, |
|||
Path: path, |
|||
Offset: offset, |
|||
}, &readFileReply); err != nil { |
|||
return nil, err |
|||
} |
|||
resp, err := n.httpClient.Get(readFileReply.URL) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
if resp.StatusCode != http.StatusOK { |
|||
return nil, errors.New("Invalid response") |
|||
} |
|||
return resp.Body, nil |
|||
} |
|||
|
|||
// ListFiles - List all files in a volume.
|
|||
func (n networkStorage) ListFiles(volume, prefix, marker string, recursive bool, count int) (files []FileInfo, eof bool, err error) { |
|||
listFilesReply := ListFilesReply{} |
|||
if err = n.connection.Call("Storage.ListFilesHandler", ListFilesArgs{ |
|||
Vol: volume, |
|||
Prefix: prefix, |
|||
Marker: marker, |
|||
Recursive: recursive, |
|||
Count: count, |
|||
}, &listFilesReply); err != nil { |
|||
return nil, true, err |
|||
} |
|||
// List of files.
|
|||
files = listFilesReply.Files |
|||
// EOF.
|
|||
eof = listFilesReply.EOF |
|||
return files, eof, nil |
|||
} |
|||
|
|||
// DeleteFile - Delete a file at path.
|
|||
func (n networkStorage) DeleteFile(volume, path string) (err error) { |
|||
reply := GenericReply{} |
|||
if err = n.connection.Call("Storage.DeleteFileHandler", DeleteFileArgs{ |
|||
Vol: volume, |
|||
Path: path, |
|||
}, &reply); err != nil { |
|||
return err |
|||
} |
|||
return nil |
|||
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue