read will block if no items

This commit is contained in:
chrislu
2024-01-28 13:10:34 -08:00
parent 0bf5424a2e
commit b6c5e57c30
3 changed files with 44 additions and 21 deletions

View File

@@ -23,13 +23,12 @@ type BufferedQueue[T any] struct {
count int // Total number of items in the queue
mutex sync.Mutex
nodeCounter int
waitOnRead bool
waitCond *sync.Cond
isClosed bool
}
// NewBufferedQueue creates a new buffered queue with the specified chunk size
func NewBufferedQueue[T any](chunkSize int, waitOnRead bool) *BufferedQueue[T] {
func NewBufferedQueue[T any](chunkSize int) *BufferedQueue[T] {
// Create an empty chunk to initialize head and tail
chunk := &ItemChunkNode[T]{items: make([]T, chunkSize), nodeId: 0}
bq := &BufferedQueue[T]{
@@ -39,7 +38,6 @@ func NewBufferedQueue[T any](chunkSize int, waitOnRead bool) *BufferedQueue[T] {
last: chunk,
count: 0,
mutex: sync.Mutex{},
waitOnRead: waitOnRead,
}
bq.waitCond = sync.NewCond(&bq.mutex)
return bq
@@ -77,7 +75,7 @@ func (q *BufferedQueue[T]) Enqueue(job T) error {
q.tail.items[q.tail.tailIndex] = job
q.tail.tailIndex++
q.count++
if q.waitOnRead {
if q.count == 1 {
q.waitCond.Signal()
}
@@ -89,19 +87,12 @@ func (q *BufferedQueue[T]) Dequeue() (T, bool) {
q.mutex.Lock()
defer q.mutex.Unlock()
if q.waitOnRead {
for q.count <= 0 && !q.isClosed {
q.waitCond.Wait()
}
if q.isClosed {
var a T
return a, false
}
} else {
if q.count == 0 {
var a T
return a, false
}
for q.count <= 0 && !q.isClosed {
q.waitCond.Wait()
}
if q.count <= 0 && q.isClosed {
var a T
return a, false
}
job := q.head.items[q.head.headIndex]