Merge lp:~elambert/gearman-java/Bug400466 into lp:gearman-java

Proposed by Eric Lambert
Status: Merged
Merged at revision: not available
Proposed branch: lp:~elambert/gearman-java/Bug400466
Merge into: lp:gearman-java
Diff against target: None lines
To merge this branch: bzr merge lp:~elambert/gearman-java/Bug400466
Reviewer Review Type Date Requested Status
Gearman-developers Pending
Review via email: mp+9721@code.launchpad.net
To post a comment you must log in.
Revision history for this message
Eric Lambert (elambert) wrote :

fix and test for 400466. Removed selectUpdatedEvents method as it was the implementation of this method which was causing the leak. Users that what to receive all intermediate events from a job (which is what the selectUpdatedEvents method did) can instead register a listener to called when events are received.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'src/org/gearman/client/GearmanClient.java'
2--- src/org/gearman/client/GearmanClient.java 2009-07-15 18:47:07 +0000
3+++ src/org/gearman/client/GearmanClient.java 2009-08-05 18:34:26 +0000
4@@ -6,13 +6,11 @@
5 package org.gearman.client;
6
7 import java.io.IOException;
8-import java.util.Collection;
9 import java.util.List;
10 import java.util.concurrent.ExecutorService;
11 import java.util.concurrent.Future;
12
13 import org.gearman.common.GearmanJobServerConnection;
14-import org.gearman.common.GearmanPacket;
15
16 /**
17 * This interface through which users of the Gearman Java Libaray will use to
18@@ -106,22 +104,6 @@
19 IllegalStateException;
20
21 /**
22- * As {@link GearmanJob} are submitted and executed, they will receive
23- * notifications from the Gearman Job Server as certain packets are received
24- * (such as JOB_COMPLETE or WORK_DATA). Users may want to process these
25- * packets,for example if jobs are sending back intermediate data, a user
26- * may wish to receive that data and act upon it. This method returns to
27- * the user all events which have occured for any job submitted by the
28- * client.
29- *
30- * @return A collection of {@link GearmanPacket} that have been received.
31- *
32- * @throws IllegalStateException If the client has already been stopped.
33- */
34- Collection<GearmanPacket> selectUpdatedJobEvents()
35- throws IllegalStateException;
36-
37- /**
38 * Sends a WORK_STATUS request to the Gearman Job Server running the
39 * {@link GearmanJob} and then returns {@link GearmanJobStatus} representing
40 * the current status of the job.
41
42=== modified file 'src/org/gearman/client/GearmanClientImpl.java'
43--- src/org/gearman/client/GearmanClientImpl.java 2009-08-05 00:13:52 +0000
44+++ src/org/gearman/client/GearmanClientImpl.java 2009-08-05 18:34:26 +0000
45@@ -12,7 +12,6 @@
46 import java.util.ArrayList;
47 import java.util.Collection;
48 import java.util.HashMap;
49-import java.util.HashSet;
50 import java.util.Iterator;
51 import java.util.List;
52 import java.util.Set;
53@@ -70,7 +69,6 @@
54 private final String DESCRIPTION;
55 private HashMap<SelectionKey, GearmanJobServerSession> sessionsMap = null;
56 private Selector ioAvailable = null;
57- private ArrayList<GearmanPacket> updatedJobs = null;
58 private static final Logger LOG = Logger.getLogger(
59 Constants.GEARMAN_CLIENT_LOGGER_NAME);
60 private state runState = state.RUNNING;
61@@ -125,7 +123,6 @@
62 */
63 public GearmanClientImpl() {
64 sessionsMap = new HashMap<SelectionKey, GearmanJobServerSession>();
65- updatedJobs = new ArrayList<GearmanPacket>();
66 jobsMaps = new HashMap<JobHandle, GearmanJobImpl>();
67 submitJobMap = new HashMap<GearmanJobServerSession, GearmanJobImpl>();
68 DESCRIPTION = new String(DESCRIPION_PREFIX + ":" +
69@@ -372,27 +369,6 @@
70 return handler.getResults();
71 }
72
73- @SuppressWarnings(value = "unchecked")
74- public Collection<GearmanPacket> selectUpdatedJobEvents() throws
75- GearmanException, IllegalStateException {
76- if (!runState.equals(state.RUNNING)) {
77- throw new IllegalStateException("Client is not active");
78- }
79- Collection<GearmanPacket> retSet = new HashSet<GearmanPacket>();
80- try {
81- driveClientIO();
82- } catch (IOException ioe) {
83- LOG.log(Level.WARNING, "Encountered IOException while driving " +
84- "client IO" + ioe);
85- //TODO improve ioexception handling
86- }
87- if (!updatedJobs.isEmpty()) {
88- retSet = (Collection<GearmanPacket>) updatedJobs.clone();
89- updatedJobs.clear();
90- }
91- return retSet;
92- }
93-
94 public int getNumberofActiveJobs() throws IllegalStateException {
95 if (runState.equals(state.TERMINATED)) {
96 throw new IllegalStateException("Client is not active");
97@@ -410,7 +386,6 @@
98 GearmanJobImpl sjob = submitJobMap.get(s);
99 if (!sjob.isBackgroundJob()) {
100 jobsMaps.put(new JobHandle(sjob.getHandle()), sjob);
101- updatedJobs.add(p);
102 }
103 break;
104 case WORK_DATA:
105@@ -419,7 +394,6 @@
106 case WORK_COMPLETE:
107 case WORK_FAIL:
108 case WORK_EXCEPTION:
109- updatedJobs.add(p);
110 JobHandle handle = new JobHandle(p.getDataComponentValue(
111 GearmanPacket.DataComponentName.JOB_HANDLE));
112 GearmanJobImpl job = jobsMaps.get(handle);
113@@ -492,8 +466,6 @@
114 }
115 sessionsMap.clear();
116 sessionsMap = null;
117- updatedJobs.clear();
118- updatedJobs = null;
119 runState = state.TERMINATED;
120 LOG.log(Level.FINE, "Completed shutdown of client: " + this);
121 return new ArrayList<Runnable>();
122
123=== added file 'test/org/gearman/client/Bug400466Test.java'
124--- test/org/gearman/client/Bug400466Test.java 1970-01-01 00:00:00 +0000
125+++ test/org/gearman/client/Bug400466Test.java 2009-08-05 18:34:26 +0000
126@@ -0,0 +1,141 @@
127+/*
128+ * Copyright (C) 2009 by Eric Lambert <Eric.Lambert@sun.com>
129+ * Use and distribution licensed under the BSD license. See
130+ * the COPYING file in the parent directory for full text.
131+ */
132+package org.gearman.client;
133+
134+import java.io.IOException;
135+import java.util.concurrent.ExecutionException;
136+import junit.framework.Assert;
137+import org.gearman.common.GearmanNIOJobServerConnection;
138+import org.gearman.util.ByteUtils;
139+import org.gearman.worker.AbstractGearmanFunction;
140+import org.gearman.worker.GearmanFunction;
141+import org.gearman.worker.GearmanFunctionFactory;
142+import org.gearman.worker.GearmanWorker;
143+import org.gearman.worker.GearmanWorkerImpl;
144+import org.junit.After;
145+import org.junit.Before;
146+import org.junit.Test;
147+
148+public class Bug400466Test {
149+
150+ GearmanClientImpl gc = null;
151+ GearmanWorker worker = null;
152+ Thread workerThread = null;
153+ WorkerRunnable runner = null;
154+ Thread wt = null;
155+ byte[] data = new byte[8193];
156+ Runtime rt = null;
157+
158+ class newReverseFunction extends AbstractGearmanFunction {
159+
160+ @Override
161+ public GearmanJobResult executeFunction() throws Exception {
162+ StringBuffer sb = null;
163+ byte[] results = null;
164+ if (data instanceof byte[]) {
165+ sb = new StringBuffer(ByteUtils.fromUTF8Bytes((byte[]) data));
166+ } else {
167+ sb = new StringBuffer(data.toString());
168+ }
169+ results = sb.reverse().toString().getBytes();
170+ return new GearmanJobResultImpl(jobHandle, true, results,
171+ new byte[0], new byte[0], 0, 0);
172+ }
173+ }
174+
175+ class newReverseFunctionFactory implements GearmanFunctionFactory {
176+
177+ public String getFunctionName() {
178+ return newReverseFunction.class.getCanonicalName();
179+ }
180+
181+ public GearmanFunction getFunction() {
182+ return new newReverseFunction();
183+ }
184+ }
185+
186+ class WorkerRunnable implements Runnable {
187+
188+ GearmanWorker myWorker = null;
189+ boolean isRunning = true;
190+
191+ public WorkerRunnable(GearmanWorker w) {
192+ myWorker = w;
193+ }
194+
195+ public void run() {
196+ while (isRunning) {
197+ myWorker.work();
198+ }
199+ }
200+
201+ public void stop() {
202+ isRunning = false;
203+ }
204+ }
205+
206+ @Before
207+ public void initTest() throws IOException {
208+ for (int i = 0; i < data.length; i++) {
209+ data[i] = '0';
210+ }
211+ rt = Runtime.getRuntime();
212+ gc = new GearmanClientImpl();
213+ worker = new GearmanWorkerImpl();
214+ gc.addJobServer(new GearmanNIOJobServerConnection("localhost"));
215+
216+ //create a worker for each of the job servers
217+ worker.addServer(new GearmanNIOJobServerConnection("localhost"));
218+ worker.registerFunctionFactory(new newReverseFunctionFactory());
219+ runner = new WorkerRunnable(worker);
220+ wt = new Thread(runner, "workerThread");
221+ wt.start();
222+ try {
223+ Thread.sleep(100);
224+ } catch (InterruptedException ioe) {
225+ }
226+ }
227+
228+ @After
229+ public void shutdownTest() throws IOException {
230+ if (gc != null) {
231+ gc.shutdownNow();
232+ }
233+ if (worker != null) {
234+ worker.stop();
235+ }
236+ if (runner != null) {
237+ runner.stop();
238+ }
239+ }
240+
241+ @Test
242+ public void test400466()
243+ throws IOException, InterruptedException, ExecutionException {
244+ long heapSizeCeiling = 0;
245+ long memUsed = 0;
246+ for (int x = 1; x <= 1000; x++) {
247+ GearmanJob job = GearmanJobImpl.createJob(
248+ newReverseFunction.class.getCanonicalName(),
249+ data, null);
250+ gc.submit(job);
251+ if (x % 100 == 0) {
252+ memUsed = rt.totalMemory() - rt.freeMemory();
253+ if (heapSizeCeiling == 0) {
254+ heapSizeCeiling = memUsed * 4;
255+ } else {
256+ Assert.assertTrue("ceiling = " + heapSizeCeiling +
257+ " used = " + memUsed, memUsed < heapSizeCeiling);
258+ }
259+ }
260+ }
261+ //Submit once last job and wait on it so as to clear the server q
262+ GearmanJob job = GearmanJobImpl.createJob(
263+ newReverseFunction.class.getCanonicalName(),
264+ data, null);
265+ gc.submit(job).get();
266+ }
267+}
268
269=== modified file 'test/org/gearman/client/GearmanClientJobExecTest.java'
270--- test/org/gearman/client/GearmanClientJobExecTest.java 2009-08-05 00:13:52 +0000
271+++ test/org/gearman/client/GearmanClientJobExecTest.java 2009-08-05 18:34:26 +0000
272@@ -12,8 +12,6 @@
273 import org.gearman.common.GearmanPacketType;
274 import org.gearman.util.ByteUtils;
275 import java.util.ArrayList;
276-import java.util.Arrays;
277-import java.util.Collection;
278 import java.util.concurrent.ExecutionException;
279 import java.util.concurrent.Future;
280 import java.util.concurrent.TimeUnit;
281@@ -160,6 +158,23 @@
282 }
283 }
284
285+ class IncrementalListener implements GearmanIOEventListener {
286+
287+ private StringBuffer sb = new StringBuffer();
288+ public void handleGearmanIOEvent(GearmanPacket event)
289+ throws IllegalArgumentException {
290+ if (!event.getPacketType().equals(GearmanPacketType.WORK_DATA)) {
291+ return;
292+ }
293+ sb.append(ByteUtils.fromUTF8Bytes((event.getDataComponentValue(
294+ GearmanPacket.DataComponentName.DATA))));
295+ }
296+ public String getResults() {
297+ return sb.toString();
298+ }
299+
300+ }
301+
302 @Before
303 public void initTest() throws IOException {
304 gc = new GearmanClientImpl();
305@@ -205,31 +220,17 @@
306 public void incrementalAttachedJob()
307 throws IOException, InterruptedException, ExecutionException {
308 StringBuffer text = generateData(8193, "Hello World");
309- StringBuffer resultText = new StringBuffer();
310 GearmanJob job = GearmanJobImpl.createJob(
311 incrementalReverseFunction.class.getCanonicalName(),
312 ByteUtils.toAsciiBytes(text.toString()), null);
313+ IncrementalListener il = new IncrementalListener();
314+ job.registerEventListener(il);
315 gc.submit(job);
316- while (!job.isDone()) {
317- Collection<GearmanPacket> events = gc.selectUpdatedJobEvents();
318- for (GearmanPacket event : events) {
319- if (!event.getPacketType().equals(GearmanPacketType.WORK_DATA)) {
320- continue;
321- }
322- if (Arrays.equals(event.getDataComponentValue(
323- GearmanPacket.DataComponentName.JOB_HANDLE),
324- job.getHandle())) {
325- resultText.append(ByteUtils.fromUTF8Bytes((
326- event.getDataComponentValue(
327- GearmanPacket.DataComponentName.DATA))));
328- }
329- }
330- }
331 GearmanJobResult result = job.get();
332 Assert.assertTrue(result.jobSucceeded());
333 Assert.assertTrue("Client reports active jobs even though all " +
334 "jobs have completed", gc.getNumberofActiveJobs() == 0);
335- String resultString = resultText.toString();
336+ String resultString = il.getResults();
337 Assert.assertTrue(resultString.equals(text.reverse().toString()));
338 }
339
340@@ -275,7 +276,7 @@
341 boolean hasHitRunning = false;
342 for (int i = 0; i < 5; i++) {
343 try {
344- Thread.sleep(100);
345+ Thread.sleep(10);
346 } catch (InterruptedException ie) {
347 }
348 status = gc.getJobStatus(job);

Subscribers

People subscribed via source and target branches