mirror of
				https://github.com/go-gitea/gitea.git
				synced 2025-11-03 20:36:07 +01:00 
			
		
		
		
	Fix queue test (#30646)
Fix #30643 The old test code is not stable due to the data-race described in the TODO added at that time. Make it stable, and remove a debug-only field from old test code.
This commit is contained in:
		@@ -63,6 +63,8 @@ func (q *WorkerPoolQueue[T]) doDispatchBatchToWorker(wg *workerGroup[T], flushCh
 | 
				
			|||||||
	// TODO: the logic could be improved in the future, to avoid a data-race between "doStartNewWorker" and "workerNum"
 | 
						// TODO: the logic could be improved in the future, to avoid a data-race between "doStartNewWorker" and "workerNum"
 | 
				
			||||||
	// The root problem is that if we skip "doStartNewWorker" here, the "workerNum" might be decreased by other workers later
 | 
						// The root problem is that if we skip "doStartNewWorker" here, the "workerNum" might be decreased by other workers later
 | 
				
			||||||
	// So ideally, it should check whether there are enough workers by some approaches, and start new workers if necessary.
 | 
						// So ideally, it should check whether there are enough workers by some approaches, and start new workers if necessary.
 | 
				
			||||||
 | 
						// This data-race is not serious, as long as a new worker will be started soon to make sure there are enough workers,
 | 
				
			||||||
 | 
						// so no need to hugely refactor at the moment.
 | 
				
			||||||
	q.workerNumMu.Lock()
 | 
						q.workerNumMu.Lock()
 | 
				
			||||||
	noWorker := q.workerNum == 0
 | 
						noWorker := q.workerNum == 0
 | 
				
			||||||
	if full || noWorker {
 | 
						if full || noWorker {
 | 
				
			||||||
@@ -136,6 +138,14 @@ func (q *WorkerPoolQueue[T]) basePushForShutdown(items ...T) bool {
 | 
				
			|||||||
	return true
 | 
						return true
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func resetIdleTicker(t *time.Ticker, dur time.Duration) {
 | 
				
			||||||
 | 
						t.Reset(dur)
 | 
				
			||||||
 | 
						select {
 | 
				
			||||||
 | 
						case <-t.C:
 | 
				
			||||||
 | 
						default:
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// doStartNewWorker starts a new worker for the queue, the worker reads from worker's channel and handles the items.
 | 
					// doStartNewWorker starts a new worker for the queue, the worker reads from worker's channel and handles the items.
 | 
				
			||||||
func (q *WorkerPoolQueue[T]) doStartNewWorker(wp *workerGroup[T]) {
 | 
					func (q *WorkerPoolQueue[T]) doStartNewWorker(wp *workerGroup[T]) {
 | 
				
			||||||
	wp.wg.Add(1)
 | 
						wp.wg.Add(1)
 | 
				
			||||||
@@ -146,8 +156,6 @@ func (q *WorkerPoolQueue[T]) doStartNewWorker(wp *workerGroup[T]) {
 | 
				
			|||||||
		log.Debug("Queue %q starts new worker", q.GetName())
 | 
							log.Debug("Queue %q starts new worker", q.GetName())
 | 
				
			||||||
		defer log.Debug("Queue %q stops idle worker", q.GetName())
 | 
							defer log.Debug("Queue %q stops idle worker", q.GetName())
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		atomic.AddInt32(&q.workerStartedCounter, 1) // Only increase counter, used for debugging
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		t := time.NewTicker(workerIdleDuration)
 | 
							t := time.NewTicker(workerIdleDuration)
 | 
				
			||||||
		defer t.Stop()
 | 
							defer t.Stop()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -169,11 +177,7 @@ func (q *WorkerPoolQueue[T]) doStartNewWorker(wp *workerGroup[T]) {
 | 
				
			|||||||
				}
 | 
									}
 | 
				
			||||||
				q.doWorkerHandle(batch)
 | 
									q.doWorkerHandle(batch)
 | 
				
			||||||
				// reset the idle ticker, and drain the tick after reset in case a tick is already triggered
 | 
									// reset the idle ticker, and drain the tick after reset in case a tick is already triggered
 | 
				
			||||||
				t.Reset(workerIdleDuration)
 | 
									resetIdleTicker(t, workerIdleDuration) // key code for TestWorkerPoolQueueWorkerIdleReset
 | 
				
			||||||
				select {
 | 
					 | 
				
			||||||
				case <-t.C:
 | 
					 | 
				
			||||||
				default:
 | 
					 | 
				
			||||||
				}
 | 
					 | 
				
			||||||
			case <-t.C:
 | 
								case <-t.C:
 | 
				
			||||||
				q.workerNumMu.Lock()
 | 
									q.workerNumMu.Lock()
 | 
				
			||||||
				keepWorking = q.workerNum <= 1 // keep the last worker running
 | 
									keepWorking = q.workerNum <= 1 // keep the last worker running
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -40,8 +40,6 @@ type WorkerPoolQueue[T any] struct {
 | 
				
			|||||||
	workerMaxNum    int
 | 
						workerMaxNum    int
 | 
				
			||||||
	workerActiveNum int
 | 
						workerActiveNum int
 | 
				
			||||||
	workerNumMu     sync.Mutex
 | 
						workerNumMu     sync.Mutex
 | 
				
			||||||
 | 
					 | 
				
			||||||
	workerStartedCounter int32
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type flushType chan struct{}
 | 
					type flushType chan struct{}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -5,8 +5,10 @@ package queue
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"context"
 | 
						"context"
 | 
				
			||||||
 | 
						"slices"
 | 
				
			||||||
	"strconv"
 | 
						"strconv"
 | 
				
			||||||
	"sync"
 | 
						"sync"
 | 
				
			||||||
 | 
						"sync/atomic"
 | 
				
			||||||
	"testing"
 | 
						"testing"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -250,23 +252,34 @@ func TestWorkerPoolQueueShutdown(t *testing.T) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
func TestWorkerPoolQueueWorkerIdleReset(t *testing.T) {
 | 
					func TestWorkerPoolQueueWorkerIdleReset(t *testing.T) {
 | 
				
			||||||
	defer test.MockVariableValue(&workerIdleDuration, 10*time.Millisecond)()
 | 
						defer test.MockVariableValue(&workerIdleDuration, 10*time.Millisecond)()
 | 
				
			||||||
	defer mockBackoffDuration(10 * time.Millisecond)()
 | 
						defer mockBackoffDuration(5 * time.Millisecond)()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						var q *WorkerPoolQueue[int]
 | 
				
			||||||
 | 
						var handledCount atomic.Int32
 | 
				
			||||||
 | 
						var hasOnlyOneWorkerRunning atomic.Bool
 | 
				
			||||||
	handler := func(items ...int) (unhandled []int) {
 | 
						handler := func(items ...int) (unhandled []int) {
 | 
				
			||||||
		time.Sleep(50 * time.Millisecond)
 | 
							handledCount.Add(int32(len(items)))
 | 
				
			||||||
 | 
							// make each work have different duration, and check the active worker number periodically
 | 
				
			||||||
 | 
							var activeNums []int
 | 
				
			||||||
 | 
							for i := 0; i < 5-items[0]%2; i++ {
 | 
				
			||||||
 | 
								time.Sleep(workerIdleDuration * 2)
 | 
				
			||||||
 | 
								activeNums = append(activeNums, q.GetWorkerActiveNumber())
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							// When the queue never becomes empty, the existing workers should keep working
 | 
				
			||||||
 | 
							// It is not 100% true at the moment because the data-race in workergroup.go is not resolved, see that TODO */
 | 
				
			||||||
 | 
							// If the "active worker numbers" is like [2 2 ... 1 1], it means that an existing worker exited and the no new worker is started.
 | 
				
			||||||
 | 
							if slices.Equal([]int{1, 1}, activeNums[len(activeNums)-2:]) {
 | 
				
			||||||
 | 
								hasOnlyOneWorkerRunning.Store(true)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
		return nil
 | 
							return nil
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						q, _ = newWorkerPoolQueueForTest("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 2, Length: 100}, handler, false)
 | 
				
			||||||
	q, _ := newWorkerPoolQueueForTest("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 2, Length: 100}, handler, false)
 | 
					 | 
				
			||||||
	stop := runWorkerPoolQueue(q)
 | 
						stop := runWorkerPoolQueue(q)
 | 
				
			||||||
	for i := 0; i < 20; i++ {
 | 
						for i := 0; i < 100; i++ {
 | 
				
			||||||
		assert.NoError(t, q.Push(i))
 | 
							assert.NoError(t, q.Push(i))
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					 | 
				
			||||||
	time.Sleep(500 * time.Millisecond)
 | 
						time.Sleep(500 * time.Millisecond)
 | 
				
			||||||
	assert.EqualValues(t, 2, q.GetWorkerNumber())
 | 
						assert.Greater(t, int(handledCount.Load()), 4) // make sure there are enough items handled during the test
 | 
				
			||||||
	assert.EqualValues(t, 2, q.GetWorkerActiveNumber())
 | 
						assert.False(t, hasOnlyOneWorkerRunning.Load(), "a slow handler should not block other workers from starting")
 | 
				
			||||||
	// when the queue never becomes empty, the existing workers should keep working
 | 
					 | 
				
			||||||
	assert.EqualValues(t, 2, q.workerStartedCounter)
 | 
					 | 
				
			||||||
	stop()
 | 
						stop()
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user