When looking at the pyjuju debug-log, there was a replay
function that allowed getting the log from the start.
There was no way to tell the tailer to start from the
beginning of the file.
I changed the meaning of NewTailer to be one that doesn't
do any seeking. The NewTailerBacktrack does seek to the end,
and then goes back a certain number of lines.
There was a bug in the backtrack tailer when zero lines
were passed in. This is fixed and a test added.
In order to provide a synchronization mechanism for other
tests, the Tailer has a public function param that is called
after the backtracking, and prior to starting tailing.
The number of lines to look back is now a unit, as negative
values make no sense at all.
Index: utils/tailer/tailer.go
=== modified file 'utils/tailer/tailer.go'
--- utils/tailer/tailer.go 2013-12-13 11:19:56 +0000
+++ utils/tailer/tailer.go 2014-04-04 03:20:53 +0000
@@ -35,25 +35,37 @@
reader *bufio.Reader
writeCloser io.WriteCloser
writer *bufio.Writer
- lines int
+ lines uint
filter TailerFilterFunc
bufferSize int
polltime time.Duration
+ lookBack bool
+ // StartedListening is purely a hook point for tests. It is a function
+ // that is called after the initial seek prior to reading.
+ StartedTailing func()
}
-// NewTailer starts a Tailer which reads strings from the passed
+// NewTailerBacktrack starts a Tailer which reads strings from the passed
// ReadSeeker line by line. If a filter function is specified the read
// lines are filtered. The matching lines are written to the passed
// Writer. The reading begins the specified number of matching lines
// from the end.
-func NewTailer(readSeeker io.ReadSeeker, writer io.Writer, lines int,
filter TailerFilterFunc) *Tailer {
- return newTailer(readSeeker, writer, lines, filter, bufferSize, polltime)
+func NewTailerBacktrack(readSeeker io.ReadSeeker, writer io.Writer, lines
uint, filter TailerFilterFunc) *Tailer {
+ return newTailer(readSeeker, writer, lines, filter, bufferSize, polltime,
true)
+}
+
+// NewTailer starts a Tailer which reads strings from the passed
+// ReadSeeker line by line. If a filter function is specified the read
+// lines are filtered. The matching lines are written to the passed
+// Writer.
+func NewTailer(readSeeker io.ReadSeeker, writer io.Writer, filter
TailerFilterFunc) *Tailer {
+ return newTailer(readSeeker, writer, 0, filter, bufferSize, polltime,
false)
}
// newTailer starts a Tailer like NewTailer but allows the setting of
// the read buffer size and the time between pollings for testing.
-func newTailer(readSeeker io.ReadSeeker, writer io.Writer, lines int,
filter TailerFilterFunc,
- bufferSize int, polltime time.Duration) *Tailer {
+func newTailer(readSeeker io.ReadSeeker, writer io.Writer, lines uint,
filter TailerFilterFunc,
+ bufferSize int, polltime time.Duration, lookBack bool) *Tailer {
t := &Tailer{
readSeeker: readSeeker,
reader: bufio.NewReaderSize(readSeeker, bufferSize),
@@ -62,6 +74,7 @@
filter: filter,
bufferSize: bufferSize,
polltime: polltime,
+ lookBack: lookBack,
}
go func() {
defer t.tomb.Done()
@@ -98,8 +111,13 @@
// writer too.
func (t *Tailer) loop() error {
// Position the readSeeker.
- if err := t.seekLastLines(); err != nil {
- return err
+ if t.lookBack {
+ if err := t.seekLastLines(); err != nil {
+ return err
+ }
+ }
+ if t.StartedTailing != nil {
+ t.StartedTailing()
}
// Start polling.
// TODO(mue) 2013-12-06
@@ -139,8 +157,12 @@
if err != nil {
return err
}
+ if t.lines == 0 {
+ // We are done, just seeking to the end is sufficient.
+ return nil
+ }
seekPos := int64(0)
- found := 0
+ found := uint(0)
buffer := make([]byte, t.bufferSize)
SeekLoop:
for offset > 0 {
// Collect initial data.
@@ -386,6 +400,9 @@
// linec is closed due to stopping or an error only the values so far care
// compared. Checking the reason for termination is done in the test.
func assertCollected(c *gc.C, linec chan string, compare []string,
injection func([]string)) {
+ if len(compare) == 0 {
+ return
+ }
timeout := time.After(testing.LongWait)
lines := []string{}
for {
Reviewers: mp+214150_ code.launchpad. net,
Message:
Please take a look.
Description:
Extend the tailer utility.
When looking at the pyjuju debug-log, there was a replay
function that allowed getting the log from the start.
There was no way to tell the tailer to start from the
beginning of the file.
I changed the meaning of NewTailer to be one that doesn't
do any seeking. The NewTailerBacktrack does seek to the end,
and then goes back a certain number of lines.
There was a bug in the backtrack tailer when zero lines
were passed in. This is fixed and a test added.
In order to provide a synchronization mechanism for other
tests, the Tailer has a public function param that is called
after the backtracking, and prior to starting tailing.
The number of lines to look back is now a unit, as negative
values make no sense at all.
https:/ /code.launchpad .net/~thumper/ juju-core/ tailer- tweaks/ +merge/ 214150
(do not edit description out of merge proposal)
Please review this at https:/ /codereview. appspot. com/84290045/
Affected files (+53, -12 lines): lxc/clonetempla te.go tailer. go tailer_ test.go
A [revision details]
M container/
M utils/tailer/
M utils/tailer/
Index: [revision details] 20140404003547- v7dzlndz9bq9h4q d
=== added file '[revision details]'
--- [revision details] 2012-01-01 00:00:00 +0000
+++ [revision details] 2012-01-01 00:00:00 +0000
@@ -0,0 +1,2 @@
+Old revision: tarmac-
+New revision: <email address hidden>
Index: container/ lxc/clonetempla te.go lxc/clonetempla te.go' lxc/clonetempla te.go 2014-03-12 04:32:02 +0000 lxc/clonetempla te.go 2014-04-04 03:33:01 +0000
=== modified file 'container/
--- container/
+++ container/
@@ -179,7 +179,7 @@
}
tailWriter := &logTail{tick: time.Now()} NewTailer( console, tailWriter, 0, nil) NewTailer( console, tailWriter, nil) Stop()
- consoleTailer := tailer.
+ consoleTailer := tailer.
defer consoleTailer.
// We should wait maybe 1 minute between output?
Index: utils/tailer/ tailer. go tailer/ tailer. go' tailer. go 2013-12-13 11:19:56 +0000 tailer. go 2014-04-04 03:20:53 +0000
=== modified file 'utils/
--- utils/tailer/
+++ utils/tailer/
@@ -35,25 +35,37 @@
reader *bufio.Reader
writeCloser io.WriteCloser
writer *bufio.Writer
- lines int
+ lines uint
filter TailerFilterFunc
bufferSize int
polltime time.Duration
+ lookBack bool
+ // StartedListening is purely a hook point for tests. It is a function
+ // that is called after the initial seek prior to reading.
+ StartedTailing func()
}
-// NewTailer starts a Tailer which reads strings from the passed readSeeker io.ReadSeeker, writer io.Writer, lines int, readSeeker, writer, lines, filter, bufferSize, polltime) ack(readSeeker io.ReadSeeker, writer io.Writer, lines readSeeker, writer, lines, filter, bufferSize, polltime, readSeeker io.ReadSeeker, writer io.Writer, filter readSeeker, writer, 0, filter, bufferSize, polltime,
+// NewTailerBacktrack starts a Tailer which reads strings from the passed
// ReadSeeker line by line. If a filter function is specified the read
// lines are filtered. The matching lines are written to the passed
// Writer. The reading begins the specified number of matching lines
// from the end.
-func NewTailer(
filter TailerFilterFunc) *Tailer {
- return newTailer(
+func NewTailerBacktr
uint, filter TailerFilterFunc) *Tailer {
+ return newTailer(
true)
+}
+
+// NewTailer starts a Tailer which reads strings from the passed
+// ReadSeeker line by line. If a filter function is specified the read
+// lines are filtered. The matching lines are written to the passed
+// Writer.
+func NewTailer(
TailerFilterFunc) *Tailer {
+ return newTailer(
false)
}
// newTailer starts a Tailer like NewTailer but allows the setting of readSeeker io.ReadSeeker, writer io.Writer, lines int, readSeeker io.ReadSeeker, writer io.Writer, lines uint, Size(readSeeker , bufferSize),
// the read buffer size and the time between pollings for testing.
-func newTailer(
filter TailerFilterFunc,
- bufferSize int, polltime time.Duration) *Tailer {
+func newTailer(
filter TailerFilterFunc,
+ bufferSize int, polltime time.Duration, lookBack bool) *Tailer {
t := &Tailer{
readSeeker: readSeeker,
reader: bufio.NewReader
@@ -62,6 +74,7 @@
filter: filter,
bufferSize: bufferSize,
polltime: polltime,
+ lookBack: lookBack,
}
go func() {
defer t.tomb.Done()
@@ -98,8 +111,13 @@
// writer too.
func (t *Tailer) loop() error {
// Position the readSeeker.
- if err := t.seekLastLines(); err != nil {
- return err
+ if t.lookBack {
+ if err := t.seekLastLines(); err != nil {
+ return err
+ }
+ }
+ if t.StartedTailing != nil {
+ t.StartedTailing()
}
// Start polling.
// TODO(mue) 2013-12-06
@@ -139,8 +157,12 @@
if err != nil {
return err
}
+ if t.lines == 0 {
+ // We are done, just seeking to the end is sufficient.
+ return nil
+ }
seekPos := int64(0)
- found := 0
+ found := uint(0)
buffer := make([]byte, t.bufferSize)
SeekLoop:
for offset > 0 {
Index: utils/tailer/ tailer_ test.go tailer/ tailer_ test.go' tailer_ test.go 2013-12-13 13:21:44 +0000 tailer_ test.go 2014-04-04 03:20:53 +0000 Written int uested int uested uint TailerFilterFun c Tailer, *readSeeker) func([]string) ctedData []string ectedData []string ectedData: alphabetData[5:], tten: 5, edData: alphabetData[5:], tten: 5, edData: alphabetData, Written: 3, esWritten, sigc) NewTestTailer( rs, writer, test.initialLin esRequested, NewTestTailer( rs, writer, test.initialLin esRequested,
=== modified file 'utils/
--- utils/tailer/
+++ utils/tailer/
@@ -59,12 +59,13 @@
description string
data []string
initialLines
- initialLinesReq
+ initialLinesReq
bufferSize int
filter tailer.
injector func(*tailer.
initialColle
appendedColl
+ fromStart bool
err string
}{{
description: "lines are longer than buffer size",
@@ -182,6 +183,19 @@
},
appendedColl
}, {
+ description: "ignore current lines",
+ data: alphabetData,
+ initialLinesWri
+ bufferSize: 5,
+ appendedCollect
+}, {
+ description: "start from the start",
+ data: alphabetData,
+ initialLinesWri
+ bufferSize: 5,
+ appendedCollect
+ fromStart: true,
+}, {
description: "lines are longer than buffer size, less lines
already written than initially requested",
data: alphabetData,
initialLines
@@ -323,7 +337,7 @@
reader, writer := io.Pipe()
sigc := make(chan struct{}, 1)
rs := startReadSeeker(c, test.data, test.initialLin
- tailer := tailer.
test.filter, bufferSize, 2*time.Millisecond)
+ tailer := tailer.
test.filter, bufferSize, 2*time.Millisecond, !test.fromStart)
linec := startReading(c, tailer, reader, writer)
// Collect initial data. testing. LongWait)
@@ -386,6 +400,9 @@
// linec is closed due to stopping or an error only the values so far care
// compared. Checking the reason for termination is done in the test.
func assertCollected(c *gc.C, linec chan string, compare []string,
injection func([]string)) {
+ if len(compare) == 0 {
+ return
+ }
timeout := time.After(
lines := []string{}
for {