diff --git a/cmd/browser-peer-rpc.go b/cmd/browser-peer-rpc.go index 240eb8d29..b83200ef6 100644 --- a/cmd/browser-peer-rpc.go +++ b/cmd/browser-peer-rpc.go @@ -78,8 +78,11 @@ func (br *browserPeerAPIHandlers) SetAuthPeer(args SetAuthPeerArgs, reply *Gener // Sends SetAuthPeer RPCs to all peers in the Minio cluster func updateCredsOnPeers(creds credential) map[string]error { - // Get list of peers (from globalS3Peers) - peers := globalS3Peers.GetPeers() + // Get list of peer addresses (from globalS3Peers) + peers := []string{} + for _, p := range globalS3Peers { + peers = append(peers, p.addr) + } // Array of errors for each peer errs := make([]error, len(peers)) diff --git a/cmd/bucket-metadata.go b/cmd/bucket-metadata.go new file mode 100644 index 000000000..10d1d7cf2 --- /dev/null +++ b/cmd/bucket-metadata.go @@ -0,0 +1,152 @@ +/* + * Minio Cloud Storage, (C) 2014-2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "encoding/json" + "net/rpc" +) + +// BucketMetaState - Interface to update bucket metadata in-memory +// state. +type BucketMetaState interface { + // Updates bucket notification + UpdateBucketNotification(args *SetBNPArgs) error + + // Updates bucket listener + UpdateBucketListener(args *SetBLPArgs) error + + // Updates bucket policy + UpdateBucketPolicy(args *SetBPPArgs) error + + // Sends event + SendEvent(args *EventArgs) error +} + +// Type that implements BucketMetaState for local node. +type localBMS struct { + ObjectAPI func() ObjectLayer +} + +// localBMS.UpdateBucketNotification - updates in-memory global bucket +// notification info. +func (lc *localBMS) UpdateBucketNotification(args *SetBNPArgs) error { + // check if object layer is available. + objAPI := lc.ObjectAPI() + if objAPI == nil { + return errServerNotInitialized + } + + globalEventNotifier.SetBucketNotificationConfig(args.Bucket, args.NCfg) + + return nil +} + +// localBMS.UpdateBucketListener - updates in-memory global bucket +// listeners info. +func (lc *localBMS) UpdateBucketListener(args *SetBLPArgs) error { + // check if object layer is available. + objAPI := lc.ObjectAPI() + if objAPI == nil { + return errServerNotInitialized + } + + // Update in-memory notification config. + return globalEventNotifier.SetBucketListenerConfig(args.Bucket, args.LCfg) +} + +// localBMS.UpdateBucketPolicy - updates in-memory global bucket +// policy info. +func (lc *localBMS) UpdateBucketPolicy(args *SetBPPArgs) error { + // check if object layer is available. + objAPI := lc.ObjectAPI() + if objAPI == nil { + return errServerNotInitialized + } + + var pCh policyChange + if err := json.Unmarshal(args.PChBytes, &pCh); err != nil { + return err + } + + return globalBucketPolicies.SetBucketPolicy(args.Bucket, pCh) +} + +// localBMS.SendEvent - sends event to local event notifier via +// `globalEventNotifier` +func (lc *localBMS) SendEvent(args *EventArgs) error { + // check if object layer is available. + objAPI := lc.ObjectAPI() + if objAPI == nil { + return errServerNotInitialized + } + + return globalEventNotifier.SendListenerEvent(args.Arn, args.Event) +} + +// Type that implements BucketMetaState for remote node. +type remoteBMS struct { + *AuthRPCClient +} + +// remoteBMS.UpdateBucketNotification - sends bucket notification +// change to remote peer via RPC call. +func (rc *remoteBMS) UpdateBucketNotification(args *SetBNPArgs) error { + reply := GenericReply{} + err := rc.Call("S3.SetBucketNotificationPeer", args, &reply) + // Check for network error and retry once. + if err != nil && err.Error() == rpc.ErrShutdown.Error() { + err = rc.Call("S3.SetBucketNotificationPeer", args, &reply) + } + return err +} + +// remoteBMS.UpdateBucketListener - sends bucket listener change to +// remote peer via RPC call. +func (rc *remoteBMS) UpdateBucketListener(args *SetBLPArgs) error { + reply := GenericReply{} + err := rc.Call("S3.SetBucketListenerPeer", args, &reply) + // Check for network error and retry once. + if err != nil && err.Error() == rpc.ErrShutdown.Error() { + err = rc.Call("S3.SetBucketListenerPeer", args, &reply) + } + return err +} + +// remoteBMS.UpdateBucketPolicy - sends bucket policy change to remote +// peer via RPC call. +func (rc *remoteBMS) UpdateBucketPolicy(args *SetBPPArgs) error { + reply := GenericReply{} + err := rc.Call("S3.SetBucketPolicyPeer", args, &reply) + // Check for network error and retry once. + if err != nil && err.Error() == rpc.ErrShutdown.Error() { + err = rc.Call("S3.SetBucketPolicyPeer", args, &reply) + } + return err +} + +// remoteBMS.SendEvent - sends event for bucket listener to remote +// peer via RPC call. +func (rc *remoteBMS) SendEvent(args *EventArgs) error { + reply := GenericReply{} + err := rc.Call("S3.Event", args, &reply) + // Check for network error and retry once. + if err != nil && err.Error() == rpc.ErrShutdown.Error() { + err = rc.Call("S3.Event", args, &reply) + } + return err +} diff --git a/cmd/bucket-notification-handlers.go b/cmd/bucket-notification-handlers.go index b4f1e7960..8068ef2ae 100644 --- a/cmd/bucket-notification-handlers.go +++ b/cmd/bucket-notification-handlers.go @@ -380,7 +380,7 @@ func AddBucketListenerConfig(bucket string, lcfg *listenerConfig, objAPI ObjectL defer nsMutex.Unlock(bucket, "", opsID) // update persistent config if dist XL - if globalS3Peers.isDistXL { + if globalIsDistXL { err := persistListenerConfig(bucket, listenerCfgs, objAPI) if err != nil { errorIf(err, "Error persisting listener config when adding a listener.") @@ -422,7 +422,7 @@ func RemoveBucketListenerConfig(bucket string, lcfg *listenerConfig, objAPI Obje defer nsMutex.Unlock(bucket, "", opsID) // update persistent config if dist XL - if globalS3Peers.isDistXL { + if globalIsDistXL { err := persistListenerConfig(bucket, updatedLcfgs, objAPI) if err != nil { errorIf(err, "Error persisting listener config when removing a listener.") diff --git a/cmd/control-router.go b/cmd/control-router.go index e7730b323..8db496a23 100644 --- a/cmd/control-router.go +++ b/cmd/control-router.go @@ -30,7 +30,7 @@ const ( // Initializes remote control clients for making remote requests. func initRemoteControlClients(srvCmdConfig serverCmdConfig) []*AuthRPCClient { - if !srvCmdConfig.isDistXL { + if !globalIsDistXL { return nil } // Initialize auth rpc clients. @@ -72,7 +72,7 @@ func registerControlRPCRouter(mux *router.Router, srvCmdConfig serverCmdConfig) // Initialize Control. ctrlHandlers := &controlAPIHandlers{ ObjectAPI: newObjectLayerFn, - IsXL: srvCmdConfig.isDistXL || len(srvCmdConfig.storageDisks) > 1, + IsXL: globalIsDistXL || len(srvCmdConfig.storageDisks) > 1, RemoteControls: initRemoteControlClients(srvCmdConfig), LocalNode: getLocalAddress(srvCmdConfig), StorageDisks: srvCmdConfig.storageDisks, diff --git a/cmd/control-router_test.go b/cmd/control-router_test.go index 2d46b8f77..2ded2acee 100644 --- a/cmd/control-router_test.go +++ b/cmd/control-router_test.go @@ -30,20 +30,20 @@ func TestInitRemoteControlClients(t *testing.T) { defer removeAll(rootPath) testCases := []struct { + isDistXL bool srvCmdConfig serverCmdConfig totalClients int }{ // Test - 1 no allocation if server config is not distributed XL. { - srvCmdConfig: serverCmdConfig{ - isDistXL: false, - }, + isDistXL: false, + srvCmdConfig: serverCmdConfig{}, totalClients: 0, }, // Test - 2 two clients allocated with 4 disks with 2 disks on same node each. { + isDistXL: true, srvCmdConfig: serverCmdConfig{ - isDistXL: true, endpoints: []*url.URL{{ Scheme: "http", Host: "10.1.10.1:9000", @@ -63,8 +63,8 @@ func TestInitRemoteControlClients(t *testing.T) { }, // Test - 3 4 clients allocated with 4 disks with 1 disk on each node. { + isDistXL: true, srvCmdConfig: serverCmdConfig{ - isDistXL: true, endpoints: []*url.URL{{ Scheme: "http", Host: "10.1.10.1:9000", Path: "/mnt/disk1", @@ -85,6 +85,7 @@ func TestInitRemoteControlClients(t *testing.T) { // Evaluate and validate all test cases. for i, testCase := range testCases { + globalIsDistXL = testCase.isDistXL rclients := initRemoteControlClients(testCase.srvCmdConfig) if len(rclients) != testCase.totalClients { t.Errorf("Test %d, Expected %d, got %d RPC clients.", i+1, testCase.totalClients, len(rclients)) diff --git a/cmd/event-notifier.go b/cmd/event-notifier.go index d648afd9b..be3612b7f 100644 --- a/cmd/event-notifier.go +++ b/cmd/event-notifier.go @@ -350,7 +350,7 @@ func loadListenerConfig(bucket string, objAPI ObjectLayer) ([]listenerConfig, er // in single node mode, there are no peers, so in this case // there is no configuration to load, as any previously // connected listen clients have been disconnected - if !globalS3Peers.isDistXL { + if !globalIsDistXL { return nil, nil } diff --git a/cmd/event-notifier_test.go b/cmd/event-notifier_test.go index 420e77164..97062b27f 100644 --- a/cmd/event-notifier_test.go +++ b/cmd/event-notifier_test.go @@ -291,7 +291,7 @@ func TestInitEventNotifier(t *testing.T) { // needed to load listener config from disk for testing (in // single peer mode, the listener config is ingored, but here // we want to test the loading from disk too.) - globalS3Peers.isDistXL = true + globalIsDistXL = true // test event notifier init if err := initEventNotifier(obj); err != nil { @@ -366,7 +366,7 @@ func TestListenBucketNotification(t *testing.T) { // needed to load listener config from disk for testing (in // single peer mode, the listener config is ingored, but here // we want to test the loading from disk too.) - globalS3Peers.isDistXL = true + globalIsDistXL = true // Init event notifier if err := initEventNotifier(obj); err != nil { diff --git a/cmd/globals.go b/cmd/globals.go index 3a1f4451d..c307b2eaa 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -41,7 +41,8 @@ const ( ) var ( - globalQuiet = false // Quiet flag set via command line + globalQuiet = false // Quiet flag set via command line + globalIsDistXL = false // "Is Distributed?" flag. // Add new global flags here. diff --git a/cmd/lock-rpc-server_test.go b/cmd/lock-rpc-server_test.go index 2109fac6f..230f6bb02 100644 --- a/cmd/lock-rpc-server_test.go +++ b/cmd/lock-rpc-server_test.go @@ -447,13 +447,14 @@ func TestLockServers(t *testing.T) { } globalMinioHost = "" testCases := []struct { + isDistXL bool srvCmdConfig serverCmdConfig totalLockServers int }{ // Test - 1 one lock server initialized. { + isDistXL: true, srvCmdConfig: serverCmdConfig{ - isDistXL: true, endpoints: []*url.URL{{ Scheme: "http", Host: "localhost:9000", @@ -476,8 +477,8 @@ func TestLockServers(t *testing.T) { }, // Test - 2 two servers possible, 1 ignored. { + isDistXL: true, srvCmdConfig: serverCmdConfig{ - isDistXL: true, endpoints: []*url.URL{{ Scheme: "http", Host: "localhost:9000", @@ -507,6 +508,7 @@ func TestLockServers(t *testing.T) { // Validates lock server initialization. for i, testCase := range testCases { + globalIsDistXL = testCase.isDistXL lockServers := newLockServers(testCase.srvCmdConfig) if len(lockServers) != testCase.totalLockServers { t.Fatalf("Test %d: Expected total %d, got %d", i+1, testCase.totalLockServers, len(lockServers)) diff --git a/cmd/notify-listener.go b/cmd/notify-listener.go index d0183fbe1..2079e7eff 100644 --- a/cmd/notify-listener.go +++ b/cmd/notify-listener.go @@ -27,6 +27,7 @@ import ( type listenerConn struct { TargetAddr string ListenerARN string + BMSClient BucketMetaState } type listenerLogger struct { @@ -35,7 +36,8 @@ type listenerLogger struct { } func newListenerLogger(listenerArn, targetAddr string) (*listenerLogger, error) { - if globalS3Peers.GetPeerClient(targetAddr) == nil { + bmsClient := globalS3Peers.GetPeerClient(targetAddr) + if bmsClient == nil { return nil, fmt.Errorf( "Peer %s was not initialized - bug!", targetAddr, @@ -44,6 +46,7 @@ func newListenerLogger(listenerArn, targetAddr string) (*listenerLogger, error) lc := listenerConn{ TargetAddr: targetAddr, ListenerARN: listenerArn, + BMSClient: bmsClient, } lcLog := logrus.New() @@ -66,21 +69,14 @@ func (lc listenerConn) Fire(entry *logrus.Entry) error { return nil } - // Fetch peer client object - client := globalS3Peers.GetPeerClient(lc.TargetAddr) - if client == nil { - return fmt.Errorf("Target %s client RPC object not available!", lc.TargetAddr) - } - // Send Event RPC call and return error arg := EventArgs{Event: notificationEvent, Arn: lc.ListenerARN} - reply := GenericReply{} - err := client.Call("S3.Event", &arg, &reply) + err := lc.BMSClient.SendEvent(&arg) // In case connection is shutdown, retry once. if err != nil { if err.Error() == rpc.ErrShutdown.Error() { - err = client.Call("S3.Event", &arg, &reply) + err = lc.BMSClient.SendEvent(&arg) } } return err diff --git a/cmd/routers.go b/cmd/routers.go index 5d4585eeb..5ce1c5fed 100644 --- a/cmd/routers.go +++ b/cmd/routers.go @@ -83,7 +83,7 @@ func configureServerHandler(srvCmdConfig serverCmdConfig) (http.Handler, error) mux := router.NewRouter() // Initialize distributed NS lock. - if srvCmdConfig.isDistXL { + if globalIsDistXL { // Register storage rpc router only if its a distributed setup. err := registerStorageRPCRouters(mux, srvCmdConfig) if err != nil { diff --git a/cmd/s3-peer-client.go b/cmd/s3-peer-client.go index 719bbe239..2bdd809ca 100644 --- a/cmd/s3-peer-client.go +++ b/cmd/s3-peer-client.go @@ -19,179 +19,168 @@ package cmd import ( "encoding/json" "fmt" - "net/rpc" "net/url" "path" "sync" - "time" ) -type s3Peers struct { - // A map of peer server address (in `host:port` format) to RPC - // client connections. - rpcClients map[string]*AuthRPCClient - - mutex *sync.RWMutex - - // Is single-node? - isDistXL bool - - // Slice of all peer addresses (in `host:port` format). - peers []string +// s3Peer structs contains the address of a peer in the cluster, and +// its BucketMetaState interface objects. +type s3Peer struct { + // address in `host:port` format + addr string + // BucketMetaState client interface + bmsClient BucketMetaState } -func initGlobalS3Peers(eps []*url.URL) { - // Get list of de-duplicated peers. - peers := getAllPeers(eps) +// type representing all peers in the cluster +type s3Peers []s3Peer - // Initialize global state. - globalS3Peers = s3Peers{ - rpcClients: make(map[string]*AuthRPCClient), - mutex: &sync.RWMutex{}, - } +// makeS3Peers makes an s3Peers struct value from the given urls +// slice. The urls slice is assumed to be non-empty and free of nil +// values. +func makeS3Peers(eps []*url.URL) s3Peers { + var ret []s3Peer - // Initialize each peer connection. - for _, peer := range peers { - globalS3Peers.InitS3PeerClient(peer) - } - - // Save new peers - globalS3Peers.peers = peers + // map to store peers that are already added to ret + seenAddr := make(map[string]bool) - // store if this is a distributed setup or not. - globalS3Peers.isDistXL = len(globalS3Peers.peers) > 1 -} + // add local (self) as peer in the array + ret = append(ret, s3Peer{ + globalMinioAddr, + &localBMS{ObjectAPI: newObjectLayerFn}, + }) + seenAddr[globalMinioAddr] = true -func (s3p *s3Peers) GetPeers() []string { - return s3p.peers -} + // iterate over endpoints to find new remote peers and add + // them to ret. + for _, ep := range eps { + if ep.Host == "" { + continue + } -func (s3p *s3Peers) GetPeerClient(peer string) *AuthRPCClient { - // Take a read lock - s3p.mutex.RLock() - defer s3p.mutex.RUnlock() - return s3p.rpcClients[peer] -} + // Check if the remote host has been added already + if !seenAddr[ep.Host] { + cfg := authConfig{ + accessKey: serverConfig.GetCredential().AccessKeyID, + secretKey: serverConfig.GetCredential().SecretAccessKey, + address: ep.Host, + secureConn: isSSL(), + path: path.Join(reservedBucket, s3Path), + loginMethod: "S3.LoginHandler", + } -// Initializes a new RPC connection (or closes and re-opens if it -// already exists) to a peer. Note that peer address is in `host:port` -// format. -func (s3p *s3Peers) InitS3PeerClient(peer string) { - // Take a write lock - s3p.mutex.Lock() - defer s3p.mutex.Unlock() - - if s3p.rpcClients[peer] != nil { - _ = s3p.rpcClients[peer].Close() - delete(s3p.rpcClients, peer) - } - authCfg := &authConfig{ - accessKey: serverConfig.GetCredential().AccessKeyID, - secretKey: serverConfig.GetCredential().SecretAccessKey, - address: peer, - secureConn: isSSL(), - path: path.Join(reservedBucket, s3Path), - loginMethod: "S3.LoginHandler", + ret = append(ret, s3Peer{ + ep.Host, + &remoteBMS{newAuthClient(&cfg)}, + }) + seenAddr[ep.Host] = true + } } - s3p.rpcClients[peer] = newAuthClient(authCfg) -} -func (s3p *s3Peers) Close() error { - // Take a write lock - s3p.mutex.Lock() - defer s3p.mutex.Unlock() + return ret +} - for _, v := range s3p.rpcClients { - if err := v.Close(); err != nil { - return err - } - } - s3p.rpcClients = nil - s3p.peers = nil - return nil +// initGlobalS3Peers - initialize globalS3Peers by passing in +// endpoints - intended to be called early in program start-up. +func initGlobalS3Peers(eps []*url.URL) { + globalS3Peers = makeS3Peers(eps) } -// Returns the network addresses of all Minio servers in the cluster in `host:port` format. -func getAllPeers(eps []*url.URL) (peers []string) { - if eps == nil { - return nil - } - peers = []string{globalMinioAddr} // Starts with a default peer. - for _, ep := range eps { - if ep == nil { - return nil +// GetPeerClient - fetch BucketMetaState interface by peer address +func (s3p s3Peers) GetPeerClient(peer string) BucketMetaState { + for _, p := range s3p { + if p.addr == peer { + return p.bmsClient } - // Rest of the peers configured. - peers = append(peers, ep.Host) } - return peers + return nil } -// Make RPC calls with the given method and arguments to all the given -// peers (in parallel), and collects the results. Since the methods -// intended for use here, have only a success or failure response, we -// do not return/inspect the `reply` parameter in the RPC call. The -// function attempts to connect to a peer only once, and returns a map -// of peer address to error response. If the error is nil, it means -// the RPC succeeded. -func (s3p *s3Peers) SendRPC(peers []string, method string, args interface { - SetToken(token string) - SetTimestamp(tstamp time.Time) -}) map[string]error { - - // peer error responses array - errArr := make([]error, len(peers)) +// SendUpdate sends bucket metadata updates to all given peer +// indices. The update calls are sent in parallel, and errors are +// returned per peer in an array. The returned error arrayslice is +// always as long as s3p.peers.addr. +// +// The input peerIndex slice can be nil if the update is to be sent to +// all peers. This is the common case. +// +// The updates are sent via a type implementing the BucketMetaState +// interface. This makes sure that the local node is directly updated, +// and remote nodes are updated via RPC calls. +func (s3p s3Peers) SendUpdate(peerIndex []int, args interface{}) []error { + + // peer error array + errs := make([]error, len(s3p)) // Start a wait group and make RPC requests to peers. var wg sync.WaitGroup - for i, target := range peers { - wg.Add(1) - go func(ix int, target string) { - defer wg.Done() - reply := &GenericReply{} - // Get RPC client object safely. - client := s3p.GetPeerClient(target) - var err error - if client == nil { - err = fmt.Errorf("Requested client was not initialized - %v", - target) - } else { - err = client.Call(method, args, reply) - // Check for network errors and try - // again just once. - if err != nil { - if err.Error() == rpc.ErrShutdown.Error() { - err = client.Call(method, args, reply) - } - } - } - errArr[ix] = err - }(i, target) - } - // Wait for requests to complete. - wg.Wait() + // Function that sends update to peer at `index` + sendUpdateToPeer := func(index int) { + defer wg.Done() + var err error + // Get BMS client for peer at `index`. The index is + // already checked for being within array bounds. + client := s3p[index].bmsClient - // Map of errors - errsMap := make(map[string]error) - for i, errVal := range errArr { - if errVal != nil { - errsMap[peers[i]] = errVal + // Make the appropriate bucket metadata update + // according to the argument type + switch v := args.(type) { + case *SetBNPArgs: + err = client.UpdateBucketNotification(v) + + case *SetBLPArgs: + err = client.UpdateBucketListener(v) + + case *SetBPPArgs: + err = client.UpdateBucketPolicy(v) + + default: + err = fmt.Errorf("Unknown arg in BucketMetaState updater - %v", args) } + errs[index] = err } - return errsMap + // Special (but common) case of peerIndex == nil, implies send + // update to all peers. + if peerIndex == nil { + for idx := 0; idx < len(s3p); idx++ { + wg.Add(1) + go sendUpdateToPeer(idx) + } + } else { + // Send update only to given peer indices. + for _, idx := range peerIndex { + // check idx is in array bounds. + if !(idx >= 0 && idx < len(s3p)) { + errorIf( + fmt.Errorf("Bad peer index %d input to SendUpdate()", idx), + "peerIndex out of bounds", + ) + continue + } + wg.Add(1) + go sendUpdateToPeer(idx) + } + } + + // Wait for requests to complete and return + wg.Wait() + return errs } // S3PeersUpdateBucketNotification - Sends Update Bucket notification // request to all peers. Currently we log an error and continue. func S3PeersUpdateBucketNotification(bucket string, ncfg *notificationConfig) { setBNPArgs := &SetBNPArgs{Bucket: bucket, NCfg: ncfg} - peers := globalS3Peers.GetPeers() - errsMap := globalS3Peers.SendRPC(peers, "S3.SetBucketNotificationPeer", - setBNPArgs) - for peer, err := range errsMap { - errorIf(err, "Error sending peer update bucket notification to %s - %v", peer, err) + errs := globalS3Peers.SendUpdate(nil, setBNPArgs) + for idx, err := range errs { + errorIf( + err, + "Error sending update bucket notification to %s - %v", + globalS3Peers[idx].addr, err, + ) } } @@ -199,11 +188,13 @@ func S3PeersUpdateBucketNotification(bucket string, ncfg *notificationConfig) { // to all peers. Currently we log an error and continue. func S3PeersUpdateBucketListener(bucket string, lcfg []listenerConfig) { setBLPArgs := &SetBLPArgs{Bucket: bucket, LCfg: lcfg} - peers := globalS3Peers.GetPeers() - errsMap := globalS3Peers.SendRPC(peers, "S3.SetBucketListenerPeer", - setBLPArgs) - for peer, err := range errsMap { - errorIf(err, "Error sending peer update bucket listener to %s - %v", peer, err) + errs := globalS3Peers.SendUpdate(nil, setBLPArgs) + for idx, err := range errs { + errorIf( + err, + "Error sending update bucket listener to %s - %v", + globalS3Peers[idx].addr, err, + ) } } @@ -216,9 +207,12 @@ func S3PeersUpdateBucketPolicy(bucket string, pCh policyChange) { return } setBPPArgs := &SetBPPArgs{Bucket: bucket, PChBytes: byts} - peers := globalS3Peers.GetPeers() - errsMap := globalS3Peers.SendRPC(peers, "S3.SetBucketPolicyPeer", setBPPArgs) - for peer, err := range errsMap { - errorIf(err, "Error sending peer update bucket policy to %s - %v", peer, err) + errs := globalS3Peers.SendUpdate(nil, setBPPArgs) + for idx, err := range errs { + errorIf( + err, + "Error sending update bucket policy to %s - %v", + globalS3Peers[idx].addr, err, + ) } } diff --git a/cmd/s3-peer-client_test.go b/cmd/s3-peer-client_test.go index 31f471c5e..023bb619a 100644 --- a/cmd/s3-peer-client_test.go +++ b/cmd/s3-peer-client_test.go @@ -22,24 +22,42 @@ import ( "testing" ) -// Validates getAllPeers, fetches all peers based on list of storage endpoints. -func TestGetAllPeers(t *testing.T) { +// Validates makeS3Peers, fetches all peers based on list of storage +// endpoints. +func TestMakeS3Peers(t *testing.T) { + // Initialize configuration + root, err := newTestConfig("us-east-1") + if err != nil { + t.Fatalf("%s", err) + } + defer removeAll(root) + + // test cases testCases := []struct { - eps []*url.URL - peers []string + gMinioAddr string + eps []*url.URL + peers []string }{ - {nil, nil}, - {[]*url.URL{nil}, nil}, - {[]*url.URL{{Path: "/mnt/disk1"}}, []string{globalMinioAddr, ""}}, - {[]*url.URL{{Host: "localhost:9001"}}, []string{globalMinioAddr, - "localhost:9001", - }}, + {":9000", []*url.URL{{Path: "/mnt/disk1"}}, []string{":9000"}}, + {":9000", []*url.URL{{Host: "localhost:9001"}}, []string{":9000", "localhost:9001"}}, + {"m1:9000", []*url.URL{{Host: "m1:9000"}, {Host: "m2:9000"}, {Host: "m3:9000"}}, []string{"m1:9000", "m2:9000", "m3:9000"}}, + } + + getPeersHelper := func(s3p s3Peers) []string { + r := []string{} + for _, p := range s3p { + r = append(r, p.addr) + } + return r } + // execute tests for i, testCase := range testCases { - peers := getAllPeers(testCase.eps) - if !reflect.DeepEqual(testCase.peers, peers) { - t.Errorf("Test %d: Expected %s, got %s", i+1, testCase.peers, peers) + globalMinioAddr = testCase.gMinioAddr + s3peers := makeS3Peers(testCase.eps) + referencePeers := getPeersHelper(s3peers) + if !reflect.DeepEqual(testCase.peers, referencePeers) { + t.Errorf("Test %d: Expected %v, got %v", i+1, testCase.peers, referencePeers) } } } diff --git a/cmd/s3-peer-router.go b/cmd/s3-peer-router.go index dbf9d83d7..4d54f64ee 100644 --- a/cmd/s3-peer-router.go +++ b/cmd/s3-peer-router.go @@ -27,12 +27,14 @@ const ( ) type s3PeerAPIHandlers struct { - ObjectAPI func() ObjectLayer + *localBMS } func registerS3PeerRPCRouter(mux *router.Router) error { s3PeerHandlers := &s3PeerAPIHandlers{ - ObjectAPI: newObjectLayerFn, + &localBMS{ + ObjectAPI: newObjectLayerFn, + }, } s3PeerRPCServer := rpc.NewServer() diff --git a/cmd/s3-peer-rpc-handlers.go b/cmd/s3-peer-rpc-handlers.go index e412738d7..7eb85722d 100644 --- a/cmd/s3-peer-rpc-handlers.go +++ b/cmd/s3-peer-rpc-handlers.go @@ -16,10 +16,7 @@ package cmd -import ( - "encoding/json" - "time" -) +import "time" func (s3 *s3PeerAPIHandlers) LoginHandler(args *RPCLoginArgs, reply *RPCLoginReply) error { jwt, err := newJWT(defaultInterNodeJWTExpiry) @@ -57,16 +54,7 @@ func (s3 *s3PeerAPIHandlers) SetBucketNotificationPeer(args *SetBNPArgs, reply * return errInvalidToken } - // check if object layer is available. - objAPI := s3.ObjectAPI() - if objAPI == nil { - return errServerNotInitialized - } - - // Update in-memory notification config. - globalEventNotifier.SetBucketNotificationConfig(args.Bucket, args.NCfg) - - return nil + return s3.UpdateBucketNotification(args) } // SetBLPArgs - Arguments collection to SetBucketListenerPeer RPC call @@ -80,20 +68,13 @@ type SetBLPArgs struct { LCfg []listenerConfig } -func (s3 *s3PeerAPIHandlers) SetBucketListenerPeer(args SetBLPArgs, reply *GenericReply) error { +func (s3 *s3PeerAPIHandlers) SetBucketListenerPeer(args *SetBLPArgs, reply *GenericReply) error { // check auth if !isRPCTokenValid(args.Token) { return errInvalidToken } - // check if object layer is available. - objAPI := s3.ObjectAPI() - if objAPI == nil { - return errServerNotInitialized - } - - // Update in-memory notification config. - return globalEventNotifier.SetBucketListenerConfig(args.Bucket, args.LCfg) + return s3.UpdateBucketListener(args) } // EventArgs - Arguments collection for Event RPC call @@ -115,13 +96,7 @@ func (s3 *s3PeerAPIHandlers) Event(args *EventArgs, reply *GenericReply) error { return errInvalidToken } - // check if object layer is available. - objAPI := s3.ObjectAPI() - if objAPI == nil { - return errServerNotInitialized - } - - return globalEventNotifier.SendListenerEvent(args.Arn, args.Event) + return s3.SendEvent(args) } // SetBPPArgs - Arguments collection for SetBucketPolicyPeer RPC call @@ -136,22 +111,11 @@ type SetBPPArgs struct { } // tell receiving server to update a bucket policy -func (s3 *s3PeerAPIHandlers) SetBucketPolicyPeer(args SetBPPArgs, reply *GenericReply) error { +func (s3 *s3PeerAPIHandlers) SetBucketPolicyPeer(args *SetBPPArgs, reply *GenericReply) error { // check auth if !isRPCTokenValid(args.Token) { return errInvalidToken } - // check if object layer is available. - objAPI := s3.ObjectAPI() - if objAPI == nil { - return errServerNotInitialized - } - - var pCh policyChange - if err := json.Unmarshal(args.PChBytes, &pCh); err != nil { - return err - } - - return globalBucketPolicies.SetBucketPolicy(args.Bucket, pCh) + return s3.UpdateBucketPolicy(args) } diff --git a/cmd/server-main.go b/cmd/server-main.go index f0204e2d9..4a476da1a 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -103,7 +103,6 @@ type serverCmdConfig struct { serverAddr string endpoints []*url.URL ignoredEndpoints []*url.URL - isDistXL bool // True only if its distributed XL. storageDisks []StorageAPI } @@ -266,17 +265,16 @@ func checkSufficientDisks(eps []*url.URL) error { } // Returns if slice of disks is a distributed setup. -func isDistributedSetup(eps []*url.URL) (isDist bool) { +func isDistributedSetup(eps []*url.URL) bool { // Validate if one the disks is not local. for _, ep := range eps { if !isLocalStorage(ep) { - // One or more disks supplied as arguments are not - // attached to the local node. - isDist = true - break + // One or more disks supplied as arguments are + // not attached to the local node. + return true } } - return isDist + return false } // We just exit for invalid endpoints. @@ -446,7 +444,7 @@ func serverMain(c *cli.Context) { firstDisk := isLocalStorage(endpoints[0]) // Check if endpoints are part of distributed setup. - isDistXL := isDistributedSetup(endpoints) + globalIsDistXL = isDistributedSetup(endpoints) // Configure server. srvConfig := serverCmdConfig{ @@ -454,7 +452,6 @@ func serverMain(c *cli.Context) { endpoints: endpoints, ignoredEndpoints: ignoredEndpoints, storageDisks: storageDisks, - isDistXL: isDistXL, } // Configure server. @@ -462,12 +459,12 @@ func serverMain(c *cli.Context) { fatalIf(err, "Unable to configure one of server's RPC services.") // Set nodes for dsync for distributed setup. - if isDistXL { + if globalIsDistXL { fatalIf(initDsyncNodes(endpoints), "Unable to initialize distributed locking") } // Initialize name space lock. - initNSLock(isDistXL) + initNSLock(globalIsDistXL) // Initialize a new HTTP server. apiServer := NewServerMux(serverAddr, handler) diff --git a/cmd/test-utils_test.go b/cmd/test-utils_test.go index bb00a945e..7cd910999 100644 --- a/cmd/test-utils_test.go +++ b/cmd/test-utils_test.go @@ -411,8 +411,10 @@ func StartTestPeersRPCServer(t TestErrHandler, instanceType string) TestServer { // Run TestServer. testRPCServer.Server = httptest.NewServer(mux) + // Set as non-distributed. + globalIsDistXL = false + // initialize remainder of serverCmdConfig - srvCfg.isDistXL = false testRPCServer.SrvCmdCfg = srvCfg return testRPCServer diff --git a/cmd/utils.go b/cmd/utils.go index 9c8c9a9e4..d6a26be14 100644 --- a/cmd/utils.go +++ b/cmd/utils.go @@ -81,7 +81,7 @@ func checkDuplicateEndpoints(endpoints []*url.URL) error { // Find local node through the command line arguments. Returns in `host:port` format. func getLocalAddress(srvCmdConfig serverCmdConfig) string { - if !srvCmdConfig.isDistXL { + if !globalIsDistXL { return srvCmdConfig.serverAddr } for _, ep := range srvCmdConfig.endpoints { diff --git a/cmd/utils_test.go b/cmd/utils_test.go index 07899d140..5fb6be69c 100644 --- a/cmd/utils_test.go +++ b/cmd/utils_test.go @@ -229,13 +229,14 @@ func TestLocalAddress(t *testing.T) { globalMinioPort = "9000" globalMinioHost = "" testCases := []struct { + isDistXL bool srvCmdConfig serverCmdConfig localAddr string }{ // Test 1 - local address is found. { + isDistXL: true, srvCmdConfig: serverCmdConfig{ - isDistXL: true, endpoints: []*url.URL{{ Scheme: "http", Host: "localhost:9000", @@ -258,9 +259,9 @@ func TestLocalAddress(t *testing.T) { }, // Test 2 - local address is everything. { + isDistXL: false, srvCmdConfig: serverCmdConfig{ serverAddr: net.JoinHostPort("", globalMinioPort), - isDistXL: false, endpoints: []*url.URL{{ Path: "/mnt/disk1", }, { @@ -275,8 +276,8 @@ func TestLocalAddress(t *testing.T) { }, // Test 3 - local address is not found. { + isDistXL: true, srvCmdConfig: serverCmdConfig{ - isDistXL: true, endpoints: []*url.URL{{ Scheme: "http", Host: "1.1.1.1:9000", @@ -301,9 +302,9 @@ func TestLocalAddress(t *testing.T) { // name is specified in the --address option on the // server command line. { + isDistXL: false, srvCmdConfig: serverCmdConfig{ serverAddr: "play.minio.io:9000", - isDistXL: false, endpoints: []*url.URL{{ Path: "/mnt/disk1", }, { @@ -320,6 +321,7 @@ func TestLocalAddress(t *testing.T) { // Validates fetching local address. for i, testCase := range testCases { + globalIsDistXL = testCase.isDistXL localAddr := getLocalAddress(testCase.srvCmdConfig) if localAddr != testCase.localAddr { t.Fatalf("Test %d: Expected %s, got %s", i+1, testCase.localAddr, localAddr)