avoid data race of LogBuffer isStopping (#3859)
This commit is contained in:
committed by
GitHub
parent
7836f7574e
commit
ee38ab8581
@@ -118,6 +118,13 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, processingTsNs int64)
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *LogBuffer) IsStopping() bool {
|
||||||
|
m.RLock()
|
||||||
|
defer m.RUnlock()
|
||||||
|
|
||||||
|
return m.isStopping
|
||||||
|
}
|
||||||
|
|
||||||
func (m *LogBuffer) Shutdown() {
|
func (m *LogBuffer) Shutdown() {
|
||||||
m.Lock()
|
m.Lock()
|
||||||
defer m.Unlock()
|
defer m.Unlock()
|
||||||
@@ -144,13 +151,12 @@ func (m *LogBuffer) loopFlush() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *LogBuffer) loopInterval() {
|
func (m *LogBuffer) loopInterval() {
|
||||||
for !m.isStopping {
|
for !m.IsStopping() {
|
||||||
time.Sleep(m.flushInterval)
|
time.Sleep(m.flushInterval)
|
||||||
m.Lock()
|
if m.IsStopping() {
|
||||||
if m.isStopping {
|
|
||||||
m.Unlock()
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
m.Lock()
|
||||||
toFlush := m.copyToFlush()
|
toFlush := m.copyToFlush()
|
||||||
m.Unlock()
|
m.Unlock()
|
||||||
if toFlush != nil {
|
if toFlush != nil {
|
||||||
|
|||||||
Reference in New Issue
Block a user