Merge lp:~thumper/juju-core/fix-backlog-limits into lp:~go-bot/juju-core/trunk

Proposed by Tim Penhey
Status: Merged
Approved by: Tim Penhey
Approved revision: no longer in the source branch.
Merged at revision: 2623
Proposed branch: lp:~thumper/juju-core/fix-backlog-limits
Merge into: lp:~go-bot/juju-core/trunk
Prerequisite: lp:~thumper/juju-core/debug-log-api
Diff against target: 424 lines (+100/-84)
6 files modified
state/apiserver/debuglog.go (+24/-6)
state/apiserver/debuglog_internal_test.go (+30/-35)
state/apiserver/debuglog_test.go (+12/-0)
utils/tailer/export_test.go (+4/-1)
utils/tailer/tailer.go (+18/-38)
utils/tailer/tailer_test.go (+12/-4)
To merge this branch: bzr merge lp:~thumper/juju-core/fix-backlog-limits
Reviewer Review Type Date Requested Status
Juju Engineering Pending
Review via email: mp+215333@code.launchpad.net

Commit message

Fix the limit for the debug-log calls.

Unbeknownst to me, the filter method was being called
during the backlog iteration as well. This fix introduces
a callback that the tailer calls when it starts the forward
filtering.

This allows us to count the filter calls only when the
filter is being called to write out the results. We cannot
count the write calls because the tailer uses buffered i/o
there.

https://codereview.appspot.com/85570045/

Description of the change

Fix the limit for the debug-log calls.

Unbeknownst to me, the filter method was being called
during the backlog iteration as well. This fix introduces
a callback that the tailer calls when it starts the forward
filtering.

This allows us to count the filter calls only when the
filter is being called to write out the results. We cannot
count the write calls because the tailer uses buffered i/o
there.

https://codereview.appspot.com/85570045/

To post a comment you must log in.
Revision history for this message
Tim Penhey (thumper) wrote :
Download full text (8.0 KiB)

Reviewers: mp+215333_code.launchpad.net,

Message:
Please take a look.

Description:
Fix the limit for the debug-log calls.

Unbeknownst to me, the filter method was being called
during the backlog iteration as well. This fix introduces
a callback that the tailer calls when it starts the forward
filtering.

This allows us to count the filter calls only when the
filter is being called to write out the results. We cannot
count the write calls because the tailer uses buffered i/o
there.

https://code.launchpad.net/~thumper/juju-core/fix-backlog-limits/+merge/215333

Requires:
https://code.launchpad.net/~thumper/juju-core/debug-log-api/+merge/215323

(do not edit description out of merge proposal)

Please review this at https://codereview.appspot.com/85570045/

Affected files (+42, -10 lines):
   A [revision details]
   M container/lxc/clonetemplate.go
   M state/apiserver/debuglog.go
   M state/apiserver/debuglog_internal_test.go
   M state/apiserver/debuglog_test.go
   M utils/tailer/tailer.go
   M utils/tailer/tailer_test.go

Index: [revision details]
=== added file '[revision details]'
--- [revision details] 2012-01-01 00:00:00 +0000
+++ [revision details] 2012-01-01 00:00:00 +0000
@@ -0,0 +1,2 @@
+Old revision: <email address hidden>
+New revision: <email address hidden>

Index: container/lxc/clonetemplate.go
=== modified file 'container/lxc/clonetemplate.go'
--- container/lxc/clonetemplate.go 2014-04-04 03:46:13 +0000
+++ container/lxc/clonetemplate.go 2014-04-11 01:57:45 +0000
@@ -179,7 +179,7 @@
   }

   tailWriter := &logTail{tick: time.Now()}
- consoleTailer := tailer.NewTailer(console, tailWriter, nil)
+ consoleTailer := tailer.NewTailer(console, tailWriter, nil, nil)
   defer consoleTailer.Stop()

   // We should wait maybe 1 minute between output?

