Merge lp:~elambert/gearman-java/Bug400466 into lp:gearman-java
- Bug400466
- Merge into trunk
Proposed by
Eric Lambert
Status: | Merged |
---|---|
Merged at revision: | not available |
Proposed branch: | lp:~elambert/gearman-java/Bug400466 |
Merge into: | lp:gearman-java |
Diff against target: | None lines |
To merge this branch: | bzr merge lp:~elambert/gearman-java/Bug400466 |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Gearman-developers | Pending | ||
Review via email: mp+9721@code.launchpad.net |
Commit message
Description of the change
To post a comment you must log in.
Revision history for this message
Eric Lambert (elambert) wrote : | # |
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'src/org/gearman/client/GearmanClient.java' |
2 | --- src/org/gearman/client/GearmanClient.java 2009-07-15 18:47:07 +0000 |
3 | +++ src/org/gearman/client/GearmanClient.java 2009-08-05 18:34:26 +0000 |
4 | @@ -6,13 +6,11 @@ |
5 | package org.gearman.client; |
6 | |
7 | import java.io.IOException; |
8 | -import java.util.Collection; |
9 | import java.util.List; |
10 | import java.util.concurrent.ExecutorService; |
11 | import java.util.concurrent.Future; |
12 | |
13 | import org.gearman.common.GearmanJobServerConnection; |
14 | -import org.gearman.common.GearmanPacket; |
15 | |
16 | /** |
17 | * This interface through which users of the Gearman Java Libaray will use to |
18 | @@ -106,22 +104,6 @@ |
19 | IllegalStateException; |
20 | |
21 | /** |
22 | - * As {@link GearmanJob} are submitted and executed, they will receive |
23 | - * notifications from the Gearman Job Server as certain packets are received |
24 | - * (such as JOB_COMPLETE or WORK_DATA). Users may want to process these |
25 | - * packets,for example if jobs are sending back intermediate data, a user |
26 | - * may wish to receive that data and act upon it. This method returns to |
27 | - * the user all events which have occured for any job submitted by the |
28 | - * client. |
29 | - * |
30 | - * @return A collection of {@link GearmanPacket} that have been received. |
31 | - * |
32 | - * @throws IllegalStateException If the client has already been stopped. |
33 | - */ |
34 | - Collection<GearmanPacket> selectUpdatedJobEvents() |
35 | - throws IllegalStateException; |
36 | - |
37 | - /** |
38 | * Sends a WORK_STATUS request to the Gearman Job Server running the |
39 | * {@link GearmanJob} and then returns {@link GearmanJobStatus} representing |
40 | * the current status of the job. |
41 | |
42 | === modified file 'src/org/gearman/client/GearmanClientImpl.java' |
43 | --- src/org/gearman/client/GearmanClientImpl.java 2009-08-05 00:13:52 +0000 |
44 | +++ src/org/gearman/client/GearmanClientImpl.java 2009-08-05 18:34:26 +0000 |
45 | @@ -12,7 +12,6 @@ |
46 | import java.util.ArrayList; |
47 | import java.util.Collection; |
48 | import java.util.HashMap; |
49 | -import java.util.HashSet; |
50 | import java.util.Iterator; |
51 | import java.util.List; |
52 | import java.util.Set; |
53 | @@ -70,7 +69,6 @@ |
54 | private final String DESCRIPTION; |
55 | private HashMap<SelectionKey, GearmanJobServerSession> sessionsMap = null; |
56 | private Selector ioAvailable = null; |
57 | - private ArrayList<GearmanPacket> updatedJobs = null; |
58 | private static final Logger LOG = Logger.getLogger( |
59 | Constants.GEARMAN_CLIENT_LOGGER_NAME); |
60 | private state runState = state.RUNNING; |
61 | @@ -125,7 +123,6 @@ |
62 | */ |
63 | public GearmanClientImpl() { |
64 | sessionsMap = new HashMap<SelectionKey, GearmanJobServerSession>(); |
65 | - updatedJobs = new ArrayList<GearmanPacket>(); |
66 | jobsMaps = new HashMap<JobHandle, GearmanJobImpl>(); |
67 | submitJobMap = new HashMap<GearmanJobServerSession, GearmanJobImpl>(); |
68 | DESCRIPTION = new String(DESCRIPION_PREFIX + ":" + |
69 | @@ -372,27 +369,6 @@ |
70 | return handler.getResults(); |
71 | } |
72 | |
73 | - @SuppressWarnings(value = "unchecked") |
74 | - public Collection<GearmanPacket> selectUpdatedJobEvents() throws |
75 | - GearmanException, IllegalStateException { |
76 | - if (!runState.equals(state.RUNNING)) { |
77 | - throw new IllegalStateException("Client is not active"); |
78 | - } |
79 | - Collection<GearmanPacket> retSet = new HashSet<GearmanPacket>(); |
80 | - try { |
81 | - driveClientIO(); |
82 | - } catch (IOException ioe) { |
83 | - LOG.log(Level.WARNING, "Encountered IOException while driving " + |
84 | - "client IO" + ioe); |
85 | - //TODO improve ioexception handling |
86 | - } |
87 | - if (!updatedJobs.isEmpty()) { |
88 | - retSet = (Collection<GearmanPacket>) updatedJobs.clone(); |
89 | - updatedJobs.clear(); |
90 | - } |
91 | - return retSet; |
92 | - } |
93 | - |
94 | public int getNumberofActiveJobs() throws IllegalStateException { |
95 | if (runState.equals(state.TERMINATED)) { |
96 | throw new IllegalStateException("Client is not active"); |
97 | @@ -410,7 +386,6 @@ |
98 | GearmanJobImpl sjob = submitJobMap.get(s); |
99 | if (!sjob.isBackgroundJob()) { |
100 | jobsMaps.put(new JobHandle(sjob.getHandle()), sjob); |
101 | - updatedJobs.add(p); |
102 | } |
103 | break; |
104 | case WORK_DATA: |
105 | @@ -419,7 +394,6 @@ |
106 | case WORK_COMPLETE: |
107 | case WORK_FAIL: |
108 | case WORK_EXCEPTION: |
109 | - updatedJobs.add(p); |
110 | JobHandle handle = new JobHandle(p.getDataComponentValue( |
111 | GearmanPacket.DataComponentName.JOB_HANDLE)); |
112 | GearmanJobImpl job = jobsMaps.get(handle); |
113 | @@ -492,8 +466,6 @@ |
114 | } |
115 | sessionsMap.clear(); |
116 | sessionsMap = null; |
117 | - updatedJobs.clear(); |
118 | - updatedJobs = null; |
119 | runState = state.TERMINATED; |
120 | LOG.log(Level.FINE, "Completed shutdown of client: " + this); |
121 | return new ArrayList<Runnable>(); |
122 | |
123 | === added file 'test/org/gearman/client/Bug400466Test.java' |
124 | --- test/org/gearman/client/Bug400466Test.java 1970-01-01 00:00:00 +0000 |
125 | +++ test/org/gearman/client/Bug400466Test.java 2009-08-05 18:34:26 +0000 |
126 | @@ -0,0 +1,141 @@ |
127 | +/* |
128 | + * Copyright (C) 2009 by Eric Lambert <Eric.Lambert@sun.com> |
129 | + * Use and distribution licensed under the BSD license. See |
130 | + * the COPYING file in the parent directory for full text. |
131 | + */ |
132 | +package org.gearman.client; |
133 | + |
134 | +import java.io.IOException; |
135 | +import java.util.concurrent.ExecutionException; |
136 | +import junit.framework.Assert; |
137 | +import org.gearman.common.GearmanNIOJobServerConnection; |
138 | +import org.gearman.util.ByteUtils; |
139 | +import org.gearman.worker.AbstractGearmanFunction; |
140 | +import org.gearman.worker.GearmanFunction; |
141 | +import org.gearman.worker.GearmanFunctionFactory; |
142 | +import org.gearman.worker.GearmanWorker; |
143 | +import org.gearman.worker.GearmanWorkerImpl; |
144 | +import org.junit.After; |
145 | +import org.junit.Before; |
146 | +import org.junit.Test; |
147 | + |
148 | +public class Bug400466Test { |
149 | + |
150 | + GearmanClientImpl gc = null; |
151 | + GearmanWorker worker = null; |
152 | + Thread workerThread = null; |
153 | + WorkerRunnable runner = null; |
154 | + Thread wt = null; |
155 | + byte[] data = new byte[8193]; |
156 | + Runtime rt = null; |
157 | + |
158 | + class newReverseFunction extends AbstractGearmanFunction { |
159 | + |
160 | + @Override |
161 | + public GearmanJobResult executeFunction() throws Exception { |
162 | + StringBuffer sb = null; |
163 | + byte[] results = null; |
164 | + if (data instanceof byte[]) { |
165 | + sb = new StringBuffer(ByteUtils.fromUTF8Bytes((byte[]) data)); |
166 | + } else { |
167 | + sb = new StringBuffer(data.toString()); |
168 | + } |
169 | + results = sb.reverse().toString().getBytes(); |
170 | + return new GearmanJobResultImpl(jobHandle, true, results, |
171 | + new byte[0], new byte[0], 0, 0); |
172 | + } |
173 | + } |
174 | + |
175 | + class newReverseFunctionFactory implements GearmanFunctionFactory { |
176 | + |
177 | + public String getFunctionName() { |
178 | + return newReverseFunction.class.getCanonicalName(); |
179 | + } |
180 | + |
181 | + public GearmanFunction getFunction() { |
182 | + return new newReverseFunction(); |
183 | + } |
184 | + } |
185 | + |
186 | + class WorkerRunnable implements Runnable { |
187 | + |
188 | + GearmanWorker myWorker = null; |
189 | + boolean isRunning = true; |
190 | + |
191 | + public WorkerRunnable(GearmanWorker w) { |
192 | + myWorker = w; |
193 | + } |
194 | + |
195 | + public void run() { |
196 | + while (isRunning) { |
197 | + myWorker.work(); |
198 | + } |
199 | + } |
200 | + |
201 | + public void stop() { |
202 | + isRunning = false; |
203 | + } |
204 | + } |
205 | + |
206 | + @Before |
207 | + public void initTest() throws IOException { |
208 | + for (int i = 0; i < data.length; i++) { |
209 | + data[i] = '0'; |
210 | + } |
211 | + rt = Runtime.getRuntime(); |
212 | + gc = new GearmanClientImpl(); |
213 | + worker = new GearmanWorkerImpl(); |
214 | + gc.addJobServer(new GearmanNIOJobServerConnection("localhost")); |
215 | + |
216 | + //create a worker for each of the job servers |
217 | + worker.addServer(new GearmanNIOJobServerConnection("localhost")); |
218 | + worker.registerFunctionFactory(new newReverseFunctionFactory()); |
219 | + runner = new WorkerRunnable(worker); |
220 | + wt = new Thread(runner, "workerThread"); |
221 | + wt.start(); |
222 | + try { |
223 | + Thread.sleep(100); |
224 | + } catch (InterruptedException ioe) { |
225 | + } |
226 | + } |
227 | + |
228 | + @After |
229 | + public void shutdownTest() throws IOException { |
230 | + if (gc != null) { |
231 | + gc.shutdownNow(); |
232 | + } |
233 | + if (worker != null) { |
234 | + worker.stop(); |
235 | + } |
236 | + if (runner != null) { |
237 | + runner.stop(); |
238 | + } |
239 | + } |
240 | + |
241 | + @Test |
242 | + public void test400466() |
243 | + throws IOException, InterruptedException, ExecutionException { |
244 | + long heapSizeCeiling = 0; |
245 | + long memUsed = 0; |
246 | + for (int x = 1; x <= 1000; x++) { |
247 | + GearmanJob job = GearmanJobImpl.createJob( |
248 | + newReverseFunction.class.getCanonicalName(), |
249 | + data, null); |
250 | + gc.submit(job); |
251 | + if (x % 100 == 0) { |
252 | + memUsed = rt.totalMemory() - rt.freeMemory(); |
253 | + if (heapSizeCeiling == 0) { |
254 | + heapSizeCeiling = memUsed * 4; |
255 | + } else { |
256 | + Assert.assertTrue("ceiling = " + heapSizeCeiling + |
257 | + " used = " + memUsed, memUsed < heapSizeCeiling); |
258 | + } |
259 | + } |
260 | + } |
261 | + //Submit once last job and wait on it so as to clear the server q |
262 | + GearmanJob job = GearmanJobImpl.createJob( |
263 | + newReverseFunction.class.getCanonicalName(), |
264 | + data, null); |
265 | + gc.submit(job).get(); |
266 | + } |
267 | +} |
268 | |
269 | === modified file 'test/org/gearman/client/GearmanClientJobExecTest.java' |
270 | --- test/org/gearman/client/GearmanClientJobExecTest.java 2009-08-05 00:13:52 +0000 |
271 | +++ test/org/gearman/client/GearmanClientJobExecTest.java 2009-08-05 18:34:26 +0000 |
272 | @@ -12,8 +12,6 @@ |
273 | import org.gearman.common.GearmanPacketType; |
274 | import org.gearman.util.ByteUtils; |
275 | import java.util.ArrayList; |
276 | -import java.util.Arrays; |
277 | -import java.util.Collection; |
278 | import java.util.concurrent.ExecutionException; |
279 | import java.util.concurrent.Future; |
280 | import java.util.concurrent.TimeUnit; |
281 | @@ -160,6 +158,23 @@ |
282 | } |
283 | } |
284 | |
285 | + class IncrementalListener implements GearmanIOEventListener { |
286 | + |
287 | + private StringBuffer sb = new StringBuffer(); |
288 | + public void handleGearmanIOEvent(GearmanPacket event) |
289 | + throws IllegalArgumentException { |
290 | + if (!event.getPacketType().equals(GearmanPacketType.WORK_DATA)) { |
291 | + return; |
292 | + } |
293 | + sb.append(ByteUtils.fromUTF8Bytes((event.getDataComponentValue( |
294 | + GearmanPacket.DataComponentName.DATA)))); |
295 | + } |
296 | + public String getResults() { |
297 | + return sb.toString(); |
298 | + } |
299 | + |
300 | + } |
301 | + |
302 | @Before |
303 | public void initTest() throws IOException { |
304 | gc = new GearmanClientImpl(); |
305 | @@ -205,31 +220,17 @@ |
306 | public void incrementalAttachedJob() |
307 | throws IOException, InterruptedException, ExecutionException { |
308 | StringBuffer text = generateData(8193, "Hello World"); |
309 | - StringBuffer resultText = new StringBuffer(); |
310 | GearmanJob job = GearmanJobImpl.createJob( |
311 | incrementalReverseFunction.class.getCanonicalName(), |
312 | ByteUtils.toAsciiBytes(text.toString()), null); |
313 | + IncrementalListener il = new IncrementalListener(); |
314 | + job.registerEventListener(il); |
315 | gc.submit(job); |
316 | - while (!job.isDone()) { |
317 | - Collection<GearmanPacket> events = gc.selectUpdatedJobEvents(); |
318 | - for (GearmanPacket event : events) { |
319 | - if (!event.getPacketType().equals(GearmanPacketType.WORK_DATA)) { |
320 | - continue; |
321 | - } |
322 | - if (Arrays.equals(event.getDataComponentValue( |
323 | - GearmanPacket.DataComponentName.JOB_HANDLE), |
324 | - job.getHandle())) { |
325 | - resultText.append(ByteUtils.fromUTF8Bytes(( |
326 | - event.getDataComponentValue( |
327 | - GearmanPacket.DataComponentName.DATA)))); |
328 | - } |
329 | - } |
330 | - } |
331 | GearmanJobResult result = job.get(); |
332 | Assert.assertTrue(result.jobSucceeded()); |
333 | Assert.assertTrue("Client reports active jobs even though all " + |
334 | "jobs have completed", gc.getNumberofActiveJobs() == 0); |
335 | - String resultString = resultText.toString(); |
336 | + String resultString = il.getResults(); |
337 | Assert.assertTrue(resultString.equals(text.reverse().toString())); |
338 | } |
339 | |
340 | @@ -275,7 +276,7 @@ |
341 | boolean hasHitRunning = false; |
342 | for (int i = 0; i < 5; i++) { |
343 | try { |
344 | - Thread.sleep(100); |
345 | + Thread.sleep(10); |
346 | } catch (InterruptedException ie) { |
347 | } |
348 | status = gc.getJobStatus(job); |
fix and test for 400466. Removed selectUpdatedEvents method as it was the implementation of this method which was causing the leak. Users that what to receive all intermediate events from a job (which is what the selectUpdatedEvents method did) can instead register a listener to called when events are received.