Merge lp:~thumper/juju-core/fix-backlog-limits into lp:~go-bot/juju-core/trunk
- fix-backlog-limits
- Merge into trunk
Status: | Merged | ||||
---|---|---|---|---|---|
Approved by: | Tim Penhey | ||||
Approved revision: | no longer in the source branch. | ||||
Merged at revision: | 2623 | ||||
Proposed branch: | lp:~thumper/juju-core/fix-backlog-limits | ||||
Merge into: | lp:~go-bot/juju-core/trunk | ||||
Prerequisite: | lp:~thumper/juju-core/debug-log-api | ||||
Diff against target: |
424 lines (+100/-84) 6 files modified
state/apiserver/debuglog.go (+24/-6) state/apiserver/debuglog_internal_test.go (+30/-35) state/apiserver/debuglog_test.go (+12/-0) utils/tailer/export_test.go (+4/-1) utils/tailer/tailer.go (+18/-38) utils/tailer/tailer_test.go (+12/-4) |
||||
To merge this branch: | bzr merge lp:~thumper/juju-core/fix-backlog-limits | ||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Juju Engineering | Pending | ||
Review via email: mp+215333@code.launchpad.net |
Commit message
Fix the limit for the debug-log calls.
Unbeknownst to me, the filter method was being called
during the backlog iteration as well. This fix introduces
a callback that the tailer calls when it starts the forward
filtering.
This allows us to count the filter calls only when the
filter is being called to write out the results. We cannot
count the write calls because the tailer uses buffered i/o
there.
Description of the change
Fix the limit for the debug-log calls.
Unbeknownst to me, the filter method was being called
during the backlog iteration as well. This fix introduces
a callback that the tailer calls when it starts the forward
filtering.
This allows us to count the filter calls only when the
filter is being called to write out the results. We cannot
count the write calls because the tailer uses buffered i/o
there.
Tim Penhey (thumper) wrote : | # |
Andrew Wilkins (axwalk) wrote : | # |
https:/
File utils/tailer/
https:/
utils/tailer/
TailerFilterSta
Rather than adding *another* parameter that's only relevant to
backtracking, I really think it would be best to ditch
NewTailerBacktrack and create a separate function that does the
backtracking.
https:/
utils/tailer/
TailerFilterSta
Isn't having a callback fairly pointless here? It's going to be called
immediately.
Roger Peppe (rogpeppe) wrote : | # |
As far as I can see, this makes a nice Tailer API into
a not-so-nice one because you've made a stateful
filter function and you want it to be called in just
the way you expect.
I don't think the filter function should be stateful.
I can see a couple of solutions that might be better:
1) (my preferred option) ditch the server-side maxLines
functionality completely. It could be implemented client-side by
simply closing the connection when the right number
of lines is received, but we've always got head(1),
so I'd suggest just losing it entirely.
2) add maxLines functionality to the Tailer itself.
1) might use a little extra bandwidth,
but I can't see that that would be great problem
in practice. If it is, the second option could
be implemented later.
Tim Penhey (thumper) wrote : | # |
Due to the buffered i/o in the tailer, I'd prefer to keep the line
parsing functionality in the server side.
However I think that axw's approach of breaking out the backtracking is
a good one, and that is the approach I'll use.
Tim Penhey (thumper) wrote : | # |
Please take a look.
Andrew Wilkins (axwalk) wrote : | # |
LGTM with a few suggestions
https:/
File state/apiserver
https:/
state/apiserver
stream.backlog, stream.filterLine)
return tailer.
https:/
state/apiserver
stream.maxLines > 0 {
It would be better to just split filterLine into something non-counting
and counting, where the latter calls the former and adds the linecount
check. Then you don't need the boolean.
https:/
File utils/tailer/
https:/
utils/tailer/
is called when the filtering is
Delete this?
Roger Peppe (rogpeppe) wrote : | # |
> Due to the buffered i/o in the tailer, I'd prefer to keep the line
parsing
> functionality in the server side.
I don't quite understand this remark (the buffer is flushed whenever we
get to the end of the file, so it shouldn't make any significant
difference), but splitting out the backtracking code makes me much
happier.
LGTM with a couple of trivial suggestions below.
https:/
File state/apiserver
https:/
state/apiserver
if stream.fromTheStart {
return nil
}
...
saves a negative and a level of indentation.
https:/
state/apiserver
stream.maxLines > 0 {
On 2014/04/14 03:52:40, axw wrote:
> It would be better to just split filterLine into something
non-counting and
> counting, where the latter calls the former and adds the linecount
check. Then
> you don't need the boolean.
+1
https:/
File utils/tailer/
https:/
utils/tailer/
is called when the filtering is
On 2014/04/14 03:52:40, axw wrote:
> Delete this?
+1
https:/
utils/tailer/
the end.
// If filter is non-nil, only lines for which filter returns
// true will be counted.
?
Preview Diff
1 | === modified file 'state/apiserver/debuglog.go' | |||
2 | --- state/apiserver/debuglog.go 2014-04-10 09:43:51 +0000 | |||
3 | +++ state/apiserver/debuglog.go 2014-04-14 04:14:21 +0000 | |||
4 | @@ -70,6 +70,11 @@ | |||
5 | 70 | return | 70 | return |
6 | 71 | } | 71 | } |
7 | 72 | defer logFile.Close() | 72 | defer logFile.Close() |
8 | 73 | if err := stream.positionLogFile(logFile); err != nil { | ||
9 | 74 | h.sendError(socket, fmt.Errorf("cannot position log file: %v", err)) | ||
10 | 75 | socket.Close() | ||
11 | 76 | return | ||
12 | 77 | } | ||
13 | 73 | 78 | ||
14 | 74 | // If we get to here, no more errors to report, so we report a nil | 79 | // If we get to here, no more errors to report, so we report a nil |
15 | 75 | // error. This way the first line of the socket is always a json | 80 | // error. This way the first line of the socket is always a json |
16 | @@ -211,14 +216,20 @@ | |||
17 | 211 | fromTheStart bool | 216 | fromTheStart bool |
18 | 212 | } | 217 | } |
19 | 213 | 218 | ||
20 | 219 | // positionLogFile will update the internal read position of the logFile to be | ||
21 | 220 | // at the end of the file or somewhere in the middle if backlog has been specified. | ||
22 | 221 | func (stream *logStream) positionLogFile(logFile io.ReadSeeker) error { | ||
23 | 222 | // Seek to the end, or lines back from the end if we need to. | ||
24 | 223 | if !stream.fromTheStart { | ||
25 | 224 | return tailer.SeekLastLines(logFile, stream.backlog, stream.filterLine) | ||
26 | 225 | } | ||
27 | 226 | return nil | ||
28 | 227 | } | ||
29 | 228 | |||
30 | 214 | // start the tailer listening to the logFile, and sending the matching | 229 | // start the tailer listening to the logFile, and sending the matching |
31 | 215 | // lines to the writer. | 230 | // lines to the writer. |
32 | 216 | func (stream *logStream) start(logFile io.ReadSeeker, writer io.Writer) { | 231 | func (stream *logStream) start(logFile io.ReadSeeker, writer io.Writer) { |
38 | 217 | if stream.fromTheStart { | 232 | stream.logTailer = tailer.NewTailer(logFile, writer, stream.countedFilterLine) |
34 | 218 | stream.logTailer = tailer.NewTailer(logFile, writer, stream.filterLine) | ||
35 | 219 | } else { | ||
36 | 220 | stream.logTailer = tailer.NewTailerBacktrack(logFile, writer, stream.backlog, stream.filterLine) | ||
37 | 221 | } | ||
39 | 222 | } | 233 | } |
40 | 223 | 234 | ||
41 | 224 | // loop starts the tailer with the log file and the web socket. | 235 | // loop starts the tailer with the log file and the web socket. |
42 | @@ -235,10 +246,17 @@ | |||
43 | 235 | // filterLine checks the received line for one of the confgured tags. | 246 | // filterLine checks the received line for one of the confgured tags. |
44 | 236 | func (stream *logStream) filterLine(line []byte) bool { | 247 | func (stream *logStream) filterLine(line []byte) bool { |
45 | 237 | log := parseLogLine(string(line)) | 248 | log := parseLogLine(string(line)) |
47 | 238 | result := stream.checkIncludeEntity(log) && | 249 | return stream.checkIncludeEntity(log) && |
48 | 239 | stream.checkIncludeModule(log) && | 250 | stream.checkIncludeModule(log) && |
49 | 240 | !stream.exclude(log) && | 251 | !stream.exclude(log) && |
50 | 241 | stream.checkLevel(log) | 252 | stream.checkLevel(log) |
51 | 253 | } | ||
52 | 254 | |||
53 | 255 | // countedFilterLine checks the received line for one of the confgured tags, | ||
54 | 256 | // and also checks to make sure the stream doesn't send more than the | ||
55 | 257 | // specified number of lines. | ||
56 | 258 | func (stream *logStream) countedFilterLine(line []byte) bool { | ||
57 | 259 | result := stream.filterLine(line) | ||
58 | 242 | if result && stream.maxLines > 0 { | 260 | if result && stream.maxLines > 0 { |
59 | 243 | stream.lineCount++ | 261 | stream.lineCount++ |
60 | 244 | result = stream.lineCount <= stream.maxLines | 262 | result = stream.lineCount <= stream.maxLines |
61 | 245 | 263 | ||
62 | === modified file 'state/apiserver/debuglog_internal_test.go' | |||
63 | --- state/apiserver/debuglog_internal_test.go 2014-04-07 05:11:48 +0000 | |||
64 | +++ state/apiserver/debuglog_internal_test.go 2014-04-14 04:14:21 +0000 | |||
65 | @@ -7,7 +7,6 @@ | |||
66 | 7 | 7 | ||
67 | 8 | import ( | 8 | import ( |
68 | 9 | "bytes" | 9 | "bytes" |
69 | 10 | "io" | ||
70 | 11 | "net/url" | 10 | "net/url" |
71 | 12 | "os" | 11 | "os" |
72 | 13 | "path/filepath" | 12 | "path/filepath" |
73 | @@ -179,36 +178,22 @@ | |||
74 | 179 | "machine-0: date time WARNING juju.foo.bar")), jc.IsFalse) | 178 | "machine-0: date time WARNING juju.foo.bar")), jc.IsFalse) |
75 | 180 | } | 179 | } |
76 | 181 | 180 | ||
78 | 182 | func (s *debugInternalSuite) TestFilterLineWithLimit(c *gc.C) { | 181 | func (s *debugInternalSuite) TestCountedFilterLineWithLimit(c *gc.C) { |
79 | 183 | stream := &logStream{ | 182 | stream := &logStream{ |
80 | 184 | filterLevel: loggo.INFO, | 183 | filterLevel: loggo.INFO, |
81 | 185 | maxLines: 5, | 184 | maxLines: 5, |
82 | 186 | } | 185 | } |
83 | 187 | line := []byte("machine-0: date time WARNING juju") | 186 | line := []byte("machine-0: date time WARNING juju") |
108 | 188 | c.Check(stream.filterLine(line), jc.IsTrue) | 187 | c.Check(stream.countedFilterLine(line), jc.IsTrue) |
109 | 189 | c.Check(stream.filterLine(line), jc.IsTrue) | 188 | c.Check(stream.countedFilterLine(line), jc.IsTrue) |
110 | 190 | c.Check(stream.filterLine(line), jc.IsTrue) | 189 | c.Check(stream.countedFilterLine(line), jc.IsTrue) |
111 | 191 | c.Check(stream.filterLine(line), jc.IsTrue) | 190 | c.Check(stream.countedFilterLine(line), jc.IsTrue) |
112 | 192 | c.Check(stream.filterLine(line), jc.IsTrue) | 191 | c.Check(stream.countedFilterLine(line), jc.IsTrue) |
113 | 193 | c.Check(stream.filterLine(line), jc.IsFalse) | 192 | c.Check(stream.countedFilterLine(line), jc.IsFalse) |
114 | 194 | c.Check(stream.filterLine(line), jc.IsFalse) | 193 | c.Check(stream.countedFilterLine(line), jc.IsFalse) |
115 | 195 | } | 194 | } |
116 | 196 | 195 | ||
117 | 197 | type seekWaitReader struct { | 196 | func (s *debugInternalSuite) testStreamInternal(c *gc.C, fromTheStart bool, backlog, maxLines uint, expected, errMatch string) { |
94 | 198 | io.ReadSeeker | ||
95 | 199 | wait chan struct{} | ||
96 | 200 | } | ||
97 | 201 | |||
98 | 202 | func (w *seekWaitReader) Seek(offset int64, whence int) (int64, error) { | ||
99 | 203 | pos, err := w.ReadSeeker.Seek(offset, whence) | ||
100 | 204 | if w.wait != nil { | ||
101 | 205 | close(w.wait) | ||
102 | 206 | w.wait = nil | ||
103 | 207 | } | ||
104 | 208 | return pos, err | ||
105 | 209 | } | ||
106 | 210 | |||
107 | 211 | func (s *debugInternalSuite) testStreamInternal(c *gc.C, fromTheStart bool, maxLines uint, expected, errMatch string) { | ||
118 | 212 | 197 | ||
119 | 213 | dir := c.MkDir() | 198 | dir := c.MkDir() |
120 | 214 | logPath := filepath.Join(dir, "logfile.txt") | 199 | logPath := filepath.Join(dir, "logfile.txt") |
121 | @@ -223,17 +208,20 @@ | |||
122 | 223 | line 2 | 208 | line 2 |
123 | 224 | line 3 | 209 | line 3 |
124 | 225 | `) | 210 | `) |
126 | 226 | stream := &logStream{fromTheStart: fromTheStart, maxLines: maxLines} | 211 | stream := &logStream{ |
127 | 212 | fromTheStart: fromTheStart, | ||
128 | 213 | backlog: backlog, | ||
129 | 214 | maxLines: maxLines, | ||
130 | 215 | } | ||
131 | 216 | err = stream.positionLogFile(logFileReader) | ||
132 | 217 | c.Assert(err, gc.IsNil) | ||
133 | 227 | output := &bytes.Buffer{} | 218 | output := &bytes.Buffer{} |
136 | 228 | waitReader := &seekWaitReader{logFileReader, make(chan struct{})} | 219 | stream.start(logFileReader, output) |
135 | 229 | stream.start(waitReader, output) | ||
137 | 230 | 220 | ||
138 | 231 | go func() { | 221 | go func() { |
139 | 232 | defer stream.tomb.Done() | 222 | defer stream.tomb.Done() |
140 | 233 | stream.tomb.Kill(stream.loop()) | 223 | stream.tomb.Kill(stream.loop()) |
141 | 234 | }() | 224 | }() |
142 | 235 | // wait for the tailer to have started tailing before writing more | ||
143 | 236 | <-waitReader.wait | ||
144 | 237 | 225 | ||
145 | 238 | logFile.WriteString("line 4\n") | 226 | logFile.WriteString("line 4\n") |
146 | 239 | logFile.WriteString("line 5\n") | 227 | logFile.WriteString("line 5\n") |
147 | @@ -265,7 +253,7 @@ | |||
148 | 265 | line 4 | 253 | line 4 |
149 | 266 | line 5 | 254 | line 5 |
150 | 267 | ` | 255 | ` |
152 | 268 | s.testStreamInternal(c, true, 0, expected, "") | 256 | s.testStreamInternal(c, true, 0, 0, expected, "") |
153 | 269 | } | 257 | } |
154 | 270 | 258 | ||
155 | 271 | func (s *debugInternalSuite) TestLogStreamLoopFromTheStartMaxLines(c *gc.C) { | 259 | func (s *debugInternalSuite) TestLogStreamLoopFromTheStartMaxLines(c *gc.C) { |
156 | @@ -273,21 +261,28 @@ | |||
157 | 273 | line 2 | 261 | line 2 |
158 | 274 | line 3 | 262 | line 3 |
159 | 275 | ` | 263 | ` |
161 | 276 | s.testStreamInternal(c, true, 3, expected, "max lines reached") | 264 | s.testStreamInternal(c, true, 0, 3, expected, "max lines reached") |
162 | 277 | } | 265 | } |
163 | 278 | 266 | ||
164 | 279 | func (s *debugInternalSuite) TestLogStreamLoopJustTail(c *gc.C) { | 267 | func (s *debugInternalSuite) TestLogStreamLoopJustTail(c *gc.C) { |
165 | 280 | expected := `line 4 | 268 | expected := `line 4 |
166 | 281 | line 5 | 269 | line 5 |
167 | 282 | ` | 270 | ` |
169 | 283 | s.testStreamInternal(c, false, 0, expected, "") | 271 | s.testStreamInternal(c, false, 0, 0, expected, "") |
170 | 272 | } | ||
171 | 273 | |||
172 | 274 | func (s *debugInternalSuite) TestLogStreamLoopBackOneLimitTwo(c *gc.C) { | ||
173 | 275 | expected := `line 3 | ||
174 | 276 | line 4 | ||
175 | 277 | ` | ||
176 | 278 | s.testStreamInternal(c, false, 1, 2, expected, "max lines reached") | ||
177 | 284 | } | 279 | } |
178 | 285 | 280 | ||
179 | 286 | func (s *debugInternalSuite) TestLogStreamLoopTailMaxLinesNotYetReached(c *gc.C) { | 281 | func (s *debugInternalSuite) TestLogStreamLoopTailMaxLinesNotYetReached(c *gc.C) { |
180 | 287 | expected := `line 4 | 282 | expected := `line 4 |
181 | 288 | line 5 | 283 | line 5 |
182 | 289 | ` | 284 | ` |
184 | 290 | s.testStreamInternal(c, false, 3, expected, "") | 285 | s.testStreamInternal(c, false, 0, 3, expected, "") |
185 | 291 | } | 286 | } |
186 | 292 | 287 | ||
187 | 293 | func assertStreamParams(c *gc.C, obtained, expected *logStream) { | 288 | func assertStreamParams(c *gc.C, obtained, expected *logStream) { |
188 | 294 | 289 | ||
189 | === modified file 'state/apiserver/debuglog_test.go' | |||
190 | --- state/apiserver/debuglog_test.go 2014-04-10 09:43:51 +0000 | |||
191 | +++ state/apiserver/debuglog_test.go 2014-04-14 04:14:21 +0000 | |||
192 | @@ -122,6 +122,18 @@ | |||
193 | 122 | s.assertWebsocketClosed(c, reader) | 122 | s.assertWebsocketClosed(c, reader) |
194 | 123 | } | 123 | } |
195 | 124 | 124 | ||
196 | 125 | func (s *debugLogSuite) TestBacklogWithMaxLines(c *gc.C) { | ||
197 | 126 | s.writeLogLines(c, 10) | ||
198 | 127 | |||
199 | 128 | reader := s.openWebsocket(c, url.Values{"backlog": {"5"}, "maxLines": {"10"}}) | ||
200 | 129 | s.assertLogFollowing(c, reader) | ||
201 | 130 | s.writeLogLines(c, logLineCount) | ||
202 | 131 | |||
203 | 132 | linesRead := s.readLogLines(c, reader, 10) | ||
204 | 133 | c.Assert(linesRead, jc.DeepEquals, logLines[5:15]) | ||
205 | 134 | s.assertWebsocketClosed(c, reader) | ||
206 | 135 | } | ||
207 | 136 | |||
208 | 125 | func (s *debugLogSuite) TestFilter(c *gc.C) { | 137 | func (s *debugLogSuite) TestFilter(c *gc.C) { |
209 | 126 | s.ensureLogFile(c) | 138 | s.ensureLogFile(c) |
210 | 127 | 139 | ||
211 | 128 | 140 | ||
212 | === modified file 'utils/tailer/export_test.go' | |||
213 | --- utils/tailer/export_test.go 2013-12-11 17:11:30 +0000 | |||
214 | +++ utils/tailer/export_test.go 2014-04-14 04:14:21 +0000 | |||
215 | @@ -3,4 +3,7 @@ | |||
216 | 3 | 3 | ||
217 | 4 | package tailer | 4 | package tailer |
218 | 5 | 5 | ||
220 | 6 | var NewTestTailer = newTailer | 6 | var ( |
221 | 7 | BufferSize = &bufferSize | ||
222 | 8 | NewTestTailer = newTailer | ||
223 | 9 | ) | ||
224 | 7 | 10 | ||
225 | === modified file 'utils/tailer/tailer.go' | |||
226 | --- utils/tailer/tailer.go 2014-04-04 04:35:44 +0000 | |||
227 | +++ utils/tailer/tailer.go 2014-04-14 04:14:21 +0000 | |||
228 | @@ -14,12 +14,13 @@ | |||
229 | 14 | ) | 14 | ) |
230 | 15 | 15 | ||
231 | 16 | const ( | 16 | const ( |
235 | 17 | bufferSize = 4096 | 17 | defaultBufferSize = 4096 |
236 | 18 | polltime = time.Second | 18 | polltime = time.Second |
237 | 19 | delimiter = '\n' | 19 | delimiter = '\n' |
238 | 20 | ) | 20 | ) |
239 | 21 | 21 | ||
240 | 22 | var ( | 22 | var ( |
241 | 23 | bufferSize = defaultBufferSize | ||
242 | 23 | delimiters = []byte{delimiter} | 24 | delimiters = []byte{delimiter} |
243 | 24 | ) | 25 | ) |
244 | 25 | 26 | ||
245 | @@ -35,20 +36,8 @@ | |||
246 | 35 | reader *bufio.Reader | 36 | reader *bufio.Reader |
247 | 36 | writeCloser io.WriteCloser | 37 | writeCloser io.WriteCloser |
248 | 37 | writer *bufio.Writer | 38 | writer *bufio.Writer |
249 | 38 | lines uint | ||
250 | 39 | filter TailerFilterFunc | 39 | filter TailerFilterFunc |
251 | 40 | bufferSize int | ||
252 | 41 | polltime time.Duration | 40 | polltime time.Duration |
253 | 42 | lookBack bool | ||
254 | 43 | } | ||
255 | 44 | |||
256 | 45 | // NewTailerBacktrack starts a Tailer which reads strings from the passed | ||
257 | 46 | // ReadSeeker line by line. If a filter function is specified the read | ||
258 | 47 | // lines are filtered. The matching lines are written to the passed | ||
259 | 48 | // Writer. The reading begins the specified number of matching lines | ||
260 | 49 | // from the end. | ||
261 | 50 | func NewTailerBacktrack(readSeeker io.ReadSeeker, writer io.Writer, lines uint, filter TailerFilterFunc) *Tailer { | ||
262 | 51 | return newTailer(readSeeker, writer, lines, filter, bufferSize, polltime, true) | ||
263 | 52 | } | 41 | } |
264 | 53 | 42 | ||
265 | 54 | // NewTailer starts a Tailer which reads strings from the passed | 43 | // NewTailer starts a Tailer which reads strings from the passed |
266 | @@ -56,22 +45,19 @@ | |||
267 | 56 | // lines are filtered. The matching lines are written to the passed | 45 | // lines are filtered. The matching lines are written to the passed |
268 | 57 | // Writer. | 46 | // Writer. |
269 | 58 | func NewTailer(readSeeker io.ReadSeeker, writer io.Writer, filter TailerFilterFunc) *Tailer { | 47 | func NewTailer(readSeeker io.ReadSeeker, writer io.Writer, filter TailerFilterFunc) *Tailer { |
271 | 59 | return newTailer(readSeeker, writer, 0, filter, bufferSize, polltime, false) | 48 | return newTailer(readSeeker, writer, filter, polltime) |
272 | 60 | } | 49 | } |
273 | 61 | 50 | ||
274 | 62 | // newTailer starts a Tailer like NewTailer but allows the setting of | 51 | // newTailer starts a Tailer like NewTailer but allows the setting of |
275 | 63 | // the read buffer size and the time between pollings for testing. | 52 | // the read buffer size and the time between pollings for testing. |
278 | 64 | func newTailer(readSeeker io.ReadSeeker, writer io.Writer, lines uint, filter TailerFilterFunc, | 53 | func newTailer(readSeeker io.ReadSeeker, writer io.Writer, |
279 | 65 | bufferSize int, polltime time.Duration, lookBack bool) *Tailer { | 54 | filter TailerFilterFunc, polltime time.Duration) *Tailer { |
280 | 66 | t := &Tailer{ | 55 | t := &Tailer{ |
281 | 67 | readSeeker: readSeeker, | 56 | readSeeker: readSeeker, |
282 | 68 | reader: bufio.NewReaderSize(readSeeker, bufferSize), | 57 | reader: bufio.NewReaderSize(readSeeker, bufferSize), |
283 | 69 | writer: bufio.NewWriter(writer), | 58 | writer: bufio.NewWriter(writer), |
284 | 70 | lines: lines, | ||
285 | 71 | filter: filter, | 59 | filter: filter, |
286 | 72 | bufferSize: bufferSize, | ||
287 | 73 | polltime: polltime, | 60 | polltime: polltime, |
288 | 74 | lookBack: lookBack, | ||
289 | 75 | } | 61 | } |
290 | 76 | go func() { | 62 | go func() { |
291 | 77 | defer t.tomb.Done() | 63 | defer t.tomb.Done() |
292 | @@ -107,12 +93,6 @@ | |||
293 | 107 | // writer and then polls for more data to write it to the | 93 | // writer and then polls for more data to write it to the |
294 | 108 | // writer too. | 94 | // writer too. |
295 | 109 | func (t *Tailer) loop() error { | 95 | func (t *Tailer) loop() error { |
296 | 110 | // Position the readSeeker. | ||
297 | 111 | if t.lookBack { | ||
298 | 112 | if err := t.seekLastLines(); err != nil { | ||
299 | 113 | return err | ||
300 | 114 | } | ||
301 | 115 | } | ||
302 | 116 | // Start polling. | 96 | // Start polling. |
303 | 117 | // TODO(mue) 2013-12-06 | 97 | // TODO(mue) 2013-12-06 |
304 | 118 | // Handling of read-seeker/files being truncated during | 98 | // Handling of read-seeker/files being truncated during |
305 | @@ -144,26 +124,26 @@ | |||
306 | 144 | } | 124 | } |
307 | 145 | } | 125 | } |
308 | 146 | 126 | ||
310 | 147 | // seekLastLines sets the read position of the ReadSeeker to the | 127 | // SeekLastLines sets the read position of the ReadSeeker to the |
311 | 148 | // wanted number of filtered lines before the end. | 128 | // wanted number of filtered lines before the end. |
314 | 149 | func (t *Tailer) seekLastLines() error { | 129 | func SeekLastLines(readSeeker io.ReadSeeker, lines uint, filter TailerFilterFunc) error { |
315 | 150 | offset, err := t.readSeeker.Seek(0, os.SEEK_END) | 130 | offset, err := readSeeker.Seek(0, os.SEEK_END) |
316 | 151 | if err != nil { | 131 | if err != nil { |
317 | 152 | return err | 132 | return err |
318 | 153 | } | 133 | } |
320 | 154 | if t.lines == 0 { | 134 | if lines == 0 { |
321 | 155 | // We are done, just seeking to the end is sufficient. | 135 | // We are done, just seeking to the end is sufficient. |
322 | 156 | return nil | 136 | return nil |
323 | 157 | } | 137 | } |
324 | 158 | seekPos := int64(0) | 138 | seekPos := int64(0) |
325 | 159 | found := uint(0) | 139 | found := uint(0) |
327 | 160 | buffer := make([]byte, t.bufferSize) | 140 | buffer := make([]byte, bufferSize) |
328 | 161 | SeekLoop: | 141 | SeekLoop: |
329 | 162 | for offset > 0 { | 142 | for offset > 0 { |
330 | 163 | // buffer contains the data left over from the | 143 | // buffer contains the data left over from the |
331 | 164 | // previous iteration. | 144 | // previous iteration. |
332 | 165 | space := cap(buffer) - len(buffer) | 145 | space := cap(buffer) - len(buffer) |
334 | 166 | if space < t.bufferSize { | 146 | if space < bufferSize { |
335 | 167 | // Grow buffer. | 147 | // Grow buffer. |
336 | 168 | newBuffer := make([]byte, len(buffer), cap(buffer)*2) | 148 | newBuffer := make([]byte, len(buffer), cap(buffer)*2) |
337 | 169 | copy(newBuffer, buffer) | 149 | copy(newBuffer, buffer) |
338 | @@ -180,11 +160,11 @@ | |||
339 | 180 | copy(buffer[space:cap(buffer)], buffer) | 160 | copy(buffer[space:cap(buffer)], buffer) |
340 | 181 | buffer = buffer[0 : len(buffer)+space] | 161 | buffer = buffer[0 : len(buffer)+space] |
341 | 182 | offset -= int64(space) | 162 | offset -= int64(space) |
343 | 183 | _, err := t.readSeeker.Seek(offset, os.SEEK_SET) | 163 | _, err := readSeeker.Seek(offset, os.SEEK_SET) |
344 | 184 | if err != nil { | 164 | if err != nil { |
345 | 185 | return err | 165 | return err |
346 | 186 | } | 166 | } |
348 | 187 | _, err = io.ReadFull(t.readSeeker, buffer[0:space]) | 167 | _, err = io.ReadFull(readSeeker, buffer[0:space]) |
349 | 188 | if err != nil { | 168 | if err != nil { |
350 | 189 | return err | 169 | return err |
351 | 190 | } | 170 | } |
352 | @@ -207,9 +187,9 @@ | |||
353 | 207 | break | 187 | break |
354 | 208 | } | 188 | } |
355 | 209 | start++ | 189 | start++ |
357 | 210 | if t.isValid(buffer[start:end]) { | 190 | if filter == nil || filter(buffer[start:end]) { |
358 | 211 | found++ | 191 | found++ |
360 | 212 | if found >= t.lines { | 192 | if found >= lines { |
361 | 213 | seekPos = offset + int64(start) | 193 | seekPos = offset + int64(start) |
362 | 214 | break SeekLoop | 194 | break SeekLoop |
363 | 215 | } | 195 | } |
364 | @@ -221,7 +201,7 @@ | |||
365 | 221 | buffer = buffer[0:end] | 201 | buffer = buffer[0:end] |
366 | 222 | } | 202 | } |
367 | 223 | // Final positioning. | 203 | // Final positioning. |
369 | 224 | t.readSeeker.Seek(seekPos, os.SEEK_SET) | 204 | readSeeker.Seek(seekPos, os.SEEK_SET) |
370 | 225 | return nil | 205 | return nil |
371 | 226 | } | 206 | } |
372 | 227 | 207 | ||
373 | 228 | 208 | ||
374 | === modified file 'utils/tailer/tailer_test.go' | |||
375 | --- utils/tailer/tailer_test.go 2014-04-04 03:46:13 +0000 | |||
376 | +++ utils/tailer/tailer_test.go 2014-04-14 04:14:21 +0000 | |||
377 | @@ -15,6 +15,7 @@ | |||
378 | 15 | gc "launchpad.net/gocheck" | 15 | gc "launchpad.net/gocheck" |
379 | 16 | 16 | ||
380 | 17 | "launchpad.net/juju-core/testing" | 17 | "launchpad.net/juju-core/testing" |
381 | 18 | "launchpad.net/juju-core/testing/testbase" | ||
382 | 18 | "launchpad.net/juju-core/utils/tailer" | 19 | "launchpad.net/juju-core/utils/tailer" |
383 | 19 | ) | 20 | ) |
384 | 20 | 21 | ||
385 | @@ -22,9 +23,11 @@ | |||
386 | 22 | gc.TestingT(t) | 23 | gc.TestingT(t) |
387 | 23 | } | 24 | } |
388 | 24 | 25 | ||
390 | 25 | type tailerSuite struct{} | 26 | type tailerSuite struct { |
391 | 27 | testbase.LoggingSuite | ||
392 | 28 | } | ||
393 | 26 | 29 | ||
395 | 27 | var _ = gc.Suite(tailerSuite{}) | 30 | var _ = gc.Suite(&tailerSuite{}) |
396 | 28 | 31 | ||
397 | 29 | var alphabetData = []string{ | 32 | var alphabetData = []string{ |
398 | 30 | "alpha alpha\n", | 33 | "alpha alpha\n", |
399 | @@ -326,7 +329,7 @@ | |||
400 | 326 | }, | 329 | }, |
401 | 327 | }} | 330 | }} |
402 | 328 | 331 | ||
404 | 329 | func (tailerSuite) TestTailer(c *gc.C) { | 332 | func (s *tailerSuite) TestTailer(c *gc.C) { |
405 | 330 | for i, test := range tests { | 333 | for i, test := range tests { |
406 | 331 | c.Logf("Test #%d) %s", i, test.description) | 334 | c.Logf("Test #%d) %s", i, test.description) |
407 | 332 | bufferSize := test.bufferSize | 335 | bufferSize := test.bufferSize |
408 | @@ -334,10 +337,15 @@ | |||
409 | 334 | // Default value. | 337 | // Default value. |
410 | 335 | bufferSize = 4096 | 338 | bufferSize = 4096 |
411 | 336 | } | 339 | } |
412 | 340 | s.PatchValue(tailer.BufferSize, bufferSize) | ||
413 | 337 | reader, writer := io.Pipe() | 341 | reader, writer := io.Pipe() |
414 | 338 | sigc := make(chan struct{}, 1) | 342 | sigc := make(chan struct{}, 1) |
415 | 339 | rs := startReadSeeker(c, test.data, test.initialLinesWritten, sigc) | 343 | rs := startReadSeeker(c, test.data, test.initialLinesWritten, sigc) |
417 | 340 | tailer := tailer.NewTestTailer(rs, writer, test.initialLinesRequested, test.filter, bufferSize, 2*time.Millisecond, !test.fromStart) | 344 | if !test.fromStart { |
418 | 345 | err := tailer.SeekLastLines(rs, test.initialLinesRequested, test.filter) | ||
419 | 346 | c.Assert(err, gc.IsNil) | ||
420 | 347 | } | ||
421 | 348 | tailer := tailer.NewTestTailer(rs, writer, test.filter, 2*time.Millisecond) | ||
422 | 341 | linec := startReading(c, tailer, reader, writer) | 349 | linec := startReading(c, tailer, reader, writer) |
423 | 342 | 350 | ||
424 | 343 | // Collect initial data. | 351 | // Collect initial data. |
Reviewers: mp+215333_ code.launchpad. net,
Message:
Please take a look.
Description:
Fix the limit for the debug-log calls.
Unbeknownst to me, the filter method was being called
during the backlog iteration as well. This fix introduces
a callback that the tailer calls when it starts the forward
filtering.
This allows us to count the filter calls only when the
filter is being called to write out the results. We cannot
count the write calls because the tailer uses buffered i/o
there.
https:/ /code.launchpad .net/~thumper/ juju-core/ fix-backlog- limits/ +merge/ 215333
Requires: /code.launchpad .net/~thumper/ juju-core/ debug-log- api/+merge/ 215323
https:/
(do not edit description out of merge proposal)
Please review this at https:/ /codereview. appspot. com/85570045/
Affected files (+42, -10 lines): lxc/clonetempla te.go /debuglog. go /debuglog_ internal_ test.go /debuglog_ test.go tailer. go tailer_ test.go
A [revision details]
M container/
M state/apiserver
M state/apiserver
M state/apiserver
M utils/tailer/
M utils/tailer/
Index: [revision details]
=== added file '[revision details]'
--- [revision details] 2012-01-01 00:00:00 +0000
+++ [revision details] 2012-01-01 00:00:00 +0000
@@ -0,0 +1,2 @@
+Old revision: <email address hidden>
+New revision: <email address hidden>
Index: container/ lxc/clonetempla te.go lxc/clonetempla te.go' lxc/clonetempla te.go 2014-04-04 03:46:13 +0000 lxc/clonetempla te.go 2014-04-11 01:57:45 +0000
=== modified file 'container/
--- container/
+++ container/
@@ -179,7 +179,7 @@
}
tailWriter := &logTail{tick: time.Now()} NewTailer( console, tailWriter, nil) NewTailer( console, tailWriter, nil, nil) Stop()
- consoleTailer := tailer.
+ consoleTailer := tailer.
defer consoleTailer.
// We should wait maybe 1 minute between output?
Index: state/apiserver /debuglog. go apiserver/ debuglog. go' /debuglog. go 2014-04-10 09:43:51 +0000 /debuglog. go 2014-04-11 01:54:49 +0000
=== modified file 'state/
--- state/apiserver
+++ state/apiserver
@@ -209,18 +209,23 @@
maxLines uint
lineCount uint
fromTheStart bool
+ started bool
}
// start the tailer listening to the logFile, and sending the matching NewTailer( logFile, writer, stream.filterLine) NewTailer( logFile, writer, stream.filterLine, NewTailerBacktr ack(logFile, writer, NewTailerBacktr ack(logFile, writer,
// lines to the writer.
func (stream *logStream) start(logFile io.ReadSeeker, writer io.Writer) {
if stream.fromTheStart {
- stream.logTailer = tailer.
+ stream.logTailer = tailer.
stream.tailStarted)
} else {
- stream.logTailer = tailer.
stream.backlog, stream.filterLine)
+ stream.logTailer = tailer.
stream.backlog, stream.filterLine, stream.tailStarted)
}
}
+func (stream *logStream) tailStarted() { checkIncludeMod ule(log) && exclude( log) && checkLevel( log)
+ stream.started = true
+}
+
// loop starts the tailer with the log file and the web socket.
func (stream *logStream) loop() error {
select {
@@ -239,7 +244,7 @@
stream.
!stream.
stream.
- if result && stream.maxLines > 0 {
+ if stream.started && result && stream.maxLines > 0 {
stream...