forked from forgejo/forgejo
New push to head repo of head branch: regenerate patch and retest apply
This commit is contained in:
parent
e0aab4a7f6
commit
0fbb8c8826
20 changed files with 475 additions and 154 deletions
|
@ -13,6 +13,7 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/Unknwon/com"
|
||||
"github.com/go-xorm/xorm"
|
||||
|
||||
api "github.com/gogits/go-gogs-client"
|
||||
|
@ -435,39 +436,58 @@ func PrepareWebhooks(repo *Repository, event HookEventType, p api.Payloader) err
|
|||
return nil
|
||||
}
|
||||
|
||||
type hookQueue struct {
|
||||
// Make sure one repository only occur once in the queue.
|
||||
lock sync.Mutex
|
||||
repoIDs map[int64]bool
|
||||
// UniqueQueue represents a queue that guarantees only one instance of same ID is in the line.
|
||||
type UniqueQueue struct {
|
||||
lock sync.Mutex
|
||||
ids map[string]bool
|
||||
|
||||
queue chan int64
|
||||
queue chan string
|
||||
}
|
||||
|
||||
func (q *hookQueue) removeRepoID(id int64) {
|
||||
func (q *UniqueQueue) Queue() <-chan string {
|
||||
return q.queue
|
||||
}
|
||||
|
||||
func NewUniqueQueue(queueLength int) *UniqueQueue {
|
||||
if queueLength <= 0 {
|
||||
queueLength = 100
|
||||
}
|
||||
|
||||
return &UniqueQueue{
|
||||
ids: make(map[string]bool),
|
||||
queue: make(chan string, queueLength),
|
||||
}
|
||||
}
|
||||
|
||||
func (q *UniqueQueue) Remove(id interface{}) {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
delete(q.repoIDs, id)
|
||||
delete(q.ids, com.ToStr(id))
|
||||
}
|
||||
|
||||
func (q *hookQueue) addRepoID(id int64) {
|
||||
q.lock.Lock()
|
||||
if q.repoIDs[id] {
|
||||
q.lock.Unlock()
|
||||
func (q *UniqueQueue) Add(id interface{}) {
|
||||
newid := com.ToStr(id)
|
||||
|
||||
if q.Exist(id) {
|
||||
return
|
||||
}
|
||||
q.repoIDs[id] = true
|
||||
|
||||
q.lock.Lock()
|
||||
q.ids[newid] = true
|
||||
q.lock.Unlock()
|
||||
q.queue <- id
|
||||
q.queue <- newid
|
||||
}
|
||||
|
||||
// AddRepoID adds repository ID to hook delivery queue.
|
||||
func (q *hookQueue) AddRepoID(id int64) {
|
||||
go q.addRepoID(id)
|
||||
func (q *UniqueQueue) Exist(id interface{}) bool {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
|
||||
return q.ids[com.ToStr(id)]
|
||||
}
|
||||
|
||||
var HookQueue *hookQueue
|
||||
var HookQueue = NewUniqueQueue(setting.Webhook.QueueLength)
|
||||
|
||||
func deliverHook(t *HookTask) {
|
||||
func (t *HookTask) deliver() {
|
||||
t.IsDelivered = true
|
||||
|
||||
timeout := time.Duration(setting.Webhook.DeliverTimeout) * time.Second
|
||||
|
@ -549,12 +569,13 @@ func deliverHook(t *HookTask) {
|
|||
}
|
||||
|
||||
// DeliverHooks checks and delivers undelivered hooks.
|
||||
// TODO: shoot more hooks at same time.
|
||||
func DeliverHooks() {
|
||||
tasks := make([]*HookTask, 0, 10)
|
||||
x.Where("is_delivered=?", false).Iterate(new(HookTask),
|
||||
func(idx int, bean interface{}) error {
|
||||
t := bean.(*HookTask)
|
||||
deliverHook(t)
|
||||
t.deliver()
|
||||
tasks = append(tasks, t)
|
||||
return nil
|
||||
})
|
||||
|
@ -566,15 +587,10 @@ func DeliverHooks() {
|
|||
}
|
||||
}
|
||||
|
||||
HookQueue = &hookQueue{
|
||||
lock: sync.Mutex{},
|
||||
repoIDs: make(map[int64]bool),
|
||||
queue: make(chan int64, setting.Webhook.QueueLength),
|
||||
}
|
||||
|
||||
// Start listening on new hook requests.
|
||||
for repoID := range HookQueue.queue {
|
||||
HookQueue.removeRepoID(repoID)
|
||||
for repoID := range HookQueue.Queue() {
|
||||
log.Trace("DeliverHooks[%v]: processing delivery hooks", repoID)
|
||||
HookQueue.Remove(repoID)
|
||||
|
||||
tasks = make([]*HookTask, 0, 5)
|
||||
if err := x.Where("repo_id=? AND is_delivered=?", repoID, false).Find(&tasks); err != nil {
|
||||
|
@ -582,9 +598,10 @@ func DeliverHooks() {
|
|||
continue
|
||||
}
|
||||
for _, t := range tasks {
|
||||
deliverHook(t)
|
||||
t.deliver()
|
||||
if err := UpdateHookTask(t); err != nil {
|
||||
log.Error(4, "UpdateHookTask(%d): %v", t.ID, err)
|
||||
log.Error(4, "UpdateHookTask[%d]: %v", t.ID, err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue