MinIO source code analysis

Bucket log audit outline design specification

reference resources

1. Source code analysis of Minio event notification module

1. Event notification module NotificationSys

// NotificationSys - notification system.
type NotificationSys struct {
	sync.RWMutex
	targetList                 *event.TargetList
	targetResCh                chan event.TargetIDResult
	bucketRulesMap             map[string]event.RulesMap
	bucketRemoteTargetRulesMap map[string]map[event.TargetID]event.RulesMap
	peerClients                []*peerRESTClient
}

// Send - sends event data to all matching targets.
func (sys *NotificationSys) Send(args eventArgs) {
	sys.RLock()
	targetIDSet := sys.bucketRulesMap[args.BucketName].Match(args.EventName, args.Object.Name)
	sys.RUnlock()

	if len(targetIDSet) == 0 {
		return
	}

	sys.targetList.Send(args.ToEvent(true), targetIDSet, sys.targetResCh)
}

2. target.Target

The notification targets to which bucket events can be published include Redis, MySQL, PostgreSQL, Kafka, Elasticsearch, etc.

For multiple target s, use sync Waitgroup sends concurrently.

// Target - event target interface
type Target interface {
	ID() TargetID
	IsActive() (bool, error)
	Save(Event) error
	Send(string) error
	Close() error
	HasQueueStore() bool
}

// TargetList - holds list of targets indexed by target ID.
type TargetList struct {
	sync.RWMutex
	targets map[TargetID]Target
}

// Send - sends events to targets identified by target IDs.
func (list *TargetList) Send(event Event, targetIDset TargetIDSet, resCh chan<- TargetIDResult) {
	go func() {
		var wg sync.WaitGroup
		for id := range targetIDset {
			list.RLock()
			target, ok := list.targets[id]
			list.RUnlock()
			if ok {
				wg.Add(1)
				go func(id TargetID, target Target) {
					defer wg.Done()
					tgtRes := TargetIDResult{ID: id}
					if err := target.Save(event); err != nil {
						tgtRes.Err = err
					}
					resCh <- tgtRes
				}(id, target)
			} else {
				resCh <- TargetIDResult{ID: id}
			}
		}
		wg.Wait()
	}()
}

3. target.RedisTarget

The notification target supports two formats: namespace and access.

If the namespacee format is used, MinIO synchronizes the objects in the bucket into entries in Redis hash. For each entry, corresponding to an object in a bucket, its key is set to "bucket name / object name", and value is an event data in JSON format about the MinIO object. If the object is updated or deleted, the entry of the object in the hash will be updated or deleted accordingly.

If access is used, Minio uses RPUSH Add the event to the list. Each element in this list is a list in JSON format. There are two elements in this list. The first element is a timestamp string, and the second element is a JSON object containing event data for operation on this bucket. In this format, the elements in the list will not be updated or deleted.

// RedisTarget - Redis target.
type RedisTarget struct {
	id         event.TargetID
	args       RedisArgs
	pool       *redis.Pool
	store      Store
	firstPing  bool
	loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{})
}


// Save - saves the events to the store if questore is configured, which will be replayed when the redis connection is active.
func (target *RedisTarget) Save(eventData event.Event) error {
	if target.store != nil {
		return target.store.Put(eventData)
	}
	_, err := target.IsActive()
	if err != nil {
		return err
	}
	return target.send(eventData)
}


// send - sends an event to the redis.
func (target *RedisTarget) send(eventData event.Event) error {
	conn := target.pool.Get()
	defer func() {
		cErr := conn.Close()
		target.loggerOnce(context.Background(), cErr, target.ID())
	}()

	if target.args.Format == event.NamespaceFormat {
		objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
		if err != nil {
			return err
		}
		key := eventData.S3.Bucket.Name + "/" + objectName

		if eventData.EventName == event.ObjectRemovedDelete {
			_, err = conn.Do("HDEL", target.args.Key, key)
		} else {
			var data []byte
			if data, err = json.Marshal(struct{ Records []event.Event }{[]event.Event{eventData}}); err != nil {
				return err
			}

			_, err = conn.Do("HSET", target.args.Key, key, data)
		}
		if err != nil {
			return err
		}
	}

	if target.args.Format == event.AccessFormat {
		data, err := json.Marshal([]RedisAccessEvent{{Event: []event.Event{eventData}, EventTime: eventData.EventTime}})
		if err != nil {
			return err
		}
		if _, err := conn.Do("RPUSH", target.args.Key, data); err != nil {
			return err
		}
	}

	return nil
}

4. target.MySQLTarget

// MySQLTarget - MySQL target.
type MySQLTarget struct {
	id         event.TargetID
	args       MySQLArgs
	updateStmt *sql.Stmt
	deleteStmt *sql.Stmt
	insertStmt *sql.Stmt
	db         *sql.DB
	store      Store
	firstPing  bool
	loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{})
}

// Save - saves the events to the store which will be replayed when the SQL connection is active.
func (target *MySQLTarget) Save(eventData event.Event) error {
	if target.store != nil {
		return target.store.Put(eventData)
	}
	_, err := target.IsActive()
	if err != nil {
		return err
	}
	return target.send(eventData)
}

// send - sends an event to the mysql.
func (target *MySQLTarget) send(eventData event.Event) error {
	if target.args.Format == event.NamespaceFormat {
		objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
		if err != nil {
			return err
		}
		key := eventData.S3.Bucket.Name + "/" + objectName

		if eventData.EventName == event.ObjectRemovedDelete {
			_, err = target.deleteStmt.Exec(key)
		} else {
			var data []byte
			if data, err = json.Marshal(struct{ Records []event.Event }{[]event.Event{eventData}}); err != nil {
				return err
			}

			_, err = target.updateStmt.Exec(key, data)
		}

		return err
	}

	if target.args.Format == event.AccessFormat {
		eventTime, err := time.Parse(event.AMZTimeFormat, eventData.EventTime)
		if err != nil {
			return err
		}

		data, err := json.Marshal(struct{ Records []event.Event }{[]event.Event{eventData}})
		if err != nil {
			return err
		}

		_, err = target.insertStmt.Exec(eventTime, data)

		return err
	}

	return nil
}

5. target.KafkaTarget

// KafkaTarget - Kafka target.
type KafkaTarget struct {
	id         event.TargetID
	args       KafkaArgs
	producer   sarama.SyncProducer
	config     *sarama.Config
	store      Store
	loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{})
}

// Save - saves the events to the store which will be replayed when the Kafka connection is active.
func (target *KafkaTarget) Save(eventData event.Event) error {
	if target.store != nil {
		return target.store.Put(eventData)
	}
	_, err := target.IsActive()
	if err != nil {
		return err
	}
	return target.send(eventData)
}

// send - sends an event to the kafka.
func (target *KafkaTarget) send(eventData event.Event) error {
	objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
	if err != nil {
		return err
	}
	key := eventData.S3.Bucket.Name + "/" + objectName

	data, err := json.Marshal(event.Log{EventName: eventData.EventName, Key: key, Records: []event.Event{eventData}})
	if err != nil {
		return err
	}

	msg := sarama.ProducerMessage{
		Topic: target.args.Topic,
		Key:   sarama.StringEncoder(key),
		Value: sarama.ByteEncoder(data),
	}

	_, _, err = target.producer.SendMessage(&msg)

	return err
}

6. target.Store

// Store - To persist the events.
type Store interface {
	Put(event event.Event) error
	Get(key string) (event.Event, error)
	List() ([]string, error)
	Del(key string) error
	Open() error
}


// QueueStore - Filestore for persisting events.
type QueueStore struct {
	sync.RWMutex
	currentEntries uint64
	entryLimit     uint64
	directory      string
}

// NewQueueStore - Creates an instance for QueueStore.
func NewQueueStore(directory string, limit uint64) Store {
	if limit == 0 {
		limit = defaultLimit
		_, maxRLimit, err := sys.GetMaxOpenFileLimit()
		if err == nil {
			// Limit the maximum number of entries
			// to maximum open file limit
			if maxRLimit < limit {
				limit = maxRLimit
			}
		}
	}

	return &QueueStore{
		directory:  directory,
		entryLimit: limit,
	}
}

// write - writes event to the directory.
func (store *QueueStore) write(key string, e event.Event) error {

	// Marshalls the event.
	eventData, err := json.Marshal(e)
	if err != nil {
		return err
	}

	path := filepath.Join(store.directory, key+eventExt)
	if err := ioutil.WriteFile(path, eventData, os.FileMode(0770)); err != nil {
		return err
	}

	// Increment the event count.
	store.currentEntries++

	return nil
}

// Put - puts a event to the store.
func (store *QueueStore) Put(e event.Event) error {
	store.Lock()
	defer store.Unlock()
	if store.currentEntries >= store.entryLimit {
		return errLimitExceeded
	}
	key, err := getNewUUID()
	if err != nil {
		return err
	}
	return store.write(key, e)
}

7. Event type Name

Event types supported by MinIO:

// Name - event type enum.
// Refer http://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html#notification-how-to-event-types-and-destinations
type Name int

// Values of Name
const (
	ObjectAccessedAll Name = 1 + iota
	ObjectAccessedGet
	ObjectAccessedGetRetention
	ObjectAccessedGetLegalHold
	ObjectAccessedHead
	ObjectCreatedAll
	ObjectCreatedCompleteMultipartUpload
	ObjectCreatedCopy
	ObjectCreatedPost
	ObjectCreatedPut
	ObjectCreatedPutRetention
	ObjectCreatedPutLegalHold
	ObjectRemovedAll
	ObjectRemovedDelete
)

8. Event Event

The notification message sent by MinIO for publishing events is in JSON format

// Identity represents access key who caused the event.
type Identity struct {
	PrincipalID string `json:"principalId"`
}

// Bucket represents bucket metadata of the event.
type Bucket struct {
	Name          string   `json:"name"`
	OwnerIdentity Identity `json:"ownerIdentity"`
	ARN           string   `json:"arn"`
}

// Object represents object metadata of the event.
type Object struct {
	Key          string            `json:"key"`
	Size         int64             `json:"size,omitempty"`
	ETag         string            `json:"eTag,omitempty"`
	ContentType  string            `json:"contentType,omitempty"`
	UserMetadata map[string]string `json:"userMetadata,omitempty"`
	VersionID    string            `json:"versionId,omitempty"`
	Sequencer    string            `json:"sequencer"`
}

// Metadata represents event metadata.
type Metadata struct {
	SchemaVersion   string `json:"s3SchemaVersion"`
	ConfigurationID string `json:"configurationId"`
	Bucket          Bucket `json:"bucket"`
	Object          Object `json:"object"`
}

// Source represents client information who triggered the event.
type Source struct {
	Host      string `json:"host"`
	Port      string `json:"port"`
	UserAgent string `json:"userAgent"`
}

// Event represents event notification information defined in
// http://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html.
type Event struct {
	EventVersion      string            `json:"eventVersion"`
	EventSource       string            `json:"eventSource"`
	AwsRegion         string            `json:"awsRegion"`
	EventTime         string            `json:"eventTime"`
	EventName         Name              `json:"eventName"`
	UserIdentity      Identity          `json:"userIdentity"`
	RequestParameters map[string]string `json:"requestParameters"`
	ResponseElements  map[string]string `json:"responseElements"`
	S3                Metadata          `json:"s3"`
	Source            Source            `json:"source"`
}

9. Event log Log

// Log represents event information for some event targets.
type Log struct {
	EventName Name
	Key       string
	Records   []Event
}

2. Source code analysis of Minio gateway

Start minio gateway:

[root@node8217 install]# cat start_minio.sh
#!/bin/bash

export MINIO_ACCESS_KEY=ak_123456
export MINIO_SECRET_KEY=sk_123456
#export MINIO_PROMETHEUS_AUTH_TYPE="public"
#export MINIO_REGION_NAME="wuhan"

#./minio server --address :9009 /BigData/minio &
./minio gateway s3 http://127.0.0.1:9009 &

After startup, you can use the minio gateway as you use the minio server.

1. minio startup inlet

mino/main.go

package main // import "github.com/minio/minio"

import (
	"os"

	minio "github.com/minio/minio/cmd"

	// Import gateway
	_ "github.com/minio/minio/cmd/gateway"
)

func main() {
	minio.Main(os.Args)
}

The package is introduced here_ "github.com/minio/minio/cmd/gateway" will call GitHub init function of the supported gateway in COM / Minio / Minio / CMD / gateway package to initialize the supported gateway.

minio/cmd/main.go

func newApp(name string) *cli.App {
	// Collection of minio commands currently supported are.
	commands := []cli.Command{}

	// registerCommand registers a cli command.
	registerCommand := func(command cli.Command) {
		commands = append(commands, command)
	}

	// Register all commands.
	registerCommand(serverCmd)
	registerCommand(gatewayCmd)

	app := cli.NewApp()
	app.Name = name
	app.Commands = commands

	return app
}

// Main main for minio server.
func Main(args []string) {
	// Set the minio app name.
	appName := filepath.Base(args[0])

	// Run the app - exit on error.
	if err := newApp(appName).Run(args); err != nil {
		os.Exit(1)
	}
}

Two commands are registered, the global variables serverCmd and gatewayCmd

2. Register S3 gateway

gatewayCmd

minio/cmd/gateway-main.go

var (
	gatewayCmd = cli.Command{
		Name:            "gateway",
		Usage:           "start object storage gateway",
		Flags:           append(ServerFlags, GlobalFlags...),
		HideHelpCommand: true,
	}
)

// RegisterGatewayCommand registers a new command for gateway.
func RegisterGatewayCommand(cmd cli.Command) error {
	cmd.Flags = append(append(cmd.Flags, ServerFlags...), GlobalFlags...)
	gatewayCmd.Subcommands = append(gatewayCmd.Subcommands, cmd)
	return nil
}

Register S3 gateway

minio/cmd/gateway/s3/gateway-s3.go

const (
	s3Backend = "s3"
)

func init() {
	minio.RegisterGatewayCommand(cli.Command{
		Name:               s3Backend,
		Usage:              "Amazon Simple Storage Service (S3)",
		Action:             s3GatewayMain,
		HideHelpCommand:    true,
	})
}


// Handler for 'minio gateway s3' command line.
func s3GatewayMain(ctx *cli.Context) {
	args := ctx.Args()
	if !ctx.Args().Present() {
		args = cli.Args{"https://s3.amazonaws.com"}
	}

	serverAddr := ctx.GlobalString("address")
	if serverAddr == "" || serverAddr == ":"+minio.GlobalMinioDefaultPort {
		serverAddr = ctx.String("address")
	}
	// Validate gateway arguments.
	logger.FatalIf(minio.ValidateGatewayArguments(serverAddr, args.First()), "Invalid argument")

	// Start the gateway..
	minio.StartGateway(ctx, &S3{args.First()})
}

The focus is on Minio Startgateway (CTX, & S3 {args. First()}), start the corresponding gateway according to the parameters.

3. S3 implements the Gateway interface

minio/cmd/gateway-main.go

// StartGateway - handler for 'minio gateway <name>'.
func StartGateway(ctx *cli.Context, gw Gateway) {
	// Gateway startup
}

Visible, S3 S3 implements CMD Gateway interface,

// Gateway represents a gateway backend.
type Gateway interface {
	// Name returns the unique name of the gateway.
	Name() string

	// NewGatewayLayer returns a new  ObjectLayer.
	NewGatewayLayer(creds auth.Credentials) (ObjectLayer, error)

	// Returns true if gateway is ready for production.
	Production() bool
}

S3 gateway implements Gateway Interface

minio/cmd/gateway/s3/gateway-s3.go

// S3 implements Gateway.
type S3 struct {
	host string
}

// Name implements Gateway interface.
func (g *S3) Name() string {
	return s3Backend
}

// NewGatewayLayer returns s3 ObjectLayer.
func (g *S3) NewGatewayLayer(creds auth.Credentials) (minio.ObjectLayer, error) {
	// creds are ignored here, since S3 gateway implements chaining
	// all credentials.
	clnt, err := newS3(g.host)
	if err != nil {
		return nil, err
	}

	metrics := minio.NewMetrics()

	t := &minio.MetricsTransport{
		Transport: minio.NewGatewayHTTPTransport(),
		Metrics:   metrics,
	}

	// Set custom transport
	clnt.SetCustomTransport(t)

	probeBucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "probe-bucket-sign-")

	// Check if the provided keys are valid.
	if _, err = clnt.BucketExists(probeBucketName); err != nil {
		if miniogo.ToErrorResponse(err).Code != "AccessDenied" {
			return nil, err
		}
	}

	s := s3Objects{
		Client:  clnt,
		Metrics: metrics,
		HTTPClient: &http.Client{
			Transport: t,
		},
	}

	// Enables single encryption of KMS is configured.
	if minio.GlobalKMS != nil {
		encS := s3EncObjects{s}

		// Start stale enc multipart uploads cleanup routine.
		go encS.cleanupStaleEncMultipartUploads(minio.GlobalContext,
			minio.GlobalMultipartCleanupInterval, minio.GlobalMultipartExpiry)

		return &encS, nil
	}
	return &s, nil
}

// Production - s3 gateway is production ready.
func (g *S3) Production() bool {
	return true
}

4. Gateway startup

minio/cmd/gateway-main.go

// StartGateway - handler for 'minio gateway <name>'.
func StartGateway(ctx *cli.Context, gw Gateway) {
	if gw == nil {
		logger.FatalIf(errUnexpected, "Gateway implementation not initialized")
	}

	// Handle common command args.
	handleCommonCmdArgs(ctx)

	// Handle gateway specific env
	gatewayHandleEnvVars()

	// Set when gateway is enabled
	globalIsGateway = true

	router := mux.NewRouter().SkipClean(true).UseEncodedPath()

	// Enable IAM admin APIs if etcd is enabled, if not just enable basic
	// operations such as profiling, server info etc.
	registerAdminRouter(router, enableConfigOps, enableIAMOps)

	// Add healthcheck router
	registerHealthCheckRouter(router)

	// Add server metrics router
	registerMetricsRouter(router)

	// Register web router when its enabled.
	if globalBrowserEnabled {
		logger.FatalIf(registerWebRouter(router), "Unable to configure web browser")
	}

	// Currently only NAS and S3 gateway support encryption headers.
	encryptionEnabled := gatewayName == "s3" || gatewayName == "nas"
	allowSSEKMS := gatewayName == "s3" // Only S3 can support SSE-KMS (as pass-through)

	// Add API router.
	registerAPIRouter(router, encryptionEnabled, allowSSEKMS)


	httpServer := xhttp.NewServer([]string{globalCLIContext.Addr},
		criticalErrorHandler{registerHandlers(router, globalHandlers...)}, getCert)
	httpServer.BaseContext = func(listener net.Listener) context.Context {
		return GlobalContext
	}
	go func() {
		globalHTTPServerErrorCh <- httpServer.Start()
	}()

	globalObjLayerMutex.Lock()
	globalHTTPServer = httpServer
	globalObjLayerMutex.Unlock()

	newObject, err := gw.NewGatewayLayer(globalActiveCred)
	newObject = NewGatewayLayerWithLocker(newObject)

	// Once endpoints are finalized, initialize the new object api in safe mode.
	globalObjLayerMutex.Lock()
	globalObjectAPI = newObject
	globalObjLayerMutex.Unlock()

	// Calls all New() for all sub-systems.
	newAllSubsystems()

	if enableIAMOps {
		// Initialize IAM sys.
		logger.FatalIf(globalIAMSys.Init(GlobalContext, newObject), "Unable to initialize IAM system")
	}

	if globalCacheConfig.Enabled {
		// initialize the new disk cache objects.
		var cacheAPI CacheObjectLayer
		cacheAPI, err = newServerCacheObjects(GlobalContext, globalCacheConfig)
		logger.FatalIf(err, "Unable to initialize disk caching")

		globalObjLayerMutex.Lock()
		globalCacheObjectAPI = cacheAPI
		globalObjLayerMutex.Unlock()
	}
}

3. Source code analysis of Minio gateway disk cache

1. disk cache usage

#!/bin/bash

export MINIO_ACCESS_KEY=ak_123456
export MINIO_SECRET_KEY=sk_123456
#export MINIO_PROMETHEUS_AUTH_TYPE="public"
#export MINIO_REGION_NAME="wuhan"

#./minio server --address :9009 /BigData/minio &

export MINIO_CACHE="on"
export MINIO_CACHE_DRIVES="/mnt/drive1,/mnt/drive2"
export MINIO_CACHE_EXCLUDE="*.pdf,mybucket/*"
export MINIO_CACHE_QUOTA=80
export MINIO_CACHE_AFTER=3
export MINIO_CACHE_WATERMARK_LOW=70
export MINIO_CACHE_WATERMARK_HIGH=90

./minio gateway s3 http://127.0.0.1:9009 &

2. disk cache implementation

func StartGateway(ctx *cli.Context, gw Gateway) {
	// ......
	if globalCacheConfig.Enabled {
		// initialize the new disk cache objects.
		var cacheAPI CacheObjectLayer
		cacheAPI, err = newServerCacheObjects(GlobalContext, globalCacheConfig)
		logger.FatalIf(err, "Unable to initialize disk caching")

		globalObjLayerMutex.Lock()
		globalCacheObjectAPI = cacheAPI
		globalObjLayerMutex.Unlock()
	}
	// ......
}

minio/cmd/disk-cache.go

// CacheObjectLayer implements primitives for cache object API layer.
type CacheObjectLayer interface {
	// Object operations.
	GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error)
	GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error)
	DeleteObject(ctx context.Context, bucket, object string) error
	DeleteObjects(ctx context.Context, bucket string, objects []string) ([]error, error)
	PutObject(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error)
	CopyObject(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (objInfo ObjectInfo, err error)
	// Storage operations.
	StorageInfo(ctx context.Context) CacheStorageInfo
	CacheStats() *CacheStats
}

The cacheObjects structure implements the CacheObjectLayer interface:

// Returns cacheObjects for use by Server.
func newServerCacheObjects(ctx context.Context, config cache.Config) (CacheObjectLayer, error) {
	// list of disk caches for cache "drives" specified in config.json or MINIO_CACHE_DRIVES env var.
	cache, migrateSw, err := newCache(config)
	if err != nil {
		return nil, err
	}
	c := &cacheObjects{
		cache:      cache,
		exclude:    config.Exclude,
		after:      config.After,
		migrating:  migrateSw,
		migMutex:   sync.Mutex{},
		cacheStats: newCacheStats(),
		GetObjectInfoFn: func(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) {
			return newObjectLayerFn().GetObjectInfo(ctx, bucket, object, opts)
		},
		GetObjectNInfoFn: func(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) {
			return newObjectLayerFn().GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts)
		},
		DeleteObjectFn: func(ctx context.Context, bucket, object string) error {
			return newObjectLayerFn().DeleteObject(ctx, bucket, object)
		},
		DeleteObjectsFn: func(ctx context.Context, bucket string, objects []string) ([]error, error) {
			errs := make([]error, len(objects))
			for idx, object := range objects {
				errs[idx] = newObjectLayerFn().DeleteObject(ctx, bucket, object)
			}
			return errs, nil
		},
		PutObjectFn: func(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) {
			return newObjectLayerFn().PutObject(ctx, bucket, object, data, opts)
		},
		CopyObjectFn: func(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (objInfo ObjectInfo, err error) {
			return newObjectLayerFn().CopyObject(ctx, srcBucket, srcObject, destBucket, destObject, srcInfo, srcOpts, dstOpts)
		},
	}

	if migrateSw {
		go c.migrateCacheFromV1toV2(ctx)
	}
	go c.gc(ctx)
	return c, nil
}
// Abstracts disk caching - used by the S3 layer
type cacheObjects struct {
	// slice of cache drives
	cache []*diskCache
	// file path patterns to exclude from cache
	exclude []string
	// number of accesses after which to cache an object
	after int
	// if true migration is in progress from v1 to v2
	migrating bool
	// mutex to protect migration bool
	migMutex sync.Mutex

	// Cache stats
	cacheStats *CacheStats

	GetObjectNInfoFn func(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error)
	GetObjectInfoFn  func(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error)
	DeleteObjectFn   func(ctx context.Context, bucket, object string) error
	DeleteObjectsFn  func(ctx context.Context, bucket string, objects []string) ([]error, error)
	PutObjectFn      func(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error)
	CopyObjectFn     func(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (objInfo ObjectInfo, err error)
}

The real cache layer is [] * diskCache implementation:

// newCache initializes the cacheFSObjects for the "drives" specified in config.json
// or the global env overrides.
func newCache(config cache.Config) ([]*diskCache, bool, error) {
	var caches []*diskCache
	ctx := logger.SetReqInfo(GlobalContext, &logger.ReqInfo{})
	formats, migrating, err := loadAndValidateCacheFormat(ctx, config.Drives)
	if err != nil {
		return nil, false, err
	}
	for i, dir := range config.Drives {
		// skip diskCache creation for cache drives missing a format.json
		if formats[i] == nil {
			caches = append(caches, nil)
			continue
		}
		if err := checkAtimeSupport(dir); err != nil {
			return nil, false, errors.New("Atime support required for disk caching")
		}

		quota := config.MaxUse
		if quota == 0 {
			quota = config.Quota
		}
		cache, err := newDiskCache(dir, quota, config.After, config.WatermarkLow, config.WatermarkHigh)
		if err != nil {
			return nil, false, err
		}
		caches = append(caches, cache)
	}
	return caches, migrating, nil
}

4. MinIO IAM module

1. User authentication

Each operation of S3 protocol requires IAMSys to perform permission verification. The main codes are as follows:

	if globalIAMSys.IsAllowed(iampolicy.Args{
		AccountName:     cred.AccessKey,
		Action:          iampolicy.Action(action),
		BucketName:      bucketName,
		ConditionValues: getConditionValues(r, "", cred.AccessKey, claims),
		ObjectName:      objectName,
		IsOwner:         owner,
		Claims:          claims,
	}) {
		// Request is allowed return the appropriate access key.
		return cred.AccessKey, owner, ErrNone
	}
	return cred.AccessKey, owner, ErrAccessDenied

2. globalIAMSys

minio/cmd/globals.go

var (
	globalIAMSys            *IAMSys
)

cmd/server-main.go

func newAllSubsystems() {
	// Create new notification system and initialize notification targets
	globalNotificationSys = NewNotificationSys(globalEndpoints)

	// Create new bucket metadata system.
	globalBucketMetadataSys = NewBucketMetadataSys()

	// Create a new config system.
	globalConfigSys = NewConfigSys()

	// Create new IAM system.
	globalIAMSys = NewIAMSys()

	// Create new policy system.
	globalPolicySys = NewPolicySys()

	// Create new lifecycle system.
	globalLifecycleSys = NewLifecycleSys()

	// Create new bucket object lock subsystem
	globalBucketObjectLockSys = NewBucketObjectLockSys()

	// Create new bucket quota subsystem
	globalBucketQuotaSys = NewBucketQuotaSys()
}

minio/cmd/iam.go

package cmd


// IAMSys - config system.
type IAMSys struct {
	usersSysType UsersSysType

	// map of policy names to policy definitions
	iamPolicyDocsMap map[string]iampolicy.Policy
	// map of usernames to credentials
	iamUsersMap map[string]auth.Credentials
	// map of group names to group info
	iamGroupsMap map[string]GroupInfo
	// map of user names to groups they are a member of
	iamUserGroupMemberships map[string]set.StringSet
	// map of usernames/temporary access keys to policy names
	iamUserPolicyMap map[string]MappedPolicy
	// map of group names to policy names
	iamGroupPolicyMap map[string]MappedPolicy

	// Persistence layer for IAM subsystem
	store IAMStorageAPI
}

// NewIAMSys - creates new config system object.
func NewIAMSys() *IAMSys {
	return &IAMSys{
		usersSysType:            MinIOUsersSysType,
		iamUsersMap:             make(map[string]auth.Credentials),
		iamPolicyDocsMap:        make(map[string]iampolicy.Policy),
		iamUserPolicyMap:        make(map[string]MappedPolicy),
		iamGroupsMap:            make(map[string]GroupInfo),
		iamUserGroupMemberships: make(map[string]set.StringSet),
	}
}
  • iamUsersMap map[string]auth.Credentials saves user information;

  • Store iamstorage API is the persistent storage layer of IAM system, which can be etcd / mini;

3. globalIAMSys initialization

minio/cmd/iam.go

// Init - initializes config system from iam.json
func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) error {

	if globalEtcdClient == nil {
		sys.store = newIAMObjectStore(ctx, objAPI)
	} else {
		sys.store = newIAMEtcdStore(ctx)
	}

	// Migrate IAM configuration
	if err := sys.doIAMConfigMigration(ctx); err != nil {
		return err
	}

	err := sys.store.loadAll(ctx, sys)

	go sys.store.watch(ctx, sys)
	return err
}

If the globalEtcdClient is not empty, etcd is preferred as the storage layer.

sys.store.loadAll(ctx, sys) loads all IAM data;

go sys.store.watch(ctx, sys) background monitors all updates;

4. IAMEtcdStore implements IAMStorageAPI

  1. During initialization, load all data in etcd;
  2. Background watch and reload in time;

minio/cmd/iam-etcd-store.go

// IAMEtcdStore implements IAMStorageAPI
type IAMEtcdStore struct {
	sync.RWMutex

	ctx context.Context

	client *etcd.Client
}

func newIAMEtcdStore(ctx context.Context) *IAMEtcdStore {
	return &IAMEtcdStore{client: globalEtcdClient, ctx: ctx}
}

func (ies *IAMEtcdStore) loadAll(ctx context.Context, sys *IAMSys) error load all IAM data into memory:

func (ies *IAMEtcdStore) loadAll(ctx context.Context, sys *IAMSys) error {
	iamUsersMap := make(map[string]auth.Credentials)
	iamGroupsMap := make(map[string]GroupInfo)
	iamPolicyDocsMap := make(map[string]iampolicy.Policy)
	iamUserPolicyMap := make(map[string]MappedPolicy)
	iamGroupPolicyMap := make(map[string]MappedPolicy)

	isMinIOUsersSys := false
	ies.rlock()
	if sys.usersSysType == MinIOUsersSysType {
		isMinIOUsersSys = true
	}
	ies.runlock()

	if err := ies.loadPolicyDocs(ctx, iamPolicyDocsMap); err != nil {
		return err
	}

	// load STS temp users
	if err := ies.loadUsers(ctx, stsUser, iamUsersMap); err != nil {
		return err
	}

	if isMinIOUsersSys {
		// load long term users
		if err := ies.loadUsers(ctx, regularUser, iamUsersMap); err != nil {
			return err
		}
		if err := ies.loadUsers(ctx, srvAccUser, iamUsersMap); err != nil {
			return err
		}
		if err := ies.loadGroups(ctx, iamGroupsMap); err != nil {
			return err
		}
		if err := ies.loadMappedPolicies(ctx, regularUser, false, iamUserPolicyMap); err != nil {
			return err
		}
	}

	// load STS policy mappings into the same map
	if err := ies.loadMappedPolicies(ctx, stsUser, false, iamUserPolicyMap); err != nil {
		return err
	}
	// load policies mapped to groups
	if err := ies.loadMappedPolicies(ctx, regularUser, true, iamGroupPolicyMap); err != nil {
		return err
	}

	ies.lock()
	defer ies.Unlock()

	// Merge the new reloaded entries into global map.
	// See issue https://github.com/minio/minio/issues/9651
	// where the present list of entries on disk are not yet
	// latest, there is a small window where this can make
	// valid users invalid.
	for k, v := range iamUsersMap {
		sys.iamUsersMap[k] = v
	}

	for k, v := range iamPolicyDocsMap {
		sys.iamPolicyDocsMap[k] = v
	}

	// Sets default canned policies, if none are set.
	setDefaultCannedPolicies(sys.iamPolicyDocsMap)

	for k, v := range iamUserPolicyMap {
		sys.iamUserPolicyMap[k] = v
	}

	// purge any expired entries which became expired now.
	for k, v := range sys.iamUsersMap {
		if v.IsExpired() {
			delete(sys.iamUsersMap, k)
			delete(sys.iamUserPolicyMap, k)
			// Deleting on the etcd is taken care of in the next cycle
		}
	}

	for k, v := range iamGroupPolicyMap {
		sys.iamGroupPolicyMap[k] = v
	}

	for k, v := range iamGroupsMap {
		sys.iamGroupsMap[k] = v
	}

	sys.buildUserGroupMemberships()

	return nil
}

func (ies *IAMEtcdStore) watch(ctx context.Context, sys *IAMSys) background monitoring data update:

func (ies *IAMEtcdStore) watch(ctx context.Context, sys *IAMSys) {
	for {
	outerLoop:
		// Refresh IAMSys with etcd watch.
		watchCh := ies.client.Watch(ctx,
			iamConfigPrefix, etcd.WithPrefix(), etcd.WithKeysOnly())

		for {
			select {
			case <-ctx.Done():
				return
			case watchResp, ok := <-watchCh:
				if !ok {
					time.Sleep(1 * time.Second)
					// Upon an error on watch channel
					// re-init the watch channel.
					goto outerLoop
				}
				if err := watchResp.Err(); err != nil {
					logger.LogIf(ctx, err)
					// log and retry.
					time.Sleep(1 * time.Second)
					// Upon an error on watch channel
					// re-init the watch channel.
					goto outerLoop
				}
				for _, event := range watchResp.Events {
					ies.lock()
					ies.reloadFromEvent(sys, event)
					ies.unlock()
				}
			}
		}
	}
}
// sys.RLock is held by caller.
func (ies *IAMEtcdStore) reloadFromEvent(sys *IAMSys, event *etcd.Event) {
	eventCreate := event.IsModify() || event.IsCreate()
	eventDelete := event.Type == etcd.EventTypeDelete
	usersPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigUsersPrefix)
	groupsPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigGroupsPrefix)
	stsPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigSTSPrefix)
	policyPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigPoliciesPrefix)
	policyDBUsersPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigPolicyDBUsersPrefix)
	policyDBSTSUsersPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigPolicyDBSTSUsersPrefix)
	policyDBGroupsPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigPolicyDBGroupsPrefix)

	switch {
	case eventCreate:
		switch {
		case usersPrefix:
			accessKey := path.Dir(strings.TrimPrefix(string(event.Kv.Key),
				iamConfigUsersPrefix))
			ies.loadUser(accessKey, regularUser, sys.iamUsersMap)
		case stsPrefix:
			accessKey := path.Dir(strings.TrimPrefix(string(event.Kv.Key),
				iamConfigSTSPrefix))
			ies.loadUser(accessKey, stsUser, sys.iamUsersMap)
		case groupsPrefix:
			group := path.Dir(strings.TrimPrefix(string(event.Kv.Key),
				iamConfigGroupsPrefix))
			ies.loadGroup(group, sys.iamGroupsMap)
			gi := sys.iamGroupsMap[group]
			sys.removeGroupFromMembershipsMap(group)
			sys.updateGroupMembershipsMap(group, &gi)
		case policyPrefix:
			policyName := path.Dir(strings.TrimPrefix(string(event.Kv.Key),
				iamConfigPoliciesPrefix))
			ies.loadPolicyDoc(policyName, sys.iamPolicyDocsMap)
		case policyDBUsersPrefix:
			policyMapFile := strings.TrimPrefix(string(event.Kv.Key),
				iamConfigPolicyDBUsersPrefix)
			user := strings.TrimSuffix(policyMapFile, ".json")
			ies.loadMappedPolicy(user, regularUser, false, sys.iamUserPolicyMap)
		case policyDBSTSUsersPrefix:
			policyMapFile := strings.TrimPrefix(string(event.Kv.Key),
				iamConfigPolicyDBSTSUsersPrefix)
			user := strings.TrimSuffix(policyMapFile, ".json")
			ies.loadMappedPolicy(user, stsUser, false, sys.iamUserPolicyMap)
		case policyDBGroupsPrefix:
			policyMapFile := strings.TrimPrefix(string(event.Kv.Key),
				iamConfigPolicyDBGroupsPrefix)
			user := strings.TrimSuffix(policyMapFile, ".json")
			ies.loadMappedPolicy(user, regularUser, true, sys.iamGroupPolicyMap)
		}
	case eventDelete:
		switch {
		case usersPrefix:
			accessKey := path.Dir(strings.TrimPrefix(string(event.Kv.Key),
				iamConfigUsersPrefix))
			delete(sys.iamUsersMap, accessKey)
		case stsPrefix:
			accessKey := path.Dir(strings.TrimPrefix(string(event.Kv.Key),
				iamConfigSTSPrefix))
			delete(sys.iamUsersMap, accessKey)
		case groupsPrefix:
			group := path.Dir(strings.TrimPrefix(string(event.Kv.Key),
				iamConfigGroupsPrefix))
			sys.removeGroupFromMembershipsMap(group)
			delete(sys.iamGroupsMap, group)
			delete(sys.iamGroupPolicyMap, group)
		case policyPrefix:
			policyName := path.Dir(strings.TrimPrefix(string(event.Kv.Key),
				iamConfigPoliciesPrefix))
			delete(sys.iamPolicyDocsMap, policyName)
		case policyDBUsersPrefix:
			policyMapFile := strings.TrimPrefix(string(event.Kv.Key),
				iamConfigPolicyDBUsersPrefix)
			user := strings.TrimSuffix(policyMapFile, ".json")
			delete(sys.iamUserPolicyMap, user)
		case policyDBSTSUsersPrefix:
			policyMapFile := strings.TrimPrefix(string(event.Kv.Key),
				iamConfigPolicyDBSTSUsersPrefix)
			user := strings.TrimSuffix(policyMapFile, ".json")
			delete(sys.iamUserPolicyMap, user)
		case policyDBGroupsPrefix:
			policyMapFile := strings.TrimPrefix(string(event.Kv.Key),
				iamConfigPolicyDBGroupsPrefix)
			user := strings.TrimSuffix(policyMapFile, ".json")
			delete(sys.iamGroupPolicyMap, user)
		}
	}
}

Addition, deletion, modification and query of user information

func (ies *IAMEtcdStore) saveUserIdentity(name string, userType IAMUserType, u UserIdentity) error {
	return ies.saveIAMConfig(u, getUserIdentityPath(name, userType))
}

func (ies *IAMEtcdStore) saveIAMConfig(item interface{}, path string) error {
	data, err := json.Marshal(item)
	if err != nil {
		return err
	}
	if globalConfigEncrypted {
		data, err = madmin.EncryptData(globalActiveCred.String(), data)
		if err != nil {
			return err
		}
	}
	return saveKeyEtcd(ies.ctx, ies.client, path, data)
}
func (ies *IAMEtcdStore) deleteUserIdentity(name string, userType IAMUserType) error {
	err := ies.deleteIAMConfig(getUserIdentityPath(name, userType))
	if err == errConfigNotFound {
		err = errNoSuchUser
	}
	return err
}

func (ies *IAMEtcdStore) deleteIAMConfig(path string) error {
	return deleteKeyEtcd(ies.ctx, ies.client, path)
}

5. Analysis of Minio configuration management source code

reference resources:

Profile: ${home} / minio/config. JSON, in release After 2018-08-18t03-49-57z, the configuration file is saved in the back-end storage.

1. Global variable globalConfigSys

minio/cmd/server-main.go

func serverMain(ctx *cli.Context) {
	// ...
	
	// Initialize all subsystems
	newAllSubsystems()
	
	// ...
}

func newAllSubsystems() {
	// Create new notification system and initialize notification targets
	globalNotificationSys = NewNotificationSys(globalEndpoints)

	// Create new bucket metadata system.
	globalBucketMetadataSys = NewBucketMetadataSys()

	// Create a new config system.
	globalConfigSys = NewConfigSys()

	// Create new IAM system.
	globalIAMSys = NewIAMSys()

	// Create new policy system.
	globalPolicySys = NewPolicySys()

	// Create new lifecycle system.
	globalLifecycleSys = NewLifecycleSys()

	// Create new bucket encryption subsystem
	globalBucketSSEConfigSys = NewBucketSSEConfigSys()

	// Create new bucket object lock subsystem
	globalBucketObjectLockSys = NewBucketObjectLockSys()

	// Create new bucket quota subsystem
	globalBucketQuotaSys = NewBucketQuotaSys()
}


func initAllSubsystems(newObject ObjectLayer) (err error) {
	// ...
	
    // Initialize config system.
	if err = globalConfigSys.Init(newObject); err != nil {
		return fmt.Errorf("Unable to initialize config system: %w", err)
	}
	
	// ...
}

2. ConfigSys

minio/cmd/config.go

package cmd

// ConfigSys - config system.
type ConfigSys struct{}

// NewConfigSys - creates new config system object.
func NewConfigSys() *ConfigSys {
	return &ConfigSys{}
}

// Init - initializes config system from config.json.
func (sys *ConfigSys) Init(objAPI ObjectLayer) error {
	if objAPI == nil {
		return errInvalidArgument
	}

	return initConfig(objAPI)
}

// Initialize and load config from remote etcd or local config directory
func initConfig(objAPI ObjectLayer) error {
	if objAPI == nil {
		return errServerNotInitialized
	}

	if isFile(getConfigFile()) {
		if err := migrateConfig(); err != nil {
			return err
		}
	}

	// Migrates ${HOME}/.minio/config.json or config.json.deprecated
	// to '<export_path>/.minio.sys/config/config.json'
	// ignore if the file doesn't exist.
	// If etcd is set then migrates /config/config.json
	// to '<export_path>/.minio.sys/config/config.json'
	if err := migrateConfigToMinioSys(objAPI); err != nil {
		return err
	}

	// Migrates backend '<export_path>/.minio.sys/config/config.json' to latest version.
	if err := migrateMinioSysConfig(objAPI); err != nil {
		return err
	}

	// Migrates backend '<export_path>/.minio.sys/config/config.json' to
	// latest config format.
	if err := migrateMinioSysConfigToKV(objAPI); err != nil {
		return err
	}

	return loadConfig(objAPI)
}

3. globalServerConfig

minio/cmd/config-current.go

package cmd

var (
	// globalServerConfig server config.
	globalServerConfig   config.Config
	globalServerConfigMu sync.RWMutex
)

// loadConfig - loads a new config from disk, overrides params
// from env if found and valid
func loadConfig(objAPI ObjectLayer) error {
	srvCfg, err := getValidConfig(objAPI)
	if err != nil {
		return err
	}

	// Override any values from ENVs.
	lookupConfigs(srvCfg)

	// hold the mutex lock before a new config is assigned.
	globalServerConfigMu.Lock()
	globalServerConfig = srvCfg
	globalServerConfigMu.Unlock()

	return nil
}

4. Addition, deletion, modification and query of configuration

minio/cmd/admin-router.go

package cmd

// adminAPIHandlers provides HTTP handlers for MinIO admin API.
type adminAPIHandlers struct{}

// registerAdminRouter - Add handler functions for each service REST API routes.
func registerAdminRouter(router *mux.Router, enableConfigOps, enableIAMOps bool) {

	adminAPI := adminAPIHandlers{}
	// Admin router
	adminRouter := router.PathPrefix(adminPathPrefix).Subrouter()

	/// Service operations

	adminVersions := []string{
		adminAPIVersionPrefix,
		adminAPIVersionV2Prefix,
	}

	for _, adminVersion := range adminVersions {		
		
		// Config KV operations.
		if enableConfigOps {
			adminRouter.Methods(http.MethodGet).Path(adminVersion+"/get-config-kv").HandlerFunc(httpTraceHdrs(adminAPI.GetConfigKVHandler)).Queries("key", "{key:.*}")
			adminRouter.Methods(http.MethodPut).Path(adminVersion + "/set-config-kv").HandlerFunc(httpTraceHdrs(adminAPI.SetConfigKVHandler))
			adminRouter.Methods(http.MethodDelete).Path(adminVersion + "/del-config-kv").HandlerFunc(httpTraceHdrs(adminAPI.DelConfigKVHandler))
		}
		
	// If none of the routes match add default error handler routes
	adminRouter.NotFoundHandler = http.HandlerFunc(httpTraceAll(errorResponseHandler))
	adminRouter.MethodNotAllowedHandler = http.HandlerFunc(httpTraceAll(errorResponseHandler))
}		

minio/cmd/admin-handlers-config-kv.go

// SetConfigKVHandler - PUT /minio/admin/v3/set-config-kv
func (a adminAPIHandlers) SetConfigKVHandler(w http.ResponseWriter, r *http.Request) {
	ctx := newContext(r, w, "SetConfigKV")

	defer logger.AuditLog(w, r, "SetConfigKV", mustGetClaimsFromToken(r))

	cred, objectAPI := validateAdminReqConfigKV(ctx, w, r)
	if objectAPI == nil {
		return
	}

	if r.ContentLength > maxEConfigJSONSize || r.ContentLength == -1 {
		// More than maxConfigSize bytes were available
		writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrAdminConfigTooLarge), r.URL)
		return
	}

	password := cred.SecretKey
	kvBytes, err := madmin.DecryptData(password, io.LimitReader(r.Body, r.ContentLength))
	if err != nil {
		logger.LogIf(ctx, err, logger.Application)
		writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrAdminConfigBadJSON), r.URL)
		return
	}

	cfg, err := readServerConfig(ctx, objectAPI)
	if err != nil {
		writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
		return
	}

	if _, err = cfg.ReadFrom(bytes.NewReader(kvBytes)); err != nil {
		writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
		return
	}

	if err = validateConfig(cfg); err != nil {
		writeCustomErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrAdminConfigBadJSON), err.Error(), r.URL)
		return
	}

	// Update the actual server config on disk.
	if err = saveServerConfig(ctx, objectAPI, cfg); err != nil {
		writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
		return
	}

	// Write to the config input KV to history.
	if err = saveServerConfigHistory(ctx, objectAPI, kvBytes); err != nil {
		writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
		return
	}

	// Make sure to write backend is encrypted
	if globalConfigEncrypted {
		saveConfig(GlobalContext, objectAPI, backendEncryptedFile, backendEncryptedMigrationComplete)
	}

	writeSuccessResponseHeadersOnly(w)
}

