1
0
Fork 0
forked from forgejo/forgejo

Queue: Add name variable to queues

This commit is contained in:
Andrew Thornton 2019-12-07 16:46:36 +00:00
parent e6ebb47299
commit 85d1a7f7d2
No known key found for this signature in database
GPG key ID: 3CDE74631F13A748
6 changed files with 52 additions and 21 deletions

View file

@ -28,6 +28,7 @@ type LevelQueueConfiguration struct {
BlockTimeout time.Duration
BoostTimeout time.Duration
BoostWorkers int
Name string
}
// LevelQueue implements a disk library queue
@ -38,6 +39,7 @@ type LevelQueue struct {
terminated chan struct{}
exemplar interface{}
workers int
name string
}
// NewLevelQueue creates a ledis local queue
@ -72,6 +74,7 @@ func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
closed: make(chan struct{}),
terminated: make(chan struct{}),
workers: config.Workers,
name: config.Name,
}, nil
}
@ -84,16 +87,16 @@ func (l *LevelQueue) Run(atShutdown, atTerminate func(context.Context, func()))
go l.readToChan()
log.Trace("Waiting til closed")
log.Trace("%s Waiting til closed", l.name)
<-l.closed
log.Trace("Waiting til done")
log.Trace("%s Waiting til done", l.name)
l.pool.Wait()
// FIXME: graceful: Needs HammerContext
log.Trace("Waiting til cleaned")
log.Trace("%s Waiting til cleaned", l.name)
l.pool.CleanUp(context.TODO())
log.Trace("cleaned")
log.Trace("%s cleaned", l.name)
}
@ -108,7 +111,7 @@ func (l *LevelQueue) readToChan() {
bs, err := l.queue.RPop()
if err != nil {
if err != levelqueue.ErrNotFound {
log.Error("RPop: %v", err)
log.Error("%s RPop: %v", l.name, err)
}
time.Sleep(time.Millisecond * 100)
continue
@ -130,12 +133,12 @@ func (l *LevelQueue) readToChan() {
err = json.Unmarshal(bs, &data)
}
if err != nil {
log.Error("LevelQueue failed to unmarshal: %v", err)
log.Error("LevelQueue: %s failed to unmarshal: %v", l.name, err)
time.Sleep(time.Millisecond * 10)
continue
}
log.Trace("LevelQueue: task found: %#v", data)
log.Trace("LevelQueue %s: task found: %#v", l.name, data)
l.pool.Push(data)
time.Sleep(time.Millisecond * 10)
@ -163,6 +166,7 @@ func (l *LevelQueue) Push(data Data) error {
// Shutdown this queue and stop processing
func (l *LevelQueue) Shutdown() {
log.Trace("Shutdown: %s", l.name)
select {
case <-l.closed:
default:
@ -172,12 +176,11 @@ func (l *LevelQueue) Shutdown() {
// Terminate this queue and close the queue
func (l *LevelQueue) Terminate() {
log.Trace("Terminating")
log.Trace("Terminating: %s", l.name)
l.Shutdown()
if err := l.queue.Close(); err != nil && err.Error() != "leveldb: closed" {
log.Error("Error whilst closing internal queue: %v", err)
log.Error("Error whilst closing internal queue in %s: %v", l.name, err)
}
}
func init() {