Merge branch 'main' into feature/bots
This commit is contained in:
@@ -109,32 +109,6 @@ func (q *ChannelQueue) Flush(timeout time.Duration) error {
|
||||
return q.FlushWithContext(ctx)
|
||||
}
|
||||
|
||||
// FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty
|
||||
func (q *ChannelQueue) FlushWithContext(ctx context.Context) error {
|
||||
log.Trace("ChannelQueue: %d Flush", q.qid)
|
||||
paused, _ := q.IsPausedIsResumed()
|
||||
for {
|
||||
select {
|
||||
case <-paused:
|
||||
return nil
|
||||
case data, ok := <-q.dataChan:
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
if unhandled := q.handle(data); unhandled != nil {
|
||||
log.Error("Unhandled Data whilst flushing queue %d", q.qid)
|
||||
}
|
||||
atomic.AddInt64(&q.numInQueue, -1)
|
||||
case <-q.baseCtx.Done():
|
||||
return q.baseCtx.Err()
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Shutdown processing from this queue
|
||||
func (q *ChannelQueue) Shutdown() {
|
||||
q.lock.Lock()
|
||||
|
||||
@@ -8,7 +8,6 @@ import (
|
||||
"fmt"
|
||||
"runtime/pprof"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"code.gitea.io/gitea/modules/container"
|
||||
@@ -167,35 +166,6 @@ func (q *ChannelUniqueQueue) Flush(timeout time.Duration) error {
|
||||
return q.FlushWithContext(ctx)
|
||||
}
|
||||
|
||||
// FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty
|
||||
func (q *ChannelUniqueQueue) FlushWithContext(ctx context.Context) error {
|
||||
log.Trace("ChannelUniqueQueue: %d Flush", q.qid)
|
||||
paused, _ := q.IsPausedIsResumed()
|
||||
for {
|
||||
select {
|
||||
case <-paused:
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
select {
|
||||
case data, ok := <-q.dataChan:
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
if unhandled := q.handle(data); unhandled != nil {
|
||||
log.Error("Unhandled Data whilst flushing queue %d", q.qid)
|
||||
}
|
||||
atomic.AddInt64(&q.numInQueue, -1)
|
||||
case <-q.baseCtx.Done():
|
||||
return q.baseCtx.Err()
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Shutdown processing from this queue
|
||||
func (q *ChannelUniqueQueue) Shutdown() {
|
||||
log.Trace("ChannelUniqueQueue: %s Shutting down", q.name)
|
||||
|
||||
@@ -463,13 +463,43 @@ func (p *WorkerPool) IsEmpty() bool {
|
||||
return atomic.LoadInt64(&p.numInQueue) == 0
|
||||
}
|
||||
|
||||
// contextError returns either ctx.Done(), the base context's error or nil
|
||||
func (p *WorkerPool) contextError(ctx context.Context) error {
|
||||
select {
|
||||
case <-p.baseCtx.Done():
|
||||
return p.baseCtx.Err()
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty
|
||||
// NB: The worker will not be registered with the manager.
|
||||
func (p *WorkerPool) FlushWithContext(ctx context.Context) error {
|
||||
log.Trace("WorkerPool: %d Flush", p.qid)
|
||||
paused, _ := p.IsPausedIsResumed()
|
||||
for {
|
||||
// Because select will return any case that is satisified at random we precheck here before looking at dataChan.
|
||||
select {
|
||||
case data := <-p.dataChan:
|
||||
case <-paused:
|
||||
// Ensure that even if paused that the cancelled error is still sent
|
||||
return p.contextError(ctx)
|
||||
case <-p.baseCtx.Done():
|
||||
return p.baseCtx.Err()
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
select {
|
||||
case <-paused:
|
||||
return p.contextError(ctx)
|
||||
case data, ok := <-p.dataChan:
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
if unhandled := p.handle(data); unhandled != nil {
|
||||
log.Error("Unhandled Data whilst flushing queue %d", p.qid)
|
||||
}
|
||||
@@ -495,6 +525,7 @@ func (p *WorkerPool) doWork(ctx context.Context) {
|
||||
paused, _ := p.IsPausedIsResumed()
|
||||
data := make([]Data, 0, p.batchLength)
|
||||
for {
|
||||
// Because select will return any case that is satisified at random we precheck here before looking at dataChan.
|
||||
select {
|
||||
case <-paused:
|
||||
log.Trace("Worker for Queue %d Pausing", p.qid)
|
||||
@@ -515,8 +546,19 @@ func (p *WorkerPool) doWork(ctx context.Context) {
|
||||
log.Trace("Worker shutting down")
|
||||
return
|
||||
}
|
||||
case <-ctx.Done():
|
||||
if len(data) > 0 {
|
||||
log.Trace("Handling: %d data, %v", len(data), data)
|
||||
if unhandled := p.handle(data...); unhandled != nil {
|
||||
log.Error("Unhandled Data in queue %d", p.qid)
|
||||
}
|
||||
atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
|
||||
}
|
||||
log.Trace("Worker shutting down")
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
select {
|
||||
case <-paused:
|
||||
// go back around
|
||||
|
||||
Reference in New Issue
Block a user