6. MinIO Bucket quota

There are two types of Bucket quotas: Hard + FIFO

  • Hard quota disallows writes to the bucket after configured quota limit is reached.
  • FIFO quota automatically deletes oldest content until bucket usage falls within configured limit while permitting writes.

1. Verify the bucket capacity when putobject is used

func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Request) {

	// ...
	
	if err := enforceBucketQuota(ctx, bucket, size); err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
		return
	}
	
	// ...
}

2. BucketQuotaSys sub module

minio/cmd/bucket-quota.go

func enforceBucketQuota(ctx context.Context, bucket string, size int64) error {
	if size < 0 {
		return nil
	}

	return globalBucketQuotaSys.check(ctx, bucket, size)
}

minio/cmd/bucket-quota.go

// BucketQuotaSys - map of bucket and quota configuration.
type BucketQuotaSys struct {
	bucketStorageCache timedValue
}

// Get - Get quota configuration.
func (sys *BucketQuotaSys) Get(bucketName string) (*madmin.BucketQuota, error) {
	if globalIsGateway {
		objAPI := newObjectLayerFn()
		if objAPI == nil {
			return nil, errServerNotInitialized
		}
		return &madmin.BucketQuota{}, nil
	}

	return globalBucketMetadataSys.GetQuotaConfig(bucketName)
}

// NewBucketQuotaSys returns initialized BucketQuotaSys
func NewBucketQuotaSys() *BucketQuotaSys {
	return &BucketQuotaSys{}
}

func (sys *BucketQuotaSys) check(ctx context.Context, bucket string, size int64) error {
	objAPI := newObjectLayerWithoutSafeModeFn()
	if objAPI == nil {
		return errServerNotInitialized
	}

	q, err := sys.Get(bucket)
	if err != nil {
		return nil
	}

	if q.Type == madmin.FIFOQuota {
		return nil
	}

	if q.Quota == 0 {
		// No quota set return quickly.
		return nil
	}

	sys.bucketStorageCache.Once.Do(func() {
		sys.bucketStorageCache.TTL = 10 * time.Second
		sys.bucketStorageCache.Update = func() (interface{}, error) {
			return loadDataUsageFromBackend(ctx, objAPI)
		}
	})

	v, err := sys.bucketStorageCache.Get()
	if err != nil {
		return err
	}

	dui := v.(DataUsageInfo)

	bui, ok := dui.BucketsUsage[bucket]
	if !ok {
		// bucket not found, cannot enforce quota
		// call will fail anyways later.
		return nil
	}

	if (bui.Size + uint64(size)) > q.Quota {
		return BucketQuotaExceeded{Bucket: bucket}
	}

	return nil
}

3. Background task: periodically delete object s to free up space

const (
	bgQuotaInterval = 1 * time.Hour
)

// initQuotaEnforcement starts the routine that deletes objects in bucket
// that exceeds the FIFO quota
func initQuotaEnforcement(ctx context.Context, objAPI ObjectLayer) {
	if env.Get(envDataUsageCrawlConf, config.EnableOn) == config.EnableOn {
		go startBucketQuotaEnforcement(ctx, objAPI)
	}
}

func startBucketQuotaEnforcement(ctx context.Context, objAPI ObjectLayer) {
	for {
		select {
		case <-ctx.Done():
			return
		case <-time.NewTimer(bgQuotaInterval).C:
			logger.LogIf(ctx, enforceFIFOQuota(ctx, objAPI))
		}

	}
}

// enforceFIFOQuota deletes objects in FIFO order until sufficient objects
// have been deleted so as to bring bucket usage within quota
func enforceFIFOQuota(ctx context.Context, objectAPI ObjectLayer) error {
	// Turn off quota enforcement if data usage info is unavailable.
	if env.Get(envDataUsageCrawlConf, config.EnableOn) == config.EnableOff {
		return nil
	}

	buckets, err := objectAPI.ListBuckets(ctx)
	if err != nil {
		return err
	}

	dataUsageInfo, err := loadDataUsageFromBackend(ctx, objectAPI)
	if err != nil {
		return err
	}

	for _, binfo := range buckets {
		bucket := binfo.Name

		bui, ok := dataUsageInfo.BucketsUsage[bucket]
		if !ok {
			// bucket doesn't exist anymore, or we
			// do not have any information to proceed.
			continue
		}

		// Check if the current bucket has quota restrictions, if not skip it
		cfg, err := globalBucketQuotaSys.Get(bucket)
		if err != nil {
			continue
		}

		if cfg.Type != madmin.FIFOQuota {
			continue
		}

		var toFree uint64
		if bui.Size > cfg.Quota && cfg.Quota > 0 {
			toFree = bui.Size - cfg.Quota
		}

		if toFree == 0 {
			continue
		}

		// Allocate new results channel to receive ObjectInfo.
		objInfoCh := make(chan ObjectInfo)

		// Walk through all objects
		if err := objectAPI.Walk(ctx, bucket, "", objInfoCh); err != nil {
			return err
		}

		// reuse the fileScorer used by disk cache to score entries by
		// ModTime to find the oldest objects in bucket to delete. In
		// the context of bucket quota enforcement - number of hits are
		// irrelevant.
		scorer, err := newFileScorer(toFree, time.Now().Unix(), 1)
		if err != nil {
			return err
		}

		rcfg, _ := globalBucketObjectLockSys.Get(bucket)

		for obj := range objInfoCh {
			// skip objects currently under retention
			if rcfg.LockEnabled && enforceRetentionForDeletion(ctx, obj) {
				continue
			}
			scorer.addFile(obj.Name, obj.ModTime, obj.Size, 1)
		}
		var objects []string
		numKeys := len(scorer.fileNames())
		for i, key := range scorer.fileNames() {
			objects = append(objects, key)
			if len(objects) < maxDeleteList && (i < numKeys-1) {
				// skip deletion until maxObjectList or end of slice
				continue
			}

			if len(objects) == 0 {
				break
			}
			// Deletes a list of objects.
			deleteErrs, err := objectAPI.DeleteObjects(ctx, bucket, objects)
			if err != nil {
				logger.LogIf(ctx, err)
			} else {
				for i := range deleteErrs {
					if deleteErrs[i] != nil {
						logger.LogIf(ctx, deleteErrs[i])
						continue
					}
					// Notify object deleted event.
					sendEvent(eventArgs{
						EventName:  event.ObjectRemovedDelete,
						BucketName: bucket,
						Object: ObjectInfo{
							Name: objects[i],
						},
						Host: "Internal: [FIFO-QUOTA-EXPIRY]",
					})
				}
				objects = nil
			}
		}
	}
	return nil
}

Added by penguinboy on Mon, 07 Mar 2022 21:07:55 +0200