Merge lp:~themue/juju-core/057-tailer into lp:~go-bot/juju-core/trunk
- 057-tailer
- Merge into trunk
Status: | Merged |
---|---|
Approved by: | Frank Mueller |
Approved revision: | no longer in the source branch. |
Merged at revision: | 2152 |
Proposed branch: | lp:~themue/juju-core/057-tailer |
Merge into: | lp:~go-bot/juju-core/trunk |
Diff against target: |
767 lines (+752/-0) 3 files modified
utils/tailer/export_test.go (+6/-0) utils/tailer/tailer.go (+250/-0) utils/tailer/tailer_test.go (+496/-0) |
To merge this branch: | bzr merge lp:~themue/juju-core/057-tailer |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Juju Engineering | Pending | ||
Review via email: mp+197522@code.launchpad.net |
Commit message
utils: added Tailer for tailing of logs in API
The Tailer is the initial component of the debug logging command
of the API. It allows the filtered tailing of any ReaderSeeker.
If no filter is passed all lines will be written in the passed
Writer, otherwise only those where the filter function returns
true. The initial number of lines can also be specified, the
filter already works here. So if a File (which is a ReaderSeeker)
containes 100 lines, 10 lines are wanted and 5 match to the
filter only those 5 lines are returned.
Description of the change
utils: added Tailer for tailing of logs in API
The Tailer is the initial component of the debug logging command
of the API. It allows the filtered tailing of any ReaderSeeker.
If no filter is passed all lines will be written in the passed
Writer, otherwise only those where the filter function returns
true. The initial number of lines can also be specified, the
filter already works here. So if a File (which is a ReaderSeeker)
containes 100 lines, 10 lines are wanted and 5 match to the
filter only those 5 lines are returned.
Frank Mueller (themue) wrote : | # |
Frank Mueller (themue) wrote : | # |
Please take a look.
Roger Peppe (rogpeppe) wrote : | # |
This is a good start, but I have a few comments and suggestions.
I wonder if it might sit well inside its own package rather being added
to the grab-bag of stuff in utils.
https:/
File utils/tailer.go (right):
https:/
utils/tailer.go:30: func StartFileTailer
filter TailerFilterFunc,
I think this function is unnecessary. It's trivial for other code to
open a file.
https:/
utils/tailer.go:40: func StartTailer(
int, filter TailerFilterFunc,
Please document this function properly.
https:/
utils/tailer.go:87: println("> error:", err.Error())
d
https:/
utils/tailer.go:98: if buffer[i] == '\n' {
This isn't strictly accurate (what happens when we have an unterminated
line at the end of the file?) but it's probably ok if we never filter
unterminated lines. A comment about why it's ok might prevent future
puzzling over the correctness of the code.
https:/
utils/tailer.
s/==/>=/
defensively.
Also, if we move the if statement before the increment,
then you can lose the "-1".
https:/
utils/tailer.
I think I'd be tempted to use ReadSlice here (changing the type of
filter to func([]byte)bool.
That way the tailer won't need to generate any garbage at all AFAICS.
Unfortunately that limits the line size to the size of the bufio buffer,
which may be acceptable. An alternative is to use ReadLine and allocate
only if the line is too long.
Here's some code I wrote a while ago to do that, in case it might be
useful:
// readLine reads a line from r.
// The returned byte slice is only valid until the
// next read call on r.
func readLine(r *bufio.Reader) ([]byte, error) {
line, isPrefix, err := r.ReadLine()
if !isPrefix {
return line, err
}
buf := append([]byte(nil), line...)
for isPrefix && err == nil {
line, isPrefix, err = r.ReadLine()
buf = append(buf, line...)
}
return buf, err
}
https:/
utils/tailer.
This code is exactly the same as the code above.
Can't we do with just this? (if we just delete the
loop above and use NewTimer(0), I think it might
just work).
https:/
utils/tailer.
I think we should ignore the line at this point if it is unterminated,
something that's quite possible if the file is being written in
arbitrary chunks. If we find an unterminated line, we should seek back
to the start of the line before we start reading again.
That means we could pass the line to filter without the trailing \n,
meaning we can be more resilient if something s...
William Reade (fwereade) wrote : | # |
WIP in response to rog's review
Frank Mueller (themue) wrote : | # |
Please take a look.
https:/
File utils/tailer.go (right):
https:/
utils/tailer.go:30: func StartFileTailer
filter TailerFilterFunc,
On 2013/12/05 16:20:37, rog wrote:
> I think this function is unnecessary. It's trivial for other code to
open a
> file.
Done.
https:/
utils/tailer.go:40: func StartTailer(
int, filter TailerFilterFunc,
On 2013/12/05 16:20:37, rog wrote:
> Please document this function properly.
Done.
https:/
utils/tailer.go:87: println("> error:", err.Error())
On 2013/12/05 16:20:37, rog wrote:
> d
Done.
https:/
utils/tailer.go:98: if buffer[i] == '\n' {
On 2013/12/05 16:20:37, rog wrote:
> This isn't strictly accurate (what happens when we have an
unterminated line at
> the end of the file?) but it's probably ok if we never filter
unterminated
> lines. A comment about why it's ok might prevent future puzzling over
the
> correctness of the code.
Added a comment at the type declaration.
https:/
utils/tailer.
On 2013/12/05 16:20:37, rog wrote:
> s/==/>=/
> defensively.
> Also, if we move the if statement before the increment,
> then you can lose the "-1".
Done.
https:/
utils/tailer.
On 2013/12/05 16:20:37, rog wrote:
> I think I'd be tempted to use ReadSlice here (changing the type of
filter to
> func([]byte)bool.
> That way the tailer won't need to generate any garbage at all AFAICS.
> Unfortunately that limits the line size to the size of the bufio
buffer, which
> may be acceptable. An alternative is to use ReadLine and allocate only
if the
> line is too long.
> Here's some code I wrote a while ago to do that, in case it might be
useful:
> // readLine reads a line from r.
> // The returned byte slice is only valid until the
> // next read call on r.
> func readLine(r *bufio.Reader) ([]byte, error) {
> line, isPrefix, err := r.ReadLine()
> if !isPrefix {
> return line, err
> }
> buf := append([]byte(nil), line...)
> for isPrefix && err == nil {
> line, isPrefix, err = r.ReadLine()
> buf = append(buf, line...)
> }
> return buf, err
> }
Good hint, using it.
https:/
utils/tailer.
On 2013/12/05 16:20:37, rog wrote:
> This code is exactly the same as the code above.
> Can't we do with just this? (if we just delete the
> loop above and use NewTimer(0), I think it might
> just work).
Yep, the split has been due to the initial approach of scanning all up
to the end first. Removed. Thanks.
https:/
utils/tailer.
On 2013/12/0...
Roger Peppe (rogpeppe) wrote : | # |
Getting there! A few more thoughts and suggestions below.
https:/
File utils/tailer/
https:/
utils/tailer/
number of matching lines
s/beginns/begins/
https:/
utils/tailer/
io.ReadSeeker, writer io.Writer, lines int, filter TailerFilterFunc)
*Tailer {
Given that this function is going to be the one that everyone calls, I
think I'd name it NewTailer, but do we actually need to expose buffer
size and poll interval publicly except to the tailer tests?
https:/
utils/tailer/
This seems rather complex to me - I don't fully trust myself to vet the
logic. Also, I don't think we need to have a separate line buffer and
read buffer - we can use a single buffer and read directly into it. That
way we can avoid allocating a new buffer every time round the loop too,
although it is necessary to copy something from the start to the end.
I started trying to explain a better possiblity, but got carried away...
Here's the kind of thing I mean. Still somewhat complex, but I've tried
to keep the invariants simple. It needs testing (I've left in
one deliberate mistake that I hope to see a test for). I suggest
some internal tests that test this function in isolation with
quite a few different inputs.
// seekLastLines sets the read position of the ReadSeeker to the
// wanted number of filtered lines before the end.
func (t *Tailer) seekLastLines() error {
offset, err := t.readSeeker.
if err != nil {
return err
}
seekPos := int64(0)
found := 0
buf := make([]byte, minRead)
SeekLoop:
for offset > 0 {
// buf contains the data left over from the
// previous iteration.
space := cap(buf) - len(buf)
if space < minRead {
// grow buffer
newBuf := make([]byte, len(buf), cap(buf)*2)
copy(newBuf, buf)
buf = newBuf
space = cap(buf) - len(buf)
}
if int64(space) > offset {
// Use exactly the right amount of space if there's
// only a small amount remaining.
space = int(offset)
}
// copy data remaining from last time to the end of the buffer,
// so we can read into the right place.
copy(
buf = buf[0 : len(buf)+space]
offset -= int64(space)
_, err := t.readSeeker.
if err != nil {
return err
}
_, err = io.ReadFull(
if err != nil {
return err
}
// Find the end of the last line in the buffer.
// This will discard any unterminated line at the end
// of the file.
end := bytes.LastIndex
if end == -1 {
// No end of line found - discard incomplete
// line and continue looking. If this happens
// at the beginning of the file, we don't care
// because we're going to stop anyway.
buf = buf[:0]
continue
}
end++
fo...
William Reade (fwereade) wrote : | # |
WIPping to address rog's review
Frank Mueller (themue) wrote : | # |
Please take a look.
https:/
File utils/tailer/
https:/
utils/tailer/
number of matching lines
On 2013/12/10 15:14:25, rog wrote:
> s/beginns/begins/
Done.
https:/
utils/tailer/
io.ReadSeeker, writer io.Writer, lines int, filter TailerFilterFunc)
*Tailer {
On 2013/12/10 15:14:25, rog wrote:
> Given that this function is going to be the one that everyone calls, I
think I'd
> name it NewTailer, but do we actually need to expose buffer size and
poll
> interval publicly except to the tailer tests?
Done.
https:/
utils/tailer/
On 2013/12/10 15:14:25, rog wrote:
> This seems rather complex to me - I don't fully trust myself to vet
the logic.
Used it, even if I have the same troubles following your array logic and
the needed debugging. Thankfully the comments helped to follow the idea.
And at least it has less lines. ;) Thx.
https:/
utils/tailer/
On 2013/12/10 15:14:25, rog wrote:
> s/beginnig/
Not needed anymore due to new function.
https:/
utils/tailer/
On 2013/12/10 15:14:25, rog wrote:
> ReadBytes can't return ErrBufferFull.
> I think you want to use ReadSlice.
> Also, if the first call succeeds, we should just return the slice that
we got
> from ReadSlice - that way in the usual case that the line is shorter
than the
> bufio buffer we can avoid any allocation.
Done.
https:/
utils/tailer/
delimiter {
On 2013/12/10 15:14:25, rog wrote:
> This can't happen. (from the docs: "returns err != nil if and only if
the
> returned data does not end in delim")
Done.
https:/
utils/tailer/
On 2013/12/10 15:14:25, rog wrote:
> This is racy. We can't seek from the end because the end might be
constantly
> changing. I think we need to keep track of where we're reading in the
file and
> seek back to the absolute offset of the start of the partial line.
Done.
https:/
utils/tailer/
On 2013/12/10 15:14:25, rog wrote:
> Why?
Yeah, too much automatic filtering here. The receiver of the data should
decide. Removed it.
https:/
utils/tailer/
On 2013/12/10 15:14...
Roger Peppe (rogpeppe) wrote : | # |
Another round of suggestions, with one or two things still to fix.
Thanks for bearing with me!
https:/
File utils/tailer/
https:/
utils/tailer/
On 2013/12/11 17:13:05, mue wrote:
> On 2013/12/10 15:14:25, rog wrote:
> > Consider trimming \r?\n from the end of the line before calling
filter?
> Why?
Then the filter functions don't need to worry about line termination
characters at all and we know that we're safe even when we have windows
stuff generating \r\n lines.
https:/
File utils/tailer/
https:/
utils/tailer/
If we've got t.bufsize, "buf" would seem logical as a name.
Not that it matters much though.
https:/
utils/tailer/
Unfortunately I think this is wrong, as we may be appending to the
internal bufio.Reader buffer, which may be overwritten by the ReadSlice
call.
How about separating the first-slice case from the appending logic?
Something like this, perhaps?
// readLine reads the next valid line from the reader, even if it is
// larger than the reader buffer.
func (t *Tailer) readLine() ([]byte, error) {
for {
slice, err := t.reader.
if err == nil {
if t.isValid(slice) {
return slice, nil
}
continue
}
line := append([]byte(nil), slice...)
for err == bufio.ErrBufferFull {
slice, err = t.reader.
line = append(line, slice...)
}
switch err {
case nil:
if t.isValid(line) {
return line, nil
}
case io.EOF:
// EOF without delimiter, step back.
t.readSeeker
return nil, err
default:
return nil, err
}
}
}
https:/
utils/tailer/
Ah, that's nicer than my suggestion, thanks.
https:/
File utils/tailer/
https:/
utils/tailer/
I'm afraid the way you're using this is racy (and in all the other tests
too) because you're reading from the buffer (with assertCollected)
concurrently with the tailer writing to it.
You could use io.Pipe instead of bytes.Buffer to avoid the problem.
BTW whenever you've got goroutine-based code, it's worth running go test
-race to check this kind of stuff.
https:/
utils/tailer/
TestLaggedTermi
I see 10 of these tests are almost identical.
This suggests to me that a table-based te...
Frank Mueller (themue) wrote : | # |
Please take a look.
https:/
File utils/tailer/
https:/
utils/tailer/
On 2013/12/11 18:27:41, rog wrote:
> If we've got t.bufsize, "buf" would seem logical as a name.
> Not that it matters much though.
Ack, but in the other direction. ;)
https:/
utils/tailer/
On 2013/12/11 18:27:41, rog wrote:
> Unfortunately I think this is wrong, as we may be appending to the
internal
> bufio.Reader buffer, which may be overwritten by the ReadSlice call.
> How about separating the first-slice case from the appending logic?
> Something like this, perhaps?
> // readLine reads the next valid line from the reader, even if it is
> // larger than the reader buffer.
> func (t *Tailer) readLine() ([]byte, error) {
> for {
> slice, err := t.reader.
> if err == nil {
> if t.isValid(slice) {
> return slice, nil
> }
> continue
> }
> line := append([]byte(nil), slice...)
> for err == bufio.ErrBufferFull {
> slice, err = t.reader.
> line = append(line, slice...)
> }
> switch err {
> case nil:
> if t.isValid(line) {
> return line, nil
> }
> case io.EOF:
> // EOF without delimiter, step back.
> t.readSeeker.
> return nil, err
> default:
> return nil, err
> }
> }
> }
Yeah, that's better, thx.
https:/
File utils/tailer/
https:/
utils/tailer/
On 2013/12/11 18:27:41, rog wrote:
> I'm afraid the way you're using this is racy (and in all the other
tests too)
> because you're reading from the buffer (with assertCollected)
concurrently with
> the tailer writing to it.
> You could use io.Pipe instead of bytes.Buffer to avoid the problem.
> BTW whenever you've got goroutine-based code, it's worth running go
test -race
> to check this kind of stuff.
Done.
https:/
utils/tailer/
TestLaggedTermi
On 2013/12/11 18:27:41, rog wrote:
> I see 10 of these tests are almost identical.
> This suggests to me that a table-based test might work well here, and
make it
> easier to see what's actually being tested.
> Then we could easily add quite a few more tests (for example, I'd like
to see
> tests with blank lines in various places, and probably some more too).
Done.
https:/
utils/tailer/
On 2013/12/11 18:27:41, rog wrote:
> This should probably be defined just before it's used.
Now different approach with table driven tests.
http...
Roger Peppe (rogpeppe) wrote : | # |
Another round. Hopefully done after this one!
https:/
File utils/tailer/
https:/
utils/tailer/
writeCloser io.WriteCloser, lines int, filter TailerFilterFunc) *Tailer
{
I don't think there's any particular reason we want to give this code
the responsibility for closing the writer. It's quite possible we might
have several tailers writing to the same writer, for example.
Better would be a Wait method (and perhaps a Dead method) to wait until
it's finished. Then the caller has the option of closing the writer.
https:/
File utils/tailer/
https:/
utils/tailer/
If we move this brace onto the previous line, we can save a level of
indentation in all these tests.
https:/
utils/tailer/
I like the new tests much better now. One thought though:
I'm not convinced that referring to "data" in all these tests makes them
more readable - I have no idea what's in data[26:29] without counting or
using grep -n.
Something like the below shows me exactly what's going
on without me needing to refer to multiple places,
and is independent of any changes else in the code.
It would make it considerably easier for me to scan down
the tests to ensure that each one represents reasonable
behaviour.
{
description: "lines are longer than buffer size",
data: []string{
"the quick brown fox ",
},
initialLin
initialLin
bufferSize: 5,
initialCol
},
appendedCo
},
},
https:/
utils/tailer/
If the tailer doesn't produce enough data, we'll block here forever I
think, regardless of the timeout.
I think you probably want something more like the below:
func assertCollected(c *gc.C, reader io.Reader, compare []string,
injection func([]string)
) {
lineChan := make(chan string)
go func() {
defer close(lineChan)
reader := bufio.NewReader
for {
line, err := reader.
if !c.Check(err, gc.IsNil) {
}
}
}()
timeout := time.After(
for {
select {
case line, ok := <-lineChan:
...
Frank Mueller (themue) wrote : | # |
Please take a look.
https:/
File utils/tailer/
https:/
utils/tailer/
writeCloser io.WriteCloser, lines int, filter TailerFilterFunc) *Tailer
{
On 2013/12/12 14:46:39, rog wrote:
> I don't think there's any particular reason we want to give this code
the
> responsibility for closing the writer. It's quite possible we might
have several
> tailers writing to the same writer, for example.
> Better would be a Wait method (and perhaps a Dead method) to wait
until
> it's finished. Then the caller has the option of closing the writer.
Done as discussed.
https:/
File utils/tailer/
https:/
utils/tailer/
On 2013/12/12 14:46:39, rog wrote:
> If we move this brace onto the previous line, we can save a level of
indentation
> in all these tests.
Done.
https:/
utils/tailer/
On 2013/12/12 14:46:39, rog wrote:
> I like the new tests much better now. One thought though:
> I'm not convinced that referring to "data" in all these tests makes
them more
> readable - I have no idea what's in data[26:29] without counting or
using grep
> -n.
> Something like the below shows me exactly what's going
> on without me needing to refer to multiple places,
> and is independent of any changes else in the code.
> It would make it considerably easier for me to scan down
> the tests to ensure that each one represents reasonable
> behaviour.
> {
> description: "lines are longer than buffer size",
> data: []string{
> "abcdefghijklmn
> "01234567890123
> "the quick brown fox ",
> },
> initialLinesWri
> initialLinesReq
> bufferSize: 5,
> initialCollecte
> "abcdefghijklmn
> },
> appendedCollect
> "01234567890123
> },
> },
This case the test table would blow up again and also would be
inconsistent (e.g. when using the standard larger group of lines). So
now using three different data variables with more speaking names as a
compromise. Also located them near to the table.
https:/
utils/tailer/
On 2013/12/12 14:46:39, rog wrote:
> If the tailer doesn't produce enough data, we'll block here forever I
think,
> regardless of the timeout.
> I think you probably want something more like the below:
> func assertCollected(c *gc.C, reader io.Reader, compare []string,
injection
> func([]st...
Roger Peppe (rogpeppe) wrote : | # |
LGTM with the tests fixed, thanks!
https:/
File utils/tailer/
https:/
utils/tailer/
On 2013/12/13 11:23:12, mue wrote:
> On 2013/12/12 14:46:39, rog wrote:
> > I like the new tests much better now. One thought though:
> >
> > I'm not convinced that referring to "data" in all these tests makes
them more
> > readable - I have no idea what's in data[26:29] without counting or
using grep
> > -n.
> >
> > Something like the below shows me exactly what's going
> > on without me needing to refer to multiple places,
> > and is independent of any changes else in the code.
> >
> > It would make it considerably easier for me to scan down
> > the tests to ensure that each one represents reasonable
> > behaviour.
> >
> > {
> > description: "lines are longer than buffer size",
> > data: []string{
> > "abcdefghijklmn
> > "01234567890123
> > "the quick brown fox ",
> > },
> > initialLinesWri
> > initialLinesReq
> > bufferSize: 5,
> > initialCollecte
> > "abcdefghijklmn
> > },
> > appendedCollect
> > "01234567890123
> > },
> > },
> >
> This case the test table would blow up again and also would be
inconsistent
> (e.g. when using the standard larger group of lines). So now using
three
> different data variables with more speaking names as a compromise.
Also located
> them near to the table.
I'm with you about alphabetData (which is also nicely memorable, which
helps), but most of them do really benefit from having the data visible
in the test.
I processed my local copy to move much of the test data into the tests,
and two issues became immediately obvious, though I'd missed them when
cross-referencing, which I think is a reasonable indication that the
tests are not currently that clear.
Here's my modified version of the test. Only alphabetData remains. I'd
prefer it if we could use this form.
http://
https:/
File utils/tailer/
https:/
utils/tailer/
longer than buffer size, missing termination of last line",
How is the last line missing termination here?
https:/
utils/tailer/
unterminatedDat
The last line doesn't seem to be missing termination here.
Frank Mueller (themue) wrote : | # |
Please take a look.
https:/
File utils/tailer/
https:/
utils/tailer/
longer than buffer size, missing termination of last line",
On 2013/12/13 12:52:14, rog wrote:
> How is the last line missing termination here?
Done.
https:/
utils/tailer/
unterminatedDat
On 2013/12/13 12:52:14, rog wrote:
> The last line doesn't seem to be missing termination here.
Done.
Preview Diff
1 | === added directory 'utils/tailer' |
2 | === added file 'utils/tailer/export_test.go' |
3 | --- utils/tailer/export_test.go 1970-01-01 00:00:00 +0000 |
4 | +++ utils/tailer/export_test.go 2013-12-13 13:24:18 +0000 |
5 | @@ -0,0 +1,6 @@ |
6 | +// Copyright 2013 Canonical Ltd. |
7 | +// Licensed under the AGPLv3, see LICENCE file for details. |
8 | + |
9 | +package tailer |
10 | + |
11 | +var NewTestTailer = newTailer |
12 | |
13 | === added file 'utils/tailer/tailer.go' |
14 | --- utils/tailer/tailer.go 1970-01-01 00:00:00 +0000 |
15 | +++ utils/tailer/tailer.go 2013-12-13 13:24:18 +0000 |
16 | @@ -0,0 +1,250 @@ |
17 | +// Copyright 2013 Canonical Ltd. |
18 | +// Licensed under the AGPLv3, see LICENCE file for details. |
19 | + |
20 | +package tailer |
21 | + |
22 | +import ( |
23 | + "bufio" |
24 | + "bytes" |
25 | + "io" |
26 | + "os" |
27 | + "time" |
28 | + |
29 | + "launchpad.net/tomb" |
30 | +) |
31 | + |
32 | +const ( |
33 | + bufferSize = 4096 |
34 | + polltime = time.Second |
35 | + delimiter = '\n' |
36 | +) |
37 | + |
38 | +var ( |
39 | + delimiters = []byte{delimiter} |
40 | +) |
41 | + |
42 | +// TailerFilterFunc decides if a line shall be tailed (func is nil or |
43 | +// returns true) of shall be omitted (func returns false). |
44 | +type TailerFilterFunc func(line []byte) bool |
45 | + |
46 | +// Tailer reads an input line by line an tails them into the passed Writer. |
47 | +// The lines have to be terminated with a newline. |
48 | +type Tailer struct { |
49 | + tomb tomb.Tomb |
50 | + readSeeker io.ReadSeeker |
51 | + reader *bufio.Reader |
52 | + writeCloser io.WriteCloser |
53 | + writer *bufio.Writer |
54 | + lines int |
55 | + filter TailerFilterFunc |
56 | + bufferSize int |
57 | + polltime time.Duration |
58 | +} |
59 | + |
60 | +// NewTailer starts a Tailer which reads strings from the passed |
61 | +// ReadSeeker line by line. If a filter function is specified the read |
62 | +// lines are filtered. The matching lines are written to the passed |
63 | +// Writer. The reading begins the specified number of matching lines |
64 | +// from the end. |
65 | +func NewTailer(readSeeker io.ReadSeeker, writer io.Writer, lines int, filter TailerFilterFunc) *Tailer { |
66 | + return newTailer(readSeeker, writer, lines, filter, bufferSize, polltime) |
67 | +} |
68 | + |
69 | +// newTailer starts a Tailer like NewTailer but allows the setting of |
70 | +// the read buffer size and the time between pollings for testing. |
71 | +func newTailer(readSeeker io.ReadSeeker, writer io.Writer, lines int, filter TailerFilterFunc, |
72 | + bufferSize int, polltime time.Duration) *Tailer { |
73 | + t := &Tailer{ |
74 | + readSeeker: readSeeker, |
75 | + reader: bufio.NewReaderSize(readSeeker, bufferSize), |
76 | + writer: bufio.NewWriter(writer), |
77 | + lines: lines, |
78 | + filter: filter, |
79 | + bufferSize: bufferSize, |
80 | + polltime: polltime, |
81 | + } |
82 | + go func() { |
83 | + defer t.tomb.Done() |
84 | + t.tomb.Kill(t.loop()) |
85 | + }() |
86 | + return t |
87 | +} |
88 | + |
89 | +// Stop tells the tailer to stop working. |
90 | +func (t *Tailer) Stop() error { |
91 | + t.tomb.Kill(nil) |
92 | + return t.tomb.Wait() |
93 | +} |
94 | + |
95 | +// Wait waits until the tailer is stopped due to command |
96 | +// or an error. In case of an error it returns the reason. |
97 | +func (t *Tailer) Wait() error { |
98 | + return t.tomb.Wait() |
99 | +} |
100 | + |
101 | +// Dead returns the channel that can be used to wait until |
102 | +// the tailer is stopped. |
103 | +func (t *Tailer) Dead() <-chan struct{} { |
104 | + return t.tomb.Dead() |
105 | +} |
106 | + |
107 | +// Err returns a possible error. |
108 | +func (t *Tailer) Err() error { |
109 | + return t.tomb.Err() |
110 | +} |
111 | + |
112 | +// loop writes the last lines based on the buffer size to the |
113 | +// writer and then polls for more data to write it to the |
114 | +// writer too. |
115 | +func (t *Tailer) loop() error { |
116 | + // Position the readSeeker. |
117 | + if err := t.seekLastLines(); err != nil { |
118 | + return err |
119 | + } |
120 | + // Start polling. |
121 | + // TODO(mue) 2013-12-06 |
122 | + // Handling of read-seeker/files being truncated during |
123 | + // tailing is currently missing! |
124 | + timer := time.NewTimer(0) |
125 | + for { |
126 | + select { |
127 | + case <-t.tomb.Dying(): |
128 | + return nil |
129 | + case <-timer.C: |
130 | + for { |
131 | + line, readErr := t.readLine() |
132 | + _, writeErr := t.writer.Write(line) |
133 | + if writeErr != nil { |
134 | + return writeErr |
135 | + } |
136 | + if readErr != nil { |
137 | + if readErr != io.EOF { |
138 | + return readErr |
139 | + } |
140 | + break |
141 | + } |
142 | + } |
143 | + if writeErr := t.writer.Flush(); writeErr != nil { |
144 | + return writeErr |
145 | + } |
146 | + timer.Reset(t.polltime) |
147 | + } |
148 | + } |
149 | +} |
150 | + |
151 | +// seekLastLines sets the read position of the ReadSeeker to the |
152 | +// wanted number of filtered lines before the end. |
153 | +func (t *Tailer) seekLastLines() error { |
154 | + offset, err := t.readSeeker.Seek(0, os.SEEK_END) |
155 | + if err != nil { |
156 | + return err |
157 | + } |
158 | + seekPos := int64(0) |
159 | + found := 0 |
160 | + buffer := make([]byte, t.bufferSize) |
161 | +SeekLoop: |
162 | + for offset > 0 { |
163 | + // buffer contains the data left over from the |
164 | + // previous iteration. |
165 | + space := cap(buffer) - len(buffer) |
166 | + if space < t.bufferSize { |
167 | + // Grow buffer. |
168 | + newBuffer := make([]byte, len(buffer), cap(buffer)*2) |
169 | + copy(newBuffer, buffer) |
170 | + buffer = newBuffer |
171 | + space = cap(buffer) - len(buffer) |
172 | + } |
173 | + if int64(space) > offset { |
174 | + // Use exactly the right amount of space if there's |
175 | + // only a small amount remaining. |
176 | + space = int(offset) |
177 | + } |
178 | + // Copy data remaining from last time to the end of the buffer, |
179 | + // so we can read into the right place. |
180 | + copy(buffer[space:cap(buffer)], buffer) |
181 | + buffer = buffer[0 : len(buffer)+space] |
182 | + offset -= int64(space) |
183 | + _, err := t.readSeeker.Seek(offset, os.SEEK_SET) |
184 | + if err != nil { |
185 | + return err |
186 | + } |
187 | + _, err = io.ReadFull(t.readSeeker, buffer[0:space]) |
188 | + if err != nil { |
189 | + return err |
190 | + } |
191 | + // Find the end of the last line in the buffer. |
192 | + // This will discard any unterminated line at the end |
193 | + // of the file. |
194 | + end := bytes.LastIndex(buffer, delimiters) |
195 | + if end == -1 { |
196 | + // No end of line found - discard incomplete |
197 | + // line and continue looking. If this happens |
198 | + // at the beginning of the file, we don't care |
199 | + // because we're going to stop anyway. |
200 | + buffer = buffer[:0] |
201 | + continue |
202 | + } |
203 | + end++ |
204 | + for { |
205 | + start := bytes.LastIndex(buffer[0:end-1], delimiters) |
206 | + if start == -1 && offset >= 0 { |
207 | + break |
208 | + } |
209 | + start++ |
210 | + if t.isValid(buffer[start:end]) { |
211 | + found++ |
212 | + if found >= t.lines { |
213 | + seekPos = offset + int64(start) |
214 | + break SeekLoop |
215 | + } |
216 | + } |
217 | + end = start |
218 | + } |
219 | + // Leave the last line in buffer, as we don't know whether |
220 | + // it's complete or not. |
221 | + buffer = buffer[0:end] |
222 | + } |
223 | + // Final positioning. |
224 | + t.readSeeker.Seek(seekPos, os.SEEK_SET) |
225 | + return nil |
226 | +} |
227 | + |
228 | +// readLine reads the next valid line from the reader, even if it is |
229 | +// larger than the reader buffer. |
230 | +func (t *Tailer) readLine() ([]byte, error) { |
231 | + for { |
232 | + slice, err := t.reader.ReadSlice(delimiter) |
233 | + if err == nil { |
234 | + if t.isValid(slice) { |
235 | + return slice, nil |
236 | + } |
237 | + continue |
238 | + } |
239 | + line := append([]byte(nil), slice...) |
240 | + for err == bufio.ErrBufferFull { |
241 | + slice, err = t.reader.ReadSlice(delimiter) |
242 | + line = append(line, slice...) |
243 | + } |
244 | + switch err { |
245 | + case nil: |
246 | + if t.isValid(line) { |
247 | + return line, nil |
248 | + } |
249 | + case io.EOF: |
250 | + // EOF without delimiter, step back. |
251 | + t.readSeeker.Seek(-int64(len(line)), os.SEEK_CUR) |
252 | + return nil, err |
253 | + default: |
254 | + return nil, err |
255 | + } |
256 | + } |
257 | +} |
258 | + |
259 | +// isValid checks if the passed line is valid by checking if the |
260 | +// line has content, the filter function is nil or it returns true. |
261 | +func (t *Tailer) isValid(line []byte) bool { |
262 | + if t.filter == nil { |
263 | + return true |
264 | + } |
265 | + return t.filter(line) |
266 | +} |
267 | |
268 | === added file 'utils/tailer/tailer_test.go' |
269 | --- utils/tailer/tailer_test.go 1970-01-01 00:00:00 +0000 |
270 | +++ utils/tailer/tailer_test.go 2013-12-13 13:24:18 +0000 |
271 | @@ -0,0 +1,496 @@ |
272 | +// Copyright 2013 Canonical Ltd. |
273 | +// Licensed under the AGPLv3, see LICENCE file for details. |
274 | + |
275 | +package tailer_test |
276 | + |
277 | +import ( |
278 | + "bufio" |
279 | + "bytes" |
280 | + "fmt" |
281 | + "io" |
282 | + "sync" |
283 | + stdtesting "testing" |
284 | + "time" |
285 | + |
286 | + gc "launchpad.net/gocheck" |
287 | + |
288 | + "launchpad.net/juju-core/testing" |
289 | + "launchpad.net/juju-core/utils/tailer" |
290 | +) |
291 | + |
292 | +func Test(t *stdtesting.T) { |
293 | + gc.TestingT(t) |
294 | +} |
295 | + |
296 | +type tailerSuite struct{} |
297 | + |
298 | +var _ = gc.Suite(tailerSuite{}) |
299 | + |
300 | +var alphabetData = []string{ |
301 | + "alpha alpha\n", |
302 | + "bravo bravo\n", |
303 | + "charlie charlie\n", |
304 | + "delta delta\n", |
305 | + "echo echo\n", |
306 | + "foxtrott foxtrott\n", |
307 | + "golf golf\n", |
308 | + "hotel hotel\n", |
309 | + "india india\n", |
310 | + "juliet juliet\n", |
311 | + "kilo kilo\n", |
312 | + "lima lima\n", |
313 | + "mike mike\n", |
314 | + "november november\n", |
315 | + "oscar oscar\n", |
316 | + "papa papa\n", |
317 | + "quebec quebec\n", |
318 | + "romeo romeo\n", |
319 | + "sierra sierra\n", |
320 | + "tango tango\n", |
321 | + "uniform uniform\n", |
322 | + "victor victor\n", |
323 | + "whiskey whiskey\n", |
324 | + "x-ray x-ray\n", |
325 | + "yankee yankee\n", |
326 | + "zulu zulu\n", |
327 | +} |
328 | + |
329 | +var tests = []struct { |
330 | + description string |
331 | + data []string |
332 | + initialLinesWritten int |
333 | + initialLinesRequested int |
334 | + bufferSize int |
335 | + filter tailer.TailerFilterFunc |
336 | + injector func(*tailer.Tailer, *readSeeker) func([]string) |
337 | + initialCollectedData []string |
338 | + appendedCollectedData []string |
339 | + err string |
340 | +}{{ |
341 | + description: "lines are longer than buffer size", |
342 | + data: []string{ |
343 | + "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n", |
344 | + "0123456789012345678901234567890123456789012345678901\n", |
345 | + }, |
346 | + initialLinesWritten: 1, |
347 | + initialLinesRequested: 1, |
348 | + bufferSize: 5, |
349 | + initialCollectedData: []string{ |
350 | + "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n", |
351 | + }, |
352 | + appendedCollectedData: []string{ |
353 | + "0123456789012345678901234567890123456789012345678901\n", |
354 | + }, |
355 | +}, { |
356 | + description: "lines are longer than buffer size, missing termination of last line", |
357 | + data: []string{ |
358 | + "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n", |
359 | + "0123456789012345678901234567890123456789012345678901\n", |
360 | + "the quick brown fox ", |
361 | + }, |
362 | + initialLinesWritten: 1, |
363 | + initialLinesRequested: 1, |
364 | + bufferSize: 5, |
365 | + initialCollectedData: []string{ |
366 | + "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n", |
367 | + }, |
368 | + appendedCollectedData: []string{ |
369 | + "0123456789012345678901234567890123456789012345678901\n", |
370 | + }, |
371 | +}, { |
372 | + description: "lines are longer than buffer size, last line is terminated later", |
373 | + data: []string{ |
374 | + "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n", |
375 | + "0123456789012345678901234567890123456789012345678901\n", |
376 | + "the quick brown fox ", |
377 | + "jumps over the lazy dog\n", |
378 | + }, |
379 | + initialLinesWritten: 1, |
380 | + initialLinesRequested: 1, |
381 | + bufferSize: 5, |
382 | + initialCollectedData: []string{ |
383 | + "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n", |
384 | + }, |
385 | + appendedCollectedData: []string{ |
386 | + "0123456789012345678901234567890123456789012345678901\n", |
387 | + "the quick brown fox jumps over the lazy dog\n", |
388 | + }, |
389 | +}, { |
390 | + description: "missing termination of last line", |
391 | + data: []string{ |
392 | + "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n", |
393 | + "0123456789012345678901234567890123456789012345678901\n", |
394 | + "the quick brown fox ", |
395 | + }, |
396 | + initialLinesWritten: 1, |
397 | + initialLinesRequested: 1, |
398 | + initialCollectedData: []string{ |
399 | + "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n", |
400 | + }, |
401 | + appendedCollectedData: []string{ |
402 | + "0123456789012345678901234567890123456789012345678901\n", |
403 | + }, |
404 | +}, { |
405 | + description: "last line is terminated later", |
406 | + data: []string{ |
407 | + "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n", |
408 | + "0123456789012345678901234567890123456789012345678901\n", |
409 | + "the quick brown fox ", |
410 | + "jumps over the lazy dog\n", |
411 | + }, |
412 | + initialLinesWritten: 1, |
413 | + initialLinesRequested: 1, |
414 | + initialCollectedData: []string{ |
415 | + "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz\n", |
416 | + }, |
417 | + appendedCollectedData: []string{ |
418 | + "0123456789012345678901234567890123456789012345678901\n", |
419 | + "the quick brown fox jumps over the lazy dog\n", |
420 | + }, |
421 | +}, { |
422 | + description: "more lines already written than initially requested", |
423 | + data: alphabetData, |
424 | + initialLinesWritten: 5, |
425 | + initialLinesRequested: 3, |
426 | + initialCollectedData: []string{ |
427 | + "charlie charlie\n", |
428 | + "delta delta\n", |
429 | + "echo echo\n", |
430 | + }, |
431 | + appendedCollectedData: alphabetData[5:], |
432 | +}, { |
433 | + description: "less lines already written than initially requested", |
434 | + data: alphabetData, |
435 | + initialLinesWritten: 3, |
436 | + initialLinesRequested: 5, |
437 | + initialCollectedData: []string{ |
438 | + "alpha alpha\n", |
439 | + "bravo bravo\n", |
440 | + "charlie charlie\n", |
441 | + }, |
442 | + appendedCollectedData: alphabetData[3:], |
443 | +}, { |
444 | + description: "lines are longer than buffer size, more lines already written than initially requested", |
445 | + data: alphabetData, |
446 | + initialLinesWritten: 5, |
447 | + initialLinesRequested: 3, |
448 | + bufferSize: 5, |
449 | + initialCollectedData: []string{ |
450 | + "charlie charlie\n", |
451 | + "delta delta\n", |
452 | + "echo echo\n", |
453 | + }, |
454 | + appendedCollectedData: alphabetData[5:], |
455 | +}, { |
456 | + description: "lines are longer than buffer size, less lines already written than initially requested", |
457 | + data: alphabetData, |
458 | + initialLinesWritten: 3, |
459 | + initialLinesRequested: 5, |
460 | + bufferSize: 5, |
461 | + initialCollectedData: []string{ |
462 | + "alpha alpha\n", |
463 | + "bravo bravo\n", |
464 | + "charlie charlie\n", |
465 | + }, |
466 | + appendedCollectedData: alphabetData[3:], |
467 | +}, { |
468 | + description: "filter lines which contain the char 'e'", |
469 | + data: alphabetData, |
470 | + initialLinesWritten: 10, |
471 | + initialLinesRequested: 3, |
472 | + filter: func(line []byte) bool { |
473 | + return bytes.Contains(line, []byte{'e'}) |
474 | + }, |
475 | + initialCollectedData: []string{ |
476 | + "echo echo\n", |
477 | + "hotel hotel\n", |
478 | + "juliet juliet\n", |
479 | + }, |
480 | + appendedCollectedData: []string{ |
481 | + "mike mike\n", |
482 | + "november november\n", |
483 | + "quebec quebec\n", |
484 | + "romeo romeo\n", |
485 | + "sierra sierra\n", |
486 | + "whiskey whiskey\n", |
487 | + "yankee yankee\n", |
488 | + }, |
489 | +}, { |
490 | + description: "stop tailing after 10 collected lines", |
491 | + data: alphabetData, |
492 | + initialLinesWritten: 5, |
493 | + initialLinesRequested: 3, |
494 | + injector: func(t *tailer.Tailer, rs *readSeeker) func([]string) { |
495 | + return func(lines []string) { |
496 | + if len(lines) == 10 { |
497 | + t.Stop() |
498 | + } |
499 | + } |
500 | + }, |
501 | + initialCollectedData: []string{ |
502 | + "charlie charlie\n", |
503 | + "delta delta\n", |
504 | + "echo echo\n", |
505 | + }, |
506 | + appendedCollectedData: alphabetData[5:], |
507 | +}, { |
508 | + description: "generate an error after 10 collected lines", |
509 | + data: alphabetData, |
510 | + initialLinesWritten: 5, |
511 | + initialLinesRequested: 3, |
512 | + injector: func(t *tailer.Tailer, rs *readSeeker) func([]string) { |
513 | + return func(lines []string) { |
514 | + if len(lines) == 10 { |
515 | + rs.setError(fmt.Errorf("ouch after 10 lines")) |
516 | + } |
517 | + } |
518 | + }, |
519 | + initialCollectedData: []string{ |
520 | + "charlie charlie\n", |
521 | + "delta delta\n", |
522 | + "echo echo\n", |
523 | + }, |
524 | + appendedCollectedData: alphabetData[5:], |
525 | + err: "ouch after 10 lines", |
526 | +}, { |
527 | + description: "more lines already written than initially requested, some empty, unfiltered", |
528 | + data: []string{ |
529 | + "one one\n", |
530 | + "two two\n", |
531 | + "\n", |
532 | + "\n", |
533 | + "three three\n", |
534 | + "four four\n", |
535 | + "\n", |
536 | + "\n", |
537 | + "five five\n", |
538 | + "six six\n", |
539 | + }, |
540 | + initialLinesWritten: 3, |
541 | + initialLinesRequested: 2, |
542 | + initialCollectedData: []string{ |
543 | + "two two\n", |
544 | + "\n", |
545 | + }, |
546 | + appendedCollectedData: []string{ |
547 | + "\n", |
548 | + "three three\n", |
549 | + "four four\n", |
550 | + "\n", |
551 | + "\n", |
552 | + "five five\n", |
553 | + "six six\n", |
554 | + }, |
555 | +}, { |
556 | + description: "more lines already written than initially requested, some empty, those filtered", |
557 | + data: []string{ |
558 | + "one one\n", |
559 | + "two two\n", |
560 | + "\n", |
561 | + "\n", |
562 | + "three three\n", |
563 | + "four four\n", |
564 | + "\n", |
565 | + "\n", |
566 | + "five five\n", |
567 | + "six six\n", |
568 | + }, |
569 | + initialLinesWritten: 3, |
570 | + initialLinesRequested: 2, |
571 | + filter: func(line []byte) bool { |
572 | + return len(bytes.TrimSpace(line)) > 0 |
573 | + }, |
574 | + initialCollectedData: []string{ |
575 | + "one one\n", |
576 | + "two two\n", |
577 | + }, |
578 | + appendedCollectedData: []string{ |
579 | + "three three\n", |
580 | + "four four\n", |
581 | + "five five\n", |
582 | + "six six\n", |
583 | + }, |
584 | +}} |
585 | + |
586 | +func (tailerSuite) TestTailer(c *gc.C) { |
587 | + for i, test := range tests { |
588 | + c.Logf("Test #%d) %s", i, test.description) |
589 | + bufferSize := test.bufferSize |
590 | + if bufferSize == 0 { |
591 | + // Default value. |
592 | + bufferSize = 4096 |
593 | + } |
594 | + reader, writer := io.Pipe() |
595 | + sigc := make(chan struct{}, 1) |
596 | + rs := startReadSeeker(c, test.data, test.initialLinesWritten, sigc) |
597 | + tailer := tailer.NewTestTailer(rs, writer, test.initialLinesRequested, test.filter, bufferSize, 2*time.Millisecond) |
598 | + linec := startReading(c, tailer, reader, writer) |
599 | + |
600 | + // Collect initial data. |
601 | + assertCollected(c, linec, test.initialCollectedData, nil) |
602 | + |
603 | + sigc <- struct{}{} |
604 | + |
605 | + // Collect remaining data, possibly with injection to stop |
606 | + // earlier or generate an error. |
607 | + var injection func([]string) |
608 | + if test.injector != nil { |
609 | + injection = test.injector(tailer, rs) |
610 | + } |
611 | + |
612 | + assertCollected(c, linec, test.appendedCollectedData, injection) |
613 | + |
614 | + if test.err == "" { |
615 | + c.Assert(tailer.Stop(), gc.IsNil) |
616 | + } else { |
617 | + c.Assert(tailer.Err(), gc.ErrorMatches, test.err) |
618 | + } |
619 | + } |
620 | +} |
621 | + |
622 | +// startReading starts a goroutine receiving the lines out of the reader |
623 | +// in the background and passing them to a created string channel. This |
624 | +// will used in the assertions. |
625 | +func startReading(c *gc.C, tailer *tailer.Tailer, reader *io.PipeReader, writer *io.PipeWriter) chan string { |
626 | + linec := make(chan string) |
627 | + // Start goroutine for reading. |
628 | + go func() { |
629 | + defer close(linec) |
630 | + reader := bufio.NewReader(reader) |
631 | + for { |
632 | + line, err := reader.ReadString('\n') |
633 | + switch err { |
634 | + case nil: |
635 | + linec <- line |
636 | + case io.EOF: |
637 | + return |
638 | + default: |
639 | + c.Fail() |
640 | + } |
641 | + } |
642 | + }() |
643 | + // Close writer when tailer is stopped or has an error. Tailer using |
644 | + // components can do it the same way. |
645 | + go func() { |
646 | + tailer.Wait() |
647 | + writer.Close() |
648 | + }() |
649 | + return linec |
650 | +} |
651 | + |
652 | +// assertCollected reads lines from the string channel linec. It compares if |
653 | +// those are the one passed with compare until a timeout. If the timeout is |
654 | +// reached earlier than all lines are collected the assertion fails. The |
655 | +// injection function allows to interrupt the processing with a function |
656 | +// generating an error or a regular stopping during the tailing. In case the |
657 | +// linec is closed due to stopping or an error only the values so far care |
658 | +// compared. Checking the reason for termination is done in the test. |
659 | +func assertCollected(c *gc.C, linec chan string, compare []string, injection func([]string)) { |
660 | + timeout := time.After(testing.LongWait) |
661 | + lines := []string{} |
662 | + for { |
663 | + select { |
664 | + case line, ok := <-linec: |
665 | + if ok { |
666 | + lines = append(lines, line) |
667 | + if injection != nil { |
668 | + injection(lines) |
669 | + } |
670 | + if len(lines) == len(compare) { |
671 | + // All data received. |
672 | + c.Assert(lines, gc.DeepEquals, compare) |
673 | + return |
674 | + } |
675 | + } else { |
676 | + // linec closed after stopping or error. |
677 | + c.Assert(lines, gc.DeepEquals, compare[:len(lines)]) |
678 | + return |
679 | + } |
680 | + case <-timeout: |
681 | + if injection == nil { |
682 | + c.Fatalf("timeout during tailer collection") |
683 | + } |
684 | + return |
685 | + } |
686 | + } |
687 | +} |
688 | + |
689 | +// startReadSeeker returns a ReadSeeker for the Tailer. It simulates |
690 | +// reading and seeking inside a file and also simulating an error. |
691 | +// The goroutine waits for a signal that it can start writing the |
692 | +// appended lines. |
693 | +func startReadSeeker(c *gc.C, data []string, initialLeg int, sigc chan struct{}) *readSeeker { |
694 | + // Write initial lines into the buffer. |
695 | + var rs readSeeker |
696 | + var i int |
697 | + for i = 0; i < initialLeg; i++ { |
698 | + rs.write(data[i]) |
699 | + } |
700 | + |
701 | + go func() { |
702 | + <-sigc |
703 | + |
704 | + for ; i < len(data); i++ { |
705 | + time.Sleep(5 * time.Millisecond) |
706 | + rs.write(data[i]) |
707 | + } |
708 | + }() |
709 | + return &rs |
710 | +} |
711 | + |
712 | +type readSeeker struct { |
713 | + mux sync.Mutex |
714 | + buffer []byte |
715 | + pos int |
716 | + err error |
717 | +} |
718 | + |
719 | +func (r *readSeeker) write(s string) { |
720 | + r.mux.Lock() |
721 | + defer r.mux.Unlock() |
722 | + r.buffer = append(r.buffer, []byte(s)...) |
723 | +} |
724 | + |
725 | +func (r *readSeeker) setError(err error) { |
726 | + r.mux.Lock() |
727 | + defer r.mux.Unlock() |
728 | + r.err = err |
729 | +} |
730 | + |
731 | +func (r *readSeeker) Read(p []byte) (n int, err error) { |
732 | + r.mux.Lock() |
733 | + defer r.mux.Unlock() |
734 | + if r.err != nil { |
735 | + return 0, r.err |
736 | + } |
737 | + if r.pos >= len(r.buffer) { |
738 | + return 0, io.EOF |
739 | + } |
740 | + n = copy(p, r.buffer[r.pos:]) |
741 | + r.pos += n |
742 | + return n, nil |
743 | +} |
744 | + |
745 | +func (r *readSeeker) Seek(offset int64, whence int) (ret int64, err error) { |
746 | + r.mux.Lock() |
747 | + defer r.mux.Unlock() |
748 | + var newPos int64 |
749 | + switch whence { |
750 | + case 0: |
751 | + newPos = offset |
752 | + case 1: |
753 | + newPos = int64(r.pos) + offset |
754 | + case 2: |
755 | + newPos = int64(len(r.buffer)) + offset |
756 | + default: |
757 | + return 0, fmt.Errorf("invalid whence: %d", whence) |
758 | + } |
759 | + if newPos < 0 { |
760 | + return 0, fmt.Errorf("negative position: %d", newPos) |
761 | + } |
762 | + if newPos >= 1<<31 { |
763 | + return 0, fmt.Errorf("position out of range: %d", newPos) |
764 | + } |
765 | + r.pos = int(newPos) |
766 | + return newPos, nil |
767 | +} |
Reviewers: mp+197522_ code.launchpad. net,
Message:
Please take a look.
Description:
utils: added Tailer for tailing of logs in API
The Tailer is the initial component of the debug logging command
of the API. It allows the filtered tailing of any ReaderSeeker.
If no filter is passed all lines will be written in the passed
Writer, otherwise only those where the filter function returns
true. The initial number of lines can also be specified, the
filter already works here. So if a File (which is a ReaderSeeker)
containes 100 lines, 10 lines are wanted and 5 match to the
filter only those 5 lines are returned.
https:/ /code.launchpad .net/~themue/ juju-core/ 057-tailer/ +merge/ 197522
(do not edit description out of merge proposal)
Please review this at https:/ /codereview. appspot. com/36540043/
Affected files (+412, -0 lines): test.go
A [revision details]
A utils/tailer.go
A utils/tailer_