forked from forgejo/forgejo
Rewrite queue (#24505)
# ⚠️ Breaking Many deprecated queue config options are removed (actually, they should have been removed in 1.18/1.19). If you see the fatal message when starting Gitea: "Please update your app.ini to remove deprecated config options", please follow the error messages to remove these options from your app.ini. Example: ``` 2023/05/06 19:39:22 [E] Removed queue option: `[indexer].ISSUE_INDEXER_QUEUE_TYPE`. Use new options in `[queue.issue_indexer]` 2023/05/06 19:39:22 [E] Removed queue option: `[indexer].UPDATE_BUFFER_LEN`. Use new options in `[queue.issue_indexer]` 2023/05/06 19:39:22 [F] Please update your app.ini to remove deprecated config options ``` Many options in `[queue]` are are dropped, including: `WRAP_IF_NECESSARY`, `MAX_ATTEMPTS`, `TIMEOUT`, `WORKERS`, `BLOCK_TIMEOUT`, `BOOST_TIMEOUT`, `BOOST_WORKERS`, they can be removed from app.ini. # The problem The old queue package has some legacy problems: * complexity: I doubt few people could tell how it works. * maintainability: Too many channels and mutex/cond are mixed together, too many different structs/interfaces depends each other. * stability: due to the complexity & maintainability, sometimes there are strange bugs and difficult to debug, and some code doesn't have test (indeed some code is difficult to test because a lot of things are mixed together). * general applicability: although it is called "queue", its behavior is not a well-known queue. * scalability: it doesn't seem easy to make it work with a cluster without breaking its behaviors. It came from some very old code to "avoid breaking", however, its technical debt is too heavy now. It's a good time to introduce a better "queue" package. # The new queue package It keeps using old config and concept as much as possible. * It only contains two major kinds of concepts: * The "base queue": channel, levelqueue, redis * They have the same abstraction, the same interface, and they are tested by the same testing code. * The "WokerPoolQueue", it uses the "base queue" to provide "worker pool" function, calls the "handler" to process the data in the base queue. * The new code doesn't do "PushBack" * Think about a queue with many workers, the "PushBack" can't guarantee the order for re-queued unhandled items, so in new code it just does "normal push" * The new code doesn't do "pause/resume" * The "pause/resume" was designed to handle some handler's failure: eg: document indexer (elasticsearch) is down * If a queue is paused for long time, either the producers blocks or the new items are dropped. * The new code doesn't do such "pause/resume" trick, it's not a common queue's behavior and it doesn't help much. * If there are unhandled items, the "push" function just blocks for a few seconds and then re-queue them and retry. * The new code doesn't do "worker booster" * Gitea's queue's handlers are light functions, the cost is only the go-routine, so it doesn't make sense to "boost" them. * The new code only use "max worker number" to limit the concurrent workers. * The new "Push" never blocks forever * Instead of creating more and more blocking goroutines, return an error is more friendly to the server and to the end user. There are more details in code comments: eg: the "Flush" problem, the strange "code.index" hanging problem, the "immediate" queue problem. Almost ready for review. TODO: * [x] add some necessary comments during review * [x] add some more tests if necessary * [x] update documents and config options * [x] test max worker / active worker * [x] re-run the CI tasks to see whether any test is flaky * [x] improve the `handleOldLengthConfiguration` to provide more friendly messages * [x] fine tune default config values (eg: length?) ## Code coverage: 
This commit is contained in:
parent
cb700aedd1
commit
6f9c278559
100 changed files with 2496 additions and 6858 deletions
|
@ -5,198 +5,109 @@ package setting
|
|||
|
||||
import (
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"code.gitea.io/gitea/modules/container"
|
||||
"code.gitea.io/gitea/modules/json"
|
||||
"code.gitea.io/gitea/modules/log"
|
||||
)
|
||||
|
||||
// QueueSettings represent the settings for a queue from the ini
|
||||
type QueueSettings struct {
|
||||
Name string
|
||||
DataDir string
|
||||
QueueLength int `ini:"LENGTH"`
|
||||
BatchLength int
|
||||
ConnectionString string
|
||||
Type string
|
||||
QueueName string
|
||||
SetName string
|
||||
WrapIfNecessary bool
|
||||
MaxAttempts int
|
||||
Timeout time.Duration
|
||||
Workers int
|
||||
MaxWorkers int
|
||||
BlockTimeout time.Duration
|
||||
BoostTimeout time.Duration
|
||||
BoostWorkers int
|
||||
Name string // not an INI option, it is the name for [queue.the-name] section
|
||||
|
||||
Type string
|
||||
Datadir string
|
||||
ConnStr string // for leveldb or redis
|
||||
Length int // max queue length before blocking
|
||||
|
||||
QueueName, SetName string // the name suffix for storage (db key, redis key), "set" is for unique queue
|
||||
|
||||
BatchLength int
|
||||
MaxWorkers int
|
||||
}
|
||||
|
||||
// Queue settings
|
||||
var Queue = QueueSettings{}
|
||||
var queueSettingsDefault = QueueSettings{
|
||||
Type: "level", // dummy, channel, level, redis
|
||||
Datadir: "queues/common", // relative to AppDataPath
|
||||
Length: 100, // queue length before a channel queue will block
|
||||
|
||||
// GetQueueSettings returns the queue settings for the appropriately named queue
|
||||
func GetQueueSettings(name string) QueueSettings {
|
||||
return getQueueSettings(CfgProvider, name)
|
||||
QueueName: "_queue",
|
||||
SetName: "_unique",
|
||||
BatchLength: 20,
|
||||
MaxWorkers: 10,
|
||||
}
|
||||
|
||||
func getQueueSettings(rootCfg ConfigProvider, name string) QueueSettings {
|
||||
q := QueueSettings{}
|
||||
sec := rootCfg.Section("queue." + name)
|
||||
q.Name = name
|
||||
func GetQueueSettings(rootCfg ConfigProvider, name string) (QueueSettings, error) {
|
||||
// deep copy default settings
|
||||
cfg := QueueSettings{}
|
||||
if cfgBs, err := json.Marshal(queueSettingsDefault); err != nil {
|
||||
return cfg, err
|
||||
} else if err = json.Unmarshal(cfgBs, &cfg); err != nil {
|
||||
return cfg, err
|
||||
}
|
||||
|
||||
// DataDir is not directly inheritable
|
||||
q.DataDir = filepath.ToSlash(filepath.Join(Queue.DataDir, "common"))
|
||||
// QueueName is not directly inheritable either
|
||||
q.QueueName = name + Queue.QueueName
|
||||
for _, key := range sec.Keys() {
|
||||
switch key.Name() {
|
||||
case "DATADIR":
|
||||
q.DataDir = key.MustString(q.DataDir)
|
||||
case "QUEUE_NAME":
|
||||
q.QueueName = key.MustString(q.QueueName)
|
||||
case "SET_NAME":
|
||||
q.SetName = key.MustString(q.SetName)
|
||||
cfg.Name = name
|
||||
if sec, err := rootCfg.GetSection("queue"); err == nil {
|
||||
if err = sec.MapTo(&cfg); err != nil {
|
||||
log.Error("Failed to map queue common config for %q: %v", name, err)
|
||||
return cfg, nil
|
||||
}
|
||||
}
|
||||
if len(q.SetName) == 0 && len(Queue.SetName) > 0 {
|
||||
q.SetName = q.QueueName + Queue.SetName
|
||||
if sec, err := rootCfg.GetSection("queue." + name); err == nil {
|
||||
if err = sec.MapTo(&cfg); err != nil {
|
||||
log.Error("Failed to map queue spec config for %q: %v", name, err)
|
||||
return cfg, nil
|
||||
}
|
||||
if sec.HasKey("CONN_STR") {
|
||||
cfg.ConnStr = sec.Key("CONN_STR").String()
|
||||
}
|
||||
}
|
||||
if !filepath.IsAbs(q.DataDir) {
|
||||
q.DataDir = filepath.ToSlash(filepath.Join(AppDataPath, q.DataDir))
|
||||
|
||||
if cfg.Datadir == "" {
|
||||
cfg.Datadir = queueSettingsDefault.Datadir
|
||||
}
|
||||
_, _ = sec.NewKey("DATADIR", q.DataDir)
|
||||
if !filepath.IsAbs(cfg.Datadir) {
|
||||
cfg.Datadir = filepath.Join(AppDataPath, cfg.Datadir)
|
||||
}
|
||||
cfg.Datadir = filepath.ToSlash(cfg.Datadir)
|
||||
|
||||
// The rest are...
|
||||
q.QueueLength = sec.Key("LENGTH").MustInt(Queue.QueueLength)
|
||||
q.BatchLength = sec.Key("BATCH_LENGTH").MustInt(Queue.BatchLength)
|
||||
q.ConnectionString = sec.Key("CONN_STR").MustString(Queue.ConnectionString)
|
||||
q.Type = sec.Key("TYPE").MustString(Queue.Type)
|
||||
q.WrapIfNecessary = sec.Key("WRAP_IF_NECESSARY").MustBool(Queue.WrapIfNecessary)
|
||||
q.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(Queue.MaxAttempts)
|
||||
q.Timeout = sec.Key("TIMEOUT").MustDuration(Queue.Timeout)
|
||||
q.Workers = sec.Key("WORKERS").MustInt(Queue.Workers)
|
||||
q.MaxWorkers = sec.Key("MAX_WORKERS").MustInt(Queue.MaxWorkers)
|
||||
q.BlockTimeout = sec.Key("BLOCK_TIMEOUT").MustDuration(Queue.BlockTimeout)
|
||||
q.BoostTimeout = sec.Key("BOOST_TIMEOUT").MustDuration(Queue.BoostTimeout)
|
||||
q.BoostWorkers = sec.Key("BOOST_WORKERS").MustInt(Queue.BoostWorkers)
|
||||
if cfg.Type == "redis" && cfg.ConnStr == "" {
|
||||
cfg.ConnStr = "redis://127.0.0.1:6379/0"
|
||||
}
|
||||
|
||||
return q
|
||||
if cfg.Length <= 0 {
|
||||
cfg.Length = queueSettingsDefault.Length
|
||||
}
|
||||
if cfg.MaxWorkers <= 0 {
|
||||
cfg.MaxWorkers = queueSettingsDefault.MaxWorkers
|
||||
}
|
||||
if cfg.BatchLength <= 0 {
|
||||
cfg.BatchLength = queueSettingsDefault.BatchLength
|
||||
}
|
||||
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
// LoadQueueSettings sets up the default settings for Queues
|
||||
// This is exported for tests to be able to use the queue
|
||||
func LoadQueueSettings() {
|
||||
loadQueueFrom(CfgProvider)
|
||||
}
|
||||
|
||||
func loadQueueFrom(rootCfg ConfigProvider) {
|
||||
sec := rootCfg.Section("queue")
|
||||
Queue.DataDir = filepath.ToSlash(sec.Key("DATADIR").MustString("queues/"))
|
||||
if !filepath.IsAbs(Queue.DataDir) {
|
||||
Queue.DataDir = filepath.ToSlash(filepath.Join(AppDataPath, Queue.DataDir))
|
||||
}
|
||||
Queue.QueueLength = sec.Key("LENGTH").MustInt(20)
|
||||
Queue.BatchLength = sec.Key("BATCH_LENGTH").MustInt(20)
|
||||
Queue.ConnectionString = sec.Key("CONN_STR").MustString("")
|
||||
defaultType := sec.Key("TYPE").String()
|
||||
Queue.Type = sec.Key("TYPE").MustString("persistable-channel")
|
||||
Queue.WrapIfNecessary = sec.Key("WRAP_IF_NECESSARY").MustBool(true)
|
||||
Queue.MaxAttempts = sec.Key("MAX_ATTEMPTS").MustInt(10)
|
||||
Queue.Timeout = sec.Key("TIMEOUT").MustDuration(GracefulHammerTime + 30*time.Second)
|
||||
Queue.Workers = sec.Key("WORKERS").MustInt(0)
|
||||
Queue.MaxWorkers = sec.Key("MAX_WORKERS").MustInt(10)
|
||||
Queue.BlockTimeout = sec.Key("BLOCK_TIMEOUT").MustDuration(1 * time.Second)
|
||||
Queue.BoostTimeout = sec.Key("BOOST_TIMEOUT").MustDuration(5 * time.Minute)
|
||||
Queue.BoostWorkers = sec.Key("BOOST_WORKERS").MustInt(1)
|
||||
Queue.QueueName = sec.Key("QUEUE_NAME").MustString("_queue")
|
||||
Queue.SetName = sec.Key("SET_NAME").MustString("")
|
||||
|
||||
// Now handle the old issue_indexer configuration
|
||||
// FIXME: DEPRECATED to be removed in v1.18.0
|
||||
section := rootCfg.Section("queue.issue_indexer")
|
||||
directlySet := toDirectlySetKeysSet(section)
|
||||
if !directlySet.Contains("TYPE") && defaultType == "" {
|
||||
switch typ := rootCfg.Section("indexer").Key("ISSUE_INDEXER_QUEUE_TYPE").MustString(""); typ {
|
||||
case "levelqueue":
|
||||
_, _ = section.NewKey("TYPE", "level")
|
||||
case "channel":
|
||||
_, _ = section.NewKey("TYPE", "persistable-channel")
|
||||
case "redis":
|
||||
_, _ = section.NewKey("TYPE", "redis")
|
||||
case "":
|
||||
_, _ = section.NewKey("TYPE", "level")
|
||||
default:
|
||||
log.Fatal("Unsupported indexer queue type: %v", typ)
|
||||
hasOld := false
|
||||
handleOldLengthConfiguration := func(rootCfg ConfigProvider, newQueueName, oldSection, oldKey string) {
|
||||
if rootCfg.Section(oldSection).HasKey(oldKey) {
|
||||
hasOld = true
|
||||
log.Error("Removed queue option: `[%s].%s`. Use new options in `[queue.%s]`", oldSection, oldKey, newQueueName)
|
||||
}
|
||||
}
|
||||
if !directlySet.Contains("LENGTH") {
|
||||
length := rootCfg.Section("indexer").Key("UPDATE_BUFFER_LEN").MustInt(0)
|
||||
if length != 0 {
|
||||
_, _ = section.NewKey("LENGTH", strconv.Itoa(length))
|
||||
}
|
||||
}
|
||||
if !directlySet.Contains("BATCH_LENGTH") {
|
||||
fallback := rootCfg.Section("indexer").Key("ISSUE_INDEXER_QUEUE_BATCH_NUMBER").MustInt(0)
|
||||
if fallback != 0 {
|
||||
_, _ = section.NewKey("BATCH_LENGTH", strconv.Itoa(fallback))
|
||||
}
|
||||
}
|
||||
if !directlySet.Contains("DATADIR") {
|
||||
queueDir := filepath.ToSlash(rootCfg.Section("indexer").Key("ISSUE_INDEXER_QUEUE_DIR").MustString(""))
|
||||
if queueDir != "" {
|
||||
_, _ = section.NewKey("DATADIR", queueDir)
|
||||
}
|
||||
}
|
||||
if !directlySet.Contains("CONN_STR") {
|
||||
connStr := rootCfg.Section("indexer").Key("ISSUE_INDEXER_QUEUE_CONN_STR").MustString("")
|
||||
if connStr != "" {
|
||||
_, _ = section.NewKey("CONN_STR", connStr)
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME: DEPRECATED to be removed in v1.18.0
|
||||
// - will need to set default for [queue.*)] LENGTH appropriately though though
|
||||
|
||||
// Handle the old mailer configuration
|
||||
handleOldLengthConfiguration(rootCfg, "mailer", "mailer", "SEND_BUFFER_LEN", 100)
|
||||
|
||||
// Handle the old test pull requests configuration
|
||||
// Please note this will be a unique queue
|
||||
handleOldLengthConfiguration(rootCfg, "pr_patch_checker", "repository", "PULL_REQUEST_QUEUE_LENGTH", 1000)
|
||||
|
||||
// Handle the old mirror queue configuration
|
||||
// Please note this will be a unique queue
|
||||
handleOldLengthConfiguration(rootCfg, "mirror", "repository", "MIRROR_QUEUE_LENGTH", 1000)
|
||||
}
|
||||
|
||||
// handleOldLengthConfiguration allows fallback to older configuration. `[queue.name]` `LENGTH` will override this configuration, but
|
||||
// if that is left unset then we should fallback to the older configuration. (Except where the new length woul be <=0)
|
||||
func handleOldLengthConfiguration(rootCfg ConfigProvider, queueName, oldSection, oldKey string, defaultValue int) {
|
||||
if rootCfg.Section(oldSection).HasKey(oldKey) {
|
||||
log.Error("Deprecated fallback for %s queue length `[%s]` `%s` present. Use `[queue.%s]` `LENGTH`. This will be removed in v1.18.0", queueName, queueName, oldSection, oldKey)
|
||||
}
|
||||
value := rootCfg.Section(oldSection).Key(oldKey).MustInt(defaultValue)
|
||||
|
||||
// Don't override with 0
|
||||
if value <= 0 {
|
||||
return
|
||||
}
|
||||
|
||||
section := rootCfg.Section("queue." + queueName)
|
||||
directlySet := toDirectlySetKeysSet(section)
|
||||
if !directlySet.Contains("LENGTH") {
|
||||
_, _ = section.NewKey("LENGTH", strconv.Itoa(value))
|
||||
handleOldLengthConfiguration(rootCfg, "issue_indexer", "indexer", "ISSUE_INDEXER_QUEUE_TYPE")
|
||||
handleOldLengthConfiguration(rootCfg, "issue_indexer", "indexer", "ISSUE_INDEXER_QUEUE_BATCH_NUMBER")
|
||||
handleOldLengthConfiguration(rootCfg, "issue_indexer", "indexer", "ISSUE_INDEXER_QUEUE_DIR")
|
||||
handleOldLengthConfiguration(rootCfg, "issue_indexer", "indexer", "ISSUE_INDEXER_QUEUE_CONN_STR")
|
||||
handleOldLengthConfiguration(rootCfg, "issue_indexer", "indexer", "UPDATE_BUFFER_LEN")
|
||||
handleOldLengthConfiguration(rootCfg, "mailer", "mailer", "SEND_BUFFER_LEN")
|
||||
handleOldLengthConfiguration(rootCfg, "pr_patch_checker", "repository", "PULL_REQUEST_QUEUE_LENGTH")
|
||||
handleOldLengthConfiguration(rootCfg, "mirror", "repository", "MIRROR_QUEUE_LENGTH")
|
||||
if hasOld {
|
||||
log.Fatal("Please update your app.ini to remove deprecated config options")
|
||||
}
|
||||
}
|
||||
|
||||
// toDirectlySetKeysSet returns a set of keys directly set by this section
|
||||
// Note: we cannot use section.HasKey(...) as that will immediately set the Key if a parent section has the Key
|
||||
// but this section does not.
|
||||
func toDirectlySetKeysSet(section ConfigSection) container.Set[string] {
|
||||
sections := make(container.Set[string])
|
||||
for _, key := range section.Keys() {
|
||||
sections.Add(key.Name())
|
||||
}
|
||||
return sections
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue