Merge lp:~themue/juju-core/057-tailer into lp:~go-bot/juju-core/trunk

Proposed by Frank Mueller
Status: Merged
Approved by: Frank Mueller
Approved revision: no longer in the source branch.
Merged at revision: 2152
Proposed branch: lp:~themue/juju-core/057-tailer
Merge into: lp:~go-bot/juju-core/trunk
Diff against target: 767 lines (+752/-0)
3 files modified
utils/tailer/export_test.go (+6/-0)
utils/tailer/tailer.go (+250/-0)
utils/tailer/tailer_test.go (+496/-0)
To merge this branch: bzr merge lp:~themue/juju-core/057-tailer
Reviewer Review Type Date Requested Status
Juju Engineering Pending
Review via email: mp+197522@code.launchpad.net

Commit message

utils: added Tailer for tailing of logs in API

The Tailer is the initial component of the debug logging command
of the API. It allows the filtered tailing of any ReaderSeeker.
If no filter is passed all lines will be written in the passed
Writer, otherwise only those where the filter function returns
true. The initial number of lines can also be specified, the
filter already works here. So if a File (which is a ReaderSeeker)
containes 100 lines, 10 lines are wanted and 5 match to the
filter only those 5 lines are returned.

https://codereview.appspot.com/36540043/

Description of the change

utils: added Tailer for tailing of logs in API

The Tailer is the initial component of the debug logging command
of the API. It allows the filtered tailing of any ReaderSeeker.
If no filter is passed all lines will be written in the passed
Writer, otherwise only those where the filter function returns
true. The initial number of lines can also be specified, the
filter already works here. So if a File (which is a ReaderSeeker)
containes 100 lines, 10 lines are wanted and 5 match to the
filter only those 5 lines are returned.

https://codereview.appspot.com/36540043/

To post a comment you must log in.
Revision history for this message
Frank Mueller (themue) wrote :

Reviewers: mp+197522_code.launchpad.net,

Message:
Please take a look.

Description:
utils: added Tailer for tailing of logs in API

The Tailer is the initial component of the debug logging command
of the API. It allows the filtered tailing of any ReaderSeeker.
If no filter is passed all lines will be written in the passed
Writer, otherwise only those where the filter function returns
true. The initial number of lines can also be specified, the
filter already works here. So if a File (which is a ReaderSeeker)
containes 100 lines, 10 lines are wanted and 5 match to the
filter only those 5 lines are returned.

https://code.launchpad.net/~themue/juju-core/057-tailer/+merge/197522

(do not edit description out of merge proposal)

Please review this at https://codereview.appspot.com/36540043/

Affected files (+412, -0 lines):
   A [revision details]
   A utils/tailer.go
   A utils/tailer_test.go

Revision history for this message
Frank Mueller (themue) wrote :
Revision history for this message
Roger Peppe (rogpeppe) wrote :
Download full text (5.4 KiB)

This is a good start, but I have a few comments and suggestions.

I wonder if it might sit well inside its own package rather being added
to the grab-bag of stuff in utils.

https://codereview.appspot.com/36540043/diff/20001/utils/tailer.go
File utils/tailer.go (right):

https://codereview.appspot.com/36540043/diff/20001/utils/tailer.go#newcode30
utils/tailer.go:30: func StartFileTailer(filename string, lines int,
filter TailerFilterFunc,
I think this function is unnecessary. It's trivial for other code to
open a file.

https://codereview.appspot.com/36540043/diff/20001/utils/tailer.go#newcode40
utils/tailer.go:40: func StartTailer(readSeeker io.ReadSeeker, lines
int, filter TailerFilterFunc,
Please document this function properly.

https://codereview.appspot.com/36540043/diff/20001/utils/tailer.go#newcode87
utils/tailer.go:87: println("> error:", err.Error())
d

https://codereview.appspot.com/36540043/diff/20001/utils/tailer.go#newcode98
utils/tailer.go:98: if buffer[i] == '\n' {
This isn't strictly accurate (what happens when we have an unterminated
line at the end of the file?) but it's probably ok if we never filter
unterminated lines. A comment about why it's ok might prevent future
puzzling over the correctness of the code.

https://codereview.appspot.com/36540043/diff/20001/utils/tailer.go#newcode100
utils/tailer.go:100: if foundNewlines-1 == t.lines {
s/==/>=/

defensively.

Also, if we move the if statement before the increment,
then you can lose the "-1".

https://codereview.appspot.com/36540043/diff/20001/utils/tailer.go#newcode127
utils/tailer.go:127: line, err := reader.ReadString('\n')
I think I'd be tempted to use ReadSlice here (changing the type of
filter to func([]byte)bool.
That way the tailer won't need to generate any garbage at all AFAICS.

Unfortunately that limits the line size to the size of the bufio buffer,
which may be acceptable. An alternative is to use ReadLine and allocate
only if the line is too long.

Here's some code I wrote a while ago to do that, in case it might be
useful:

// readLine reads a line from r.
// The returned byte slice is only valid until the
// next read call on r.
func readLine(r *bufio.Reader) ([]byte, error) {
 line, isPrefix, err := r.ReadLine()
 if !isPrefix {
  return line, err
 }
 buf := append([]byte(nil), line...)
 for isPrefix && err == nil {
  line, isPrefix, err = r.ReadLine()
  buf = append(buf, line...)
 }
 return buf, err
}

https://codereview.appspot.com/36540043/diff/20001/utils/tailer.go#newcode147
utils/tailer.go:147: for {
This code is exactly the same as the code above.
Can't we do with just this? (if we just delete the
loop above and use NewTimer(0), I think it might
just work).

https://codereview.appspot.com/36540043/diff/20001/utils/tailer.go#newcode149
utils/tailer.go:149: if len(line) > 0 {
I think we should ignore the line at this point if it is unterminated,
something that's quite possible if the file is being written in
arbitrary chunks. If we find an unterminated line, we should seek back
to the start of the line before we start reading again.

That means we could pass the line to filter without the trailing \n,
meaning we can be more resilient if something s...

Read more...

Revision history for this message
William Reade (fwereade) wrote :

WIP in response to rog's review

Revision history for this message
Frank Mueller (themue) wrote :
Download full text (6.6 KiB)

Please take a look.

https://codereview.appspot.com/36540043/diff/20001/utils/tailer.go
File utils/tailer.go (right):

https://codereview.appspot.com/36540043/diff/20001/utils/tailer.go#newcode30
utils/tailer.go:30: func StartFileTailer(filename string, lines int,
filter TailerFilterFunc,
On 2013/12/05 16:20:37, rog wrote:
> I think this function is unnecessary. It's trivial for other code to
open a
> file.

Done.

https://codereview.appspot.com/36540043/diff/20001/utils/tailer.go#newcode40
utils/tailer.go:40: func StartTailer(readSeeker io.ReadSeeker, lines
int, filter TailerFilterFunc,
On 2013/12/05 16:20:37, rog wrote:
> Please document this function properly.

Done.

https://codereview.appspot.com/36540043/diff/20001/utils/tailer.go#newcode87
utils/tailer.go:87: println("> error:", err.Error())
On 2013/12/05 16:20:37, rog wrote:
> d

Done.

https://codereview.appspot.com/36540043/diff/20001/utils/tailer.go#newcode98
utils/tailer.go:98: if buffer[i] == '\n' {
On 2013/12/05 16:20:37, rog wrote:
> This isn't strictly accurate (what happens when we have an
unterminated line at
> the end of the file?) but it's probably ok if we never filter
unterminated
> lines. A comment about why it's ok might prevent future puzzling over
the
> correctness of the code.

Added a comment at the type declaration.

https://codereview.appspot.com/36540043/diff/20001/utils/tailer.go#newcode100
utils/tailer.go:100: if foundNewlines-1 == t.lines {
On 2013/12/05 16:20:37, rog wrote:
> s/==/>=/

> defensively.

> Also, if we move the if statement before the increment,
> then you can lose the "-1".

Done.

https://codereview.appspot.com/36540043/diff/20001/utils/tailer.go#newcode127
utils/tailer.go:127: line, err := reader.ReadString('\n')
On 2013/12/05 16:20:37, rog wrote:
> I think I'd be tempted to use ReadSlice here (changing the type of
filter to
> func([]byte)bool.
> That way the tailer won't need to generate any garbage at all AFAICS.

> Unfortunately that limits the line size to the size of the bufio
buffer, which
> may be acceptable. An alternative is to use ReadLine and allocate only
if the
> line is too long.

> Here's some code I wrote a while ago to do that, in case it might be
useful:

> // readLine reads a line from r.
> // The returned byte slice is only valid until the
> // next read call on r.
> func readLine(r *bufio.Reader) ([]byte, error) {
> line, isPrefix, err := r.ReadLine()
> if !isPrefix {
> return line, err
> }
> buf := append([]byte(nil), line...)
> for isPrefix && err == nil {
> line, isPrefix, err = r.ReadLine()
> buf = append(buf, line...)
> }
> return buf, err
> }

Good hint, using it.

https://codereview.appspot.com/36540043/diff/20001/utils/tailer.go#newcode147
utils/tailer.go:147: for {
On 2013/12/05 16:20:37, rog wrote:
> This code is exactly the same as the code above.
> Can't we do with just this? (if we just delete the
> loop above and use NewTimer(0), I think it might
> just work).

Yep, the split has been due to the initial approach of scanning all up
to the end first. Removed. Thanks.

https://codereview.appspot.com/36540043/diff/20001/utils/tailer.go#newcode149
utils/tailer.go:149: if len(line) > 0 {
On 2013/12/0...

Read more...

Revision history for this message
Roger Peppe (rogpeppe) wrote :
Download full text (5.6 KiB)

Getting there! A few more thoughts and suggestions below.

https://codereview.appspot.com/36540043/diff/40001/utils/tailer/tailer.go
File utils/tailer/tailer.go (right):

https://codereview.appspot.com/36540043/diff/40001/utils/tailer/tailer.go#newcode41
utils/tailer/tailer.go:41: // Writer. The reading beginns the specified
number of matching lines
s/beginns/begins/

https://codereview.appspot.com/36540043/diff/40001/utils/tailer/tailer.go#newcode43
utils/tailer/tailer.go:43: func NewStandardTailer(readSeeker
io.ReadSeeker, writer io.Writer, lines int, filter TailerFilterFunc)
*Tailer {
Given that this function is going to be the one that everyone calls, I
think I'd name it NewTailer, but do we actually need to expose buffer
size and poll interval publicly except to the tailer tests?

https://codereview.appspot.com/36540043/diff/40001/utils/tailer/tailer.go#newcode126
utils/tailer/tailer.go:126: readBuffer := make([]byte, t.bufsize)
This seems rather complex to me - I don't fully trust myself to vet the
logic. Also, I don't think we need to have a separate line buffer and
read buffer - we can use a single buffer and read directly into it. That
way we can avoid allocating a new buffer every time round the loop too,
although it is necessary to copy something from the start to the end.

I started trying to explain a better possiblity, but got carried away...
Here's the kind of thing I mean. Still somewhat complex, but I've tried
to keep the invariants simple. It needs testing (I've left in
one deliberate mistake that I hope to see a test for). I suggest
some internal tests that test this function in isolation with
quite a few different inputs.

// seekLastLines sets the read position of the ReadSeeker to the
// wanted number of filtered lines before the end.
func (t *Tailer) seekLastLines() error {
 offset, err := t.readSeeker.Seek(0, os.SEEK_END)
 if err != nil {
  return err
 }
 seekPos := int64(0)
 found := 0
 buf := make([]byte, minRead)
SeekLoop:
 for offset > 0 {
  // buf contains the data left over from the
  // previous iteration.
  space := cap(buf) - len(buf)
  if space < minRead {
   // grow buffer
   newBuf := make([]byte, len(buf), cap(buf)*2)
   copy(newBuf, buf)
   buf = newBuf
   space = cap(buf) - len(buf)

  }
  if int64(space) > offset {
   // Use exactly the right amount of space if there's
   // only a small amount remaining.
   space = int(offset)
  }
  // copy data remaining from last time to the end of the buffer,
  // so we can read into the right place.
  copy(buf[space:cap(buf)], buf)
  buf = buf[0 : len(buf)+space]

  offset -= int64(space)
  _, err := t.readSeeker.Seek(offset, os.SEEK_SET)
  if err != nil {
   return err
  }
  _, err = io.ReadFull(t.readSeeker, buf[0:space])
  if err != nil {
   return err
  }
  // Find the end of the last line in the buffer.
  // This will discard any unterminated line at the end
  // of the file.
  end := bytes.LastIndex(buf, delim)
  if end == -1 {
   // No end of line found - discard incomplete
   // line and continue looking. If this happens
   // at the beginning of the file, we don't care
   // because we're going to stop anyway.
   buf = buf[:0]
   continue
  }
  end++
  fo...

Read more...

Revision history for this message
William Reade (fwereade) wrote :

WIPping to address rog's review

Revision history for this message
Frank Mueller (themue) wrote :
Download full text (3.3 KiB)

Please take a look.

https://codereview.appspot.com/36540043/diff/40001/utils/tailer/tailer.go
File utils/tailer/tailer.go (right):

https://codereview.appspot.com/36540043/diff/40001/utils/tailer/tailer.go#newcode41
utils/tailer/tailer.go:41: // Writer. The reading beginns the specified
number of matching lines
On 2013/12/10 15:14:25, rog wrote:
> s/beginns/begins/

Done.

https://codereview.appspot.com/36540043/diff/40001/utils/tailer/tailer.go#newcode43
utils/tailer/tailer.go:43: func NewStandardTailer(readSeeker
io.ReadSeeker, writer io.Writer, lines int, filter TailerFilterFunc)
*Tailer {
On 2013/12/10 15:14:25, rog wrote:
> Given that this function is going to be the one that everyone calls, I
think I'd
> name it NewTailer, but do we actually need to expose buffer size and
poll
> interval publicly except to the tailer tests?

Done.

https://codereview.appspot.com/36540043/diff/40001/utils/tailer/tailer.go#newcode126
utils/tailer/tailer.go:126: readBuffer := make([]byte, t.bufsize)
On 2013/12/10 15:14:25, rog wrote:
> This seems rather complex to me - I don't fully trust myself to vet
the logic.

Used it, even if I have the same troubles following your array logic and
the needed debugging. Thankfully the comments helped to follow the idea.
And at least it has less lines. ;) Thx.

https://codereview.appspot.com/36540043/diff/40001/utils/tailer/tailer.go#newcode198
utils/tailer/tailer.go:198: // Reached beginnig of data.
On 2013/12/10 15:14:25, rog wrote:
> s/beginnig/beginning/

Not needed anymore due to new function.

https://codereview.appspot.com/36540043/diff/40001/utils/tailer/tailer.go#newcode212
utils/tailer/tailer.go:212: buffer, err := t.reader.ReadBytes(delimiter)
On 2013/12/10 15:14:25, rog wrote:
> ReadBytes can't return ErrBufferFull.

> I think you want to use ReadSlice.
> Also, if the first call succeeds, we should just return the slice that
we got
> from ReadSlice - that way in the usual case that the line is shorter
than the
> bufio buffer we can avoid any allocation.

Done.

https://codereview.appspot.com/36540043/diff/40001/utils/tailer/tailer.go#newcode226
utils/tailer/tailer.go:226: if len(line) == 0 || line[len(line)-1] ==
delimiter {
On 2013/12/10 15:14:25, rog wrote:
> This can't happen. (from the docs: "returns err != nil if and only if
the
> returned data does not end in delim")

Done.

https://codereview.appspot.com/36540043/diff/40001/utils/tailer/tailer.go#newcode234
utils/tailer/tailer.go:234: t.readSeeker.Seek(offset, os.SEEK_END)
On 2013/12/10 15:14:25, rog wrote:
> This is racy. We can't seek from the end because the end might be
constantly
> changing. I think we need to keep track of where we're reading in the
file and
> seek back to the absolute offset of the start of the partial line.

Done.

https://codereview.appspot.com/36540043/diff/40001/utils/tailer/tailer.go#newcode247
utils/tailer/tailer.go:247: return false
On 2013/12/10 15:14:25, rog wrote:
> Why?

Yeah, too much automatic filtering here. The receiver of the data should
decide. Removed it.

https://codereview.appspot.com/36540043/diff/40001/utils/tailer/tailer.go#newcode252
utils/tailer/tailer.go:252: return t.filter(line)
On 2013/12/10 15:14...

Read more...

Revision history for this message
Roger Peppe (rogpeppe) wrote :
Download full text (3.8 KiB)

Another round of suggestions, with one or two things still to fix.
Thanks for bearing with me!

https://codereview.appspot.com/36540043/diff/40001/utils/tailer/tailer.go
File utils/tailer/tailer.go (right):

https://codereview.appspot.com/36540043/diff/40001/utils/tailer/tailer.go#newcode252
utils/tailer/tailer.go:252: return t.filter(line)
On 2013/12/11 17:13:05, mue wrote:
> On 2013/12/10 15:14:25, rog wrote:
> > Consider trimming \r?\n from the end of the line before calling
filter?

> Why?

Then the filter functions don't need to worry about line termination
characters at all and we know that we're safe even when we have windows
stuff generating \r\n lines.

https://codereview.appspot.com/36540043/diff/60001/utils/tailer/tailer.go
File utils/tailer/tailer.go (right):

https://codereview.appspot.com/36540043/diff/60001/utils/tailer/tailer.go#newcode131
utils/tailer/tailer.go:131: buffer := make([]byte, t.bufsize)
If we've got t.bufsize, "buf" would seem logical as a name.
Not that it matters much though.

https://codereview.appspot.com/36540043/diff/60001/utils/tailer/tailer.go#newcode211
utils/tailer/tailer.go:211: line = append(line, slice...)
Unfortunately I think this is wrong, as we may be appending to the
internal bufio.Reader buffer, which may be overwritten by the ReadSlice
call.

How about separating the first-slice case from the appending logic?

Something like this, perhaps?

// readLine reads the next valid line from the reader, even if it is
// larger than the reader buffer.
func (t *Tailer) readLine() ([]byte, error) {
 for {
  slice, err := t.reader.ReadSlice(delimiter)
  if err == nil {
   if t.isValid(slice) {
    return slice, nil
   }
   continue
  }
  line := append([]byte(nil), slice...)
  for err == bufio.ErrBufferFull {
   slice, err = t.reader.ReadSlice(delimiter)
   line = append(line, slice...)
  }
  switch err {
  case nil:
   if t.isValid(line) {
    return line, nil
   }
  case io.EOF:
   // EOF without delimiter, step back.
   t.readSeeker.Seek(-int64(len(line)), os.SEEK_CUR)
   return nil, err
  default:
   return nil, err
  }
 }
}

https://codereview.appspot.com/36540043/diff/60001/utils/tailer/tailer.go#newcode224
utils/tailer/tailer.go:224: t.readSeeker.Seek(-offset, os.SEEK_CUR)
Ah, that's nicer than my suggestion, thanks.

https://codereview.appspot.com/36540043/diff/60001/utils/tailer/tailer_test.go
File utils/tailer/tailer_test.go (right):

https://codereview.appspot.com/36540043/diff/60001/utils/tailer/tailer_test.go#newcode30
utils/tailer/tailer_test.go:30: buffer := bytes.NewBuffer(nil)
I'm afraid the way you're using this is racy (and in all the other tests
too) because you're reading from the buffer (with assertCollected)
concurrently with the tailer writing to it.

You could use io.Pipe instead of bytes.Buffer to avoid the problem.

BTW whenever you've got goroutine-based code, it's worth running go test
-race to check this kind of stuff.

https://codereview.appspot.com/36540043/diff/60001/utils/tailer/tailer_test.go#newcode47
utils/tailer/tailer_test.go:47: func (tailerSuite)
TestLaggedTermination(c *gc.C) {
I see 10 of these tests are almost identical.
This suggests to me that a table-based te...

Read more...

Revision history for this message
Frank Mueller (themue) wrote :
Download full text (3.4 KiB)

Please take a look.

https://codereview.appspot.com/36540043/diff/60001/utils/tailer/tailer.go
File utils/tailer/tailer.go (right):

https://codereview.appspot.com/36540043/diff/60001/utils/tailer/tailer.go#newcode131
utils/tailer/tailer.go:131: buffer := make([]byte, t.bufsize)
On 2013/12/11 18:27:41, rog wrote:
> If we've got t.bufsize, "buf" would seem logical as a name.
> Not that it matters much though.

Ack, but in the other direction. ;)

https://codereview.appspot.com/36540043/diff/60001/utils/tailer/tailer.go#newcode211
utils/tailer/tailer.go:211: line = append(line, slice...)
On 2013/12/11 18:27:41, rog wrote:
> Unfortunately I think this is wrong, as we may be appending to the
internal
> bufio.Reader buffer, which may be overwritten by the ReadSlice call.

> How about separating the first-slice case from the appending logic?

> Something like this, perhaps?

> // readLine reads the next valid line from the reader, even if it is
> // larger than the reader buffer.
> func (t *Tailer) readLine() ([]byte, error) {
> for {
> slice, err := t.reader.ReadSlice(delimiter)
> if err == nil {
> if t.isValid(slice) {
> return slice, nil
> }
> continue
> }
> line := append([]byte(nil), slice...)
> for err == bufio.ErrBufferFull {
> slice, err = t.reader.ReadSlice(delimiter)
> line = append(line, slice...)
> }
> switch err {
> case nil:
> if t.isValid(line) {
> return line, nil
> }
> case io.EOF:
> // EOF without delimiter, step back.
> t.readSeeker.Seek(-int64(len(line)), os.SEEK_CUR)
> return nil, err
> default:
> return nil, err
> }
> }
> }

Yeah, that's better, thx.

https://codereview.appspot.com/36540043/diff/60001/utils/tailer/tailer_test.go
File utils/tailer/tailer_test.go (right):

https://codereview.appspot.com/36540043/diff/60001/utils/tailer/tailer_test.go#newcode30
utils/tailer/tailer_test.go:30: buffer := bytes.NewBuffer(nil)
On 2013/12/11 18:27:41, rog wrote:
> I'm afraid the way you're using this is racy (and in all the other
tests too)
> because you're reading from the buffer (with assertCollected)
concurrently with
> the tailer writing to it.

> You could use io.Pipe instead of bytes.Buffer to avoid the problem.

> BTW whenever you've got goroutine-based code, it's worth running go
test -race
> to check this kind of stuff.

Done.

https://codereview.appspot.com/36540043/diff/60001/utils/tailer/tailer_test.go#newcode47
utils/tailer/tailer_test.go:47: func (tailerSuite)
TestLaggedTermination(c *gc.C) {
On 2013/12/11 18:27:41, rog wrote:
> I see 10 of these tests are almost identical.
> This suggests to me that a table-based test might work well here, and
make it
> easier to see what's actually being tested.

> Then we could easily add quite a few more tests (for example, I'd like
to see
> tests with blank lines in various places, and probably some more too).

Done.

https://codereview.appspot.com/36540043/diff/60001/utils/tailer/tailer_test.go#newcode264
utils/tailer/tailer_test.go:264: disturber := func(lines []string) {
On 2013/12/11 18:27:41, rog wrote:
> This should probably be defined just before it's used.

Now different approach with table driven tests.

http...

Read more...

Revision history for this message
Roger Peppe (rogpeppe) wrote :
Download full text (3.7 KiB)

Another round. Hopefully done after this one!

https://codereview.appspot.com/36540043/diff/80001/utils/tailer/tailer.go
File utils/tailer/tailer.go (right):

https://codereview.appspot.com/36540043/diff/80001/utils/tailer/tailer.go#newcode49
utils/tailer/tailer.go:49: func NewTailer(readSeeker io.ReadSeeker,
writeCloser io.WriteCloser, lines int, filter TailerFilterFunc) *Tailer
{
I don't think there's any particular reason we want to give this code
the responsibility for closing the writer. It's quite possible we might
have several tailers writing to the same writer, for example.

Better would be a Wait method (and perhaps a Dead method) to wait until
it's finished. Then the caller has the option of closing the writer.

https://codereview.appspot.com/36540043/diff/80001/utils/tailer/tailer_test.go
File utils/tailer/tailer_test.go (right):

https://codereview.appspot.com/36540043/diff/80001/utils/tailer/tailer_test.go#newcode40
utils/tailer/tailer_test.go:40: {
If we move this brace onto the previous line, we can save a level of
indentation in all these tests.

https://codereview.appspot.com/36540043/diff/80001/utils/tailer/tailer_test.go#newcode42
utils/tailer/tailer_test.go:42: data: data[26:29],
I like the new tests much better now. One thought though:

I'm not convinced that referring to "data" in all these tests makes them
more readable - I have no idea what's in data[26:29] without counting or
using grep -n.

Something like the below shows me exactly what's going
on without me needing to refer to multiple places,
and is independent of any changes else in the code.

It would make it considerably easier for me to scan down
the tests to ensure that each one represents reasonable
behaviour.

{
     description: "lines are longer than buffer size",
     data: []string{
         "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n",
         "0123456789012345678901234567890123456789012345678901\n",
         "the quick brown fox ",
     },
     initialLinesWritten: 1,
     initialLinesRequested: 1,
     bufferSize: 5,
     initialCollectedData: []string{
         "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n",
     },
     appendedCollectedData: []string{
         "0123456789012345678901234567890123456789012345678901\n",
     },
},

https://codereview.appspot.com/36540043/diff/80001/utils/tailer/tailer_test.go#newcode215
utils/tailer/tailer_test.go:215: line, err := buffer.ReadString('\n')
If the tailer doesn't produce enough data, we'll block here forever I
think, regardless of the timeout.

I think you probably want something more like the below:

func assertCollected(c *gc.C, reader io.Reader, compare []string,
injection func([]string)
      ) {
    lineChan := make(chan string)
    go func() {
        defer close(lineChan)
        reader := bufio.NewReader(reader)
        for {
            line, err := reader.ReadString('\n')
            if !c.Check(err, gc.IsNil) {
                return
            }
            lineChan <- line
        }
    }()
    timeout := time.After(testing.LongWait)
    for {
         select {
         case line, ok := <-lineChan:
              c.Assert(ok, jc.IsTrue)
...

Read more...

Revision history for this message
Frank Mueller (themue) wrote :
Download full text (4.4 KiB)

Please take a look.

https://codereview.appspot.com/36540043/diff/80001/utils/tailer/tailer.go
File utils/tailer/tailer.go (right):

https://codereview.appspot.com/36540043/diff/80001/utils/tailer/tailer.go#newcode49
utils/tailer/tailer.go:49: func NewTailer(readSeeker io.ReadSeeker,
writeCloser io.WriteCloser, lines int, filter TailerFilterFunc) *Tailer
{
On 2013/12/12 14:46:39, rog wrote:
> I don't think there's any particular reason we want to give this code
the
> responsibility for closing the writer. It's quite possible we might
have several
> tailers writing to the same writer, for example.

> Better would be a Wait method (and perhaps a Dead method) to wait
until
> it's finished. Then the caller has the option of closing the writer.

Done as discussed.

https://codereview.appspot.com/36540043/diff/80001/utils/tailer/tailer_test.go
File utils/tailer/tailer_test.go (right):

https://codereview.appspot.com/36540043/diff/80001/utils/tailer/tailer_test.go#newcode40
utils/tailer/tailer_test.go:40: {
On 2013/12/12 14:46:39, rog wrote:
> If we move this brace onto the previous line, we can save a level of
indentation
> in all these tests.

Done.

https://codereview.appspot.com/36540043/diff/80001/utils/tailer/tailer_test.go#newcode42
utils/tailer/tailer_test.go:42: data: data[26:29],
On 2013/12/12 14:46:39, rog wrote:
> I like the new tests much better now. One thought though:

> I'm not convinced that referring to "data" in all these tests makes
them more
> readable - I have no idea what's in data[26:29] without counting or
using grep
> -n.

> Something like the below shows me exactly what's going
> on without me needing to refer to multiple places,
> and is independent of any changes else in the code.

> It would make it considerably easier for me to scan down
> the tests to ensure that each one represents reasonable
> behaviour.

> {
> description: "lines are longer than buffer size",
> data: []string{
> "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n",
> "0123456789012345678901234567890123456789012345678901\n",
> "the quick brown fox ",
> },
> initialLinesWritten: 1,
> initialLinesRequested: 1,
> bufferSize: 5,
> initialCollectedData: []string{
> "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n",
> },
> appendedCollectedData: []string{
> "0123456789012345678901234567890123456789012345678901\n",
> },
> },

This case the test table would blow up again and also would be
inconsistent (e.g. when using the standard larger group of lines). So
now using three different data variables with more speaking names as a
compromise. Also located them near to the table.

https://codereview.appspot.com/36540043/diff/80001/utils/tailer/tailer_test.go#newcode215
utils/tailer/tailer_test.go:215: line, err := buffer.ReadString('\n')
On 2013/12/12 14:46:39, rog wrote:
> If the tailer doesn't produce enough data, we'll block here forever I
think,
> regardless of the timeout.

> I think you probably want something more like the below:

> func assertCollected(c *gc.C, reader io.Reader, compare []string,
injection
> func([]st...

Read more...

Revision history for this message
Roger Peppe (rogpeppe) wrote :

LGTM with the tests fixed, thanks!

https://codereview.appspot.com/36540043/diff/80001/utils/tailer/tailer_test.go
File utils/tailer/tailer_test.go (right):

https://codereview.appspot.com/36540043/diff/80001/utils/tailer/tailer_test.go#newcode42
utils/tailer/tailer_test.go:42: data: data[26:29],
On 2013/12/13 11:23:12, mue wrote:
> On 2013/12/12 14:46:39, rog wrote:
> > I like the new tests much better now. One thought though:
> >
> > I'm not convinced that referring to "data" in all these tests makes
them more
> > readable - I have no idea what's in data[26:29] without counting or
using grep
> > -n.
> >
> > Something like the below shows me exactly what's going
> > on without me needing to refer to multiple places,
> > and is independent of any changes else in the code.
> >
> > It would make it considerably easier for me to scan down
> > the tests to ensure that each one represents reasonable
> > behaviour.
> >
> > {
> > description: "lines are longer than buffer size",
> > data: []string{
> > "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n",
> > "0123456789012345678901234567890123456789012345678901\n",
> > "the quick brown fox ",
> > },
> > initialLinesWritten: 1,
> > initialLinesRequested: 1,
> > bufferSize: 5,
> > initialCollectedData: []string{
> > "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n",
> > },
> > appendedCollectedData: []string{
> > "0123456789012345678901234567890123456789012345678901\n",
> > },
> > },
> >

> This case the test table would blow up again and also would be
inconsistent
> (e.g. when using the standard larger group of lines). So now using
three
> different data variables with more speaking names as a compromise.
Also located
> them near to the table.

I'm with you about alphabetData (which is also nicely memorable, which
helps), but most of them do really benefit from having the data visible
in the test.

I processed my local copy to move much of the test data into the tests,
and two issues became immediately obvious, though I'd missed them when
cross-referencing, which I think is a reasonable indication that the
tests are not currently that clear.

Here's my modified version of the test. Only alphabetData remains. I'd
prefer it if we could use this form.

http://paste.ubuntu.com/6566524/

https://codereview.appspot.com/36540043/diff/100001/utils/tailer/tailer_test.go
File utils/tailer/tailer_test.go (right):

https://codereview.appspot.com/36540043/diff/100001/utils/tailer/tailer_test.go#newcode100
utils/tailer/tailer_test.go:100: description: "lines are
longer than buffer size, missing termination of last line",
How is the last line missing termination here?

https://codereview.appspot.com/36540043/diff/100001/utils/tailer/tailer_test.go#newcode117
utils/tailer/tailer_test.go:117: data:
unterminatedData[0:2],
The last line doesn't seem to be missing termination here.

https://codereview.appspot.com/36540043/

Revision history for this message
Frank Mueller (themue) wrote :

Please take a look.

https://codereview.appspot.com/36540043/diff/100001/utils/tailer/tailer_test.go
File utils/tailer/tailer_test.go (right):

https://codereview.appspot.com/36540043/diff/100001/utils/tailer/tailer_test.go#newcode100
utils/tailer/tailer_test.go:100: description: "lines are
longer than buffer size, missing termination of last line",
On 2013/12/13 12:52:14, rog wrote:
> How is the last line missing termination here?

Done.

https://codereview.appspot.com/36540043/diff/100001/utils/tailer/tailer_test.go#newcode117
utils/tailer/tailer_test.go:117: data:
unterminatedData[0:2],
On 2013/12/13 12:52:14, rog wrote:
> The last line doesn't seem to be missing termination here.

Done.

https://codereview.appspot.com/36540043/

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== added directory 'utils/tailer'
2=== added file 'utils/tailer/export_test.go'
3--- utils/tailer/export_test.go 1970-01-01 00:00:00 +0000
4+++ utils/tailer/export_test.go 2013-12-13 13:24:18 +0000
5@@ -0,0 +1,6 @@
6+// Copyright 2013 Canonical Ltd.
7+// Licensed under the AGPLv3, see LICENCE file for details.
8+
9+package tailer
10+
11+var NewTestTailer = newTailer
12
13=== added file 'utils/tailer/tailer.go'
14--- utils/tailer/tailer.go 1970-01-01 00:00:00 +0000
15+++ utils/tailer/tailer.go 2013-12-13 13:24:18 +0000
16@@ -0,0 +1,250 @@
17+// Copyright 2013 Canonical Ltd.
18+// Licensed under the AGPLv3, see LICENCE file for details.
19+
20+package tailer
21+
22+import (
23+ "bufio"
24+ "bytes"
25+ "io"
26+ "os"
27+ "time"
28+
29+ "launchpad.net/tomb"
30+)
31+
32+const (
33+ bufferSize = 4096
34+ polltime = time.Second
35+ delimiter = '\n'
36+)
37+
38+var (
39+ delimiters = []byte{delimiter}
40+)
41+
42+// TailerFilterFunc decides if a line shall be tailed (func is nil or
43+// returns true) of shall be omitted (func returns false).
44+type TailerFilterFunc func(line []byte) bool
45+
46+// Tailer reads an input line by line an tails them into the passed Writer.
47+// The lines have to be terminated with a newline.
48+type Tailer struct {
49+ tomb tomb.Tomb
50+ readSeeker io.ReadSeeker
51+ reader *bufio.Reader
52+ writeCloser io.WriteCloser
53+ writer *bufio.Writer
54+ lines int
55+ filter TailerFilterFunc
56+ bufferSize int
57+ polltime time.Duration
58+}
59+
60+// NewTailer starts a Tailer which reads strings from the passed
61+// ReadSeeker line by line. If a filter function is specified the read
62+// lines are filtered. The matching lines are written to the passed
63+// Writer. The reading begins the specified number of matching lines
64+// from the end.
65+func NewTailer(readSeeker io.ReadSeeker, writer io.Writer, lines int, filter TailerFilterFunc) *Tailer {
66+ return newTailer(readSeeker, writer, lines, filter, bufferSize, polltime)
67+}
68+
69+// newTailer starts a Tailer like NewTailer but allows the setting of
70+// the read buffer size and the time between pollings for testing.
71+func newTailer(readSeeker io.ReadSeeker, writer io.Writer, lines int, filter TailerFilterFunc,
72+ bufferSize int, polltime time.Duration) *Tailer {
73+ t := &Tailer{
74+ readSeeker: readSeeker,
75+ reader: bufio.NewReaderSize(readSeeker, bufferSize),
76+ writer: bufio.NewWriter(writer),
77+ lines: lines,
78+ filter: filter,
79+ bufferSize: bufferSize,
80+ polltime: polltime,
81+ }
82+ go func() {
83+ defer t.tomb.Done()
84+ t.tomb.Kill(t.loop())
85+ }()
86+ return t
87+}
88+
89+// Stop tells the tailer to stop working.
90+func (t *Tailer) Stop() error {
91+ t.tomb.Kill(nil)
92+ return t.tomb.Wait()
93+}
94+
95+// Wait waits until the tailer is stopped due to command
96+// or an error. In case of an error it returns the reason.
97+func (t *Tailer) Wait() error {
98+ return t.tomb.Wait()
99+}
100+
101+// Dead returns the channel that can be used to wait until
102+// the tailer is stopped.
103+func (t *Tailer) Dead() <-chan struct{} {
104+ return t.tomb.Dead()
105+}
106+
107+// Err returns a possible error.
108+func (t *Tailer) Err() error {
109+ return t.tomb.Err()
110+}
111+
112+// loop writes the last lines based on the buffer size to the
113+// writer and then polls for more data to write it to the
114+// writer too.
115+func (t *Tailer) loop() error {
116+ // Position the readSeeker.
117+ if err := t.seekLastLines(); err != nil {
118+ return err
119+ }
120+ // Start polling.
121+ // TODO(mue) 2013-12-06
122+ // Handling of read-seeker/files being truncated during
123+ // tailing is currently missing!
124+ timer := time.NewTimer(0)
125+ for {
126+ select {
127+ case <-t.tomb.Dying():
128+ return nil
129+ case <-timer.C:
130+ for {
131+ line, readErr := t.readLine()
132+ _, writeErr := t.writer.Write(line)
133+ if writeErr != nil {
134+ return writeErr
135+ }
136+ if readErr != nil {
137+ if readErr != io.EOF {
138+ return readErr
139+ }
140+ break
141+ }
142+ }
143+ if writeErr := t.writer.Flush(); writeErr != nil {
144+ return writeErr
145+ }
146+ timer.Reset(t.polltime)
147+ }
148+ }
149+}
150+
151+// seekLastLines sets the read position of the ReadSeeker to the
152+// wanted number of filtered lines before the end.
153+func (t *Tailer) seekLastLines() error {
154+ offset, err := t.readSeeker.Seek(0, os.SEEK_END)
155+ if err != nil {
156+ return err
157+ }
158+ seekPos := int64(0)
159+ found := 0
160+ buffer := make([]byte, t.bufferSize)
161+SeekLoop:
162+ for offset > 0 {
163+ // buffer contains the data left over from the
164+ // previous iteration.
165+ space := cap(buffer) - len(buffer)
166+ if space < t.bufferSize {
167+ // Grow buffer.
168+ newBuffer := make([]byte, len(buffer), cap(buffer)*2)
169+ copy(newBuffer, buffer)
170+ buffer = newBuffer
171+ space = cap(buffer) - len(buffer)
172+ }
173+ if int64(space) > offset {
174+ // Use exactly the right amount of space if there's
175+ // only a small amount remaining.
176+ space = int(offset)
177+ }
178+ // Copy data remaining from last time to the end of the buffer,
179+ // so we can read into the right place.
180+ copy(buffer[space:cap(buffer)], buffer)
181+ buffer = buffer[0 : len(buffer)+space]
182+ offset -= int64(space)
183+ _, err := t.readSeeker.Seek(offset, os.SEEK_SET)
184+ if err != nil {
185+ return err
186+ }
187+ _, err = io.ReadFull(t.readSeeker, buffer[0:space])
188+ if err != nil {
189+ return err
190+ }
191+ // Find the end of the last line in the buffer.
192+ // This will discard any unterminated line at the end
193+ // of the file.
194+ end := bytes.LastIndex(buffer, delimiters)
195+ if end == -1 {
196+ // No end of line found - discard incomplete
197+ // line and continue looking. If this happens
198+ // at the beginning of the file, we don't care
199+ // because we're going to stop anyway.
200+ buffer = buffer[:0]
201+ continue
202+ }
203+ end++
204+ for {
205+ start := bytes.LastIndex(buffer[0:end-1], delimiters)
206+ if start == -1 && offset >= 0 {
207+ break
208+ }
209+ start++
210+ if t.isValid(buffer[start:end]) {
211+ found++
212+ if found >= t.lines {
213+ seekPos = offset + int64(start)
214+ break SeekLoop
215+ }
216+ }
217+ end = start
218+ }
219+ // Leave the last line in buffer, as we don't know whether
220+ // it's complete or not.
221+ buffer = buffer[0:end]
222+ }
223+ // Final positioning.
224+ t.readSeeker.Seek(seekPos, os.SEEK_SET)
225+ return nil
226+}
227+
228+// readLine reads the next valid line from the reader, even if it is
229+// larger than the reader buffer.
230+func (t *Tailer) readLine() ([]byte, error) {
231+ for {
232+ slice, err := t.reader.ReadSlice(delimiter)
233+ if err == nil {
234+ if t.isValid(slice) {
235+ return slice, nil
236+ }
237+ continue
238+ }
239+ line := append([]byte(nil), slice...)
240+ for err == bufio.ErrBufferFull {
241+ slice, err = t.reader.ReadSlice(delimiter)
242+ line = append(line, slice...)
243+ }
244+ switch err {
245+ case nil:
246+ if t.isValid(line) {
247+ return line, nil
248+ }
249+ case io.EOF:
250+ // EOF without delimiter, step back.
251+ t.readSeeker.Seek(-int64(len(line)), os.SEEK_CUR)
252+ return nil, err
253+ default:
254+ return nil, err
255+ }
256+ }
257+}
258+
259+// isValid checks if the passed line is valid by checking if the
260+// line has content, the filter function is nil or it returns true.
261+func (t *Tailer) isValid(line []byte) bool {
262+ if t.filter == nil {
263+ return true
264+ }
265+ return t.filter(line)
266+}
267
268=== added file 'utils/tailer/tailer_test.go'
269--- utils/tailer/tailer_test.go 1970-01-01 00:00:00 +0000
270+++ utils/tailer/tailer_test.go 2013-12-13 13:24:18 +0000
271@@ -0,0 +1,496 @@
272+// Copyright 2013 Canonical Ltd.
273+// Licensed under the AGPLv3, see LICENCE file for details.
274+
275+package tailer_test
276+
277+import (
278+ "bufio"
279+ "bytes"
280+ "fmt"
281+ "io"
282+ "sync"
283+ stdtesting "testing"
284+ "time"
285+
286+ gc "launchpad.net/gocheck"
287+
288+ "launchpad.net/juju-core/testing"
289+ "launchpad.net/juju-core/utils/tailer"
290+)
291+
292+func Test(t *stdtesting.T) {
293+ gc.TestingT(t)
294+}
295+
296+type tailerSuite struct{}
297+
298+var _ = gc.Suite(tailerSuite{})
299+
300+var alphabetData = []string{
301+ "alpha alpha\n",
302+ "bravo bravo\n",
303+ "charlie charlie\n",
304+ "delta delta\n",
305+ "echo echo\n",
306+ "foxtrott foxtrott\n",
307+ "golf golf\n",
308+ "hotel hotel\n",
309+ "india india\n",
310+ "juliet juliet\n",
311+ "kilo kilo\n",
312+ "lima lima\n",
313+ "mike mike\n",
314+ "november november\n",
315+ "oscar oscar\n",
316+ "papa papa\n",
317+ "quebec quebec\n",
318+ "romeo romeo\n",
319+ "sierra sierra\n",
320+ "tango tango\n",
321+ "uniform uniform\n",
322+ "victor victor\n",
323+ "whiskey whiskey\n",
324+ "x-ray x-ray\n",
325+ "yankee yankee\n",
326+ "zulu zulu\n",
327+}
328+
329+var tests = []struct {
330+ description string
331+ data []string
332+ initialLinesWritten int
333+ initialLinesRequested int
334+ bufferSize int
335+ filter tailer.TailerFilterFunc
336+ injector func(*tailer.Tailer, *readSeeker) func([]string)
337+ initialCollectedData []string
338+ appendedCollectedData []string
339+ err string
340+}{{
341+ description: "lines are longer than buffer size",
342+ data: []string{
343+ "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n",
344+ "0123456789012345678901234567890123456789012345678901\n",
345+ },
346+ initialLinesWritten: 1,
347+ initialLinesRequested: 1,
348+ bufferSize: 5,
349+ initialCollectedData: []string{
350+ "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n",
351+ },
352+ appendedCollectedData: []string{
353+ "0123456789012345678901234567890123456789012345678901\n",
354+ },
355+}, {
356+ description: "lines are longer than buffer size, missing termination of last line",
357+ data: []string{
358+ "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n",
359+ "0123456789012345678901234567890123456789012345678901\n",
360+ "the quick brown fox ",
361+ },
362+ initialLinesWritten: 1,
363+ initialLinesRequested: 1,
364+ bufferSize: 5,
365+ initialCollectedData: []string{
366+ "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n",
367+ },
368+ appendedCollectedData: []string{
369+ "0123456789012345678901234567890123456789012345678901\n",
370+ },
371+}, {
372+ description: "lines are longer than buffer size, last line is terminated later",
373+ data: []string{
374+ "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n",
375+ "0123456789012345678901234567890123456789012345678901\n",
376+ "the quick brown fox ",
377+ "jumps over the lazy dog\n",
378+ },
379+ initialLinesWritten: 1,
380+ initialLinesRequested: 1,
381+ bufferSize: 5,
382+ initialCollectedData: []string{
383+ "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n",
384+ },
385+ appendedCollectedData: []string{
386+ "0123456789012345678901234567890123456789012345678901\n",
387+ "the quick brown fox jumps over the lazy dog\n",
388+ },
389+}, {
390+ description: "missing termination of last line",
391+ data: []string{
392+ "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n",
393+ "0123456789012345678901234567890123456789012345678901\n",
394+ "the quick brown fox ",
395+ },
396+ initialLinesWritten: 1,
397+ initialLinesRequested: 1,
398+ initialCollectedData: []string{
399+ "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n",
400+ },
401+ appendedCollectedData: []string{
402+ "0123456789012345678901234567890123456789012345678901\n",
403+ },
404+}, {
405+ description: "last line is terminated later",
406+ data: []string{
407+ "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n",
408+ "0123456789012345678901234567890123456789012345678901\n",
409+ "the quick brown fox ",
410+ "jumps over the lazy dog\n",
411+ },
412+ initialLinesWritten: 1,
413+ initialLinesRequested: 1,
414+ initialCollectedData: []string{
415+ "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n",
416+ },
417+ appendedCollectedData: []string{
418+ "0123456789012345678901234567890123456789012345678901\n",
419+ "the quick brown fox jumps over the lazy dog\n",
420+ },
421+}, {
422+ description: "more lines already written than initially requested",
423+ data: alphabetData,
424+ initialLinesWritten: 5,
425+ initialLinesRequested: 3,
426+ initialCollectedData: []string{
427+ "charlie charlie\n",
428+ "delta delta\n",
429+ "echo echo\n",
430+ },
431+ appendedCollectedData: alphabetData[5:],
432+}, {
433+ description: "less lines already written than initially requested",
434+ data: alphabetData,
435+ initialLinesWritten: 3,
436+ initialLinesRequested: 5,
437+ initialCollectedData: []string{
438+ "alpha alpha\n",
439+ "bravo bravo\n",
440+ "charlie charlie\n",
441+ },
442+ appendedCollectedData: alphabetData[3:],
443+}, {
444+ description: "lines are longer than buffer size, more lines already written than initially requested",
445+ data: alphabetData,
446+ initialLinesWritten: 5,
447+ initialLinesRequested: 3,
448+ bufferSize: 5,
449+ initialCollectedData: []string{
450+ "charlie charlie\n",
451+ "delta delta\n",
452+ "echo echo\n",
453+ },
454+ appendedCollectedData: alphabetData[5:],
455+}, {
456+ description: "lines are longer than buffer size, less lines already written than initially requested",
457+ data: alphabetData,
458+ initialLinesWritten: 3,
459+ initialLinesRequested: 5,
460+ bufferSize: 5,
461+ initialCollectedData: []string{
462+ "alpha alpha\n",
463+ "bravo bravo\n",
464+ "charlie charlie\n",
465+ },
466+ appendedCollectedData: alphabetData[3:],
467+}, {
468+ description: "filter lines which contain the char 'e'",
469+ data: alphabetData,
470+ initialLinesWritten: 10,
471+ initialLinesRequested: 3,
472+ filter: func(line []byte) bool {
473+ return bytes.Contains(line, []byte{'e'})
474+ },
475+ initialCollectedData: []string{
476+ "echo echo\n",
477+ "hotel hotel\n",
478+ "juliet juliet\n",
479+ },
480+ appendedCollectedData: []string{
481+ "mike mike\n",
482+ "november november\n",
483+ "quebec quebec\n",
484+ "romeo romeo\n",
485+ "sierra sierra\n",
486+ "whiskey whiskey\n",
487+ "yankee yankee\n",
488+ },
489+}, {
490+ description: "stop tailing after 10 collected lines",
491+ data: alphabetData,
492+ initialLinesWritten: 5,
493+ initialLinesRequested: 3,
494+ injector: func(t *tailer.Tailer, rs *readSeeker) func([]string) {
495+ return func(lines []string) {
496+ if len(lines) == 10 {
497+ t.Stop()
498+ }
499+ }
500+ },
501+ initialCollectedData: []string{
502+ "charlie charlie\n",
503+ "delta delta\n",
504+ "echo echo\n",
505+ },
506+ appendedCollectedData: alphabetData[5:],
507+}, {
508+ description: "generate an error after 10 collected lines",
509+ data: alphabetData,
510+ initialLinesWritten: 5,
511+ initialLinesRequested: 3,
512+ injector: func(t *tailer.Tailer, rs *readSeeker) func([]string) {
513+ return func(lines []string) {
514+ if len(lines) == 10 {
515+ rs.setError(fmt.Errorf("ouch after 10 lines"))
516+ }
517+ }
518+ },
519+ initialCollectedData: []string{
520+ "charlie charlie\n",
521+ "delta delta\n",
522+ "echo echo\n",
523+ },
524+ appendedCollectedData: alphabetData[5:],
525+ err: "ouch after 10 lines",
526+}, {
527+ description: "more lines already written than initially requested, some empty, unfiltered",
528+ data: []string{
529+ "one one\n",
530+ "two two\n",
531+ "\n",
532+ "\n",
533+ "three three\n",
534+ "four four\n",
535+ "\n",
536+ "\n",
537+ "five five\n",
538+ "six six\n",
539+ },
540+ initialLinesWritten: 3,
541+ initialLinesRequested: 2,
542+ initialCollectedData: []string{
543+ "two two\n",
544+ "\n",
545+ },
546+ appendedCollectedData: []string{
547+ "\n",
548+ "three three\n",
549+ "four four\n",
550+ "\n",
551+ "\n",
552+ "five five\n",
553+ "six six\n",
554+ },
555+}, {
556+ description: "more lines already written than initially requested, some empty, those filtered",
557+ data: []string{
558+ "one one\n",
559+ "two two\n",
560+ "\n",
561+ "\n",
562+ "three three\n",
563+ "four four\n",
564+ "\n",
565+ "\n",
566+ "five five\n",
567+ "six six\n",
568+ },
569+ initialLinesWritten: 3,
570+ initialLinesRequested: 2,
571+ filter: func(line []byte) bool {
572+ return len(bytes.TrimSpace(line)) > 0
573+ },
574+ initialCollectedData: []string{
575+ "one one\n",
576+ "two two\n",
577+ },
578+ appendedCollectedData: []string{
579+ "three three\n",
580+ "four four\n",
581+ "five five\n",
582+ "six six\n",
583+ },
584+}}
585+
586+func (tailerSuite) TestTailer(c *gc.C) {
587+ for i, test := range tests {
588+ c.Logf("Test #%d) %s", i, test.description)
589+ bufferSize := test.bufferSize
590+ if bufferSize == 0 {
591+ // Default value.
592+ bufferSize = 4096
593+ }
594+ reader, writer := io.Pipe()
595+ sigc := make(chan struct{}, 1)
596+ rs := startReadSeeker(c, test.data, test.initialLinesWritten, sigc)
597+ tailer := tailer.NewTestTailer(rs, writer, test.initialLinesRequested, test.filter, bufferSize, 2*time.Millisecond)
598+ linec := startReading(c, tailer, reader, writer)
599+
600+ // Collect initial data.
601+ assertCollected(c, linec, test.initialCollectedData, nil)
602+
603+ sigc <- struct{}{}
604+
605+ // Collect remaining data, possibly with injection to stop
606+ // earlier or generate an error.
607+ var injection func([]string)
608+ if test.injector != nil {
609+ injection = test.injector(tailer, rs)
610+ }
611+
612+ assertCollected(c, linec, test.appendedCollectedData, injection)
613+
614+ if test.err == "" {
615+ c.Assert(tailer.Stop(), gc.IsNil)
616+ } else {
617+ c.Assert(tailer.Err(), gc.ErrorMatches, test.err)
618+ }
619+ }
620+}
621+
622+// startReading starts a goroutine receiving the lines out of the reader
623+// in the background and passing them to a created string channel. This
624+// will used in the assertions.
625+func startReading(c *gc.C, tailer *tailer.Tailer, reader *io.PipeReader, writer *io.PipeWriter) chan string {
626+ linec := make(chan string)
627+ // Start goroutine for reading.
628+ go func() {
629+ defer close(linec)
630+ reader := bufio.NewReader(reader)
631+ for {
632+ line, err := reader.ReadString('\n')
633+ switch err {
634+ case nil:
635+ linec <- line
636+ case io.EOF:
637+ return
638+ default:
639+ c.Fail()
640+ }
641+ }
642+ }()
643+ // Close writer when tailer is stopped or has an error. Tailer using
644+ // components can do it the same way.
645+ go func() {
646+ tailer.Wait()
647+ writer.Close()
648+ }()
649+ return linec
650+}
651+
652+// assertCollected reads lines from the string channel linec. It compares if
653+// those are the one passed with compare until a timeout. If the timeout is
654+// reached earlier than all lines are collected the assertion fails. The
655+// injection function allows to interrupt the processing with a function
656+// generating an error or a regular stopping during the tailing. In case the
657+// linec is closed due to stopping or an error only the values so far care
658+// compared. Checking the reason for termination is done in the test.
659+func assertCollected(c *gc.C, linec chan string, compare []string, injection func([]string)) {
660+ timeout := time.After(testing.LongWait)
661+ lines := []string{}
662+ for {
663+ select {
664+ case line, ok := <-linec:
665+ if ok {
666+ lines = append(lines, line)
667+ if injection != nil {
668+ injection(lines)
669+ }
670+ if len(lines) == len(compare) {
671+ // All data received.
672+ c.Assert(lines, gc.DeepEquals, compare)
673+ return
674+ }
675+ } else {
676+ // linec closed after stopping or error.
677+ c.Assert(lines, gc.DeepEquals, compare[:len(lines)])
678+ return
679+ }
680+ case <-timeout:
681+ if injection == nil {
682+ c.Fatalf("timeout during tailer collection")
683+ }
684+ return
685+ }
686+ }
687+}
688+
689+// startReadSeeker returns a ReadSeeker for the Tailer. It simulates
690+// reading and seeking inside a file and also simulating an error.
691+// The goroutine waits for a signal that it can start writing the
692+// appended lines.
693+func startReadSeeker(c *gc.C, data []string, initialLeg int, sigc chan struct{}) *readSeeker {
694+ // Write initial lines into the buffer.
695+ var rs readSeeker
696+ var i int
697+ for i = 0; i < initialLeg; i++ {
698+ rs.write(data[i])
699+ }
700+
701+ go func() {
702+ <-sigc
703+
704+ for ; i < len(data); i++ {
705+ time.Sleep(5 * time.Millisecond)
706+ rs.write(data[i])
707+ }
708+ }()
709+ return &rs
710+}
711+
712+type readSeeker struct {
713+ mux sync.Mutex
714+ buffer []byte
715+ pos int
716+ err error
717+}
718+
719+func (r *readSeeker) write(s string) {
720+ r.mux.Lock()
721+ defer r.mux.Unlock()
722+ r.buffer = append(r.buffer, []byte(s)...)
723+}
724+
725+func (r *readSeeker) setError(err error) {
726+ r.mux.Lock()
727+ defer r.mux.Unlock()
728+ r.err = err
729+}
730+
731+func (r *readSeeker) Read(p []byte) (n int, err error) {
732+ r.mux.Lock()
733+ defer r.mux.Unlock()
734+ if r.err != nil {
735+ return 0, r.err
736+ }
737+ if r.pos >= len(r.buffer) {
738+ return 0, io.EOF
739+ }
740+ n = copy(p, r.buffer[r.pos:])
741+ r.pos += n
742+ return n, nil
743+}
744+
745+func (r *readSeeker) Seek(offset int64, whence int) (ret int64, err error) {
746+ r.mux.Lock()
747+ defer r.mux.Unlock()
748+ var newPos int64
749+ switch whence {
750+ case 0:
751+ newPos = offset
752+ case 1:
753+ newPos = int64(r.pos) + offset
754+ case 2:
755+ newPos = int64(len(r.buffer)) + offset
756+ default:
757+ return 0, fmt.Errorf("invalid whence: %d", whence)
758+ }
759+ if newPos < 0 {
760+ return 0, fmt.Errorf("negative position: %d", newPos)
761+ }
762+ if newPos >= 1<<31 {
763+ return 0, fmt.Errorf("position out of range: %d", newPos)
764+ }
765+ r.pos = int(newPos)
766+ return newPos, nil
767+}

Subscribers

People subscribed via source and target branches

to status/vote changes: