branch main pkg/source/file/watch.go:321, your have drop the function of Pre-allocation offset
Opened this issue · 0 comments
pwm1992 commented
What version of Loggie?
main
Expected Behavior
currrent code
` // Pre-allocation offset
if e.job.task.config.ReadFromTail {
existAckOffset = fileSize
w.preAllocationOffset(existAckOffset, job)
}
// set ack offset
job.NextOffset(existAckOffset)
An earlier version v1.4.1
`
}
// Pre-allocation offset
if existAckOffset == 0 {
if w.config.ReadFromTail {
existAckOffset = fileSize
}
w.preAllocationOffset(existAckOffset, job)
}
// set ack offset
job.NextOffset(existAckOffset)
// set line number
you just ignore the new files in pipeline
Actual Behavior
the sqlite db not create the first register for the new file
and then, the ack of this file will not access by this ack.go --> ach.ackChan
like this the ack.go ach.jobAckChains will not container the new file jobWatchUid.
`
case ss := <-ach.appendChan:
for _, s := range ss {
jobWatchUid := s.WatchUid()
if chain, ok := ach.jobAckChains[jobWatchUid]; ok {
chain.Append(s)
} else {
// new ack chain
create := false
for _, task := range ach.ackTasks {
if task.isContain(s) {
create = true
ackChain := task.NewAckChain(jobWatchUid)
ach.jobAckChains[ackChain.Key()] = ackChain
ackChain.Append(s)
break
}
}
if !create {
log.Debug("append state of source has stopped: %+v", s)
}
}
}
case ss := <-ach.ackChan:
for _, s := range ss {
jobWatchUid := s.WatchUid()
if chain, ok := ach.jobAckChains[jobWatchUid]; ok {
chain.Ack(s)
} else {
log.Debug("ack state of source has stopped: %+v", s)
}
}