Bucket log audit outline design specification
reference resources
1. Source code analysis of Minio event notification module
1. Event notification module NotificationSys
// NotificationSys - notification system. type NotificationSys struct { sync.RWMutex targetList *event.TargetList targetResCh chan event.TargetIDResult bucketRulesMap map[string]event.RulesMap bucketRemoteTargetRulesMap map[string]map[event.TargetID]event.RulesMap peerClients []*peerRESTClient } // Send - sends event data to all matching targets. func (sys *NotificationSys) Send(args eventArgs) { sys.RLock() targetIDSet := sys.bucketRulesMap[args.BucketName].Match(args.EventName, args.Object.Name) sys.RUnlock() if len(targetIDSet) == 0 { return } sys.targetList.Send(args.ToEvent(true), targetIDSet, sys.targetResCh) }
2. target.Target
The notification targets to which bucket events can be published include Redis, MySQL, PostgreSQL, Kafka, Elasticsearch, etc.
For multiple target s, use sync Waitgroup sends concurrently.
// Target - event target interface type Target interface { ID() TargetID IsActive() (bool, error) Save(Event) error Send(string) error Close() error HasQueueStore() bool } // TargetList - holds list of targets indexed by target ID. type TargetList struct { sync.RWMutex targets map[TargetID]Target } // Send - sends events to targets identified by target IDs. func (list *TargetList) Send(event Event, targetIDset TargetIDSet, resCh chan<- TargetIDResult) { go func() { var wg sync.WaitGroup for id := range targetIDset { list.RLock() target, ok := list.targets[id] list.RUnlock() if ok { wg.Add(1) go func(id TargetID, target Target) { defer wg.Done() tgtRes := TargetIDResult{ID: id} if err := target.Save(event); err != nil { tgtRes.Err = err } resCh <- tgtRes }(id, target) } else { resCh <- TargetIDResult{ID: id} } } wg.Wait() }() }
3. target.RedisTarget
The notification target supports two formats: namespace and access.
If the namespacee format is used, MinIO synchronizes the objects in the bucket into entries in Redis hash. For each entry, corresponding to an object in a bucket, its key is set to "bucket name / object name", and value is an event data in JSON format about the MinIO object. If the object is updated or deleted, the entry of the object in the hash will be updated or deleted accordingly.
If access is used, Minio uses RPUSH Add the event to the list. Each element in this list is a list in JSON format. There are two elements in this list. The first element is a timestamp string, and the second element is a JSON object containing event data for operation on this bucket. In this format, the elements in the list will not be updated or deleted.
// RedisTarget - Redis target. type RedisTarget struct { id event.TargetID args RedisArgs pool *redis.Pool store Store firstPing bool loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{}) } // Save - saves the events to the store if questore is configured, which will be replayed when the redis connection is active. func (target *RedisTarget) Save(eventData event.Event) error { if target.store != nil { return target.store.Put(eventData) } _, err := target.IsActive() if err != nil { return err } return target.send(eventData) } // send - sends an event to the redis. func (target *RedisTarget) send(eventData event.Event) error { conn := target.pool.Get() defer func() { cErr := conn.Close() target.loggerOnce(context.Background(), cErr, target.ID()) }() if target.args.Format == event.NamespaceFormat { objectName, err := url.QueryUnescape(eventData.S3.Object.Key) if err != nil { return err } key := eventData.S3.Bucket.Name + "/" + objectName if eventData.EventName == event.ObjectRemovedDelete { _, err = conn.Do("HDEL", target.args.Key, key) } else { var data []byte if data, err = json.Marshal(struct{ Records []event.Event }{[]event.Event{eventData}}); err != nil { return err } _, err = conn.Do("HSET", target.args.Key, key, data) } if err != nil { return err } } if target.args.Format == event.AccessFormat { data, err := json.Marshal([]RedisAccessEvent{{Event: []event.Event{eventData}, EventTime: eventData.EventTime}}) if err != nil { return err } if _, err := conn.Do("RPUSH", target.args.Key, data); err != nil { return err } } return nil }
4. target.MySQLTarget
// MySQLTarget - MySQL target. type MySQLTarget struct { id event.TargetID args MySQLArgs updateStmt *sql.Stmt deleteStmt *sql.Stmt insertStmt *sql.Stmt db *sql.DB store Store firstPing bool loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{}) } // Save - saves the events to the store which will be replayed when the SQL connection is active. func (target *MySQLTarget) Save(eventData event.Event) error { if target.store != nil { return target.store.Put(eventData) } _, err := target.IsActive() if err != nil { return err } return target.send(eventData) } // send - sends an event to the mysql. func (target *MySQLTarget) send(eventData event.Event) error { if target.args.Format == event.NamespaceFormat { objectName, err := url.QueryUnescape(eventData.S3.Object.Key) if err != nil { return err } key := eventData.S3.Bucket.Name + "/" + objectName if eventData.EventName == event.ObjectRemovedDelete { _, err = target.deleteStmt.Exec(key) } else { var data []byte if data, err = json.Marshal(struct{ Records []event.Event }{[]event.Event{eventData}}); err != nil { return err } _, err = target.updateStmt.Exec(key, data) } return err } if target.args.Format == event.AccessFormat { eventTime, err := time.Parse(event.AMZTimeFormat, eventData.EventTime) if err != nil { return err } data, err := json.Marshal(struct{ Records []event.Event }{[]event.Event{eventData}}) if err != nil { return err } _, err = target.insertStmt.Exec(eventTime, data) return err } return nil }
5. target.KafkaTarget
// KafkaTarget - Kafka target. type KafkaTarget struct { id event.TargetID args KafkaArgs producer sarama.SyncProducer config *sarama.Config store Store loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{}) } // Save - saves the events to the store which will be replayed when the Kafka connection is active. func (target *KafkaTarget) Save(eventData event.Event) error { if target.store != nil { return target.store.Put(eventData) } _, err := target.IsActive() if err != nil { return err } return target.send(eventData) } // send - sends an event to the kafka. func (target *KafkaTarget) send(eventData event.Event) error { objectName, err := url.QueryUnescape(eventData.S3.Object.Key) if err != nil { return err } key := eventData.S3.Bucket.Name + "/" + objectName data, err := json.Marshal(event.Log{EventName: eventData.EventName, Key: key, Records: []event.Event{eventData}}) if err != nil { return err } msg := sarama.ProducerMessage{ Topic: target.args.Topic, Key: sarama.StringEncoder(key), Value: sarama.ByteEncoder(data), } _, _, err = target.producer.SendMessage(&msg) return err }
6. target.Store
// Store - To persist the events. type Store interface { Put(event event.Event) error Get(key string) (event.Event, error) List() ([]string, error) Del(key string) error Open() error } // QueueStore - Filestore for persisting events. type QueueStore struct { sync.RWMutex currentEntries uint64 entryLimit uint64 directory string } // NewQueueStore - Creates an instance for QueueStore. func NewQueueStore(directory string, limit uint64) Store { if limit == 0 { limit = defaultLimit _, maxRLimit, err := sys.GetMaxOpenFileLimit() if err == nil { // Limit the maximum number of entries // to maximum open file limit if maxRLimit < limit { limit = maxRLimit } } } return &QueueStore{ directory: directory, entryLimit: limit, } } // write - writes event to the directory. func (store *QueueStore) write(key string, e event.Event) error { // Marshalls the event. eventData, err := json.Marshal(e) if err != nil { return err } path := filepath.Join(store.directory, key+eventExt) if err := ioutil.WriteFile(path, eventData, os.FileMode(0770)); err != nil { return err } // Increment the event count. store.currentEntries++ return nil } // Put - puts a event to the store. func (store *QueueStore) Put(e event.Event) error { store.Lock() defer store.Unlock() if store.currentEntries >= store.entryLimit { return errLimitExceeded } key, err := getNewUUID() if err != nil { return err } return store.write(key, e) }
7. Event type Name
Event types supported by MinIO:
// Name - event type enum. // Refer http://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html#notification-how-to-event-types-and-destinations type Name int // Values of Name const ( ObjectAccessedAll Name = 1 + iota ObjectAccessedGet ObjectAccessedGetRetention ObjectAccessedGetLegalHold ObjectAccessedHead ObjectCreatedAll ObjectCreatedCompleteMultipartUpload ObjectCreatedCopy ObjectCreatedPost ObjectCreatedPut ObjectCreatedPutRetention ObjectCreatedPutLegalHold ObjectRemovedAll ObjectRemovedDelete )
8. Event Event
The notification message sent by MinIO for publishing events is in JSON format
// Identity represents access key who caused the event. type Identity struct { PrincipalID string `json:"principalId"` } // Bucket represents bucket metadata of the event. type Bucket struct { Name string `json:"name"` OwnerIdentity Identity `json:"ownerIdentity"` ARN string `json:"arn"` } // Object represents object metadata of the event. type Object struct { Key string `json:"key"` Size int64 `json:"size,omitempty"` ETag string `json:"eTag,omitempty"` ContentType string `json:"contentType,omitempty"` UserMetadata map[string]string `json:"userMetadata,omitempty"` VersionID string `json:"versionId,omitempty"` Sequencer string `json:"sequencer"` } // Metadata represents event metadata. type Metadata struct { SchemaVersion string `json:"s3SchemaVersion"` ConfigurationID string `json:"configurationId"` Bucket Bucket `json:"bucket"` Object Object `json:"object"` } // Source represents client information who triggered the event. type Source struct { Host string `json:"host"` Port string `json:"port"` UserAgent string `json:"userAgent"` } // Event represents event notification information defined in // http://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html. type Event struct { EventVersion string `json:"eventVersion"` EventSource string `json:"eventSource"` AwsRegion string `json:"awsRegion"` EventTime string `json:"eventTime"` EventName Name `json:"eventName"` UserIdentity Identity `json:"userIdentity"` RequestParameters map[string]string `json:"requestParameters"` ResponseElements map[string]string `json:"responseElements"` S3 Metadata `json:"s3"` Source Source `json:"source"` }
9. Event log Log
// Log represents event information for some event targets. type Log struct { EventName Name Key string Records []Event }
2. Source code analysis of Minio gateway
Start minio gateway:
[root@node8217 install]# cat start_minio.sh #!/bin/bash export MINIO_ACCESS_KEY=ak_123456 export MINIO_SECRET_KEY=sk_123456 #export MINIO_PROMETHEUS_AUTH_TYPE="public" #export MINIO_REGION_NAME="wuhan" #./minio server --address :9009 /BigData/minio & ./minio gateway s3 http://127.0.0.1:9009 &
After startup, you can use the minio gateway as you use the minio server.
1. minio startup inlet
mino/main.go
package main // import "github.com/minio/minio" import ( "os" minio "github.com/minio/minio/cmd" // Import gateway _ "github.com/minio/minio/cmd/gateway" ) func main() { minio.Main(os.Args) }
The package is introduced here_ "github.com/minio/minio/cmd/gateway" will call GitHub init function of the supported gateway in COM / Minio / Minio / CMD / gateway package to initialize the supported gateway.
minio/cmd/main.go
func newApp(name string) *cli.App { // Collection of minio commands currently supported are. commands := []cli.Command{} // registerCommand registers a cli command. registerCommand := func(command cli.Command) { commands = append(commands, command) } // Register all commands. registerCommand(serverCmd) registerCommand(gatewayCmd) app := cli.NewApp() app.Name = name app.Commands = commands return app } // Main main for minio server. func Main(args []string) { // Set the minio app name. appName := filepath.Base(args[0]) // Run the app - exit on error. if err := newApp(appName).Run(args); err != nil { os.Exit(1) } }
Two commands are registered, the global variables serverCmd and gatewayCmd
2. Register S3 gateway
gatewayCmd
minio/cmd/gateway-main.go
var ( gatewayCmd = cli.Command{ Name: "gateway", Usage: "start object storage gateway", Flags: append(ServerFlags, GlobalFlags...), HideHelpCommand: true, } ) // RegisterGatewayCommand registers a new command for gateway. func RegisterGatewayCommand(cmd cli.Command) error { cmd.Flags = append(append(cmd.Flags, ServerFlags...), GlobalFlags...) gatewayCmd.Subcommands = append(gatewayCmd.Subcommands, cmd) return nil }
Register S3 gateway
minio/cmd/gateway/s3/gateway-s3.go
const ( s3Backend = "s3" ) func init() { minio.RegisterGatewayCommand(cli.Command{ Name: s3Backend, Usage: "Amazon Simple Storage Service (S3)", Action: s3GatewayMain, HideHelpCommand: true, }) } // Handler for 'minio gateway s3' command line. func s3GatewayMain(ctx *cli.Context) { args := ctx.Args() if !ctx.Args().Present() { args = cli.Args{"https://s3.amazonaws.com"} } serverAddr := ctx.GlobalString("address") if serverAddr == "" || serverAddr == ":"+minio.GlobalMinioDefaultPort { serverAddr = ctx.String("address") } // Validate gateway arguments. logger.FatalIf(minio.ValidateGatewayArguments(serverAddr, args.First()), "Invalid argument") // Start the gateway.. minio.StartGateway(ctx, &S3{args.First()}) }
The focus is on Minio Startgateway (CTX, & S3 {args. First()}), start the corresponding gateway according to the parameters.
3. S3 implements the Gateway interface
minio/cmd/gateway-main.go
// StartGateway - handler for 'minio gateway <name>'. func StartGateway(ctx *cli.Context, gw Gateway) { // Gateway startup }
Visible, S3 S3 implements CMD Gateway interface,
// Gateway represents a gateway backend. type Gateway interface { // Name returns the unique name of the gateway. Name() string // NewGatewayLayer returns a new ObjectLayer. NewGatewayLayer(creds auth.Credentials) (ObjectLayer, error) // Returns true if gateway is ready for production. Production() bool }
S3 gateway implements Gateway Interface
minio/cmd/gateway/s3/gateway-s3.go
// S3 implements Gateway. type S3 struct { host string } // Name implements Gateway interface. func (g *S3) Name() string { return s3Backend } // NewGatewayLayer returns s3 ObjectLayer. func (g *S3) NewGatewayLayer(creds auth.Credentials) (minio.ObjectLayer, error) { // creds are ignored here, since S3 gateway implements chaining // all credentials. clnt, err := newS3(g.host) if err != nil { return nil, err } metrics := minio.NewMetrics() t := &minio.MetricsTransport{ Transport: minio.NewGatewayHTTPTransport(), Metrics: metrics, } // Set custom transport clnt.SetCustomTransport(t) probeBucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "probe-bucket-sign-") // Check if the provided keys are valid. if _, err = clnt.BucketExists(probeBucketName); err != nil { if miniogo.ToErrorResponse(err).Code != "AccessDenied" { return nil, err } } s := s3Objects{ Client: clnt, Metrics: metrics, HTTPClient: &http.Client{ Transport: t, }, } // Enables single encryption of KMS is configured. if minio.GlobalKMS != nil { encS := s3EncObjects{s} // Start stale enc multipart uploads cleanup routine. go encS.cleanupStaleEncMultipartUploads(minio.GlobalContext, minio.GlobalMultipartCleanupInterval, minio.GlobalMultipartExpiry) return &encS, nil } return &s, nil } // Production - s3 gateway is production ready. func (g *S3) Production() bool { return true }
4. Gateway startup
minio/cmd/gateway-main.go
// StartGateway - handler for 'minio gateway <name>'. func StartGateway(ctx *cli.Context, gw Gateway) { if gw == nil { logger.FatalIf(errUnexpected, "Gateway implementation not initialized") } // Handle common command args. handleCommonCmdArgs(ctx) // Handle gateway specific env gatewayHandleEnvVars() // Set when gateway is enabled globalIsGateway = true router := mux.NewRouter().SkipClean(true).UseEncodedPath() // Enable IAM admin APIs if etcd is enabled, if not just enable basic // operations such as profiling, server info etc. registerAdminRouter(router, enableConfigOps, enableIAMOps) // Add healthcheck router registerHealthCheckRouter(router) // Add server metrics router registerMetricsRouter(router) // Register web router when its enabled. if globalBrowserEnabled { logger.FatalIf(registerWebRouter(router), "Unable to configure web browser") } // Currently only NAS and S3 gateway support encryption headers. encryptionEnabled := gatewayName == "s3" || gatewayName == "nas" allowSSEKMS := gatewayName == "s3" // Only S3 can support SSE-KMS (as pass-through) // Add API router. registerAPIRouter(router, encryptionEnabled, allowSSEKMS) httpServer := xhttp.NewServer([]string{globalCLIContext.Addr}, criticalErrorHandler{registerHandlers(router, globalHandlers...)}, getCert) httpServer.BaseContext = func(listener net.Listener) context.Context { return GlobalContext } go func() { globalHTTPServerErrorCh <- httpServer.Start() }() globalObjLayerMutex.Lock() globalHTTPServer = httpServer globalObjLayerMutex.Unlock() newObject, err := gw.NewGatewayLayer(globalActiveCred) newObject = NewGatewayLayerWithLocker(newObject) // Once endpoints are finalized, initialize the new object api in safe mode. globalObjLayerMutex.Lock() globalObjectAPI = newObject globalObjLayerMutex.Unlock() // Calls all New() for all sub-systems. newAllSubsystems() if enableIAMOps { // Initialize IAM sys. logger.FatalIf(globalIAMSys.Init(GlobalContext, newObject), "Unable to initialize IAM system") } if globalCacheConfig.Enabled { // initialize the new disk cache objects. var cacheAPI CacheObjectLayer cacheAPI, err = newServerCacheObjects(GlobalContext, globalCacheConfig) logger.FatalIf(err, "Unable to initialize disk caching") globalObjLayerMutex.Lock() globalCacheObjectAPI = cacheAPI globalObjLayerMutex.Unlock() } }
3. Source code analysis of Minio gateway disk cache
1. disk cache usage
#!/bin/bash export MINIO_ACCESS_KEY=ak_123456 export MINIO_SECRET_KEY=sk_123456 #export MINIO_PROMETHEUS_AUTH_TYPE="public" #export MINIO_REGION_NAME="wuhan" #./minio server --address :9009 /BigData/minio & export MINIO_CACHE="on" export MINIO_CACHE_DRIVES="/mnt/drive1,/mnt/drive2" export MINIO_CACHE_EXCLUDE="*.pdf,mybucket/*" export MINIO_CACHE_QUOTA=80 export MINIO_CACHE_AFTER=3 export MINIO_CACHE_WATERMARK_LOW=70 export MINIO_CACHE_WATERMARK_HIGH=90 ./minio gateway s3 http://127.0.0.1:9009 &
2. disk cache implementation
func StartGateway(ctx *cli.Context, gw Gateway) { // ...... if globalCacheConfig.Enabled { // initialize the new disk cache objects. var cacheAPI CacheObjectLayer cacheAPI, err = newServerCacheObjects(GlobalContext, globalCacheConfig) logger.FatalIf(err, "Unable to initialize disk caching") globalObjLayerMutex.Lock() globalCacheObjectAPI = cacheAPI globalObjLayerMutex.Unlock() } // ...... }
minio/cmd/disk-cache.go
// CacheObjectLayer implements primitives for cache object API layer. type CacheObjectLayer interface { // Object operations. GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) DeleteObject(ctx context.Context, bucket, object string) error DeleteObjects(ctx context.Context, bucket string, objects []string) ([]error, error) PutObject(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) CopyObject(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (objInfo ObjectInfo, err error) // Storage operations. StorageInfo(ctx context.Context) CacheStorageInfo CacheStats() *CacheStats }
The cacheObjects structure implements the CacheObjectLayer interface:
// Returns cacheObjects for use by Server. func newServerCacheObjects(ctx context.Context, config cache.Config) (CacheObjectLayer, error) { // list of disk caches for cache "drives" specified in config.json or MINIO_CACHE_DRIVES env var. cache, migrateSw, err := newCache(config) if err != nil { return nil, err } c := &cacheObjects{ cache: cache, exclude: config.Exclude, after: config.After, migrating: migrateSw, migMutex: sync.Mutex{}, cacheStats: newCacheStats(), GetObjectInfoFn: func(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) { return newObjectLayerFn().GetObjectInfo(ctx, bucket, object, opts) }, GetObjectNInfoFn: func(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) { return newObjectLayerFn().GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts) }, DeleteObjectFn: func(ctx context.Context, bucket, object string) error { return newObjectLayerFn().DeleteObject(ctx, bucket, object) }, DeleteObjectsFn: func(ctx context.Context, bucket string, objects []string) ([]error, error) { errs := make([]error, len(objects)) for idx, object := range objects { errs[idx] = newObjectLayerFn().DeleteObject(ctx, bucket, object) } return errs, nil }, PutObjectFn: func(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) { return newObjectLayerFn().PutObject(ctx, bucket, object, data, opts) }, CopyObjectFn: func(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (objInfo ObjectInfo, err error) { return newObjectLayerFn().CopyObject(ctx, srcBucket, srcObject, destBucket, destObject, srcInfo, srcOpts, dstOpts) }, } if migrateSw { go c.migrateCacheFromV1toV2(ctx) } go c.gc(ctx) return c, nil }
// Abstracts disk caching - used by the S3 layer type cacheObjects struct { // slice of cache drives cache []*diskCache // file path patterns to exclude from cache exclude []string // number of accesses after which to cache an object after int // if true migration is in progress from v1 to v2 migrating bool // mutex to protect migration bool migMutex sync.Mutex // Cache stats cacheStats *CacheStats GetObjectNInfoFn func(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) GetObjectInfoFn func(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) DeleteObjectFn func(ctx context.Context, bucket, object string) error DeleteObjectsFn func(ctx context.Context, bucket string, objects []string) ([]error, error) PutObjectFn func(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) CopyObjectFn func(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (objInfo ObjectInfo, err error) }
The real cache layer is [] * diskCache implementation:
// newCache initializes the cacheFSObjects for the "drives" specified in config.json // or the global env overrides. func newCache(config cache.Config) ([]*diskCache, bool, error) { var caches []*diskCache ctx := logger.SetReqInfo(GlobalContext, &logger.ReqInfo{}) formats, migrating, err := loadAndValidateCacheFormat(ctx, config.Drives) if err != nil { return nil, false, err } for i, dir := range config.Drives { // skip diskCache creation for cache drives missing a format.json if formats[i] == nil { caches = append(caches, nil) continue } if err := checkAtimeSupport(dir); err != nil { return nil, false, errors.New("Atime support required for disk caching") } quota := config.MaxUse if quota == 0 { quota = config.Quota } cache, err := newDiskCache(dir, quota, config.After, config.WatermarkLow, config.WatermarkHigh) if err != nil { return nil, false, err } caches = append(caches, cache) } return caches, migrating, nil }
4. MinIO IAM module
1. User authentication
Each operation of S3 protocol requires IAMSys to perform permission verification. The main codes are as follows:
if globalIAMSys.IsAllowed(iampolicy.Args{ AccountName: cred.AccessKey, Action: iampolicy.Action(action), BucketName: bucketName, ConditionValues: getConditionValues(r, "", cred.AccessKey, claims), ObjectName: objectName, IsOwner: owner, Claims: claims, }) { // Request is allowed return the appropriate access key. return cred.AccessKey, owner, ErrNone } return cred.AccessKey, owner, ErrAccessDenied
2. globalIAMSys
minio/cmd/globals.go
var ( globalIAMSys *IAMSys )
cmd/server-main.go
func newAllSubsystems() { // Create new notification system and initialize notification targets globalNotificationSys = NewNotificationSys(globalEndpoints) // Create new bucket metadata system. globalBucketMetadataSys = NewBucketMetadataSys() // Create a new config system. globalConfigSys = NewConfigSys() // Create new IAM system. globalIAMSys = NewIAMSys() // Create new policy system. globalPolicySys = NewPolicySys() // Create new lifecycle system. globalLifecycleSys = NewLifecycleSys() // Create new bucket object lock subsystem globalBucketObjectLockSys = NewBucketObjectLockSys() // Create new bucket quota subsystem globalBucketQuotaSys = NewBucketQuotaSys() }
minio/cmd/iam.go
package cmd // IAMSys - config system. type IAMSys struct { usersSysType UsersSysType // map of policy names to policy definitions iamPolicyDocsMap map[string]iampolicy.Policy // map of usernames to credentials iamUsersMap map[string]auth.Credentials // map of group names to group info iamGroupsMap map[string]GroupInfo // map of user names to groups they are a member of iamUserGroupMemberships map[string]set.StringSet // map of usernames/temporary access keys to policy names iamUserPolicyMap map[string]MappedPolicy // map of group names to policy names iamGroupPolicyMap map[string]MappedPolicy // Persistence layer for IAM subsystem store IAMStorageAPI } // NewIAMSys - creates new config system object. func NewIAMSys() *IAMSys { return &IAMSys{ usersSysType: MinIOUsersSysType, iamUsersMap: make(map[string]auth.Credentials), iamPolicyDocsMap: make(map[string]iampolicy.Policy), iamUserPolicyMap: make(map[string]MappedPolicy), iamGroupsMap: make(map[string]GroupInfo), iamUserGroupMemberships: make(map[string]set.StringSet), } }
-
iamUsersMap map[string]auth.Credentials saves user information;
-
Store iamstorage API is the persistent storage layer of IAM system, which can be etcd / mini;
3. globalIAMSys initialization
minio/cmd/iam.go
// Init - initializes config system from iam.json func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) error { if globalEtcdClient == nil { sys.store = newIAMObjectStore(ctx, objAPI) } else { sys.store = newIAMEtcdStore(ctx) } // Migrate IAM configuration if err := sys.doIAMConfigMigration(ctx); err != nil { return err } err := sys.store.loadAll(ctx, sys) go sys.store.watch(ctx, sys) return err }
If the globalEtcdClient is not empty, etcd is preferred as the storage layer.
sys.store.loadAll(ctx, sys) loads all IAM data;
go sys.store.watch(ctx, sys) background monitors all updates;
4. IAMEtcdStore implements IAMStorageAPI
- During initialization, load all data in etcd;
- Background watch and reload in time;
minio/cmd/iam-etcd-store.go
// IAMEtcdStore implements IAMStorageAPI type IAMEtcdStore struct { sync.RWMutex ctx context.Context client *etcd.Client } func newIAMEtcdStore(ctx context.Context) *IAMEtcdStore { return &IAMEtcdStore{client: globalEtcdClient, ctx: ctx} }
func (ies *IAMEtcdStore) loadAll(ctx context.Context, sys *IAMSys) error load all IAM data into memory:
func (ies *IAMEtcdStore) loadAll(ctx context.Context, sys *IAMSys) error { iamUsersMap := make(map[string]auth.Credentials) iamGroupsMap := make(map[string]GroupInfo) iamPolicyDocsMap := make(map[string]iampolicy.Policy) iamUserPolicyMap := make(map[string]MappedPolicy) iamGroupPolicyMap := make(map[string]MappedPolicy) isMinIOUsersSys := false ies.rlock() if sys.usersSysType == MinIOUsersSysType { isMinIOUsersSys = true } ies.runlock() if err := ies.loadPolicyDocs(ctx, iamPolicyDocsMap); err != nil { return err } // load STS temp users if err := ies.loadUsers(ctx, stsUser, iamUsersMap); err != nil { return err } if isMinIOUsersSys { // load long term users if err := ies.loadUsers(ctx, regularUser, iamUsersMap); err != nil { return err } if err := ies.loadUsers(ctx, srvAccUser, iamUsersMap); err != nil { return err } if err := ies.loadGroups(ctx, iamGroupsMap); err != nil { return err } if err := ies.loadMappedPolicies(ctx, regularUser, false, iamUserPolicyMap); err != nil { return err } } // load STS policy mappings into the same map if err := ies.loadMappedPolicies(ctx, stsUser, false, iamUserPolicyMap); err != nil { return err } // load policies mapped to groups if err := ies.loadMappedPolicies(ctx, regularUser, true, iamGroupPolicyMap); err != nil { return err } ies.lock() defer ies.Unlock() // Merge the new reloaded entries into global map. // See issue https://github.com/minio/minio/issues/9651 // where the present list of entries on disk are not yet // latest, there is a small window where this can make // valid users invalid. for k, v := range iamUsersMap { sys.iamUsersMap[k] = v } for k, v := range iamPolicyDocsMap { sys.iamPolicyDocsMap[k] = v } // Sets default canned policies, if none are set. setDefaultCannedPolicies(sys.iamPolicyDocsMap) for k, v := range iamUserPolicyMap { sys.iamUserPolicyMap[k] = v } // purge any expired entries which became expired now. for k, v := range sys.iamUsersMap { if v.IsExpired() { delete(sys.iamUsersMap, k) delete(sys.iamUserPolicyMap, k) // Deleting on the etcd is taken care of in the next cycle } } for k, v := range iamGroupPolicyMap { sys.iamGroupPolicyMap[k] = v } for k, v := range iamGroupsMap { sys.iamGroupsMap[k] = v } sys.buildUserGroupMemberships() return nil }
func (ies *IAMEtcdStore) watch(ctx context.Context, sys *IAMSys) background monitoring data update:
func (ies *IAMEtcdStore) watch(ctx context.Context, sys *IAMSys) { for { outerLoop: // Refresh IAMSys with etcd watch. watchCh := ies.client.Watch(ctx, iamConfigPrefix, etcd.WithPrefix(), etcd.WithKeysOnly()) for { select { case <-ctx.Done(): return case watchResp, ok := <-watchCh: if !ok { time.Sleep(1 * time.Second) // Upon an error on watch channel // re-init the watch channel. goto outerLoop } if err := watchResp.Err(); err != nil { logger.LogIf(ctx, err) // log and retry. time.Sleep(1 * time.Second) // Upon an error on watch channel // re-init the watch channel. goto outerLoop } for _, event := range watchResp.Events { ies.lock() ies.reloadFromEvent(sys, event) ies.unlock() } } } } }
// sys.RLock is held by caller. func (ies *IAMEtcdStore) reloadFromEvent(sys *IAMSys, event *etcd.Event) { eventCreate := event.IsModify() || event.IsCreate() eventDelete := event.Type == etcd.EventTypeDelete usersPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigUsersPrefix) groupsPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigGroupsPrefix) stsPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigSTSPrefix) policyPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigPoliciesPrefix) policyDBUsersPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigPolicyDBUsersPrefix) policyDBSTSUsersPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigPolicyDBSTSUsersPrefix) policyDBGroupsPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigPolicyDBGroupsPrefix) switch { case eventCreate: switch { case usersPrefix: accessKey := path.Dir(strings.TrimPrefix(string(event.Kv.Key), iamConfigUsersPrefix)) ies.loadUser(accessKey, regularUser, sys.iamUsersMap) case stsPrefix: accessKey := path.Dir(strings.TrimPrefix(string(event.Kv.Key), iamConfigSTSPrefix)) ies.loadUser(accessKey, stsUser, sys.iamUsersMap) case groupsPrefix: group := path.Dir(strings.TrimPrefix(string(event.Kv.Key), iamConfigGroupsPrefix)) ies.loadGroup(group, sys.iamGroupsMap) gi := sys.iamGroupsMap[group] sys.removeGroupFromMembershipsMap(group) sys.updateGroupMembershipsMap(group, &gi) case policyPrefix: policyName := path.Dir(strings.TrimPrefix(string(event.Kv.Key), iamConfigPoliciesPrefix)) ies.loadPolicyDoc(policyName, sys.iamPolicyDocsMap) case policyDBUsersPrefix: policyMapFile := strings.TrimPrefix(string(event.Kv.Key), iamConfigPolicyDBUsersPrefix) user := strings.TrimSuffix(policyMapFile, ".json") ies.loadMappedPolicy(user, regularUser, false, sys.iamUserPolicyMap) case policyDBSTSUsersPrefix: policyMapFile := strings.TrimPrefix(string(event.Kv.Key), iamConfigPolicyDBSTSUsersPrefix) user := strings.TrimSuffix(policyMapFile, ".json") ies.loadMappedPolicy(user, stsUser, false, sys.iamUserPolicyMap) case policyDBGroupsPrefix: policyMapFile := strings.TrimPrefix(string(event.Kv.Key), iamConfigPolicyDBGroupsPrefix) user := strings.TrimSuffix(policyMapFile, ".json") ies.loadMappedPolicy(user, regularUser, true, sys.iamGroupPolicyMap) } case eventDelete: switch { case usersPrefix: accessKey := path.Dir(strings.TrimPrefix(string(event.Kv.Key), iamConfigUsersPrefix)) delete(sys.iamUsersMap, accessKey) case stsPrefix: accessKey := path.Dir(strings.TrimPrefix(string(event.Kv.Key), iamConfigSTSPrefix)) delete(sys.iamUsersMap, accessKey) case groupsPrefix: group := path.Dir(strings.TrimPrefix(string(event.Kv.Key), iamConfigGroupsPrefix)) sys.removeGroupFromMembershipsMap(group) delete(sys.iamGroupsMap, group) delete(sys.iamGroupPolicyMap, group) case policyPrefix: policyName := path.Dir(strings.TrimPrefix(string(event.Kv.Key), iamConfigPoliciesPrefix)) delete(sys.iamPolicyDocsMap, policyName) case policyDBUsersPrefix: policyMapFile := strings.TrimPrefix(string(event.Kv.Key), iamConfigPolicyDBUsersPrefix) user := strings.TrimSuffix(policyMapFile, ".json") delete(sys.iamUserPolicyMap, user) case policyDBSTSUsersPrefix: policyMapFile := strings.TrimPrefix(string(event.Kv.Key), iamConfigPolicyDBSTSUsersPrefix) user := strings.TrimSuffix(policyMapFile, ".json") delete(sys.iamUserPolicyMap, user) case policyDBGroupsPrefix: policyMapFile := strings.TrimPrefix(string(event.Kv.Key), iamConfigPolicyDBGroupsPrefix) user := strings.TrimSuffix(policyMapFile, ".json") delete(sys.iamGroupPolicyMap, user) } } }
Addition, deletion, modification and query of user information
func (ies *IAMEtcdStore) saveUserIdentity(name string, userType IAMUserType, u UserIdentity) error { return ies.saveIAMConfig(u, getUserIdentityPath(name, userType)) } func (ies *IAMEtcdStore) saveIAMConfig(item interface{}, path string) error { data, err := json.Marshal(item) if err != nil { return err } if globalConfigEncrypted { data, err = madmin.EncryptData(globalActiveCred.String(), data) if err != nil { return err } } return saveKeyEtcd(ies.ctx, ies.client, path, data) }
func (ies *IAMEtcdStore) deleteUserIdentity(name string, userType IAMUserType) error { err := ies.deleteIAMConfig(getUserIdentityPath(name, userType)) if err == errConfigNotFound { err = errNoSuchUser } return err } func (ies *IAMEtcdStore) deleteIAMConfig(path string) error { return deleteKeyEtcd(ies.ctx, ies.client, path) }
5. Analysis of Minio configuration management source code
reference resources:
Profile: ${home} / minio/config. JSON, in release After 2018-08-18t03-49-57z, the configuration file is saved in the back-end storage.
1. Global variable globalConfigSys
minio/cmd/server-main.go
func serverMain(ctx *cli.Context) { // ... // Initialize all subsystems newAllSubsystems() // ... } func newAllSubsystems() { // Create new notification system and initialize notification targets globalNotificationSys = NewNotificationSys(globalEndpoints) // Create new bucket metadata system. globalBucketMetadataSys = NewBucketMetadataSys() // Create a new config system. globalConfigSys = NewConfigSys() // Create new IAM system. globalIAMSys = NewIAMSys() // Create new policy system. globalPolicySys = NewPolicySys() // Create new lifecycle system. globalLifecycleSys = NewLifecycleSys() // Create new bucket encryption subsystem globalBucketSSEConfigSys = NewBucketSSEConfigSys() // Create new bucket object lock subsystem globalBucketObjectLockSys = NewBucketObjectLockSys() // Create new bucket quota subsystem globalBucketQuotaSys = NewBucketQuotaSys() } func initAllSubsystems(newObject ObjectLayer) (err error) { // ... // Initialize config system. if err = globalConfigSys.Init(newObject); err != nil { return fmt.Errorf("Unable to initialize config system: %w", err) } // ... }
2. ConfigSys
minio/cmd/config.go
package cmd // ConfigSys - config system. type ConfigSys struct{} // NewConfigSys - creates new config system object. func NewConfigSys() *ConfigSys { return &ConfigSys{} } // Init - initializes config system from config.json. func (sys *ConfigSys) Init(objAPI ObjectLayer) error { if objAPI == nil { return errInvalidArgument } return initConfig(objAPI) } // Initialize and load config from remote etcd or local config directory func initConfig(objAPI ObjectLayer) error { if objAPI == nil { return errServerNotInitialized } if isFile(getConfigFile()) { if err := migrateConfig(); err != nil { return err } } // Migrates ${HOME}/.minio/config.json or config.json.deprecated // to '<export_path>/.minio.sys/config/config.json' // ignore if the file doesn't exist. // If etcd is set then migrates /config/config.json // to '<export_path>/.minio.sys/config/config.json' if err := migrateConfigToMinioSys(objAPI); err != nil { return err } // Migrates backend '<export_path>/.minio.sys/config/config.json' to latest version. if err := migrateMinioSysConfig(objAPI); err != nil { return err } // Migrates backend '<export_path>/.minio.sys/config/config.json' to // latest config format. if err := migrateMinioSysConfigToKV(objAPI); err != nil { return err } return loadConfig(objAPI) }
3. globalServerConfig
minio/cmd/config-current.go
package cmd var ( // globalServerConfig server config. globalServerConfig config.Config globalServerConfigMu sync.RWMutex ) // loadConfig - loads a new config from disk, overrides params // from env if found and valid func loadConfig(objAPI ObjectLayer) error { srvCfg, err := getValidConfig(objAPI) if err != nil { return err } // Override any values from ENVs. lookupConfigs(srvCfg) // hold the mutex lock before a new config is assigned. globalServerConfigMu.Lock() globalServerConfig = srvCfg globalServerConfigMu.Unlock() return nil }
4. Addition, deletion, modification and query of configuration
minio/cmd/admin-router.go
package cmd // adminAPIHandlers provides HTTP handlers for MinIO admin API. type adminAPIHandlers struct{} // registerAdminRouter - Add handler functions for each service REST API routes. func registerAdminRouter(router *mux.Router, enableConfigOps, enableIAMOps bool) { adminAPI := adminAPIHandlers{} // Admin router adminRouter := router.PathPrefix(adminPathPrefix).Subrouter() /// Service operations adminVersions := []string{ adminAPIVersionPrefix, adminAPIVersionV2Prefix, } for _, adminVersion := range adminVersions { // Config KV operations. if enableConfigOps { adminRouter.Methods(http.MethodGet).Path(adminVersion+"/get-config-kv").HandlerFunc(httpTraceHdrs(adminAPI.GetConfigKVHandler)).Queries("key", "{key:.*}") adminRouter.Methods(http.MethodPut).Path(adminVersion + "/set-config-kv").HandlerFunc(httpTraceHdrs(adminAPI.SetConfigKVHandler)) adminRouter.Methods(http.MethodDelete).Path(adminVersion + "/del-config-kv").HandlerFunc(httpTraceHdrs(adminAPI.DelConfigKVHandler)) } // If none of the routes match add default error handler routes adminRouter.NotFoundHandler = http.HandlerFunc(httpTraceAll(errorResponseHandler)) adminRouter.MethodNotAllowedHandler = http.HandlerFunc(httpTraceAll(errorResponseHandler)) }
minio/cmd/admin-handlers-config-kv.go
// SetConfigKVHandler - PUT /minio/admin/v3/set-config-kv func (a adminAPIHandlers) SetConfigKVHandler(w http.ResponseWriter, r *http.Request) { ctx := newContext(r, w, "SetConfigKV") defer logger.AuditLog(w, r, "SetConfigKV", mustGetClaimsFromToken(r)) cred, objectAPI := validateAdminReqConfigKV(ctx, w, r) if objectAPI == nil { return } if r.ContentLength > maxEConfigJSONSize || r.ContentLength == -1 { // More than maxConfigSize bytes were available writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrAdminConfigTooLarge), r.URL) return } password := cred.SecretKey kvBytes, err := madmin.DecryptData(password, io.LimitReader(r.Body, r.ContentLength)) if err != nil { logger.LogIf(ctx, err, logger.Application) writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrAdminConfigBadJSON), r.URL) return } cfg, err := readServerConfig(ctx, objectAPI) if err != nil { writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) return } if _, err = cfg.ReadFrom(bytes.NewReader(kvBytes)); err != nil { writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) return } if err = validateConfig(cfg); err != nil { writeCustomErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrAdminConfigBadJSON), err.Error(), r.URL) return } // Update the actual server config on disk. if err = saveServerConfig(ctx, objectAPI, cfg); err != nil { writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) return } // Write to the config input KV to history. if err = saveServerConfigHistory(ctx, objectAPI, kvBytes); err != nil { writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) return } // Make sure to write backend is encrypted if globalConfigEncrypted { saveConfig(GlobalContext, objectAPI, backendEncryptedFile, backendEncryptedMigrationComplete) } writeSuccessResponseHeadersOnly(w) }
6. MinIO Bucket quota
There are two types of Bucket quotas: Hard + FIFO
- Hard quota disallows writes to the bucket after configured quota limit is reached.
- FIFO quota automatically deletes oldest content until bucket usage falls within configured limit while permitting writes.
1. Verify the bucket capacity when putobject is used
func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Request) { // ... if err := enforceBucketQuota(ctx, bucket, size); err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) return } // ... }
2. BucketQuotaSys sub module
minio/cmd/bucket-quota.go
func enforceBucketQuota(ctx context.Context, bucket string, size int64) error { if size < 0 { return nil } return globalBucketQuotaSys.check(ctx, bucket, size) }
minio/cmd/bucket-quota.go
// BucketQuotaSys - map of bucket and quota configuration. type BucketQuotaSys struct { bucketStorageCache timedValue } // Get - Get quota configuration. func (sys *BucketQuotaSys) Get(bucketName string) (*madmin.BucketQuota, error) { if globalIsGateway { objAPI := newObjectLayerFn() if objAPI == nil { return nil, errServerNotInitialized } return &madmin.BucketQuota{}, nil } return globalBucketMetadataSys.GetQuotaConfig(bucketName) } // NewBucketQuotaSys returns initialized BucketQuotaSys func NewBucketQuotaSys() *BucketQuotaSys { return &BucketQuotaSys{} } func (sys *BucketQuotaSys) check(ctx context.Context, bucket string, size int64) error { objAPI := newObjectLayerWithoutSafeModeFn() if objAPI == nil { return errServerNotInitialized } q, err := sys.Get(bucket) if err != nil { return nil } if q.Type == madmin.FIFOQuota { return nil } if q.Quota == 0 { // No quota set return quickly. return nil } sys.bucketStorageCache.Once.Do(func() { sys.bucketStorageCache.TTL = 10 * time.Second sys.bucketStorageCache.Update = func() (interface{}, error) { return loadDataUsageFromBackend(ctx, objAPI) } }) v, err := sys.bucketStorageCache.Get() if err != nil { return err } dui := v.(DataUsageInfo) bui, ok := dui.BucketsUsage[bucket] if !ok { // bucket not found, cannot enforce quota // call will fail anyways later. return nil } if (bui.Size + uint64(size)) > q.Quota { return BucketQuotaExceeded{Bucket: bucket} } return nil }
3. Background task: periodically delete object s to free up space
const ( bgQuotaInterval = 1 * time.Hour ) // initQuotaEnforcement starts the routine that deletes objects in bucket // that exceeds the FIFO quota func initQuotaEnforcement(ctx context.Context, objAPI ObjectLayer) { if env.Get(envDataUsageCrawlConf, config.EnableOn) == config.EnableOn { go startBucketQuotaEnforcement(ctx, objAPI) } } func startBucketQuotaEnforcement(ctx context.Context, objAPI ObjectLayer) { for { select { case <-ctx.Done(): return case <-time.NewTimer(bgQuotaInterval).C: logger.LogIf(ctx, enforceFIFOQuota(ctx, objAPI)) } } } // enforceFIFOQuota deletes objects in FIFO order until sufficient objects // have been deleted so as to bring bucket usage within quota func enforceFIFOQuota(ctx context.Context, objectAPI ObjectLayer) error { // Turn off quota enforcement if data usage info is unavailable. if env.Get(envDataUsageCrawlConf, config.EnableOn) == config.EnableOff { return nil } buckets, err := objectAPI.ListBuckets(ctx) if err != nil { return err } dataUsageInfo, err := loadDataUsageFromBackend(ctx, objectAPI) if err != nil { return err } for _, binfo := range buckets { bucket := binfo.Name bui, ok := dataUsageInfo.BucketsUsage[bucket] if !ok { // bucket doesn't exist anymore, or we // do not have any information to proceed. continue } // Check if the current bucket has quota restrictions, if not skip it cfg, err := globalBucketQuotaSys.Get(bucket) if err != nil { continue } if cfg.Type != madmin.FIFOQuota { continue } var toFree uint64 if bui.Size > cfg.Quota && cfg.Quota > 0 { toFree = bui.Size - cfg.Quota } if toFree == 0 { continue } // Allocate new results channel to receive ObjectInfo. objInfoCh := make(chan ObjectInfo) // Walk through all objects if err := objectAPI.Walk(ctx, bucket, "", objInfoCh); err != nil { return err } // reuse the fileScorer used by disk cache to score entries by // ModTime to find the oldest objects in bucket to delete. In // the context of bucket quota enforcement - number of hits are // irrelevant. scorer, err := newFileScorer(toFree, time.Now().Unix(), 1) if err != nil { return err } rcfg, _ := globalBucketObjectLockSys.Get(bucket) for obj := range objInfoCh { // skip objects currently under retention if rcfg.LockEnabled && enforceRetentionForDeletion(ctx, obj) { continue } scorer.addFile(obj.Name, obj.ModTime, obj.Size, 1) } var objects []string numKeys := len(scorer.fileNames()) for i, key := range scorer.fileNames() { objects = append(objects, key) if len(objects) < maxDeleteList && (i < numKeys-1) { // skip deletion until maxObjectList or end of slice continue } if len(objects) == 0 { break } // Deletes a list of objects. deleteErrs, err := objectAPI.DeleteObjects(ctx, bucket, objects) if err != nil { logger.LogIf(ctx, err) } else { for i := range deleteErrs { if deleteErrs[i] != nil { logger.LogIf(ctx, deleteErrs[i]) continue } // Notify object deleted event. sendEvent(eventArgs{ EventName: event.ObjectRemovedDelete, BucketName: bucket, Object: ObjectInfo{ Name: objects[i], }, Host: "Internal: [FIFO-QUOTA-EXPIRY]", }) } objects = nil } } } return nil }