Index: state/apiserver/debuglog.go
=== modified file 'state/apiserver/debuglog.go'
--- state/apiserver/debuglog.go 2014-04-10 09:43:51 +0000
+++ state/apiserver/debuglog.go 2014-04-11 01:54:49 +0000
@@ -209,18 +209,23 @@
   maxLines uint
   lineCount uint
   fromTheStart bool
+ started bool
  }

  // start the tailer listening to the logFile, and sending the matching
  // lines to the writer.
  func (stream *logStream) start(logFile io.ReadSeeker, writer io.Writer) {
   if stream.fromTheStart {
- stream.logTailer = tailer.NewTailer(logFile, writer, stream.filterLine)
+ stream.logTailer = tailer.NewTailer(logFile, writer, stream.filterLine,
stream.tailStarted)
   } else {
- stream.logTailer = tailer.NewTailerBacktrack(logFile, writer,
stream.backlog, stream.filterLine)
+ stream.logTailer = tailer.NewTailerBacktrack(logFile, writer,
stream.backlog, stream.filterLine, stream.tailStarted)
   }
  }

+func (stream *logStream) tailStarted() {
+ stream.started = true
+}
+
  // loop starts the tailer with the log file and the web socket.
  func (stream *logStream) loop() error {
   select {
@@ -239,7 +244,7 @@
    stream.checkIncludeModule(log) &&
    !stream.exclude(log) &&
    stream.checkLevel(log)
- if result && stream.maxLines > 0 {
+ if stream.started && result && stream.maxLines > 0 {
    stream...

Read more...

Revision history for this message
Andrew Wilkins (axwalk) wrote :

https://codereview.appspot.com/85570045/diff/1/utils/tailer/tailer.go
File utils/tailer/tailer.go (right):

https://codereview.appspot.com/85570045/diff/1/utils/tailer/tailer.go#newcode56
utils/tailer/tailer.go:56: filter TailerFilterFunc, callback
TailerFilterStartedFunc) *Tailer {
Rather than adding *another* parameter that's only relevant to
backtracking, I really think it would be best to ditch
NewTailerBacktrack and create a separate function that does the
backtracking.

https://codereview.appspot.com/85570045/diff/1/utils/tailer/tailer.go#newcode65
utils/tailer/tailer.go:65: filter TailerFilterFunc, callback
TailerFilterStartedFunc) *Tailer {
Isn't having a callback fairly pointless here? It's going to be called
immediately.

https://codereview.appspot.com/85570045/

Revision history for this message
Roger Peppe (rogpeppe) wrote :

As far as I can see, this makes a nice Tailer API into
a not-so-nice one because you've made a stateful
filter function and you want it to be called in just
the way you expect.

I don't think the filter function should be stateful.

I can see a couple of solutions that might be better:

1) (my preferred option) ditch the server-side maxLines
functionality completely. It could be implemented client-side by
simply closing the connection when the right number
of lines is received, but we've always got head(1),
so I'd suggest just losing it entirely.
2) add maxLines functionality to the Tailer itself.

1) might use a little extra bandwidth,
but I can't see that that would be great problem
in practice. If it is, the second option could
be implemented later.

https://codereview.appspot.com/85570045/

Revision history for this message
Tim Penhey (thumper) wrote :

Due to the buffered i/o in the tailer, I'd prefer to keep the line
parsing functionality in the server side.

However I think that axw's approach of breaking out the backtracking is
a good one, and that is the approach I'll use.

https://codereview.appspot.com/85570045/

Revision history for this message
Tim Penhey (thumper) wrote :
Revision history for this message
Andrew Wilkins (axwalk) wrote :

LGTM with a few suggestions

https://codereview.appspot.com/85570045/diff/20001/state/apiserver/debuglog.go
File state/apiserver/debuglog.go (right):

https://codereview.appspot.com/85570045/diff/20001/state/apiserver/debuglog.go#newcode225
state/apiserver/debuglog.go:225: err := tailer.SeekLastLines(logFile,
stream.backlog, stream.filterLine)
return tailer.SeekLastLines(...

https://codereview.appspot.com/85570045/diff/20001/state/apiserver/debuglog.go#newcode258
state/apiserver/debuglog.go:258: if stream.started && result &&
stream.maxLines > 0 {
It would be better to just split filterLine into something non-counting
and counting, where the latter calls the former and adds the linecount
check. Then you don't need the boolean.

https://codereview.appspot.com/85570045/diff/20001/utils/tailer/tailer.go
File utils/tailer/tailer.go (right):

https://codereview.appspot.com/85570045/diff/20001/utils/tailer/tailer.go#newcode31
utils/tailer/tailer.go:31: // TailerFilterStartedFunc is a callback that
is called when the filtering is
Delete this?

https://codereview.appspot.com/85570045/

Revision history for this message
Roger Peppe (rogpeppe) wrote :

> Due to the buffered i/o in the tailer, I'd prefer to keep the line
parsing
> functionality in the server side.

I don't quite understand this remark (the buffer is flushed whenever we
get to the end of the file, so it shouldn't make any significant
difference), but splitting out the backtracking code makes me much
happier.

LGTM with a couple of trivial suggestions below.

https://codereview.appspot.com/85570045/diff/20001/state/apiserver/debuglog.go
File state/apiserver/debuglog.go (right):

https://codereview.appspot.com/85570045/diff/20001/state/apiserver/debuglog.go#newcode224
state/apiserver/debuglog.go:224: if !stream.fromTheStart {
if stream.fromTheStart {
     return nil
}
...

saves a negative and a level of indentation.

https://codereview.appspot.com/85570045/diff/20001/state/apiserver/debuglog.go#newcode258
state/apiserver/debuglog.go:258: if stream.started && result &&
stream.maxLines > 0 {
On 2014/04/14 03:52:40, axw wrote:
> It would be better to just split filterLine into something
non-counting and
> counting, where the latter calls the former and adds the linecount
check. Then
> you don't need the boolean.

+1

https://codereview.appspot.com/85570045/diff/20001/utils/tailer/tailer.go
File utils/tailer/tailer.go (right):

https://codereview.appspot.com/85570045/diff/20001/utils/tailer/tailer.go#newcode31
utils/tailer/tailer.go:31: // TailerFilterStartedFunc is a callback that
is called when the filtering is
On 2014/04/14 03:52:40, axw wrote:
> Delete this?

+1

https://codereview.appspot.com/85570045/diff/20001/utils/tailer/tailer.go#newcode132
utils/tailer/tailer.go:132: // wanted number of filtered lines before
the end.
// If filter is non-nil, only lines for which filter returns
// true will be counted.

?

https://codereview.appspot.com/85570045/

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'state/apiserver/debuglog.go'
--- state/apiserver/debuglog.go 2014-04-10 09:43:51 +0000
+++ state/apiserver/debuglog.go 2014-04-14 04:14:21 +0000
@@ -70,6 +70,11 @@
70 return70 return
71 }71 }
72 defer logFile.Close()72 defer logFile.Close()
73 if err := stream.positionLogFile(logFile); err != nil {
74 h.sendError(socket, fmt.Errorf("cannot position log file: %v", err))
75 socket.Close()
76 return
77 }
7378
74 // If we get to here, no more errors to report, so we report a nil79 // If we get to here, no more errors to report, so we report a nil
75 // error. This way the first line of the socket is always a json80 // error. This way the first line of the socket is always a json
@@ -211,14 +216,20 @@
211 fromTheStart bool216 fromTheStart bool
212}217}
213218
219// positionLogFile will update the internal read position of the logFile to be
220// at the end of the file or somewhere in the middle if backlog has been specified.
221func (stream *logStream) positionLogFile(logFile io.ReadSeeker) error {
222 // Seek to the end, or lines back from the end if we need to.
223 if !stream.fromTheStart {
224 return tailer.SeekLastLines(logFile, stream.backlog, stream.filterLine)
225 }
226 return nil
227}
228
214// start the tailer listening to the logFile, and sending the matching229// start the tailer listening to the logFile, and sending the matching
215// lines to the writer.230// lines to the writer.
216func (stream *logStream) start(logFile io.ReadSeeker, writer io.Writer) {231func (stream *logStream) start(logFile io.ReadSeeker, writer io.Writer) {
217 if stream.fromTheStart {232 stream.logTailer = tailer.NewTailer(logFile, writer, stream.countedFilterLine)
218 stream.logTailer = tailer.NewTailer(logFile, writer, stream.filterLine)
219 } else {
220 stream.logTailer = tailer.NewTailerBacktrack(logFile, writer, stream.backlog, stream.filterLine)
221 }
222}233}
223234
224// loop starts the tailer with the log file and the web socket.235// loop starts the tailer with the log file and the web socket.
@@ -235,10 +246,17 @@
235// filterLine checks the received line for one of the confgured tags.246// filterLine checks the received line for one of the confgured tags.
236func (stream *logStream) filterLine(line []byte) bool {247func (stream *logStream) filterLine(line []byte) bool {
237 log := parseLogLine(string(line))248 log := parseLogLine(string(line))
238 result := stream.checkIncludeEntity(log) &&249 return stream.checkIncludeEntity(log) &&
239 stream.checkIncludeModule(log) &&250 stream.checkIncludeModule(log) &&
240 !stream.exclude(log) &&251 !stream.exclude(log) &&
241 stream.checkLevel(log)252 stream.checkLevel(log)
253}
254
255// countedFilterLine checks the received line for one of the confgured tags,
256// and also checks to make sure the stream doesn't send more than the
257// specified number of lines.
258func (stream *logStream) countedFilterLine(line []byte) bool {
259 result := stream.filterLine(line)
242 if result && stream.maxLines > 0 {260 if result && stream.maxLines > 0 {
243 stream.lineCount++261 stream.lineCount++
244 result = stream.lineCount <= stream.maxLines262 result = stream.lineCount <= stream.maxLines
245263
=== modified file 'state/apiserver/debuglog_internal_test.go'
--- state/apiserver/debuglog_internal_test.go 2014-04-07 05:11:48 +0000
+++ state/apiserver/debuglog_internal_test.go 2014-04-14 04:14:21 +0000
@@ -7,7 +7,6 @@
77
8import (8import (
9 "bytes"9 "bytes"
10 "io"
11 "net/url"10 "net/url"
12 "os"11 "os"
13 "path/filepath"12 "path/filepath"
@@ -179,36 +178,22 @@
179 "machine-0: date time WARNING juju.foo.bar")), jc.IsFalse)178 "machine-0: date time WARNING juju.foo.bar")), jc.IsFalse)
180}179}
181180
182func (s *debugInternalSuite) TestFilterLineWithLimit(c *gc.C) {181func (s *debugInternalSuite) TestCountedFilterLineWithLimit(c *gc.C) {
183 stream := &logStream{182 stream := &logStream{
184 filterLevel: loggo.INFO,183 filterLevel: loggo.INFO,
185 maxLines: 5,184 maxLines: 5,
186 }185 }
187 line := []byte("machine-0: date time WARNING juju")186 line := []byte("machine-0: date time WARNING juju")
188 c.Check(stream.filterLine(line), jc.IsTrue)187 c.Check(stream.countedFilterLine(line), jc.IsTrue)
189 c.Check(stream.filterLine(line), jc.IsTrue)188 c.Check(stream.countedFilterLine(line), jc.IsTrue)
190 c.Check(stream.filterLine(line), jc.IsTrue)189 c.Check(stream.countedFilterLine(line), jc.IsTrue)
191 c.Check(stream.filterLine(line), jc.IsTrue)190 c.Check(stream.countedFilterLine(line), jc.IsTrue)
192 c.Check(stream.filterLine(line), jc.IsTrue)191 c.Check(stream.countedFilterLine(line), jc.IsTrue)
193 c.Check(stream.filterLine(line), jc.IsFalse)192 c.Check(stream.countedFilterLine(line), jc.IsFalse)
194 c.Check(stream.filterLine(line), jc.IsFalse)193 c.Check(stream.countedFilterLine(line), jc.IsFalse)
195}194}
196195
197type seekWaitReader struct {196func (s *debugInternalSuite) testStreamInternal(c *gc.C, fromTheStart bool, backlog, maxLines uint, expected, errMatch string) {
198 io.ReadSeeker
199 wait chan struct{}
200}
201
202func (w *seekWaitReader) Seek(offset int64, whence int) (int64, error) {
203 pos, err := w.ReadSeeker.Seek(offset, whence)
204 if w.wait != nil {
205 close(w.wait)
206 w.wait = nil
207 }
208 return pos, err
209}
210
211func (s *debugInternalSuite) testStreamInternal(c *gc.C, fromTheStart bool, maxLines uint, expected, errMatch string) {
212197
213 dir := c.MkDir()198 dir := c.MkDir()
214 logPath := filepath.Join(dir, "logfile.txt")199 logPath := filepath.Join(dir, "logfile.txt")
@@ -223,17 +208,20 @@
223line 2208line 2
224line 3209line 3
225`)210`)
226 stream := &logStream{fromTheStart: fromTheStart, maxLines: maxLines}211 stream := &logStream{
212 fromTheStart: fromTheStart,
213 backlog: backlog,
214 maxLines: maxLines,
215 }
216 err = stream.positionLogFile(logFileReader)
217 c.Assert(err, gc.IsNil)
227 output := &bytes.Buffer{}218 output := &bytes.Buffer{}
228 waitReader := &seekWaitReader{logFileReader, make(chan struct{})}219 stream.start(logFileReader, output)
229 stream.start(waitReader, output)
230220
231 go func() {221 go func() {
232 defer stream.tomb.Done()222 defer stream.tomb.Done()
233 stream.tomb.Kill(stream.loop())223 stream.tomb.Kill(stream.loop())
234 }()224 }()
235 // wait for the tailer to have started tailing before writing more
236 <-waitReader.wait
237225
238 logFile.WriteString("line 4\n")226 logFile.WriteString("line 4\n")
239 logFile.WriteString("line 5\n")227 logFile.WriteString("line 5\n")
@@ -265,7 +253,7 @@
265line 4253line 4
266line 5254line 5
267`255`
268 s.testStreamInternal(c, true, 0, expected, "")256 s.testStreamInternal(c, true, 0, 0, expected, "")
269}257}
270258
271func (s *debugInternalSuite) TestLogStreamLoopFromTheStartMaxLines(c *gc.C) {259func (s *debugInternalSuite) TestLogStreamLoopFromTheStartMaxLines(c *gc.C) {
@@ -273,21 +261,28 @@
273line 2261line 2
274line 3262line 3
275`263`
276 s.testStreamInternal(c, true, 3, expected, "max lines reached")264 s.testStreamInternal(c, true, 0, 3, expected, "max lines reached")
277}265}
278266
279func (s *debugInternalSuite) TestLogStreamLoopJustTail(c *gc.C) {267func (s *debugInternalSuite) TestLogStreamLoopJustTail(c *gc.C) {
280 expected := `line 4268 expected := `line 4
281line 5269line 5
282`270`
283 s.testStreamInternal(c, false, 0, expected, "")271 s.testStreamInternal(c, false, 0, 0, expected, "")
272}
273
274func (s *debugInternalSuite) TestLogStreamLoopBackOneLimitTwo(c *gc.C) {
275 expected := `line 3
276line 4
277`
278 s.testStreamInternal(c, false, 1, 2, expected, "max lines reached")
284}279}
285280
286func (s *debugInternalSuite) TestLogStreamLoopTailMaxLinesNotYetReached(c *gc.C) {281func (s *debugInternalSuite) TestLogStreamLoopTailMaxLinesNotYetReached(c *gc.C) {
287 expected := `line 4282 expected := `line 4
288line 5283line 5
289`284`
290 s.testStreamInternal(c, false, 3, expected, "")285 s.testStreamInternal(c, false, 0, 3, expected, "")
291}286}
292287
293func assertStreamParams(c *gc.C, obtained, expected *logStream) {288func assertStreamParams(c *gc.C, obtained, expected *logStream) {
294289
=== modified file 'state/apiserver/debuglog_test.go'
--- state/apiserver/debuglog_test.go 2014-04-10 09:43:51 +0000
+++ state/apiserver/debuglog_test.go 2014-04-14 04:14:21 +0000
@@ -122,6 +122,18 @@
122 s.assertWebsocketClosed(c, reader)122 s.assertWebsocketClosed(c, reader)
123}123}
124124
125func (s *debugLogSuite) TestBacklogWithMaxLines(c *gc.C) {
126 s.writeLogLines(c, 10)
127
128 reader := s.openWebsocket(c, url.Values{"backlog": {"5"}, "maxLines": {"10"}})
129 s.assertLogFollowing(c, reader)
130 s.writeLogLines(c, logLineCount)
131
132 linesRead := s.readLogLines(c, reader, 10)
133 c.Assert(linesRead, jc.DeepEquals, logLines[5:15])
134 s.assertWebsocketClosed(c, reader)
135}
136
125func (s *debugLogSuite) TestFilter(c *gc.C) {137func (s *debugLogSuite) TestFilter(c *gc.C) {
126 s.ensureLogFile(c)138 s.ensureLogFile(c)
127139
128140
=== modified file 'utils/tailer/export_test.go'
--- utils/tailer/export_test.go 2013-12-11 17:11:30 +0000
+++ utils/tailer/export_test.go 2014-04-14 04:14:21 +0000
@@ -3,4 +3,7 @@
33
4package tailer4package tailer
55
6var NewTestTailer = newTailer6var (
7 BufferSize = &bufferSize
8 NewTestTailer = newTailer
9)
710
=== modified file 'utils/tailer/tailer.go'
--- utils/tailer/tailer.go 2014-04-04 04:35:44 +0000
+++ utils/tailer/tailer.go 2014-04-14 04:14:21 +0000
@@ -14,12 +14,13 @@
14)14)
1515
16const (16const (
17 bufferSize = 409617 defaultBufferSize = 4096
18 polltime = time.Second18 polltime = time.Second
19 delimiter = '\n'19 delimiter = '\n'
20)20)
2121
22var (22var (
23 bufferSize = defaultBufferSize
23 delimiters = []byte{delimiter}24 delimiters = []byte{delimiter}
24)25)
2526
@@ -35,20 +36,8 @@
35 reader *bufio.Reader36 reader *bufio.Reader
36 writeCloser io.WriteCloser37 writeCloser io.WriteCloser
37 writer *bufio.Writer38 writer *bufio.Writer
38 lines uint
39 filter TailerFilterFunc39 filter TailerFilterFunc
40 bufferSize int
41 polltime time.Duration40 polltime time.Duration
42 lookBack bool
43}
44
45// NewTailerBacktrack starts a Tailer which reads strings from the passed
46// ReadSeeker line by line. If a filter function is specified the read
47// lines are filtered. The matching lines are written to the passed
48// Writer. The reading begins the specified number of matching lines
49// from the end.
50func NewTailerBacktrack(readSeeker io.ReadSeeker, writer io.Writer, lines uint, filter TailerFilterFunc) *Tailer {
51 return newTailer(readSeeker, writer, lines, filter, bufferSize, polltime, true)
52}41}
5342
54// NewTailer starts a Tailer which reads strings from the passed43// NewTailer starts a Tailer which reads strings from the passed
@@ -56,22 +45,19 @@
56// lines are filtered. The matching lines are written to the passed45// lines are filtered. The matching lines are written to the passed
57// Writer.46// Writer.
58func NewTailer(readSeeker io.ReadSeeker, writer io.Writer, filter TailerFilterFunc) *Tailer {47func NewTailer(readSeeker io.ReadSeeker, writer io.Writer, filter TailerFilterFunc) *Tailer {
59 return newTailer(readSeeker, writer, 0, filter, bufferSize, polltime, false)48 return newTailer(readSeeker, writer, filter, polltime)
60}49}
6150
62// newTailer starts a Tailer like NewTailer but allows the setting of51// newTailer starts a Tailer like NewTailer but allows the setting of
63// the read buffer size and the time between pollings for testing.52// the read buffer size and the time between pollings for testing.
64func newTailer(readSeeker io.ReadSeeker, writer io.Writer, lines uint, filter TailerFilterFunc,53func newTailer(readSeeker io.ReadSeeker, writer io.Writer,
65 bufferSize int, polltime time.Duration, lookBack bool) *Tailer {54 filter TailerFilterFunc, polltime time.Duration) *Tailer {
66 t := &Tailer{55 t := &Tailer{
67 readSeeker: readSeeker,56 readSeeker: readSeeker,
68 reader: bufio.NewReaderSize(readSeeker, bufferSize),57 reader: bufio.NewReaderSize(readSeeker, bufferSize),
69 writer: bufio.NewWriter(writer),58 writer: bufio.NewWriter(writer),
70 lines: lines,
71 filter: filter,59 filter: filter,
72 bufferSize: bufferSize,
73 polltime: polltime,60 polltime: polltime,
74 lookBack: lookBack,
75 }61 }
76 go func() {62 go func() {
77 defer t.tomb.Done()63 defer t.tomb.Done()
@@ -107,12 +93,6 @@
107// writer and then polls for more data to write it to the93// writer and then polls for more data to write it to the
108// writer too.94// writer too.
109func (t *Tailer) loop() error {95func (t *Tailer) loop() error {
110 // Position the readSeeker.
111 if t.lookBack {
112 if err := t.seekLastLines(); err != nil {
113 return err
114 }
115 }
116 // Start polling.96 // Start polling.
117 // TODO(mue) 2013-12-0697 // TODO(mue) 2013-12-06
118 // Handling of read-seeker/files being truncated during98 // Handling of read-seeker/files being truncated during
@@ -144,26 +124,26 @@
144 }124 }
145}125}
146126
147// seekLastLines sets the read position of the ReadSeeker to the127// SeekLastLines sets the read position of the ReadSeeker to the
148// wanted number of filtered lines before the end.128// wanted number of filtered lines before the end.
149func (t *Tailer) seekLastLines() error {129func SeekLastLines(readSeeker io.ReadSeeker, lines uint, filter TailerFilterFunc) error {
150 offset, err := t.readSeeker.Seek(0, os.SEEK_END)130 offset, err := readSeeker.Seek(0, os.SEEK_END)
151 if err != nil {131 if err != nil {
152 return err132 return err
153 }133 }
154 if t.lines == 0 {134 if lines == 0 {
155 // We are done, just seeking to the end is sufficient.135 // We are done, just seeking to the end is sufficient.
156 return nil136 return nil
157 }137 }
158 seekPos := int64(0)138 seekPos := int64(0)
159 found := uint(0)139 found := uint(0)
160 buffer := make([]byte, t.bufferSize)140 buffer := make([]byte, bufferSize)
161SeekLoop:141SeekLoop:
162 for offset > 0 {142 for offset > 0 {
163 // buffer contains the data left over from the143 // buffer contains the data left over from the
164 // previous iteration.144 // previous iteration.
165 space := cap(buffer) - len(buffer)145 space := cap(buffer) - len(buffer)
166 if space < t.bufferSize {146 if space < bufferSize {
167 // Grow buffer.147 // Grow buffer.
168 newBuffer := make([]byte, len(buffer), cap(buffer)*2)148 newBuffer := make([]byte, len(buffer), cap(buffer)*2)
169 copy(newBuffer, buffer)149 copy(newBuffer, buffer)
@@ -180,11 +160,11 @@
180 copy(buffer[space:cap(buffer)], buffer)160 copy(buffer[space:cap(buffer)], buffer)
181 buffer = buffer[0 : len(buffer)+space]161 buffer = buffer[0 : len(buffer)+space]
182 offset -= int64(space)162 offset -= int64(space)
183 _, err := t.readSeeker.Seek(offset, os.SEEK_SET)163 _, err := readSeeker.Seek(offset, os.SEEK_SET)
184 if err != nil {164 if err != nil {
185 return err165 return err
186 }166 }
187 _, err = io.ReadFull(t.readSeeker, buffer[0:space])167 _, err = io.ReadFull(readSeeker, buffer[0:space])
188 if err != nil {168 if err != nil {
189 return err169 return err
190 }170 }
@@ -207,9 +187,9 @@
207 break187 break
208 }188 }
209 start++189 start++
210 if t.isValid(buffer[start:end]) {190 if filter == nil || filter(buffer[start:end]) {
211 found++191 found++
212 if found >= t.lines {192 if found >= lines {
213 seekPos = offset + int64(start)193 seekPos = offset + int64(start)
214 break SeekLoop194 break SeekLoop
215 }195 }
@@ -221,7 +201,7 @@
221 buffer = buffer[0:end]201 buffer = buffer[0:end]
222 }202 }
223 // Final positioning.203 // Final positioning.
224 t.readSeeker.Seek(seekPos, os.SEEK_SET)204 readSeeker.Seek(seekPos, os.SEEK_SET)
225 return nil205 return nil
226}206}
227207
228208
=== modified file 'utils/tailer/tailer_test.go'
--- utils/tailer/tailer_test.go 2014-04-04 03:46:13 +0000
+++ utils/tailer/tailer_test.go 2014-04-14 04:14:21 +0000
@@ -15,6 +15,7 @@
15 gc "launchpad.net/gocheck"15 gc "launchpad.net/gocheck"
1616
17 "launchpad.net/juju-core/testing"17 "launchpad.net/juju-core/testing"
18 "launchpad.net/juju-core/testing/testbase"
18 "launchpad.net/juju-core/utils/tailer"19 "launchpad.net/juju-core/utils/tailer"
19)20)
2021
@@ -22,9 +23,11 @@
22 gc.TestingT(t)23 gc.TestingT(t)
23}24}
2425
25type tailerSuite struct{}26type tailerSuite struct {
27 testbase.LoggingSuite
28}
2629
27var _ = gc.Suite(tailerSuite{})30var _ = gc.Suite(&tailerSuite{})
2831
29var alphabetData = []string{32var alphabetData = []string{
30 "alpha alpha\n",33 "alpha alpha\n",
@@ -326,7 +329,7 @@
326 },329 },
327}}330}}
328331
329func (tailerSuite) TestTailer(c *gc.C) {332func (s *tailerSuite) TestTailer(c *gc.C) {
330 for i, test := range tests {333 for i, test := range tests {
331 c.Logf("Test #%d) %s", i, test.description)334 c.Logf("Test #%d) %s", i, test.description)
332 bufferSize := test.bufferSize335 bufferSize := test.bufferSize
@@ -334,10 +337,15 @@
334 // Default value.337 // Default value.
335 bufferSize = 4096338 bufferSize = 4096
336 }339 }
340 s.PatchValue(tailer.BufferSize, bufferSize)
337 reader, writer := io.Pipe()341 reader, writer := io.Pipe()
338 sigc := make(chan struct{}, 1)342 sigc := make(chan struct{}, 1)
339 rs := startReadSeeker(c, test.data, test.initialLinesWritten, sigc)343 rs := startReadSeeker(c, test.data, test.initialLinesWritten, sigc)
340 tailer := tailer.NewTestTailer(rs, writer, test.initialLinesRequested, test.filter, bufferSize, 2*time.Millisecond, !test.fromStart)344 if !test.fromStart {
345 err := tailer.SeekLastLines(rs, test.initialLinesRequested, test.filter)
346 c.Assert(err, gc.IsNil)
347 }
348 tailer := tailer.NewTestTailer(rs, writer, test.filter, 2*time.Millisecond)
341 linec := startReading(c, tailer, reader, writer)349 linec := startReading(c, tailer, reader, writer)
342350
343 // Collect initial data.351 // Collect initial data.

Subscribers

People subscribed via source and target branches

to status/vote changes: