Merge lp:~kroq-gar78/ubuntu/precise/activemq/sid-merge into lp:ubuntu/precise/activemq

Proposed by Aditya V
Status: Merged
Merge reported by: Stéphane Graber
Merged at revision: not available
Proposed branch: lp:~kroq-gar78/ubuntu/precise/activemq/sid-merge
Merge into: lp:ubuntu/precise/activemq
Diff against target: 17152 lines (+12/-8449)
14 files modified
.pc/CVE-2011-4605.diff/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (+0/-2389)
.pc/CVE-2011-4605.diff/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (+0/-1449)
.pc/activemq-admin.patch/assembly/src/release/bin/activemq-admin (+0/-155)
.pc/applied-patches (+0/-8)
.pc/disable_some_modules.diff/pom.xml (+0/-1320)
.pc/drop_derby_use.diff/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DataSourceSupport.java (+0/-96)
.pc/exclude_geronimo_jca.diff/activemq-pool/pom.xml (+0/-104)
.pc/exclude_geronimo_jca.diff/activemq-spring/src/main/java/org/apache/activemq/pool/PooledConnectionFactoryBean.java (+0/-187)
.pc/exclude_spring_osgi.diff/activemq-spring/src/main/java/org/apache/activemq/hooks/SpringContextHook.java (+0/-51)
.pc/init_debian_default_values.diff/assembly/src/release/bin/activemq (+0/-653)
.pc/javadoc_links.diff/activemq-core/pom.xml (+0/-705)
.pc/javadoc_links.diff/pom.xml (+0/-1328)
debian/changelog (+8/-0)
debian/libactivemq-java.poms (+4/-4)
To merge this branch: bzr merge lp:~kroq-gar78/ubuntu/precise/activemq/sid-merge
Reviewer Review Type Date Requested Status
James Page Approve
Ubuntu branches Pending
Review via email: mp+114781@code.launchpad.net

This proposal supersedes a proposal from 2012-05-20.

Description of the change

I fixed the error that caused activemq not to start by merging the Debian Sid branch into this one. I've now filled out the SRU for bug 993249 which is why I'm resubmitting this.

To post a comment you must log in.
Revision history for this message
James Page (james-page) wrote : Posted in a previous version of this proposal

Thanks for taking the time to prepare this merge proposal.

As Ubuntu 12.04 is now release this fix will need to go through the Stable Release Update process.

This consitutes a minimal change the the version that is already in Ubuntu 12.04 rather than a merge from Debian Testing.

See https://wiki.ubuntu.com/StableReleaseUpdates for more information.

review: Needs Fixing
Revision history for this message
James Page (james-page) wrote :

Hi Aditya

Thanks for re-proposing the merge for precise.

First of all, the reason this merge is so large is that you have proposed it with patches not applied; the official source packages for Ubuntu are stored in bzr with patches applied and the .pc folder under version control.

You can fix this on future merge proposals by "quilt push && bzr add .pc" - they reviewers will only see the diff, not all of the missing patches as well.

Secondly, for a merge the changlog entries for the Debian change and the Ubuntu merge should be separate; in your branch the merge and the Debian changelog entry for 5.5.0+dfsg-7 are combined which is not correct.

Lastly, I would not deal with this as a merge; its an SRU so should be minimal change only - I would cherry pick the fix that was applied in 5.5.0+dfsg-7 and create a new version (5.5.0+dfsg-6ubuntu1.1) targetted precise-proposed. The changelog entry needs to be suitably verbose explaining both the issue and how its been fixed inline with https://wiki.ubuntu.com/StableReleaseUpdates.

If you could re-proposed inline with my last comment I will be happy to upload for you.

Thanks

review: Needs Fixing
9. By Aditya V

debian/libactivemq-java.poms: added the options '--java-lib' to
certain lines so that the necessary files are included in the
package and activemq doesn't fail to start (LP: #993249)

Revision history for this message
James Page (james-page) wrote :

Aditya

Thanks for updating; I've uploaded with a minor tweak to the changelog entry to say where the fix originated from and explain exactly what --java-lib is doing; I also switched the target from 'precise' to 'precise-proposed'.

Thanks for your work on this fix.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== added directory '.pc/CVE-2011-4605.diff'
2=== removed directory '.pc/CVE-2011-4605.diff'
3=== added directory '.pc/CVE-2011-4605.diff/activemq-core'
4=== removed directory '.pc/CVE-2011-4605.diff/activemq-core'
5=== added directory '.pc/CVE-2011-4605.diff/activemq-core/src'
6=== removed directory '.pc/CVE-2011-4605.diff/activemq-core/src'
7=== added directory '.pc/CVE-2011-4605.diff/activemq-core/src/main'
8=== removed directory '.pc/CVE-2011-4605.diff/activemq-core/src/main'
9=== added directory '.pc/CVE-2011-4605.diff/activemq-core/src/main/java'
10=== removed directory '.pc/CVE-2011-4605.diff/activemq-core/src/main/java'
11=== added directory '.pc/CVE-2011-4605.diff/activemq-core/src/main/java/org'
12=== removed directory '.pc/CVE-2011-4605.diff/activemq-core/src/main/java/org'
13=== added directory '.pc/CVE-2011-4605.diff/activemq-core/src/main/java/org/apache'
14=== removed directory '.pc/CVE-2011-4605.diff/activemq-core/src/main/java/org/apache'
15=== added directory '.pc/CVE-2011-4605.diff/activemq-core/src/main/java/org/apache/activemq'
16=== removed directory '.pc/CVE-2011-4605.diff/activemq-core/src/main/java/org/apache/activemq'
17=== added file '.pc/CVE-2011-4605.diff/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java'
18--- .pc/CVE-2011-4605.diff/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java 1970-01-01 00:00:00 +0000
19+++ .pc/CVE-2011-4605.diff/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java 2012-07-17 09:01:20 +0000
20@@ -0,0 +1,2389 @@
21+/**
22+ * Licensed to the Apache Software Foundation (ASF) under one or more
23+ * contributor license agreements. See the NOTICE file distributed with
24+ * this work for additional information regarding copyright ownership.
25+ * The ASF licenses this file to You under the Apache License, Version 2.0
26+ * (the "License"); you may not use this file except in compliance with
27+ * the License. You may obtain a copy of the License at
28+ *
29+ * http://www.apache.org/licenses/LICENSE-2.0
30+ *
31+ * Unless required by applicable law or agreed to in writing, software
32+ * distributed under the License is distributed on an "AS IS" BASIS,
33+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
34+ * See the License for the specific language governing permissions and
35+ * limitations under the License.
36+ */
37+package org.apache.activemq;
38+
39+import java.io.IOException;
40+import java.io.InputStream;
41+import java.io.OutputStream;
42+import java.net.URI;
43+import java.net.URISyntaxException;
44+import java.util.HashMap;
45+import java.util.Iterator;
46+import java.util.Map;
47+import java.util.concurrent.ConcurrentHashMap;
48+import java.util.concurrent.CopyOnWriteArrayList;
49+import java.util.concurrent.CountDownLatch;
50+import java.util.concurrent.LinkedBlockingQueue;
51+import java.util.concurrent.ThreadFactory;
52+import java.util.concurrent.ThreadPoolExecutor;
53+import java.util.concurrent.TimeUnit;
54+import java.util.concurrent.atomic.AtomicBoolean;
55+import java.util.concurrent.atomic.AtomicInteger;
56+import javax.jms.Connection;
57+import javax.jms.ConnectionConsumer;
58+import javax.jms.ConnectionMetaData;
59+import javax.jms.DeliveryMode;
60+import javax.jms.Destination;
61+import javax.jms.ExceptionListener;
62+import javax.jms.IllegalStateException;
63+import javax.jms.InvalidDestinationException;
64+import javax.jms.JMSException;
65+import javax.jms.Queue;
66+import javax.jms.QueueConnection;
67+import javax.jms.QueueSession;
68+import javax.jms.ServerSessionPool;
69+import javax.jms.Session;
70+import javax.jms.Topic;
71+import javax.jms.TopicConnection;
72+import javax.jms.TopicSession;
73+import javax.jms.XAConnection;
74+import org.apache.activemq.advisory.DestinationSource;
75+import org.apache.activemq.blob.BlobTransferPolicy;
76+import org.apache.activemq.command.ActiveMQDestination;
77+import org.apache.activemq.command.ActiveMQMessage;
78+import org.apache.activemq.command.ActiveMQTempDestination;
79+import org.apache.activemq.command.ActiveMQTempQueue;
80+import org.apache.activemq.command.ActiveMQTempTopic;
81+import org.apache.activemq.command.BrokerInfo;
82+import org.apache.activemq.command.Command;
83+import org.apache.activemq.command.CommandTypes;
84+import org.apache.activemq.command.ConnectionControl;
85+import org.apache.activemq.command.ConnectionError;
86+import org.apache.activemq.command.ConnectionId;
87+import org.apache.activemq.command.ConnectionInfo;
88+import org.apache.activemq.command.ConsumerControl;
89+import org.apache.activemq.command.ConsumerId;
90+import org.apache.activemq.command.ConsumerInfo;
91+import org.apache.activemq.command.ControlCommand;
92+import org.apache.activemq.command.DestinationInfo;
93+import org.apache.activemq.command.ExceptionResponse;
94+import org.apache.activemq.command.Message;
95+import org.apache.activemq.command.MessageDispatch;
96+import org.apache.activemq.command.MessageId;
97+import org.apache.activemq.command.ProducerAck;
98+import org.apache.activemq.command.ProducerId;
99+import org.apache.activemq.command.RemoveInfo;
100+import org.apache.activemq.command.RemoveSubscriptionInfo;
101+import org.apache.activemq.command.Response;
102+import org.apache.activemq.command.SessionId;
103+import org.apache.activemq.command.ShutdownInfo;
104+import org.apache.activemq.command.WireFormatInfo;
105+import org.apache.activemq.management.JMSConnectionStatsImpl;
106+import org.apache.activemq.management.JMSStatsImpl;
107+import org.apache.activemq.management.StatsCapable;
108+import org.apache.activemq.management.StatsImpl;
109+import org.apache.activemq.state.CommandVisitorAdapter;
110+import org.apache.activemq.thread.Scheduler;
111+import org.apache.activemq.thread.TaskRunnerFactory;
112+import org.apache.activemq.transport.Transport;
113+import org.apache.activemq.transport.TransportListener;
114+import org.apache.activemq.transport.failover.FailoverTransport;
115+import org.apache.activemq.util.IdGenerator;
116+import org.apache.activemq.util.IntrospectionSupport;
117+import org.apache.activemq.util.JMSExceptionSupport;
118+import org.apache.activemq.util.LongSequenceGenerator;
119+import org.apache.activemq.util.ServiceSupport;
120+import org.slf4j.Logger;
121+import org.slf4j.LoggerFactory;
122+
123+public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, StreamConnection, TransportListener, EnhancedConnection {
124+
125+ public static final String DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER;
126+ public static final String DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
127+ public static final String DEFAULT_BROKER_URL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
128+
129+ private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnection.class);
130+ private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
131+
132+ public final ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination> activeTempDestinations = new ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination>();
133+
134+ protected boolean dispatchAsync=true;
135+ protected boolean alwaysSessionAsync = true;
136+
137+ private TaskRunnerFactory sessionTaskRunner;
138+ private final ThreadPoolExecutor executor;
139+
140+ // Connection state variables
141+ private final ConnectionInfo info;
142+ private ExceptionListener exceptionListener;
143+ private ClientInternalExceptionListener clientInternalExceptionListener;
144+ private boolean clientIDSet;
145+ private boolean isConnectionInfoSentToBroker;
146+ private boolean userSpecifiedClientID;
147+
148+ // Configuration options variables
149+ private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
150+ private BlobTransferPolicy blobTransferPolicy;
151+ private RedeliveryPolicy redeliveryPolicy;
152+ private MessageTransformer transformer;
153+
154+ private boolean disableTimeStampsByDefault;
155+ private boolean optimizedMessageDispatch = true;
156+ private boolean copyMessageOnSend = true;
157+ private boolean useCompression;
158+ private boolean objectMessageSerializationDefered;
159+ private boolean useAsyncSend;
160+ private boolean optimizeAcknowledge;
161+ private boolean nestedMapAndListEnabled = true;
162+ private boolean useRetroactiveConsumer;
163+ private boolean exclusiveConsumer;
164+ private boolean alwaysSyncSend;
165+ private int closeTimeout = 15000;
166+ private boolean watchTopicAdvisories = true;
167+ private long warnAboutUnstartedConnectionTimeout = 500L;
168+ private int sendTimeout =0;
169+ private boolean sendAcksAsync=true;
170+ private boolean checkForDuplicates = true;
171+
172+ private final Transport transport;
173+ private final IdGenerator clientIdGenerator;
174+ private final JMSStatsImpl factoryStats;
175+ private final JMSConnectionStatsImpl stats;
176+
177+ private final AtomicBoolean started = new AtomicBoolean(false);
178+ private final AtomicBoolean closing = new AtomicBoolean(false);
179+ private final AtomicBoolean closed = new AtomicBoolean(false);
180+ private final AtomicBoolean transportFailed = new AtomicBoolean(false);
181+ private final CopyOnWriteArrayList<ActiveMQSession> sessions = new CopyOnWriteArrayList<ActiveMQSession>();
182+ private final CopyOnWriteArrayList<ActiveMQConnectionConsumer> connectionConsumers = new CopyOnWriteArrayList<ActiveMQConnectionConsumer>();
183+ private final CopyOnWriteArrayList<ActiveMQInputStream> inputStreams = new CopyOnWriteArrayList<ActiveMQInputStream>();
184+ private final CopyOnWriteArrayList<ActiveMQOutputStream> outputStreams = new CopyOnWriteArrayList<ActiveMQOutputStream>();
185+ private final CopyOnWriteArrayList<TransportListener> transportListeners = new CopyOnWriteArrayList<TransportListener>();
186+
187+ // Maps ConsumerIds to ActiveMQConsumer objects
188+ private final ConcurrentHashMap<ConsumerId, ActiveMQDispatcher> dispatchers = new ConcurrentHashMap<ConsumerId, ActiveMQDispatcher>();
189+ private final ConcurrentHashMap<ProducerId, ActiveMQMessageProducer> producers = new ConcurrentHashMap<ProducerId, ActiveMQMessageProducer>();
190+ private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator();
191+ private final SessionId connectionSessionId;
192+ private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
193+ private final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator();
194+ private final LongSequenceGenerator tempDestinationIdGenerator = new LongSequenceGenerator();
195+ private final LongSequenceGenerator localTransactionIdGenerator = new LongSequenceGenerator();
196+
197+ private AdvisoryConsumer advisoryConsumer;
198+ private final CountDownLatch brokerInfoReceived = new CountDownLatch(1);
199+ private BrokerInfo brokerInfo;
200+ private IOException firstFailureError;
201+ private int producerWindowSize = ActiveMQConnectionFactory.DEFAULT_PRODUCER_WINDOW_SIZE;
202+
203+ // Assume that protocol is the latest. Change to the actual protocol
204+ // version when a WireFormatInfo is received.
205+ private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
206+ private final long timeCreated;
207+ private final ConnectionAudit connectionAudit = new ConnectionAudit();
208+ private DestinationSource destinationSource;
209+ private final Object ensureConnectionInfoSentMutex = new Object();
210+ private boolean useDedicatedTaskRunner;
211+ protected volatile CountDownLatch transportInterruptionProcessingComplete;
212+ private long consumerFailoverRedeliveryWaitPeriod;
213+ private final Scheduler scheduler;
214+ private boolean messagePrioritySupported=true;
215+
216+ /**
217+ * Construct an <code>ActiveMQConnection</code>
218+ *
219+ * @param transport
220+ * @param factoryStats
221+ * @throws Exception
222+ */
223+ protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, JMSStatsImpl factoryStats) throws Exception {
224+
225+ this.transport = transport;
226+ this.clientIdGenerator = clientIdGenerator;
227+ this.factoryStats = factoryStats;
228+
229+ // Configure a single threaded executor who's core thread can timeout if
230+ // idle
231+ executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
232+ public Thread newThread(Runnable r) {
233+ Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport);
234+ thread.setDaemon(true);
235+ return thread;
236+ }
237+ });
238+ // asyncConnectionThread.allowCoreThreadTimeOut(true);
239+ String uniqueId = CONNECTION_ID_GENERATOR.generateId();
240+ this.info = new ConnectionInfo(new ConnectionId(uniqueId));
241+ this.info.setManageable(true);
242+ this.info.setFaultTolerant(transport.isFaultTolerant());
243+ this.connectionSessionId = new SessionId(info.getConnectionId(), -1);
244+
245+ this.transport.setTransportListener(this);
246+
247+ this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection);
248+ this.factoryStats.addConnection(this);
249+ this.timeCreated = System.currentTimeMillis();
250+ this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant());
251+ this.scheduler = new Scheduler("ActiveMQConnection["+uniqueId+"] Scheduler");
252+ this.scheduler.start();
253+ }
254+
255+ protected void setUserName(String userName) {
256+ this.info.setUserName(userName);
257+ }
258+
259+ protected void setPassword(String password) {
260+ this.info.setPassword(password);
261+ }
262+
263+ /**
264+ * A static helper method to create a new connection
265+ *
266+ * @return an ActiveMQConnection
267+ * @throws JMSException
268+ */
269+ public static ActiveMQConnection makeConnection() throws JMSException {
270+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
271+ return (ActiveMQConnection)factory.createConnection();
272+ }
273+
274+ /**
275+ * A static helper method to create a new connection
276+ *
277+ * @param uri
278+ * @return and ActiveMQConnection
279+ * @throws JMSException
280+ */
281+ public static ActiveMQConnection makeConnection(String uri) throws JMSException, URISyntaxException {
282+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
283+ return (ActiveMQConnection)factory.createConnection();
284+ }
285+
286+ /**
287+ * A static helper method to create a new connection
288+ *
289+ * @param user
290+ * @param password
291+ * @param uri
292+ * @return an ActiveMQConnection
293+ * @throws JMSException
294+ */
295+ public static ActiveMQConnection makeConnection(String user, String password, String uri) throws JMSException, URISyntaxException {
296+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, password, new URI(uri));
297+ return (ActiveMQConnection)factory.createConnection();
298+ }
299+
300+ /**
301+ * @return a number unique for this connection
302+ */
303+ public JMSConnectionStatsImpl getConnectionStats() {
304+ return stats;
305+ }
306+
307+ /**
308+ * Creates a <CODE>Session</CODE> object.
309+ *
310+ * @param transacted indicates whether the session is transacted
311+ * @param acknowledgeMode indicates whether the consumer or the client will
312+ * acknowledge any messages it receives; ignored if the
313+ * session is transacted. Legal values are
314+ * <code>Session.AUTO_ACKNOWLEDGE</code>,
315+ * <code>Session.CLIENT_ACKNOWLEDGE</code>, and
316+ * <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
317+ * @return a newly created session
318+ * @throws JMSException if the <CODE>Connection</CODE> object fails to
319+ * create a session due to some internal error or lack of
320+ * support for the specific transaction and acknowledgement
321+ * mode.
322+ * @see Session#AUTO_ACKNOWLEDGE
323+ * @see Session#CLIENT_ACKNOWLEDGE
324+ * @see Session#DUPS_OK_ACKNOWLEDGE
325+ * @since 1.1
326+ */
327+ public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
328+ checkClosedOrFailed();
329+ ensureConnectionInfoSent();
330+ if(!transacted) {
331+ if (acknowledgeMode==Session.SESSION_TRANSACTED) {
332+ throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session");
333+ } else if (acknowledgeMode < Session.SESSION_TRANSACTED || acknowledgeMode > ActiveMQSession.MAX_ACK_CONSTANT) {
334+ throw new JMSException("invalid acknowledgeMode: " + acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), " +
335+ "Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)");
336+ }
337+ }
338+ return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : (acknowledgeMode == Session.SESSION_TRANSACTED
339+ ? Session.AUTO_ACKNOWLEDGE : acknowledgeMode), isDispatchAsync(), isAlwaysSessionAsync());
340+ }
341+
342+ /**
343+ * @return sessionId
344+ */
345+ protected SessionId getNextSessionId() {
346+ return new SessionId(info.getConnectionId(), sessionIdGenerator.getNextSequenceId());
347+ }
348+
349+ /**
350+ * Gets the client identifier for this connection.
351+ * <P>
352+ * This value is specific to the JMS provider. It is either preconfigured by
353+ * an administrator in a <CODE> ConnectionFactory</CODE> object or assigned
354+ * dynamically by the application by calling the <code>setClientID</code>
355+ * method.
356+ *
357+ * @return the unique client identifier
358+ * @throws JMSException if the JMS provider fails to return the client ID
359+ * for this connection due to some internal error.
360+ */
361+ public String getClientID() throws JMSException {
362+ checkClosedOrFailed();
363+ return this.info.getClientId();
364+ }
365+
366+ /**
367+ * Sets the client identifier for this connection.
368+ * <P>
369+ * The preferred way to assign a JMS client's client identifier is for it to
370+ * be configured in a client-specific <CODE>ConnectionFactory</CODE>
371+ * object and transparently assigned to the <CODE>Connection</CODE> object
372+ * it creates.
373+ * <P>
374+ * Alternatively, a client can set a connection's client identifier using a
375+ * provider-specific value. The facility to set a connection's client
376+ * identifier explicitly is not a mechanism for overriding the identifier
377+ * that has been administratively configured. It is provided for the case
378+ * where no administratively specified identifier exists. If one does exist,
379+ * an attempt to change it by setting it must throw an
380+ * <CODE>IllegalStateException</CODE>. If a client sets the client
381+ * identifier explicitly, it must do so immediately after it creates the
382+ * connection and before any other action on the connection is taken. After
383+ * this point, setting the client identifier is a programming error that
384+ * should throw an <CODE>IllegalStateException</CODE>.
385+ * <P>
386+ * The purpose of the client identifier is to associate a connection and its
387+ * objects with a state maintained on behalf of the client by a provider.
388+ * The only such state identified by the JMS API is that required to support
389+ * durable subscriptions.
390+ * <P>
391+ * If another connection with the same <code>clientID</code> is already
392+ * running when this method is called, the JMS provider should detect the
393+ * duplicate ID and throw an <CODE>InvalidClientIDException</CODE>.
394+ *
395+ * @param newClientID the unique client identifier
396+ * @throws JMSException if the JMS provider fails to set the client ID for
397+ * this connection due to some internal error.
398+ * @throws javax.jms.InvalidClientIDException if the JMS client specifies an
399+ * invalid or duplicate client ID.
400+ * @throws javax.jms.IllegalStateException if the JMS client attempts to set
401+ * a connection's client ID at the wrong time or when it has
402+ * been administratively configured.
403+ */
404+ public void setClientID(String newClientID) throws JMSException {
405+ checkClosedOrFailed();
406+
407+ if (this.clientIDSet) {
408+ throw new IllegalStateException("The clientID has already been set");
409+ }
410+
411+ if (this.isConnectionInfoSentToBroker) {
412+ throw new IllegalStateException("Setting clientID on a used Connection is not allowed");
413+ }
414+
415+ this.info.setClientId(newClientID);
416+ this.userSpecifiedClientID = true;
417+ ensureConnectionInfoSent();
418+ }
419+
420+ /**
421+ * Sets the default client id that the connection will use if explicitly not
422+ * set with the setClientId() call.
423+ */
424+ public void setDefaultClientID(String clientID) throws JMSException {
425+ this.info.setClientId(clientID);
426+ this.userSpecifiedClientID = true;
427+ }
428+
429+ /**
430+ * Gets the metadata for this connection.
431+ *
432+ * @return the connection metadata
433+ * @throws JMSException if the JMS provider fails to get the connection
434+ * metadata for this connection.
435+ * @see javax.jms.ConnectionMetaData
436+ */
437+ public ConnectionMetaData getMetaData() throws JMSException {
438+ checkClosedOrFailed();
439+ return ActiveMQConnectionMetaData.INSTANCE;
440+ }
441+
442+ /**
443+ * Gets the <CODE>ExceptionListener</CODE> object for this connection. Not
444+ * every <CODE>Connection</CODE> has an <CODE>ExceptionListener</CODE>
445+ * associated with it.
446+ *
447+ * @return the <CODE>ExceptionListener</CODE> for this connection, or
448+ * null, if no <CODE>ExceptionListener</CODE> is associated with
449+ * this connection.
450+ * @throws JMSException if the JMS provider fails to get the
451+ * <CODE>ExceptionListener</CODE> for this connection.
452+ * @see javax.jms.Connection#setExceptionListener(ExceptionListener)
453+ */
454+ public ExceptionListener getExceptionListener() throws JMSException {
455+ checkClosedOrFailed();
456+ return this.exceptionListener;
457+ }
458+
459+ /**
460+ * Sets an exception listener for this connection.
461+ * <P>
462+ * If a JMS provider detects a serious problem with a connection, it informs
463+ * the connection's <CODE> ExceptionListener</CODE>, if one has been
464+ * registered. It does this by calling the listener's <CODE>onException
465+ * </CODE>
466+ * method, passing it a <CODE>JMSException</CODE> object describing the
467+ * problem.
468+ * <P>
469+ * An exception listener allows a client to be notified of a problem
470+ * asynchronously. Some connections only consume messages, so they would
471+ * have no other way to learn their connection has failed.
472+ * <P>
473+ * A connection serializes execution of its <CODE>ExceptionListener</CODE>.
474+ * <P>
475+ * A JMS provider should attempt to resolve connection problems itself
476+ * before it notifies the client of them.
477+ *
478+ * @param listener the exception listener
479+ * @throws JMSException if the JMS provider fails to set the exception
480+ * listener for this connection.
481+ */
482+ public void setExceptionListener(ExceptionListener listener) throws JMSException {
483+ checkClosedOrFailed();
484+ this.exceptionListener = listener;
485+ }
486+
487+ /**
488+ * Gets the <code>ClientInternalExceptionListener</code> object for this connection.
489+ * Not every <CODE>ActiveMQConnectionn</CODE> has a <CODE>ClientInternalExceptionListener</CODE>
490+ * associated with it.
491+ *
492+ * @return the listener or <code>null</code> if no listener is registered with the connection.
493+ */
494+ public ClientInternalExceptionListener getClientInternalExceptionListener()
495+ {
496+ return clientInternalExceptionListener;
497+ }
498+
499+ /**
500+ * Sets a client internal exception listener for this connection.
501+ * The connection will notify the listener, if one has been registered, of exceptions thrown by container components
502+ * (e.g. an EJB container in case of Message Driven Beans) during asynchronous processing of a message.
503+ * It does this by calling the listener's <code>onException()</code> method passing it a <code>Throwable</code>
504+ * describing the problem.
505+ *
506+ * @param listener the exception listener
507+ */
508+ public void setClientInternalExceptionListener(ClientInternalExceptionListener listener)
509+ {
510+ this.clientInternalExceptionListener = listener;
511+ }
512+
513+ /**
514+ * Starts (or restarts) a connection's delivery of incoming messages. A call
515+ * to <CODE>start</CODE> on a connection that has already been started is
516+ * ignored.
517+ *
518+ * @throws JMSException if the JMS provider fails to start message delivery
519+ * due to some internal error.
520+ * @see javax.jms.Connection#stop()
521+ */
522+ public void start() throws JMSException {
523+ checkClosedOrFailed();
524+ ensureConnectionInfoSent();
525+ if (started.compareAndSet(false, true)) {
526+ for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
527+ ActiveMQSession session = i.next();
528+ session.start();
529+ }
530+ }
531+ }
532+
533+ /**
534+ * Temporarily stops a connection's delivery of incoming messages. Delivery
535+ * can be restarted using the connection's <CODE>start</CODE> method. When
536+ * the connection is stopped, delivery to all the connection's message
537+ * consumers is inhibited: synchronous receives block, and messages are not
538+ * delivered to message listeners.
539+ * <P>
540+ * This call blocks until receives and/or message listeners in progress have
541+ * completed.
542+ * <P>
543+ * Stopping a connection has no effect on its ability to send messages. A
544+ * call to <CODE>stop</CODE> on a connection that has already been stopped
545+ * is ignored.
546+ * <P>
547+ * A call to <CODE>stop</CODE> must not return until delivery of messages
548+ * has paused. This means that a client can rely on the fact that none of
549+ * its message listeners will be called and that all threads of control
550+ * waiting for <CODE>receive</CODE> calls to return will not return with a
551+ * message until the connection is restarted. The receive timers for a
552+ * stopped connection continue to advance, so receives may time out while
553+ * the connection is stopped.
554+ * <P>
555+ * If message listeners are running when <CODE>stop</CODE> is invoked, the
556+ * <CODE>stop</CODE> call must wait until all of them have returned before
557+ * it may return. While these message listeners are completing, they must
558+ * have the full services of the connection available to them.
559+ *
560+ * @throws JMSException if the JMS provider fails to stop message delivery
561+ * due to some internal error.
562+ * @see javax.jms.Connection#start()
563+ */
564+ public void stop() throws JMSException {
565+ checkClosedOrFailed();
566+ if (started.compareAndSet(true, false)) {
567+ synchronized(sessions) {
568+ for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
569+ ActiveMQSession s = i.next();
570+ s.stop();
571+ }
572+ }
573+ }
574+ }
575+
576+ /**
577+ * Closes the connection.
578+ * <P>
579+ * Since a provider typically allocates significant resources outside the
580+ * JVM on behalf of a connection, clients should close these resources when
581+ * they are not needed. Relying on garbage collection to eventually reclaim
582+ * these resources may not be timely enough.
583+ * <P>
584+ * There is no need to close the sessions, producers, and consumers of a
585+ * closed connection.
586+ * <P>
587+ * Closing a connection causes all temporary destinations to be deleted.
588+ * <P>
589+ * When this method is invoked, it should not return until message
590+ * processing has been shut down in an orderly fashion. This means that all
591+ * message listeners that may have been running have returned, and that all
592+ * pending receives have returned. A close terminates all pending message
593+ * receives on the connection's sessions' consumers. The receives may return
594+ * with a message or with null, depending on whether there was a message
595+ * available at the time of the close. If one or more of the connection's
596+ * sessions' message listeners is processing a message at the time when
597+ * connection <CODE>close</CODE> is invoked, all the facilities of the
598+ * connection and its sessions must remain available to those listeners
599+ * until they return control to the JMS provider.
600+ * <P>
601+ * Closing a connection causes any of its sessions' transactions in progress
602+ * to be rolled back. In the case where a session's work is coordinated by
603+ * an external transaction manager, a session's <CODE>commit</CODE> and
604+ * <CODE> rollback</CODE> methods are not used and the result of a closed
605+ * session's work is determined later by the transaction manager. Closing a
606+ * connection does NOT force an acknowledgment of client-acknowledged
607+ * sessions.
608+ * <P>
609+ * Invoking the <CODE>acknowledge</CODE> method of a received message from
610+ * a closed connection's session must throw an
611+ * <CODE>IllegalStateException</CODE>. Closing a closed connection must
612+ * NOT throw an exception.
613+ *
614+ * @throws JMSException if the JMS provider fails to close the connection
615+ * due to some internal error. For example, a failure to
616+ * release resources or to close a socket connection can
617+ * cause this exception to be thrown.
618+ */
619+ public void close() throws JMSException {
620+ try {
621+ // If we were running, lets stop first.
622+ if (!closed.get() && !transportFailed.get()) {
623+ stop();
624+ }
625+
626+ synchronized (this) {
627+ if (!closed.get()) {
628+ closing.set(true);
629+
630+ if (destinationSource != null) {
631+ destinationSource.stop();
632+ destinationSource = null;
633+ }
634+ if (advisoryConsumer != null) {
635+ advisoryConsumer.dispose();
636+ advisoryConsumer = null;
637+ }
638+ if (this.scheduler != null) {
639+ try {
640+ this.scheduler.stop();
641+ } catch (Exception e) {
642+ JMSException ex = JMSExceptionSupport.create(e);
643+ throw ex;
644+ }
645+ }
646+
647+ long lastDeliveredSequenceId = 0;
648+ for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
649+ ActiveMQSession s = i.next();
650+ s.dispose();
651+ lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, s.getLastDeliveredSequenceId());
652+ }
653+ for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
654+ ActiveMQConnectionConsumer c = i.next();
655+ c.dispose();
656+ }
657+ for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
658+ ActiveMQInputStream c = i.next();
659+ c.dispose();
660+ }
661+ for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) {
662+ ActiveMQOutputStream c = i.next();
663+ c.dispose();
664+ }
665+
666+ // As TemporaryQueue and TemporaryTopic instances are bound
667+ // to a connection we should just delete them after the connection
668+ // is closed to free up memory
669+ for (Iterator<ActiveMQTempDestination> i = this.activeTempDestinations.values().iterator(); i.hasNext();) {
670+ ActiveMQTempDestination c = i.next();
671+ c.delete();
672+ }
673+
674+ if (isConnectionInfoSentToBroker) {
675+ // If we announced ourselfs to the broker.. Try to let
676+ // the broker
677+ // know that the connection is being shutdown.
678+ RemoveInfo removeCommand = info.createRemoveCommand();
679+ removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
680+ doSyncSendPacket(info.createRemoveCommand(), closeTimeout);
681+ doAsyncSendPacket(new ShutdownInfo());
682+ }
683+
684+ ServiceSupport.dispose(this.transport);
685+
686+ started.set(false);
687+
688+ // TODO if we move the TaskRunnerFactory to the connection
689+ // factory
690+ // then we may need to call
691+ // factory.onConnectionClose(this);
692+ if (sessionTaskRunner != null) {
693+ sessionTaskRunner.shutdown();
694+ }
695+ closed.set(true);
696+ closing.set(false);
697+ }
698+ }
699+ } finally {
700+ try {
701+ if (executor != null){
702+ executor.shutdown();
703+ }
704+ }catch(Throwable e) {
705+ LOG.error("Error shutting down thread pool " + e,e);
706+ }
707+ factoryStats.removeConnection(this);
708+ }
709+ }
710+
711+ /**
712+ * Tells the broker to terminate its VM. This can be used to cleanly
713+ * terminate a broker running in a standalone java process. Server must have
714+ * property enable.vm.shutdown=true defined to allow this to work.
715+ */
716+ // TODO : org.apache.activemq.message.BrokerAdminCommand not yet
717+ // implemented.
718+ /*
719+ * public void terminateBrokerVM() throws JMSException { BrokerAdminCommand
720+ * command = new BrokerAdminCommand();
721+ * command.setCommand(BrokerAdminCommand.SHUTDOWN_SERVER_VM);
722+ * asyncSendPacket(command); }
723+ */
724+
725+ /**
726+ * Create a durable connection consumer for this connection (optional
727+ * operation). This is an expert facility not used by regular JMS clients.
728+ *
729+ * @param topic topic to access
730+ * @param subscriptionName durable subscription name
731+ * @param messageSelector only messages with properties matching the message
732+ * selector expression are delivered. A value of null or an
733+ * empty string indicates that there is no message selector
734+ * for the message consumer.
735+ * @param sessionPool the server session pool to associate with this durable
736+ * connection consumer
737+ * @param maxMessages the maximum number of messages that can be assigned to
738+ * a server session at one time
739+ * @return the durable connection consumer
740+ * @throws JMSException if the <CODE>Connection</CODE> object fails to
741+ * create a connection consumer due to some internal error
742+ * or invalid arguments for <CODE>sessionPool</CODE> and
743+ * <CODE>messageSelector</CODE>.
744+ * @throws javax.jms.InvalidDestinationException if an invalid destination
745+ * is specified.
746+ * @throws javax.jms.InvalidSelectorException if the message selector is
747+ * invalid.
748+ * @see javax.jms.ConnectionConsumer
749+ * @since 1.1
750+ */
751+ public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages)
752+ throws JMSException {
753+ return this.createDurableConnectionConsumer(topic, subscriptionName, messageSelector, sessionPool, maxMessages, false);
754+ }
755+
756+ /**
757+ * Create a durable connection consumer for this connection (optional
758+ * operation). This is an expert facility not used by regular JMS clients.
759+ *
760+ * @param topic topic to access
761+ * @param subscriptionName durable subscription name
762+ * @param messageSelector only messages with properties matching the message
763+ * selector expression are delivered. A value of null or an
764+ * empty string indicates that there is no message selector
765+ * for the message consumer.
766+ * @param sessionPool the server session pool to associate with this durable
767+ * connection consumer
768+ * @param maxMessages the maximum number of messages that can be assigned to
769+ * a server session at one time
770+ * @param noLocal set true if you want to filter out messages published
771+ * locally
772+ * @return the durable connection consumer
773+ * @throws JMSException if the <CODE>Connection</CODE> object fails to
774+ * create a connection consumer due to some internal error
775+ * or invalid arguments for <CODE>sessionPool</CODE> and
776+ * <CODE>messageSelector</CODE>.
777+ * @throws javax.jms.InvalidDestinationException if an invalid destination
778+ * is specified.
779+ * @throws javax.jms.InvalidSelectorException if the message selector is
780+ * invalid.
781+ * @see javax.jms.ConnectionConsumer
782+ * @since 1.1
783+ */
784+ public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages,
785+ boolean noLocal) throws JMSException {
786+ checkClosedOrFailed();
787+ ensureConnectionInfoSent();
788+ SessionId sessionId = new SessionId(info.getConnectionId(), -1);
789+ ConsumerInfo info = new ConsumerInfo(new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()));
790+ info.setDestination(ActiveMQMessageTransformation.transformDestination(topic));
791+ info.setSubscriptionName(subscriptionName);
792+ info.setSelector(messageSelector);
793+ info.setPrefetchSize(maxMessages);
794+ info.setDispatchAsync(isDispatchAsync());
795+
796+ // Allows the options on the destination to configure the consumerInfo
797+ if (info.getDestination().getOptions() != null) {
798+ Map<String, String> options = new HashMap<String, String>(info.getDestination().getOptions());
799+ IntrospectionSupport.setProperties(this.info, options, "consumer.");
800+ }
801+
802+ return new ActiveMQConnectionConsumer(this, sessionPool, info);
803+ }
804+
805+ // Properties
806+ // -------------------------------------------------------------------------
807+
808+ /**
809+ * Returns true if this connection has been started
810+ *
811+ * @return true if this Connection is started
812+ */
813+ public boolean isStarted() {
814+ return started.get();
815+ }
816+
817+ /**
818+ * Returns true if the connection is closed
819+ */
820+ public boolean isClosed() {
821+ return closed.get();
822+ }
823+
824+ /**
825+ * Returns true if the connection is in the process of being closed
826+ */
827+ public boolean isClosing() {
828+ return closing.get();
829+ }
830+
831+ /**
832+ * Returns true if the underlying transport has failed
833+ */
834+ public boolean isTransportFailed() {
835+ return transportFailed.get();
836+ }
837+
838+ /**
839+ * @return Returns the prefetchPolicy.
840+ */
841+ public ActiveMQPrefetchPolicy getPrefetchPolicy() {
842+ return prefetchPolicy;
843+ }
844+
845+ /**
846+ * Sets the <a
847+ * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch
848+ * policy</a> for consumers created by this connection.
849+ */
850+ public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
851+ this.prefetchPolicy = prefetchPolicy;
852+ }
853+
854+ /**
855+ */
856+ public Transport getTransportChannel() {
857+ return transport;
858+ }
859+
860+ /**
861+ * @return Returns the clientID of the connection, forcing one to be
862+ * generated if one has not yet been configured.
863+ */
864+ public String getInitializedClientID() throws JMSException {
865+ ensureConnectionInfoSent();
866+ return info.getClientId();
867+ }
868+
869+ /**
870+ * @return Returns the timeStampsDisableByDefault.
871+ */
872+ public boolean isDisableTimeStampsByDefault() {
873+ return disableTimeStampsByDefault;
874+ }
875+
876+ /**
877+ * Sets whether or not timestamps on messages should be disabled or not. If
878+ * you disable them it adds a small performance boost.
879+ */
880+ public void setDisableTimeStampsByDefault(boolean timeStampsDisableByDefault) {
881+ this.disableTimeStampsByDefault = timeStampsDisableByDefault;
882+ }
883+
884+ /**
885+ * @return Returns the dispatchOptimizedMessage.
886+ */
887+ public boolean isOptimizedMessageDispatch() {
888+ return optimizedMessageDispatch;
889+ }
890+
891+ /**
892+ * If this flag is set then an larger prefetch limit is used - only
893+ * applicable for durable topic subscribers.
894+ */
895+ public void setOptimizedMessageDispatch(boolean dispatchOptimizedMessage) {
896+ this.optimizedMessageDispatch = dispatchOptimizedMessage;
897+ }
898+
899+ /**
900+ * @return Returns the closeTimeout.
901+ */
902+ public int getCloseTimeout() {
903+ return closeTimeout;
904+ }
905+
906+ /**
907+ * Sets the timeout before a close is considered complete. Normally a
908+ * close() on a connection waits for confirmation from the broker; this
909+ * allows that operation to timeout to save the client hanging if there is
910+ * no broker
911+ */
912+ public void setCloseTimeout(int closeTimeout) {
913+ this.closeTimeout = closeTimeout;
914+ }
915+
916+ /**
917+ * @return ConnectionInfo
918+ */
919+ public ConnectionInfo getConnectionInfo() {
920+ return this.info;
921+ }
922+
923+ public boolean isUseRetroactiveConsumer() {
924+ return useRetroactiveConsumer;
925+ }
926+
927+ /**
928+ * Sets whether or not retroactive consumers are enabled. Retroactive
929+ * consumers allow non-durable topic subscribers to receive old messages
930+ * that were published before the non-durable subscriber started.
931+ */
932+ public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) {
933+ this.useRetroactiveConsumer = useRetroactiveConsumer;
934+ }
935+
936+ public boolean isNestedMapAndListEnabled() {
937+ return nestedMapAndListEnabled;
938+ }
939+
940+ /**
941+ * Enables/disables whether or not Message properties and MapMessage entries
942+ * support <a
943+ * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested
944+ * Structures</a> of Map and List objects
945+ */
946+ public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) {
947+ this.nestedMapAndListEnabled = structuredMapsEnabled;
948+ }
949+
950+ public boolean isExclusiveConsumer() {
951+ return exclusiveConsumer;
952+ }
953+
954+ /**
955+ * Enables or disables whether or not queue consumers should be exclusive or
956+ * not for example to preserve ordering when not using <a
957+ * href="http://activemq.apache.org/message-groups.html">Message Groups</a>
958+ *
959+ * @param exclusiveConsumer
960+ */
961+ public void setExclusiveConsumer(boolean exclusiveConsumer) {
962+ this.exclusiveConsumer = exclusiveConsumer;
963+ }
964+
965+ /**
966+ * Adds a transport listener so that a client can be notified of events in
967+ * the underlying transport
968+ */
969+ public void addTransportListener(TransportListener transportListener) {
970+ transportListeners.add(transportListener);
971+ }
972+
973+ public void removeTransportListener(TransportListener transportListener) {
974+ transportListeners.remove(transportListener);
975+ }
976+
977+ public boolean isUseDedicatedTaskRunner() {
978+ return useDedicatedTaskRunner;
979+ }
980+
981+ public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
982+ this.useDedicatedTaskRunner = useDedicatedTaskRunner;
983+ }
984+
985+ public TaskRunnerFactory getSessionTaskRunner() {
986+ synchronized (this) {
987+ if (sessionTaskRunner == null) {
988+ sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner());
989+ }
990+ }
991+ return sessionTaskRunner;
992+ }
993+
994+ public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) {
995+ this.sessionTaskRunner = sessionTaskRunner;
996+ }
997+
998+ public MessageTransformer getTransformer() {
999+ return transformer;
1000+ }
1001+
1002+ /**
1003+ * Sets the transformer used to transform messages before they are sent on
1004+ * to the JMS bus or when they are received from the bus but before they are
1005+ * delivered to the JMS client
1006+ */
1007+ public void setTransformer(MessageTransformer transformer) {
1008+ this.transformer = transformer;
1009+ }
1010+
1011+ /**
1012+ * @return the statsEnabled
1013+ */
1014+ public boolean isStatsEnabled() {
1015+ return this.stats.isEnabled();
1016+ }
1017+
1018+ /**
1019+ * @param statsEnabled the statsEnabled to set
1020+ */
1021+ public void setStatsEnabled(boolean statsEnabled) {
1022+ this.stats.setEnabled(statsEnabled);
1023+ }
1024+
1025+ /**
1026+ * Returns the {@link DestinationSource} object which can be used to listen to destinations
1027+ * being created or destroyed or to enquire about the current destinations available on the broker
1028+ *
1029+ * @return a lazily created destination source
1030+ * @throws JMSException
1031+ */
1032+ public DestinationSource getDestinationSource() throws JMSException {
1033+ if (destinationSource == null) {
1034+ destinationSource = new DestinationSource(this);
1035+ destinationSource.start();
1036+ }
1037+ return destinationSource;
1038+ }
1039+
1040+ // Implementation methods
1041+ // -------------------------------------------------------------------------
1042+
1043+ /**
1044+ * Used internally for adding Sessions to the Connection
1045+ *
1046+ * @param session
1047+ * @throws JMSException
1048+ * @throws JMSException
1049+ */
1050+ protected void addSession(ActiveMQSession session) throws JMSException {
1051+ this.sessions.add(session);
1052+ if (sessions.size() > 1 || session.isTransacted()) {
1053+ optimizedMessageDispatch = false;
1054+ }
1055+ }
1056+
1057+ /**
1058+ * Used interanlly for removing Sessions from a Connection
1059+ *
1060+ * @param session
1061+ */
1062+ protected void removeSession(ActiveMQSession session) {
1063+ this.sessions.remove(session);
1064+ this.removeDispatcher(session);
1065+ }
1066+
1067+ /**
1068+ * Add a ConnectionConsumer
1069+ *
1070+ * @param connectionConsumer
1071+ * @throws JMSException
1072+ */
1073+ protected void addConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) throws JMSException {
1074+ this.connectionConsumers.add(connectionConsumer);
1075+ }
1076+
1077+ /**
1078+ * Remove a ConnectionConsumer
1079+ *
1080+ * @param connectionConsumer
1081+ */
1082+ protected void removeConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) {
1083+ this.connectionConsumers.remove(connectionConsumer);
1084+ this.removeDispatcher(connectionConsumer);
1085+ }
1086+
1087+ /**
1088+ * Creates a <CODE>TopicSession</CODE> object.
1089+ *
1090+ * @param transacted indicates whether the session is transacted
1091+ * @param acknowledgeMode indicates whether the consumer or the client will
1092+ * acknowledge any messages it receives; ignored if the
1093+ * session is transacted. Legal values are
1094+ * <code>Session.AUTO_ACKNOWLEDGE</code>,
1095+ * <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1096+ * <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1097+ * @return a newly created topic session
1098+ * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
1099+ * to create a session due to some internal error or lack of
1100+ * support for the specific transaction and acknowledgement
1101+ * mode.
1102+ * @see Session#AUTO_ACKNOWLEDGE
1103+ * @see Session#CLIENT_ACKNOWLEDGE
1104+ * @see Session#DUPS_OK_ACKNOWLEDGE
1105+ */
1106+ public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException {
1107+ return new ActiveMQTopicSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
1108+ }
1109+
1110+ /**
1111+ * Creates a connection consumer for this connection (optional operation).
1112+ * This is an expert facility not used by regular JMS clients.
1113+ *
1114+ * @param topic the topic to access
1115+ * @param messageSelector only messages with properties matching the message
1116+ * selector expression are delivered. A value of null or an
1117+ * empty string indicates that there is no message selector
1118+ * for the message consumer.
1119+ * @param sessionPool the server session pool to associate with this
1120+ * connection consumer
1121+ * @param maxMessages the maximum number of messages that can be assigned to
1122+ * a server session at one time
1123+ * @return the connection consumer
1124+ * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
1125+ * to create a connection consumer due to some internal
1126+ * error or invalid arguments for <CODE>sessionPool</CODE>
1127+ * and <CODE>messageSelector</CODE>.
1128+ * @throws javax.jms.InvalidDestinationException if an invalid topic is
1129+ * specified.
1130+ * @throws javax.jms.InvalidSelectorException if the message selector is
1131+ * invalid.
1132+ * @see javax.jms.ConnectionConsumer
1133+ */
1134+ public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1135+ return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, false);
1136+ }
1137+
1138+ /**
1139+ * Creates a connection consumer for this connection (optional operation).
1140+ * This is an expert facility not used by regular JMS clients.
1141+ *
1142+ * @param queue the queue to access
1143+ * @param messageSelector only messages with properties matching the message
1144+ * selector expression are delivered. A value of null or an
1145+ * empty string indicates that there is no message selector
1146+ * for the message consumer.
1147+ * @param sessionPool the server session pool to associate with this
1148+ * connection consumer
1149+ * @param maxMessages the maximum number of messages that can be assigned to
1150+ * a server session at one time
1151+ * @return the connection consumer
1152+ * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
1153+ * to create a connection consumer due to some internal
1154+ * error or invalid arguments for <CODE>sessionPool</CODE>
1155+ * and <CODE>messageSelector</CODE>.
1156+ * @throws javax.jms.InvalidDestinationException if an invalid queue is
1157+ * specified.
1158+ * @throws javax.jms.InvalidSelectorException if the message selector is
1159+ * invalid.
1160+ * @see javax.jms.ConnectionConsumer
1161+ */
1162+ public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1163+ return createConnectionConsumer(queue, messageSelector, sessionPool, maxMessages, false);
1164+ }
1165+
1166+ /**
1167+ * Creates a connection consumer for this connection (optional operation).
1168+ * This is an expert facility not used by regular JMS clients.
1169+ *
1170+ * @param destination the destination to access
1171+ * @param messageSelector only messages with properties matching the message
1172+ * selector expression are delivered. A value of null or an
1173+ * empty string indicates that there is no message selector
1174+ * for the message consumer.
1175+ * @param sessionPool the server session pool to associate with this
1176+ * connection consumer
1177+ * @param maxMessages the maximum number of messages that can be assigned to
1178+ * a server session at one time
1179+ * @return the connection consumer
1180+ * @throws JMSException if the <CODE>Connection</CODE> object fails to
1181+ * create a connection consumer due to some internal error
1182+ * or invalid arguments for <CODE>sessionPool</CODE> and
1183+ * <CODE>messageSelector</CODE>.
1184+ * @throws javax.jms.InvalidDestinationException if an invalid destination
1185+ * is specified.
1186+ * @throws javax.jms.InvalidSelectorException if the message selector is
1187+ * invalid.
1188+ * @see javax.jms.ConnectionConsumer
1189+ * @since 1.1
1190+ */
1191+ public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1192+ return createConnectionConsumer(destination, messageSelector, sessionPool, maxMessages, false);
1193+ }
1194+
1195+ public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages, boolean noLocal)
1196+ throws JMSException {
1197+
1198+ checkClosedOrFailed();
1199+ ensureConnectionInfoSent();
1200+
1201+ ConsumerId consumerId = createConsumerId();
1202+ ConsumerInfo consumerInfo = new ConsumerInfo(consumerId);
1203+ consumerInfo.setDestination(ActiveMQMessageTransformation.transformDestination(destination));
1204+ consumerInfo.setSelector(messageSelector);
1205+ consumerInfo.setPrefetchSize(maxMessages);
1206+ consumerInfo.setNoLocal(noLocal);
1207+ consumerInfo.setDispatchAsync(isDispatchAsync());
1208+
1209+ // Allows the options on the destination to configure the consumerInfo
1210+ if (consumerInfo.getDestination().getOptions() != null) {
1211+ Map<String, String> options = new HashMap<String, String>(consumerInfo.getDestination().getOptions());
1212+ IntrospectionSupport.setProperties(consumerInfo, options, "consumer.");
1213+ }
1214+
1215+ return new ActiveMQConnectionConsumer(this, sessionPool, consumerInfo);
1216+ }
1217+
1218+ /**
1219+ * @return
1220+ */
1221+ private ConsumerId createConsumerId() {
1222+ return new ConsumerId(connectionSessionId, consumerIdGenerator.getNextSequenceId());
1223+ }
1224+
1225+ /**
1226+ * @return
1227+ */
1228+ private ProducerId createProducerId() {
1229+ return new ProducerId(connectionSessionId, producerIdGenerator.getNextSequenceId());
1230+ }
1231+
1232+ /**
1233+ * Creates a <CODE>QueueSession</CODE> object.
1234+ *
1235+ * @param transacted indicates whether the session is transacted
1236+ * @param acknowledgeMode indicates whether the consumer or the client will
1237+ * acknowledge any messages it receives; ignored if the
1238+ * session is transacted. Legal values are
1239+ * <code>Session.AUTO_ACKNOWLEDGE</code>,
1240+ * <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1241+ * <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1242+ * @return a newly created queue session
1243+ * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
1244+ * to create a session due to some internal error or lack of
1245+ * support for the specific transaction and acknowledgement
1246+ * mode.
1247+ * @see Session#AUTO_ACKNOWLEDGE
1248+ * @see Session#CLIENT_ACKNOWLEDGE
1249+ * @see Session#DUPS_OK_ACKNOWLEDGE
1250+ */
1251+ public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException {
1252+ return new ActiveMQQueueSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
1253+ }
1254+
1255+ /**
1256+ * Ensures that the clientID was manually specified and not auto-generated.
1257+ * If the clientID was not specified this method will throw an exception.
1258+ * This method is used to ensure that the clientID + durableSubscriber name
1259+ * are used correctly.
1260+ *
1261+ * @throws JMSException
1262+ */
1263+ public void checkClientIDWasManuallySpecified() throws JMSException {
1264+ if (!userSpecifiedClientID) {
1265+ throw new JMSException("You cannot create a durable subscriber without specifying a unique clientID on a Connection");
1266+ }
1267+ }
1268+
1269+ /**
1270+ * send a Packet through the Connection - for internal use only
1271+ *
1272+ * @param command
1273+ * @throws JMSException
1274+ */
1275+ public void asyncSendPacket(Command command) throws JMSException {
1276+ if (isClosed()) {
1277+ throw new ConnectionClosedException();
1278+ } else {
1279+ doAsyncSendPacket(command);
1280+ }
1281+ }
1282+
1283+ private void doAsyncSendPacket(Command command) throws JMSException {
1284+ try {
1285+ this.transport.oneway(command);
1286+ } catch (IOException e) {
1287+ throw JMSExceptionSupport.create(e);
1288+ }
1289+ }
1290+
1291+ /**
1292+ * Send a packet through a Connection - for internal use only
1293+ *
1294+ * @param command
1295+ * @return
1296+ * @throws JMSException
1297+ */
1298+ public Response syncSendPacket(Command command) throws JMSException {
1299+ if (isClosed()) {
1300+ throw new ConnectionClosedException();
1301+ } else {
1302+
1303+ try {
1304+ Response response = (Response)this.transport.request(command);
1305+ if (response.isException()) {
1306+ ExceptionResponse er = (ExceptionResponse)response;
1307+ if (er.getException() instanceof JMSException) {
1308+ throw (JMSException)er.getException();
1309+ } else {
1310+ if (isClosed()||closing.get()) {
1311+ LOG.debug("Received an exception but connection is closing");
1312+ }
1313+ JMSException jmsEx = null;
1314+ try {
1315+ jmsEx = JMSExceptionSupport.create(er.getException());
1316+ }catch(Throwable e) {
1317+ LOG.error("Caught an exception trying to create a JMSException for " +er.getException(),e);
1318+ }
1319+ if(jmsEx !=null) {
1320+ throw jmsEx;
1321+ }
1322+ }
1323+ }
1324+ return response;
1325+ } catch (IOException e) {
1326+ throw JMSExceptionSupport.create(e);
1327+ }
1328+ }
1329+ }
1330+
1331+ /**
1332+ * Send a packet through a Connection - for internal use only
1333+ *
1334+ * @param command
1335+ * @return
1336+ * @throws JMSException
1337+ */
1338+ public Response syncSendPacket(Command command, int timeout) throws JMSException {
1339+ if (isClosed() || closing.get()) {
1340+ throw new ConnectionClosedException();
1341+ } else {
1342+ return doSyncSendPacket(command, timeout);
1343+ }
1344+ }
1345+
1346+ private Response doSyncSendPacket(Command command, int timeout)
1347+ throws JMSException {
1348+ try {
1349+ Response response = (Response) (timeout > 0
1350+ ? this.transport.request(command, timeout)
1351+ : this.transport.request(command));
1352+ if (response != null && response.isException()) {
1353+ ExceptionResponse er = (ExceptionResponse)response;
1354+ if (er.getException() instanceof JMSException) {
1355+ throw (JMSException)er.getException();
1356+ } else {
1357+ throw JMSExceptionSupport.create(er.getException());
1358+ }
1359+ }
1360+ return response;
1361+ } catch (IOException e) {
1362+ throw JMSExceptionSupport.create(e);
1363+ }
1364+ }
1365+
1366+ /**
1367+ * @return statistics for this Connection
1368+ */
1369+ public StatsImpl getStats() {
1370+ return stats;
1371+ }
1372+
1373+ /**
1374+ * simply throws an exception if the Connection is already closed or the
1375+ * Transport has failed
1376+ *
1377+ * @throws JMSException
1378+ */
1379+ protected synchronized void checkClosedOrFailed() throws JMSException {
1380+ checkClosed();
1381+ if (transportFailed.get()) {
1382+ throw new ConnectionFailedException(firstFailureError);
1383+ }
1384+ }
1385+
1386+ /**
1387+ * simply throws an exception if the Connection is already closed
1388+ *
1389+ * @throws JMSException
1390+ */
1391+ protected synchronized void checkClosed() throws JMSException {
1392+ if (closed.get()) {
1393+ throw new ConnectionClosedException();
1394+ }
1395+ }
1396+
1397+ /**
1398+ * Send the ConnectionInfo to the Broker
1399+ *
1400+ * @throws JMSException
1401+ */
1402+ protected void ensureConnectionInfoSent() throws JMSException {
1403+ synchronized(this.ensureConnectionInfoSentMutex) {
1404+ // Can we skip sending the ConnectionInfo packet??
1405+ if (isConnectionInfoSentToBroker || closed.get()) {
1406+ return;
1407+ }
1408+ //TODO shouldn't this check be on userSpecifiedClientID rather than the value of clientID?
1409+ if (info.getClientId() == null || info.getClientId().trim().length() == 0) {
1410+ info.setClientId(clientIdGenerator.generateId());
1411+ }
1412+ syncSendPacket(info.copy());
1413+
1414+ this.isConnectionInfoSentToBroker = true;
1415+ // Add a temp destination advisory consumer so that
1416+ // We know what the valid temporary destinations are on the
1417+ // broker without having to do an RPC to the broker.
1418+
1419+ ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1), consumerIdGenerator.getNextSequenceId());
1420+ if (watchTopicAdvisories) {
1421+ advisoryConsumer = new AdvisoryConsumer(this, consumerId);
1422+ }
1423+ }
1424+ }
1425+
1426+ public synchronized boolean isWatchTopicAdvisories() {
1427+ return watchTopicAdvisories;
1428+ }
1429+
1430+ public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) {
1431+ this.watchTopicAdvisories = watchTopicAdvisories;
1432+ }
1433+
1434+ /**
1435+ * @return Returns the useAsyncSend.
1436+ */
1437+ public boolean isUseAsyncSend() {
1438+ return useAsyncSend;
1439+ }
1440+
1441+ /**
1442+ * Forces the use of <a
1443+ * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which
1444+ * adds a massive performance boost; but means that the send() method will
1445+ * return immediately whether the message has been sent or not which could
1446+ * lead to message loss.
1447+ */
1448+ public void setUseAsyncSend(boolean useAsyncSend) {
1449+ this.useAsyncSend = useAsyncSend;
1450+ }
1451+
1452+ /**
1453+ * @return true if always sync send messages
1454+ */
1455+ public boolean isAlwaysSyncSend() {
1456+ return this.alwaysSyncSend;
1457+ }
1458+
1459+ /**
1460+ * Set true if always require messages to be sync sent
1461+ *
1462+ * @param alwaysSyncSend
1463+ */
1464+ public void setAlwaysSyncSend(boolean alwaysSyncSend) {
1465+ this.alwaysSyncSend = alwaysSyncSend;
1466+ }
1467+
1468+ /**
1469+ * @return the messagePrioritySupported
1470+ */
1471+ public boolean isMessagePrioritySupported() {
1472+ return this.messagePrioritySupported;
1473+ }
1474+
1475+ /**
1476+ * @param messagePrioritySupported the messagePrioritySupported to set
1477+ */
1478+ public void setMessagePrioritySupported(boolean messagePrioritySupported) {
1479+ this.messagePrioritySupported = messagePrioritySupported;
1480+ }
1481+
1482+ /**
1483+ * Cleans up this connection so that it's state is as if the connection was
1484+ * just created. This allows the Resource Adapter to clean up a connection
1485+ * so that it can be reused without having to close and recreate the
1486+ * connection.
1487+ */
1488+ public void cleanup() throws JMSException {
1489+
1490+ if (advisoryConsumer != null && !isTransportFailed()) {
1491+ advisoryConsumer.dispose();
1492+ advisoryConsumer = null;
1493+ }
1494+
1495+ for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
1496+ ActiveMQSession s = i.next();
1497+ s.dispose();
1498+ }
1499+ for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
1500+ ActiveMQConnectionConsumer c = i.next();
1501+ c.dispose();
1502+ }
1503+ for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
1504+ ActiveMQInputStream c = i.next();
1505+ c.dispose();
1506+ }
1507+ for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) {
1508+ ActiveMQOutputStream c = i.next();
1509+ c.dispose();
1510+ }
1511+
1512+ if (isConnectionInfoSentToBroker) {
1513+ if (!transportFailed.get() && !closing.get()) {
1514+ syncSendPacket(info.createRemoveCommand());
1515+ }
1516+ isConnectionInfoSentToBroker = false;
1517+ }
1518+ if (userSpecifiedClientID) {
1519+ info.setClientId(null);
1520+ userSpecifiedClientID = false;
1521+ }
1522+ clientIDSet = false;
1523+
1524+ started.set(false);
1525+ }
1526+
1527+ /**
1528+ * Changes the associated username/password that is associated with this
1529+ * connection. If the connection has been used, you must called cleanup()
1530+ * before calling this method.
1531+ *
1532+ * @throws IllegalStateException if the connection is in used.
1533+ */
1534+ public void changeUserInfo(String userName, String password) throws JMSException {
1535+ if (isConnectionInfoSentToBroker) {
1536+ throw new IllegalStateException("changeUserInfo used Connection is not allowed");
1537+ }
1538+ this.info.setUserName(userName);
1539+ this.info.setPassword(password);
1540+ }
1541+
1542+ /**
1543+ * @return Returns the resourceManagerId.
1544+ * @throws JMSException
1545+ */
1546+ public String getResourceManagerId() throws JMSException {
1547+ waitForBrokerInfo();
1548+ if (brokerInfo == null) {
1549+ throw new JMSException("Connection failed before Broker info was received.");
1550+ }
1551+ return brokerInfo.getBrokerId().getValue();
1552+ }
1553+
1554+ /**
1555+ * Returns the broker name if one is available or null if one is not
1556+ * available yet.
1557+ */
1558+ public String getBrokerName() {
1559+ try {
1560+ brokerInfoReceived.await(5, TimeUnit.SECONDS);
1561+ if (brokerInfo == null) {
1562+ return null;
1563+ }
1564+ return brokerInfo.getBrokerName();
1565+ } catch (InterruptedException e) {
1566+ Thread.currentThread().interrupt();
1567+ return null;
1568+ }
1569+ }
1570+
1571+ /**
1572+ * Returns the broker information if it is available or null if it is not
1573+ * available yet.
1574+ */
1575+ public BrokerInfo getBrokerInfo() {
1576+ return brokerInfo;
1577+ }
1578+
1579+ /**
1580+ * @return Returns the RedeliveryPolicy.
1581+ * @throws JMSException
1582+ */
1583+ public RedeliveryPolicy getRedeliveryPolicy() throws JMSException {
1584+ return redeliveryPolicy;
1585+ }
1586+
1587+ /**
1588+ * Sets the redelivery policy to be used when messages are rolled back
1589+ */
1590+ public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
1591+ this.redeliveryPolicy = redeliveryPolicy;
1592+ }
1593+
1594+ public BlobTransferPolicy getBlobTransferPolicy() {
1595+ if (blobTransferPolicy == null) {
1596+ blobTransferPolicy = createBlobTransferPolicy();
1597+ }
1598+ return blobTransferPolicy;
1599+ }
1600+
1601+ /**
1602+ * Sets the policy used to describe how out-of-band BLOBs (Binary Large
1603+ * OBjects) are transferred from producers to brokers to consumers
1604+ */
1605+ public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
1606+ this.blobTransferPolicy = blobTransferPolicy;
1607+ }
1608+
1609+ /**
1610+ * @return Returns the alwaysSessionAsync.
1611+ */
1612+ public boolean isAlwaysSessionAsync() {
1613+ return alwaysSessionAsync;
1614+ }
1615+
1616+ /**
1617+ * If this flag is set then a separate thread is not used for dispatching
1618+ * messages for each Session in the Connection. However, a separate thread
1619+ * is always used if there is more than one session, or the session isn't in
1620+ * auto acknowledge or duplicates ok mode
1621+ */
1622+ public void setAlwaysSessionAsync(boolean alwaysSessionAsync) {
1623+ this.alwaysSessionAsync = alwaysSessionAsync;
1624+ }
1625+
1626+ /**
1627+ * @return Returns the optimizeAcknowledge.
1628+ */
1629+ public boolean isOptimizeAcknowledge() {
1630+ return optimizeAcknowledge;
1631+ }
1632+
1633+ /**
1634+ * Enables an optimised acknowledgement mode where messages are acknowledged
1635+ * in batches rather than individually
1636+ *
1637+ * @param optimizeAcknowledge The optimizeAcknowledge to set.
1638+ */
1639+ public void setOptimizeAcknowledge(boolean optimizeAcknowledge) {
1640+ this.optimizeAcknowledge = optimizeAcknowledge;
1641+ }
1642+
1643+ public long getWarnAboutUnstartedConnectionTimeout() {
1644+ return warnAboutUnstartedConnectionTimeout;
1645+ }
1646+
1647+ /**
1648+ * Enables the timeout from a connection creation to when a warning is
1649+ * generated if the connection is not properly started via {@link #start()}
1650+ * and a message is received by a consumer. It is a very common gotcha to
1651+ * forget to <a
1652+ * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start
1653+ * the connection</a> so this option makes the default case to create a
1654+ * warning if the user forgets. To disable the warning just set the value to <
1655+ * 0 (say -1).
1656+ */
1657+ public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) {
1658+ this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
1659+ }
1660+
1661+ /**
1662+ * @return the sendTimeout
1663+ */
1664+ public int getSendTimeout() {
1665+ return sendTimeout;
1666+ }
1667+
1668+ /**
1669+ * @param sendTimeout the sendTimeout to set
1670+ */
1671+ public void setSendTimeout(int sendTimeout) {
1672+ this.sendTimeout = sendTimeout;
1673+ }
1674+
1675+ /**
1676+ * @return the sendAcksAsync
1677+ */
1678+ public boolean isSendAcksAsync() {
1679+ return sendAcksAsync;
1680+ }
1681+
1682+ /**
1683+ * @param sendAcksAsync the sendAcksAsync to set
1684+ */
1685+ public void setSendAcksAsync(boolean sendAcksAsync) {
1686+ this.sendAcksAsync = sendAcksAsync;
1687+ }
1688+
1689+
1690+ /**
1691+ * Returns the time this connection was created
1692+ */
1693+ public long getTimeCreated() {
1694+ return timeCreated;
1695+ }
1696+
1697+ private void waitForBrokerInfo() throws JMSException {
1698+ try {
1699+ brokerInfoReceived.await();
1700+ } catch (InterruptedException e) {
1701+ Thread.currentThread().interrupt();
1702+ throw JMSExceptionSupport.create(e);
1703+ }
1704+ }
1705+
1706+ // Package protected so that it can be used in unit tests
1707+ public Transport getTransport() {
1708+ return transport;
1709+ }
1710+
1711+ public void addProducer(ProducerId producerId, ActiveMQMessageProducer producer) {
1712+ producers.put(producerId, producer);
1713+ }
1714+
1715+ public void removeProducer(ProducerId producerId) {
1716+ producers.remove(producerId);
1717+ }
1718+
1719+ public void addDispatcher(ConsumerId consumerId, ActiveMQDispatcher dispatcher) {
1720+ dispatchers.put(consumerId, dispatcher);
1721+ }
1722+
1723+ public void removeDispatcher(ConsumerId consumerId) {
1724+ dispatchers.remove(consumerId);
1725+ }
1726+
1727+ /**
1728+ * @param o - the command to consume
1729+ */
1730+ public void onCommand(final Object o) {
1731+ final Command command = (Command)o;
1732+ if (!closed.get() && command != null) {
1733+ try {
1734+ command.visit(new CommandVisitorAdapter() {
1735+ @Override
1736+ public Response processMessageDispatch(MessageDispatch md) throws Exception {
1737+ waitForTransportInterruptionProcessingToComplete();
1738+ ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());
1739+ if (dispatcher != null) {
1740+ // Copy in case a embedded broker is dispatching via
1741+ // vm://
1742+ // md.getMessage() == null to signal end of queue
1743+ // browse.
1744+ Message msg = md.getMessage();
1745+ if (msg != null) {
1746+ msg = msg.copy();
1747+ msg.setReadOnlyBody(true);
1748+ msg.setReadOnlyProperties(true);
1749+ msg.setRedeliveryCounter(md.getRedeliveryCounter());
1750+ msg.setConnection(ActiveMQConnection.this);
1751+ md.setMessage(msg);
1752+ }
1753+ dispatcher.dispatch(md);
1754+ }
1755+ return null;
1756+ }
1757+
1758+ @Override
1759+ public Response processProducerAck(ProducerAck pa) throws Exception {
1760+ if (pa != null && pa.getProducerId() != null) {
1761+ ActiveMQMessageProducer producer = producers.get(pa.getProducerId());
1762+ if (producer != null) {
1763+ producer.onProducerAck(pa);
1764+ }
1765+ }
1766+ return null;
1767+ }
1768+
1769+ @Override
1770+ public Response processBrokerInfo(BrokerInfo info) throws Exception {
1771+ brokerInfo = info;
1772+ brokerInfoReceived.countDown();
1773+ optimizeAcknowledge &= !brokerInfo.isFaultTolerantConfiguration();
1774+ getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl());
1775+ return null;
1776+ }
1777+
1778+ @Override
1779+ public Response processConnectionError(final ConnectionError error) throws Exception {
1780+ executor.execute(new Runnable() {
1781+ public void run() {
1782+ onAsyncException(error.getException());
1783+ }
1784+ });
1785+ return null;
1786+ }
1787+
1788+ @Override
1789+ public Response processControlCommand(ControlCommand command) throws Exception {
1790+ onControlCommand(command);
1791+ return null;
1792+ }
1793+
1794+ @Override
1795+ public Response processConnectionControl(ConnectionControl control) throws Exception {
1796+ onConnectionControl((ConnectionControl)command);
1797+ return null;
1798+ }
1799+
1800+ @Override
1801+ public Response processConsumerControl(ConsumerControl control) throws Exception {
1802+ onConsumerControl((ConsumerControl)command);
1803+ return null;
1804+ }
1805+
1806+ @Override
1807+ public Response processWireFormat(WireFormatInfo info) throws Exception {
1808+ onWireFormatInfo((WireFormatInfo)command);
1809+ return null;
1810+ }
1811+ });
1812+ } catch (Exception e) {
1813+ onClientInternalException(e);
1814+ }
1815+
1816+ }
1817+ for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1818+ TransportListener listener = iter.next();
1819+ listener.onCommand(command);
1820+ }
1821+ }
1822+
1823+ protected void onWireFormatInfo(WireFormatInfo info) {
1824+ protocolVersion.set(info.getVersion());
1825+ }
1826+
1827+ /**
1828+ * Handles async client internal exceptions.
1829+ * A client internal exception is usually one that has been thrown
1830+ * by a container runtime component during asynchronous processing of a
1831+ * message that does not affect the connection itself.
1832+ * This method notifies the <code>ClientInternalExceptionListener</code> by invoking
1833+ * its <code>onException</code> method, if one has been registered with this connection.
1834+ *
1835+ * @param error the exception that the problem
1836+ */
1837+ public void onClientInternalException(final Throwable error) {
1838+ if ( !closed.get() && !closing.get() ) {
1839+ if ( this.clientInternalExceptionListener != null ) {
1840+ executor.execute(new Runnable() {
1841+ public void run() {
1842+ ActiveMQConnection.this.clientInternalExceptionListener.onException(error);
1843+ }
1844+ });
1845+ } else {
1846+ LOG.debug("Async client internal exception occurred with no exception listener registered: "
1847+ + error, error);
1848+ }
1849+ }
1850+ }
1851+ /**
1852+ * Used for handling async exceptions
1853+ *
1854+ * @param error
1855+ */
1856+ public void onAsyncException(Throwable error) {
1857+ if (!closed.get() && !closing.get()) {
1858+ if (this.exceptionListener != null) {
1859+
1860+ if (!(error instanceof JMSException)) {
1861+ error = JMSExceptionSupport.create(error);
1862+ }
1863+ final JMSException e = (JMSException)error;
1864+
1865+ executor.execute(new Runnable() {
1866+ public void run() {
1867+ ActiveMQConnection.this.exceptionListener.onException(e);
1868+ }
1869+ });
1870+
1871+ } else {
1872+ LOG.debug("Async exception with no exception listener: " + error, error);
1873+ }
1874+ }
1875+ }
1876+
1877+ public void onException(final IOException error) {
1878+ onAsyncException(error);
1879+ if (!closing.get() && !closed.get()) {
1880+ executor.execute(new Runnable() {
1881+ public void run() {
1882+ transportFailed(error);
1883+ ServiceSupport.dispose(ActiveMQConnection.this.transport);
1884+ brokerInfoReceived.countDown();
1885+ try {
1886+ cleanup();
1887+ } catch (JMSException e) {
1888+ LOG.warn("Exception during connection cleanup, " + e, e);
1889+ }
1890+ for (Iterator<TransportListener> iter = transportListeners
1891+ .iterator(); iter.hasNext();) {
1892+ TransportListener listener = iter.next();
1893+ listener.onException(error);
1894+ }
1895+ }
1896+ });
1897+ }
1898+ }
1899+
1900+ public void transportInterupted() {
1901+ this.transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.size() - (advisoryConsumer != null ? 1:0));
1902+ if (LOG.isDebugEnabled()) {
1903+ LOG.debug("transport interrupted, dispatchers: " + transportInterruptionProcessingComplete.getCount());
1904+ }
1905+ signalInterruptionProcessingNeeded();
1906+
1907+ for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
1908+ ActiveMQSession s = i.next();
1909+ s.clearMessagesInProgress();
1910+ }
1911+
1912+ for (ActiveMQConnectionConsumer connectionConsumer : this.connectionConsumers) {
1913+ connectionConsumer.clearMessagesInProgress();
1914+ }
1915+
1916+ for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1917+ TransportListener listener = iter.next();
1918+ listener.transportInterupted();
1919+ }
1920+ }
1921+
1922+ public void transportResumed() {
1923+ for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1924+ TransportListener listener = iter.next();
1925+ listener.transportResumed();
1926+ }
1927+ }
1928+
1929+ /**
1930+ * Create the DestinationInfo object for the temporary destination.
1931+ *
1932+ * @param topic - if its true topic, else queue.
1933+ * @return DestinationInfo
1934+ * @throws JMSException
1935+ */
1936+ protected ActiveMQTempDestination createTempDestination(boolean topic) throws JMSException {
1937+
1938+ // Check if Destination info is of temporary type.
1939+ ActiveMQTempDestination dest;
1940+ if (topic) {
1941+ dest = new ActiveMQTempTopic(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
1942+ } else {
1943+ dest = new ActiveMQTempQueue(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
1944+ }
1945+
1946+ DestinationInfo info = new DestinationInfo();
1947+ info.setConnectionId(this.info.getConnectionId());
1948+ info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
1949+ info.setDestination(dest);
1950+ syncSendPacket(info);
1951+
1952+ dest.setConnection(this);
1953+ activeTempDestinations.put(dest, dest);
1954+ return dest;
1955+ }
1956+
1957+ /**
1958+ * @param destination
1959+ * @throws JMSException
1960+ */
1961+ public void deleteTempDestination(ActiveMQTempDestination destination) throws JMSException {
1962+
1963+ checkClosedOrFailed();
1964+
1965+ for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
1966+ ActiveMQSession s = i.next();
1967+ if (s.isInUse(destination)) {
1968+ throw new JMSException("A consumer is consuming from the temporary destination");
1969+ }
1970+ }
1971+
1972+ activeTempDestinations.remove(destination);
1973+
1974+ DestinationInfo destInfo = new DestinationInfo();
1975+ destInfo.setConnectionId(this.info.getConnectionId());
1976+ destInfo.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
1977+ destInfo.setDestination(destination);
1978+ destInfo.setTimeout(0);
1979+ syncSendPacket(destInfo);
1980+ }
1981+
1982+ public boolean isDeleted(ActiveMQDestination dest) {
1983+
1984+ // If we are not watching the advisories.. then
1985+ // we will assume that the temp destination does exist.
1986+ if (advisoryConsumer == null) {
1987+ return false;
1988+ }
1989+
1990+ return !activeTempDestinations.contains(dest);
1991+ }
1992+
1993+ public boolean isCopyMessageOnSend() {
1994+ return copyMessageOnSend;
1995+ }
1996+
1997+ public LongSequenceGenerator getLocalTransactionIdGenerator() {
1998+ return localTransactionIdGenerator;
1999+ }
2000+
2001+ public boolean isUseCompression() {
2002+ return useCompression;
2003+ }
2004+
2005+ /**
2006+ * Enables the use of compression of the message bodies
2007+ */
2008+ public void setUseCompression(boolean useCompression) {
2009+ this.useCompression = useCompression;
2010+ }
2011+
2012+ public void destroyDestination(ActiveMQDestination destination) throws JMSException {
2013+
2014+ checkClosedOrFailed();
2015+ ensureConnectionInfoSent();
2016+
2017+ DestinationInfo info = new DestinationInfo();
2018+ info.setConnectionId(this.info.getConnectionId());
2019+ info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
2020+ info.setDestination(destination);
2021+ info.setTimeout(0);
2022+ syncSendPacket(info);
2023+
2024+ }
2025+
2026+ public boolean isDispatchAsync() {
2027+ return dispatchAsync;
2028+ }
2029+
2030+ /**
2031+ * Enables or disables the default setting of whether or not consumers have
2032+ * their messages <a
2033+ * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched
2034+ * synchronously or asynchronously by the broker</a>. For non-durable
2035+ * topics for example we typically dispatch synchronously by default to
2036+ * minimize context switches which boost performance. However sometimes its
2037+ * better to go slower to ensure that a single blocked consumer socket does
2038+ * not block delivery to other consumers.
2039+ *
2040+ * @param asyncDispatch If true then consumers created on this connection
2041+ * will default to having their messages dispatched
2042+ * asynchronously. The default value is false.
2043+ */
2044+ public void setDispatchAsync(boolean asyncDispatch) {
2045+ this.dispatchAsync = asyncDispatch;
2046+ }
2047+
2048+ public boolean isObjectMessageSerializationDefered() {
2049+ return objectMessageSerializationDefered;
2050+ }
2051+
2052+ /**
2053+ * When an object is set on an ObjectMessage, the JMS spec requires the
2054+ * object to be serialized by that set method. Enabling this flag causes the
2055+ * object to not get serialized. The object may subsequently get serialized
2056+ * if the message needs to be sent over a socket or stored to disk.
2057+ */
2058+ public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) {
2059+ this.objectMessageSerializationDefered = objectMessageSerializationDefered;
2060+ }
2061+
2062+ public InputStream createInputStream(Destination dest) throws JMSException {
2063+ return createInputStream(dest, null);
2064+ }
2065+
2066+ public InputStream createInputStream(Destination dest, String messageSelector) throws JMSException {
2067+ return createInputStream(dest, messageSelector, false);
2068+ }
2069+
2070+ public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException {
2071+ return createInputStream(dest, messageSelector, noLocal, -1);
2072+ }
2073+
2074+
2075+
2076+ public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal, long timeout) throws JMSException {
2077+ return doCreateInputStream(dest, messageSelector, noLocal, null, timeout);
2078+ }
2079+
2080+ public InputStream createDurableInputStream(Topic dest, String name) throws JMSException {
2081+ return createInputStream(dest, null, false);
2082+ }
2083+
2084+ public InputStream createDurableInputStream(Topic dest, String name, String messageSelector) throws JMSException {
2085+ return createDurableInputStream(dest, name, messageSelector, false);
2086+ }
2087+
2088+ public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal) throws JMSException {
2089+ return createDurableInputStream(dest, name, messageSelector, noLocal, -1);
2090+ }
2091+
2092+ public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal, long timeout) throws JMSException {
2093+ return doCreateInputStream(dest, messageSelector, noLocal, name, timeout);
2094+ }
2095+
2096+ private InputStream doCreateInputStream(Destination dest, String messageSelector, boolean noLocal, String subName, long timeout) throws JMSException {
2097+ checkClosedOrFailed();
2098+ ensureConnectionInfoSent();
2099+ return new ActiveMQInputStream(this, createConsumerId(), ActiveMQDestination.transform(dest), messageSelector, noLocal, subName, prefetchPolicy.getInputStreamPrefetch(), timeout);
2100+ }
2101+
2102+ /**
2103+ * Creates a persistent output stream; individual messages will be written
2104+ * to disk/database by the broker
2105+ */
2106+ public OutputStream createOutputStream(Destination dest) throws JMSException {
2107+ return createOutputStream(dest, null, ActiveMQMessage.DEFAULT_DELIVERY_MODE, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
2108+ }
2109+
2110+ /**
2111+ * Creates a non persistent output stream; messages will not be written to
2112+ * disk
2113+ */
2114+ public OutputStream createNonPersistentOutputStream(Destination dest) throws JMSException {
2115+ return createOutputStream(dest, null, DeliveryMode.NON_PERSISTENT, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
2116+ }
2117+
2118+ /**
2119+ * Creates an output stream allowing full control over the delivery mode,
2120+ * the priority and time to live of the messages and the properties added to
2121+ * messages on the stream.
2122+ *
2123+ * @param streamProperties defines a map of key-value pairs where the keys
2124+ * are strings and the values are primitive values (numbers
2125+ * and strings) which are appended to the messages similarly
2126+ * to using the
2127+ * {@link javax.jms.Message#setObjectProperty(String, Object)}
2128+ * method
2129+ */
2130+ public OutputStream createOutputStream(Destination dest, Map<String, Object> streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException {
2131+ checkClosedOrFailed();
2132+ ensureConnectionInfoSent();
2133+ return new ActiveMQOutputStream(this, createProducerId(), ActiveMQDestination.transform(dest), streamProperties, deliveryMode, priority, timeToLive);
2134+ }
2135+
2136+ /**
2137+ * Unsubscribes a durable subscription that has been created by a client.
2138+ * <P>
2139+ * This method deletes the state being maintained on behalf of the
2140+ * subscriber by its provider.
2141+ * <P>
2142+ * It is erroneous for a client to delete a durable subscription while there
2143+ * is an active <CODE>MessageConsumer </CODE> or
2144+ * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
2145+ * message is part of a pending transaction or has not been acknowledged in
2146+ * the session.
2147+ *
2148+ * @param name the name used to identify this subscription
2149+ * @throws JMSException if the session fails to unsubscribe to the durable
2150+ * subscription due to some internal error.
2151+ * @throws InvalidDestinationException if an invalid subscription name is
2152+ * specified.
2153+ * @since 1.1
2154+ */
2155+ public void unsubscribe(String name) throws InvalidDestinationException, JMSException {
2156+ checkClosedOrFailed();
2157+ RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
2158+ rsi.setConnectionId(getConnectionInfo().getConnectionId());
2159+ rsi.setSubscriptionName(name);
2160+ rsi.setClientId(getConnectionInfo().getClientId());
2161+ syncSendPacket(rsi);
2162+ }
2163+
2164+ /**
2165+ * Internal send method optimized: - It does not copy the message - It can
2166+ * only handle ActiveMQ messages. - You can specify if the send is async or
2167+ * sync - Does not allow you to send /w a transaction.
2168+ */
2169+ void send(ActiveMQDestination destination, ActiveMQMessage msg, MessageId messageId, int deliveryMode, int priority, long timeToLive, boolean async) throws JMSException {
2170+ checkClosedOrFailed();
2171+
2172+ if (destination.isTemporary() && isDeleted(destination)) {
2173+ throw new JMSException("Cannot publish to a deleted Destination: " + destination);
2174+ }
2175+
2176+ msg.setJMSDestination(destination);
2177+ msg.setJMSDeliveryMode(deliveryMode);
2178+ long expiration = 0L;
2179+
2180+ if (!isDisableTimeStampsByDefault()) {
2181+ long timeStamp = System.currentTimeMillis();
2182+ msg.setJMSTimestamp(timeStamp);
2183+ if (timeToLive > 0) {
2184+ expiration = timeToLive + timeStamp;
2185+ }
2186+ }
2187+
2188+ msg.setJMSExpiration(expiration);
2189+ msg.setJMSPriority(priority);
2190+
2191+ msg.setJMSRedelivered(false);
2192+ msg.setMessageId(messageId);
2193+
2194+ msg.onSend();
2195+
2196+ msg.setProducerId(msg.getMessageId().getProducerId());
2197+
2198+ if (LOG.isDebugEnabled()) {
2199+ LOG.debug("Sending message: " + msg);
2200+ }
2201+
2202+ if (async) {
2203+ asyncSendPacket(msg);
2204+ } else {
2205+ syncSendPacket(msg);
2206+ }
2207+
2208+ }
2209+
2210+ public void addOutputStream(ActiveMQOutputStream stream) {
2211+ outputStreams.add(stream);
2212+ }
2213+
2214+ public void removeOutputStream(ActiveMQOutputStream stream) {
2215+ outputStreams.remove(stream);
2216+ }
2217+
2218+ public void addInputStream(ActiveMQInputStream stream) {
2219+ inputStreams.add(stream);
2220+ }
2221+
2222+ public void removeInputStream(ActiveMQInputStream stream) {
2223+ inputStreams.remove(stream);
2224+ }
2225+
2226+ protected void onControlCommand(ControlCommand command) {
2227+ String text = command.getCommand();
2228+ if (text != null) {
2229+ if (text.equals("shutdown")) {
2230+ LOG.info("JVM told to shutdown");
2231+ System.exit(0);
2232+ }
2233+ }
2234+ }
2235+
2236+ protected void onConnectionControl(ConnectionControl command) {
2237+ if (command.isFaultTolerant()) {
2238+ this.optimizeAcknowledge = false;
2239+ for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2240+ ActiveMQSession s = i.next();
2241+ s.setOptimizeAcknowledge(false);
2242+ }
2243+ }
2244+ }
2245+
2246+ protected void onConsumerControl(ConsumerControl command) {
2247+ if (command.isClose()) {
2248+ for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2249+ ActiveMQSession s = i.next();
2250+ s.close(command.getConsumerId());
2251+ }
2252+ } else {
2253+ for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2254+ ActiveMQSession s = i.next();
2255+ s.setPrefetchSize(command.getConsumerId(), command.getPrefetch());
2256+ }
2257+ }
2258+ }
2259+
2260+ protected void transportFailed(IOException error) {
2261+ transportFailed.set(true);
2262+ if (firstFailureError == null) {
2263+ firstFailureError = error;
2264+ }
2265+ }
2266+
2267+ /**
2268+ * Should a JMS message be copied to a new JMS Message object as part of the
2269+ * send() method in JMS. This is enabled by default to be compliant with the
2270+ * JMS specification. You can disable it if you do not mutate JMS messages
2271+ * after they are sent for a performance boost
2272+ */
2273+ public void setCopyMessageOnSend(boolean copyMessageOnSend) {
2274+ this.copyMessageOnSend = copyMessageOnSend;
2275+ }
2276+
2277+ @Override
2278+ public String toString() {
2279+ return "ActiveMQConnection {id=" + info.getConnectionId() + ",clientId=" + info.getClientId() + ",started=" + started.get() + "}";
2280+ }
2281+
2282+ protected BlobTransferPolicy createBlobTransferPolicy() {
2283+ return new BlobTransferPolicy();
2284+ }
2285+
2286+ public int getProtocolVersion() {
2287+ return protocolVersion.get();
2288+ }
2289+
2290+ public int getProducerWindowSize() {
2291+ return producerWindowSize;
2292+ }
2293+
2294+ public void setProducerWindowSize(int producerWindowSize) {
2295+ this.producerWindowSize = producerWindowSize;
2296+ }
2297+
2298+ public void setAuditDepth(int auditDepth) {
2299+ connectionAudit.setAuditDepth(auditDepth);
2300+ }
2301+
2302+ public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
2303+ connectionAudit.setAuditMaximumProducerNumber(auditMaximumProducerNumber);
2304+ }
2305+
2306+ protected void removeDispatcher(ActiveMQDispatcher dispatcher) {
2307+ connectionAudit.removeDispatcher(dispatcher);
2308+ }
2309+
2310+ protected boolean isDuplicate(ActiveMQDispatcher dispatcher, Message message) {
2311+ return checkForDuplicates && connectionAudit.isDuplicate(dispatcher, message);
2312+ }
2313+
2314+ protected void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) {
2315+ connectionAudit.rollbackDuplicate(dispatcher, message);
2316+ }
2317+
2318+ public IOException getFirstFailureError() {
2319+ return firstFailureError;
2320+ }
2321+
2322+ protected void waitForTransportInterruptionProcessingToComplete() throws InterruptedException {
2323+ CountDownLatch cdl = this.transportInterruptionProcessingComplete;
2324+ if (cdl != null) {
2325+ if (!closed.get() && !transportFailed.get() && cdl.getCount()>0) {
2326+ LOG.warn("dispatch paused, waiting for outstanding dispatch interruption processing (" + cdl.getCount() + ") to complete..");
2327+ cdl.await(10, TimeUnit.SECONDS);
2328+ }
2329+ signalInterruptionProcessingComplete();
2330+ }
2331+ }
2332+
2333+ protected void transportInterruptionProcessingComplete() {
2334+ CountDownLatch cdl = this.transportInterruptionProcessingComplete;
2335+ if (cdl != null) {
2336+ cdl.countDown();
2337+ try {
2338+ signalInterruptionProcessingComplete();
2339+ } catch (InterruptedException ignored) {}
2340+ }
2341+ }
2342+
2343+ private void signalInterruptionProcessingComplete() throws InterruptedException {
2344+ CountDownLatch cdl = this.transportInterruptionProcessingComplete;
2345+ if (cdl.getCount()==0) {
2346+ if (LOG.isDebugEnabled()) {
2347+ LOG.debug("transportInterruptionProcessingComplete for: " + this.getConnectionInfo().getConnectionId());
2348+ }
2349+ this.transportInterruptionProcessingComplete = null;
2350+
2351+ FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
2352+ if (failoverTransport != null) {
2353+ failoverTransport.connectionInterruptProcessingComplete(this.getConnectionInfo().getConnectionId());
2354+ if (LOG.isDebugEnabled()) {
2355+ LOG.debug("notified failover transport (" + failoverTransport
2356+ + ") of interruption completion for: " + this.getConnectionInfo().getConnectionId());
2357+ }
2358+ }
2359+
2360+ }
2361+ }
2362+
2363+ private void signalInterruptionProcessingNeeded() {
2364+ FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
2365+ if (failoverTransport != null) {
2366+ failoverTransport.getStateTracker().transportInterrupted(this.getConnectionInfo().getConnectionId());
2367+ if (LOG.isDebugEnabled()) {
2368+ LOG.debug("notified failover transport (" + failoverTransport
2369+ + ") of pending interruption processing for: " + this.getConnectionInfo().getConnectionId());
2370+ }
2371+ }
2372+ }
2373+
2374+ /*
2375+ * specify the amount of time in milliseconds that a consumer with a transaction pending recovery
2376+ * will wait to receive re dispatched messages.
2377+ * default value is 0 so there is no wait by default.
2378+ */
2379+ public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) {
2380+ this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod;
2381+ }
2382+
2383+ public long getConsumerFailoverRedeliveryWaitPeriod() {
2384+ return consumerFailoverRedeliveryWaitPeriod;
2385+ }
2386+
2387+ protected Scheduler getScheduler() {
2388+ return this.scheduler;
2389+ }
2390+
2391+ protected ThreadPoolExecutor getExecutor() {
2392+ return this.executor;
2393+ }
2394+
2395+ /**
2396+ * @return the checkForDuplicates
2397+ */
2398+ public boolean isCheckForDuplicates() {
2399+ return this.checkForDuplicates;
2400+ }
2401+
2402+ /**
2403+ * @param checkForDuplicates the checkForDuplicates to set
2404+ */
2405+ public void setCheckForDuplicates(boolean checkForDuplicates) {
2406+ this.checkForDuplicates = checkForDuplicates;
2407+ }
2408+
2409+}
2410
2411=== removed file '.pc/CVE-2011-4605.diff/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java'
2412--- .pc/CVE-2011-4605.diff/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java 2012-01-15 19:38:21 +0000
2413+++ .pc/CVE-2011-4605.diff/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java 1970-01-01 00:00:00 +0000
2414@@ -1,2389 +0,0 @@
2415-/**
2416- * Licensed to the Apache Software Foundation (ASF) under one or more
2417- * contributor license agreements. See the NOTICE file distributed with
2418- * this work for additional information regarding copyright ownership.
2419- * The ASF licenses this file to You under the Apache License, Version 2.0
2420- * (the "License"); you may not use this file except in compliance with
2421- * the License. You may obtain a copy of the License at
2422- *
2423- * http://www.apache.org/licenses/LICENSE-2.0
2424- *
2425- * Unless required by applicable law or agreed to in writing, software
2426- * distributed under the License is distributed on an "AS IS" BASIS,
2427- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
2428- * See the License for the specific language governing permissions and
2429- * limitations under the License.
2430- */
2431-package org.apache.activemq;
2432-
2433-import java.io.IOException;
2434-import java.io.InputStream;
2435-import java.io.OutputStream;
2436-import java.net.URI;
2437-import java.net.URISyntaxException;
2438-import java.util.HashMap;
2439-import java.util.Iterator;
2440-import java.util.Map;
2441-import java.util.concurrent.ConcurrentHashMap;
2442-import java.util.concurrent.CopyOnWriteArrayList;
2443-import java.util.concurrent.CountDownLatch;
2444-import java.util.concurrent.LinkedBlockingQueue;
2445-import java.util.concurrent.ThreadFactory;
2446-import java.util.concurrent.ThreadPoolExecutor;
2447-import java.util.concurrent.TimeUnit;
2448-import java.util.concurrent.atomic.AtomicBoolean;
2449-import java.util.concurrent.atomic.AtomicInteger;
2450-import javax.jms.Connection;
2451-import javax.jms.ConnectionConsumer;
2452-import javax.jms.ConnectionMetaData;
2453-import javax.jms.DeliveryMode;
2454-import javax.jms.Destination;
2455-import javax.jms.ExceptionListener;
2456-import javax.jms.IllegalStateException;
2457-import javax.jms.InvalidDestinationException;
2458-import javax.jms.JMSException;
2459-import javax.jms.Queue;
2460-import javax.jms.QueueConnection;
2461-import javax.jms.QueueSession;
2462-import javax.jms.ServerSessionPool;
2463-import javax.jms.Session;
2464-import javax.jms.Topic;
2465-import javax.jms.TopicConnection;
2466-import javax.jms.TopicSession;
2467-import javax.jms.XAConnection;
2468-import org.apache.activemq.advisory.DestinationSource;
2469-import org.apache.activemq.blob.BlobTransferPolicy;
2470-import org.apache.activemq.command.ActiveMQDestination;
2471-import org.apache.activemq.command.ActiveMQMessage;
2472-import org.apache.activemq.command.ActiveMQTempDestination;
2473-import org.apache.activemq.command.ActiveMQTempQueue;
2474-import org.apache.activemq.command.ActiveMQTempTopic;
2475-import org.apache.activemq.command.BrokerInfo;
2476-import org.apache.activemq.command.Command;
2477-import org.apache.activemq.command.CommandTypes;
2478-import org.apache.activemq.command.ConnectionControl;
2479-import org.apache.activemq.command.ConnectionError;
2480-import org.apache.activemq.command.ConnectionId;
2481-import org.apache.activemq.command.ConnectionInfo;
2482-import org.apache.activemq.command.ConsumerControl;
2483-import org.apache.activemq.command.ConsumerId;
2484-import org.apache.activemq.command.ConsumerInfo;
2485-import org.apache.activemq.command.ControlCommand;
2486-import org.apache.activemq.command.DestinationInfo;
2487-import org.apache.activemq.command.ExceptionResponse;
2488-import org.apache.activemq.command.Message;
2489-import org.apache.activemq.command.MessageDispatch;
2490-import org.apache.activemq.command.MessageId;
2491-import org.apache.activemq.command.ProducerAck;
2492-import org.apache.activemq.command.ProducerId;
2493-import org.apache.activemq.command.RemoveInfo;
2494-import org.apache.activemq.command.RemoveSubscriptionInfo;
2495-import org.apache.activemq.command.Response;
2496-import org.apache.activemq.command.SessionId;
2497-import org.apache.activemq.command.ShutdownInfo;
2498-import org.apache.activemq.command.WireFormatInfo;
2499-import org.apache.activemq.management.JMSConnectionStatsImpl;
2500-import org.apache.activemq.management.JMSStatsImpl;
2501-import org.apache.activemq.management.StatsCapable;
2502-import org.apache.activemq.management.StatsImpl;
2503-import org.apache.activemq.state.CommandVisitorAdapter;
2504-import org.apache.activemq.thread.Scheduler;
2505-import org.apache.activemq.thread.TaskRunnerFactory;
2506-import org.apache.activemq.transport.Transport;
2507-import org.apache.activemq.transport.TransportListener;
2508-import org.apache.activemq.transport.failover.FailoverTransport;
2509-import org.apache.activemq.util.IdGenerator;
2510-import org.apache.activemq.util.IntrospectionSupport;
2511-import org.apache.activemq.util.JMSExceptionSupport;
2512-import org.apache.activemq.util.LongSequenceGenerator;
2513-import org.apache.activemq.util.ServiceSupport;
2514-import org.slf4j.Logger;
2515-import org.slf4j.LoggerFactory;
2516-
2517-public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, StreamConnection, TransportListener, EnhancedConnection {
2518-
2519- public static final String DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER;
2520- public static final String DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
2521- public static final String DEFAULT_BROKER_URL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
2522-
2523- private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnection.class);
2524- private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
2525-
2526- public final ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination> activeTempDestinations = new ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination>();
2527-
2528- protected boolean dispatchAsync=true;
2529- protected boolean alwaysSessionAsync = true;
2530-
2531- private TaskRunnerFactory sessionTaskRunner;
2532- private final ThreadPoolExecutor executor;
2533-
2534- // Connection state variables
2535- private final ConnectionInfo info;
2536- private ExceptionListener exceptionListener;
2537- private ClientInternalExceptionListener clientInternalExceptionListener;
2538- private boolean clientIDSet;
2539- private boolean isConnectionInfoSentToBroker;
2540- private boolean userSpecifiedClientID;
2541-
2542- // Configuration options variables
2543- private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
2544- private BlobTransferPolicy blobTransferPolicy;
2545- private RedeliveryPolicy redeliveryPolicy;
2546- private MessageTransformer transformer;
2547-
2548- private boolean disableTimeStampsByDefault;
2549- private boolean optimizedMessageDispatch = true;
2550- private boolean copyMessageOnSend = true;
2551- private boolean useCompression;
2552- private boolean objectMessageSerializationDefered;
2553- private boolean useAsyncSend;
2554- private boolean optimizeAcknowledge;
2555- private boolean nestedMapAndListEnabled = true;
2556- private boolean useRetroactiveConsumer;
2557- private boolean exclusiveConsumer;
2558- private boolean alwaysSyncSend;
2559- private int closeTimeout = 15000;
2560- private boolean watchTopicAdvisories = true;
2561- private long warnAboutUnstartedConnectionTimeout = 500L;
2562- private int sendTimeout =0;
2563- private boolean sendAcksAsync=true;
2564- private boolean checkForDuplicates = true;
2565-
2566- private final Transport transport;
2567- private final IdGenerator clientIdGenerator;
2568- private final JMSStatsImpl factoryStats;
2569- private final JMSConnectionStatsImpl stats;
2570-
2571- private final AtomicBoolean started = new AtomicBoolean(false);
2572- private final AtomicBoolean closing = new AtomicBoolean(false);
2573- private final AtomicBoolean closed = new AtomicBoolean(false);
2574- private final AtomicBoolean transportFailed = new AtomicBoolean(false);
2575- private final CopyOnWriteArrayList<ActiveMQSession> sessions = new CopyOnWriteArrayList<ActiveMQSession>();
2576- private final CopyOnWriteArrayList<ActiveMQConnectionConsumer> connectionConsumers = new CopyOnWriteArrayList<ActiveMQConnectionConsumer>();
2577- private final CopyOnWriteArrayList<ActiveMQInputStream> inputStreams = new CopyOnWriteArrayList<ActiveMQInputStream>();
2578- private final CopyOnWriteArrayList<ActiveMQOutputStream> outputStreams = new CopyOnWriteArrayList<ActiveMQOutputStream>();
2579- private final CopyOnWriteArrayList<TransportListener> transportListeners = new CopyOnWriteArrayList<TransportListener>();
2580-
2581- // Maps ConsumerIds to ActiveMQConsumer objects
2582- private final ConcurrentHashMap<ConsumerId, ActiveMQDispatcher> dispatchers = new ConcurrentHashMap<ConsumerId, ActiveMQDispatcher>();
2583- private final ConcurrentHashMap<ProducerId, ActiveMQMessageProducer> producers = new ConcurrentHashMap<ProducerId, ActiveMQMessageProducer>();
2584- private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator();
2585- private final SessionId connectionSessionId;
2586- private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
2587- private final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator();
2588- private final LongSequenceGenerator tempDestinationIdGenerator = new LongSequenceGenerator();
2589- private final LongSequenceGenerator localTransactionIdGenerator = new LongSequenceGenerator();
2590-
2591- private AdvisoryConsumer advisoryConsumer;
2592- private final CountDownLatch brokerInfoReceived = new CountDownLatch(1);
2593- private BrokerInfo brokerInfo;
2594- private IOException firstFailureError;
2595- private int producerWindowSize = ActiveMQConnectionFactory.DEFAULT_PRODUCER_WINDOW_SIZE;
2596-
2597- // Assume that protocol is the latest. Change to the actual protocol
2598- // version when a WireFormatInfo is received.
2599- private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
2600- private final long timeCreated;
2601- private final ConnectionAudit connectionAudit = new ConnectionAudit();
2602- private DestinationSource destinationSource;
2603- private final Object ensureConnectionInfoSentMutex = new Object();
2604- private boolean useDedicatedTaskRunner;
2605- protected volatile CountDownLatch transportInterruptionProcessingComplete;
2606- private long consumerFailoverRedeliveryWaitPeriod;
2607- private final Scheduler scheduler;
2608- private boolean messagePrioritySupported=true;
2609-
2610- /**
2611- * Construct an <code>ActiveMQConnection</code>
2612- *
2613- * @param transport
2614- * @param factoryStats
2615- * @throws Exception
2616- */
2617- protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, JMSStatsImpl factoryStats) throws Exception {
2618-
2619- this.transport = transport;
2620- this.clientIdGenerator = clientIdGenerator;
2621- this.factoryStats = factoryStats;
2622-
2623- // Configure a single threaded executor who's core thread can timeout if
2624- // idle
2625- executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
2626- public Thread newThread(Runnable r) {
2627- Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport);
2628- thread.setDaemon(true);
2629- return thread;
2630- }
2631- });
2632- // asyncConnectionThread.allowCoreThreadTimeOut(true);
2633- String uniqueId = CONNECTION_ID_GENERATOR.generateId();
2634- this.info = new ConnectionInfo(new ConnectionId(uniqueId));
2635- this.info.setManageable(true);
2636- this.info.setFaultTolerant(transport.isFaultTolerant());
2637- this.connectionSessionId = new SessionId(info.getConnectionId(), -1);
2638-
2639- this.transport.setTransportListener(this);
2640-
2641- this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection);
2642- this.factoryStats.addConnection(this);
2643- this.timeCreated = System.currentTimeMillis();
2644- this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant());
2645- this.scheduler = new Scheduler("ActiveMQConnection["+uniqueId+"] Scheduler");
2646- this.scheduler.start();
2647- }
2648-
2649- protected void setUserName(String userName) {
2650- this.info.setUserName(userName);
2651- }
2652-
2653- protected void setPassword(String password) {
2654- this.info.setPassword(password);
2655- }
2656-
2657- /**
2658- * A static helper method to create a new connection
2659- *
2660- * @return an ActiveMQConnection
2661- * @throws JMSException
2662- */
2663- public static ActiveMQConnection makeConnection() throws JMSException {
2664- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
2665- return (ActiveMQConnection)factory.createConnection();
2666- }
2667-
2668- /**
2669- * A static helper method to create a new connection
2670- *
2671- * @param uri
2672- * @return and ActiveMQConnection
2673- * @throws JMSException
2674- */
2675- public static ActiveMQConnection makeConnection(String uri) throws JMSException, URISyntaxException {
2676- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
2677- return (ActiveMQConnection)factory.createConnection();
2678- }
2679-
2680- /**
2681- * A static helper method to create a new connection
2682- *
2683- * @param user
2684- * @param password
2685- * @param uri
2686- * @return an ActiveMQConnection
2687- * @throws JMSException
2688- */
2689- public static ActiveMQConnection makeConnection(String user, String password, String uri) throws JMSException, URISyntaxException {
2690- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, password, new URI(uri));
2691- return (ActiveMQConnection)factory.createConnection();
2692- }
2693-
2694- /**
2695- * @return a number unique for this connection
2696- */
2697- public JMSConnectionStatsImpl getConnectionStats() {
2698- return stats;
2699- }
2700-
2701- /**
2702- * Creates a <CODE>Session</CODE> object.
2703- *
2704- * @param transacted indicates whether the session is transacted
2705- * @param acknowledgeMode indicates whether the consumer or the client will
2706- * acknowledge any messages it receives; ignored if the
2707- * session is transacted. Legal values are
2708- * <code>Session.AUTO_ACKNOWLEDGE</code>,
2709- * <code>Session.CLIENT_ACKNOWLEDGE</code>, and
2710- * <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
2711- * @return a newly created session
2712- * @throws JMSException if the <CODE>Connection</CODE> object fails to
2713- * create a session due to some internal error or lack of
2714- * support for the specific transaction and acknowledgement
2715- * mode.
2716- * @see Session#AUTO_ACKNOWLEDGE
2717- * @see Session#CLIENT_ACKNOWLEDGE
2718- * @see Session#DUPS_OK_ACKNOWLEDGE
2719- * @since 1.1
2720- */
2721- public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
2722- checkClosedOrFailed();
2723- ensureConnectionInfoSent();
2724- if(!transacted) {
2725- if (acknowledgeMode==Session.SESSION_TRANSACTED) {
2726- throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session");
2727- } else if (acknowledgeMode < Session.SESSION_TRANSACTED || acknowledgeMode > ActiveMQSession.MAX_ACK_CONSTANT) {
2728- throw new JMSException("invalid acknowledgeMode: " + acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), " +
2729- "Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)");
2730- }
2731- }
2732- return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : (acknowledgeMode == Session.SESSION_TRANSACTED
2733- ? Session.AUTO_ACKNOWLEDGE : acknowledgeMode), isDispatchAsync(), isAlwaysSessionAsync());
2734- }
2735-
2736- /**
2737- * @return sessionId
2738- */
2739- protected SessionId getNextSessionId() {
2740- return new SessionId(info.getConnectionId(), sessionIdGenerator.getNextSequenceId());
2741- }
2742-
2743- /**
2744- * Gets the client identifier for this connection.
2745- * <P>
2746- * This value is specific to the JMS provider. It is either preconfigured by
2747- * an administrator in a <CODE> ConnectionFactory</CODE> object or assigned
2748- * dynamically by the application by calling the <code>setClientID</code>
2749- * method.
2750- *
2751- * @return the unique client identifier
2752- * @throws JMSException if the JMS provider fails to return the client ID
2753- * for this connection due to some internal error.
2754- */
2755- public String getClientID() throws JMSException {
2756- checkClosedOrFailed();
2757- return this.info.getClientId();
2758- }
2759-
2760- /**
2761- * Sets the client identifier for this connection.
2762- * <P>
2763- * The preferred way to assign a JMS client's client identifier is for it to
2764- * be configured in a client-specific <CODE>ConnectionFactory</CODE>
2765- * object and transparently assigned to the <CODE>Connection</CODE> object
2766- * it creates.
2767- * <P>
2768- * Alternatively, a client can set a connection's client identifier using a
2769- * provider-specific value. The facility to set a connection's client
2770- * identifier explicitly is not a mechanism for overriding the identifier
2771- * that has been administratively configured. It is provided for the case
2772- * where no administratively specified identifier exists. If one does exist,
2773- * an attempt to change it by setting it must throw an
2774- * <CODE>IllegalStateException</CODE>. If a client sets the client
2775- * identifier explicitly, it must do so immediately after it creates the
2776- * connection and before any other action on the connection is taken. After
2777- * this point, setting the client identifier is a programming error that
2778- * should throw an <CODE>IllegalStateException</CODE>.
2779- * <P>
2780- * The purpose of the client identifier is to associate a connection and its
2781- * objects with a state maintained on behalf of the client by a provider.
2782- * The only such state identified by the JMS API is that required to support
2783- * durable subscriptions.
2784- * <P>
2785- * If another connection with the same <code>clientID</code> is already
2786- * running when this method is called, the JMS provider should detect the
2787- * duplicate ID and throw an <CODE>InvalidClientIDException</CODE>.
2788- *
2789- * @param newClientID the unique client identifier
2790- * @throws JMSException if the JMS provider fails to set the client ID for
2791- * this connection due to some internal error.
2792- * @throws javax.jms.InvalidClientIDException if the JMS client specifies an
2793- * invalid or duplicate client ID.
2794- * @throws javax.jms.IllegalStateException if the JMS client attempts to set
2795- * a connection's client ID at the wrong time or when it has
2796- * been administratively configured.
2797- */
2798- public void setClientID(String newClientID) throws JMSException {
2799- checkClosedOrFailed();
2800-
2801- if (this.clientIDSet) {
2802- throw new IllegalStateException("The clientID has already been set");
2803- }
2804-
2805- if (this.isConnectionInfoSentToBroker) {
2806- throw new IllegalStateException("Setting clientID on a used Connection is not allowed");
2807- }
2808-
2809- this.info.setClientId(newClientID);
2810- this.userSpecifiedClientID = true;
2811- ensureConnectionInfoSent();
2812- }
2813-
2814- /**
2815- * Sets the default client id that the connection will use if explicitly not
2816- * set with the setClientId() call.
2817- */
2818- public void setDefaultClientID(String clientID) throws JMSException {
2819- this.info.setClientId(clientID);
2820- this.userSpecifiedClientID = true;
2821- }
2822-
2823- /**
2824- * Gets the metadata for this connection.
2825- *
2826- * @return the connection metadata
2827- * @throws JMSException if the JMS provider fails to get the connection
2828- * metadata for this connection.
2829- * @see javax.jms.ConnectionMetaData
2830- */
2831- public ConnectionMetaData getMetaData() throws JMSException {
2832- checkClosedOrFailed();
2833- return ActiveMQConnectionMetaData.INSTANCE;
2834- }
2835-
2836- /**
2837- * Gets the <CODE>ExceptionListener</CODE> object for this connection. Not
2838- * every <CODE>Connection</CODE> has an <CODE>ExceptionListener</CODE>
2839- * associated with it.
2840- *
2841- * @return the <CODE>ExceptionListener</CODE> for this connection, or
2842- * null, if no <CODE>ExceptionListener</CODE> is associated with
2843- * this connection.
2844- * @throws JMSException if the JMS provider fails to get the
2845- * <CODE>ExceptionListener</CODE> for this connection.
2846- * @see javax.jms.Connection#setExceptionListener(ExceptionListener)
2847- */
2848- public ExceptionListener getExceptionListener() throws JMSException {
2849- checkClosedOrFailed();
2850- return this.exceptionListener;
2851- }
2852-
2853- /**
2854- * Sets an exception listener for this connection.
2855- * <P>
2856- * If a JMS provider detects a serious problem with a connection, it informs
2857- * the connection's <CODE> ExceptionListener</CODE>, if one has been
2858- * registered. It does this by calling the listener's <CODE>onException
2859- * </CODE>
2860- * method, passing it a <CODE>JMSException</CODE> object describing the
2861- * problem.
2862- * <P>
2863- * An exception listener allows a client to be notified of a problem
2864- * asynchronously. Some connections only consume messages, so they would
2865- * have no other way to learn their connection has failed.
2866- * <P>
2867- * A connection serializes execution of its <CODE>ExceptionListener</CODE>.
2868- * <P>
2869- * A JMS provider should attempt to resolve connection problems itself
2870- * before it notifies the client of them.
2871- *
2872- * @param listener the exception listener
2873- * @throws JMSException if the JMS provider fails to set the exception
2874- * listener for this connection.
2875- */
2876- public void setExceptionListener(ExceptionListener listener) throws JMSException {
2877- checkClosedOrFailed();
2878- this.exceptionListener = listener;
2879- }
2880-
2881- /**
2882- * Gets the <code>ClientInternalExceptionListener</code> object for this connection.
2883- * Not every <CODE>ActiveMQConnectionn</CODE> has a <CODE>ClientInternalExceptionListener</CODE>
2884- * associated with it.
2885- *
2886- * @return the listener or <code>null</code> if no listener is registered with the connection.
2887- */
2888- public ClientInternalExceptionListener getClientInternalExceptionListener()
2889- {
2890- return clientInternalExceptionListener;
2891- }
2892-
2893- /**
2894- * Sets a client internal exception listener for this connection.
2895- * The connection will notify the listener, if one has been registered, of exceptions thrown by container components
2896- * (e.g. an EJB container in case of Message Driven Beans) during asynchronous processing of a message.
2897- * It does this by calling the listener's <code>onException()</code> method passing it a <code>Throwable</code>
2898- * describing the problem.
2899- *
2900- * @param listener the exception listener
2901- */
2902- public void setClientInternalExceptionListener(ClientInternalExceptionListener listener)
2903- {
2904- this.clientInternalExceptionListener = listener;
2905- }
2906-
2907- /**
2908- * Starts (or restarts) a connection's delivery of incoming messages. A call
2909- * to <CODE>start</CODE> on a connection that has already been started is
2910- * ignored.
2911- *
2912- * @throws JMSException if the JMS provider fails to start message delivery
2913- * due to some internal error.
2914- * @see javax.jms.Connection#stop()
2915- */
2916- public void start() throws JMSException {
2917- checkClosedOrFailed();
2918- ensureConnectionInfoSent();
2919- if (started.compareAndSet(false, true)) {
2920- for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
2921- ActiveMQSession session = i.next();
2922- session.start();
2923- }
2924- }
2925- }
2926-
2927- /**
2928- * Temporarily stops a connection's delivery of incoming messages. Delivery
2929- * can be restarted using the connection's <CODE>start</CODE> method. When
2930- * the connection is stopped, delivery to all the connection's message
2931- * consumers is inhibited: synchronous receives block, and messages are not
2932- * delivered to message listeners.
2933- * <P>
2934- * This call blocks until receives and/or message listeners in progress have
2935- * completed.
2936- * <P>
2937- * Stopping a connection has no effect on its ability to send messages. A
2938- * call to <CODE>stop</CODE> on a connection that has already been stopped
2939- * is ignored.
2940- * <P>
2941- * A call to <CODE>stop</CODE> must not return until delivery of messages
2942- * has paused. This means that a client can rely on the fact that none of
2943- * its message listeners will be called and that all threads of control
2944- * waiting for <CODE>receive</CODE> calls to return will not return with a
2945- * message until the connection is restarted. The receive timers for a
2946- * stopped connection continue to advance, so receives may time out while
2947- * the connection is stopped.
2948- * <P>
2949- * If message listeners are running when <CODE>stop</CODE> is invoked, the
2950- * <CODE>stop</CODE> call must wait until all of them have returned before
2951- * it may return. While these message listeners are completing, they must
2952- * have the full services of the connection available to them.
2953- *
2954- * @throws JMSException if the JMS provider fails to stop message delivery
2955- * due to some internal error.
2956- * @see javax.jms.Connection#start()
2957- */
2958- public void stop() throws JMSException {
2959- checkClosedOrFailed();
2960- if (started.compareAndSet(true, false)) {
2961- synchronized(sessions) {
2962- for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
2963- ActiveMQSession s = i.next();
2964- s.stop();
2965- }
2966- }
2967- }
2968- }
2969-
2970- /**
2971- * Closes the connection.
2972- * <P>
2973- * Since a provider typically allocates significant resources outside the
2974- * JVM on behalf of a connection, clients should close these resources when
2975- * they are not needed. Relying on garbage collection to eventually reclaim
2976- * these resources may not be timely enough.
2977- * <P>
2978- * There is no need to close the sessions, producers, and consumers of a
2979- * closed connection.
2980- * <P>
2981- * Closing a connection causes all temporary destinations to be deleted.
2982- * <P>
2983- * When this method is invoked, it should not return until message
2984- * processing has been shut down in an orderly fashion. This means that all
2985- * message listeners that may have been running have returned, and that all
2986- * pending receives have returned. A close terminates all pending message
2987- * receives on the connection's sessions' consumers. The receives may return
2988- * with a message or with null, depending on whether there was a message
2989- * available at the time of the close. If one or more of the connection's
2990- * sessions' message listeners is processing a message at the time when
2991- * connection <CODE>close</CODE> is invoked, all the facilities of the
2992- * connection and its sessions must remain available to those listeners
2993- * until they return control to the JMS provider.
2994- * <P>
2995- * Closing a connection causes any of its sessions' transactions in progress
2996- * to be rolled back. In the case where a session's work is coordinated by
2997- * an external transaction manager, a session's <CODE>commit</CODE> and
2998- * <CODE> rollback</CODE> methods are not used and the result of a closed
2999- * session's work is determined later by the transaction manager. Closing a
3000- * connection does NOT force an acknowledgment of client-acknowledged
3001- * sessions.
3002- * <P>
3003- * Invoking the <CODE>acknowledge</CODE> method of a received message from
3004- * a closed connection's session must throw an
3005- * <CODE>IllegalStateException</CODE>. Closing a closed connection must
3006- * NOT throw an exception.
3007- *
3008- * @throws JMSException if the JMS provider fails to close the connection
3009- * due to some internal error. For example, a failure to
3010- * release resources or to close a socket connection can
3011- * cause this exception to be thrown.
3012- */
3013- public void close() throws JMSException {
3014- try {
3015- // If we were running, lets stop first.
3016- if (!closed.get() && !transportFailed.get()) {
3017- stop();
3018- }
3019-
3020- synchronized (this) {
3021- if (!closed.get()) {
3022- closing.set(true);
3023-
3024- if (destinationSource != null) {
3025- destinationSource.stop();
3026- destinationSource = null;
3027- }
3028- if (advisoryConsumer != null) {
3029- advisoryConsumer.dispose();
3030- advisoryConsumer = null;
3031- }
3032- if (this.scheduler != null) {
3033- try {
3034- this.scheduler.stop();
3035- } catch (Exception e) {
3036- JMSException ex = JMSExceptionSupport.create(e);
3037- throw ex;
3038- }
3039- }
3040-
3041- long lastDeliveredSequenceId = 0;
3042- for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
3043- ActiveMQSession s = i.next();
3044- s.dispose();
3045- lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, s.getLastDeliveredSequenceId());
3046- }
3047- for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
3048- ActiveMQConnectionConsumer c = i.next();
3049- c.dispose();
3050- }
3051- for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
3052- ActiveMQInputStream c = i.next();
3053- c.dispose();
3054- }
3055- for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) {
3056- ActiveMQOutputStream c = i.next();
3057- c.dispose();
3058- }
3059-
3060- // As TemporaryQueue and TemporaryTopic instances are bound
3061- // to a connection we should just delete them after the connection
3062- // is closed to free up memory
3063- for (Iterator<ActiveMQTempDestination> i = this.activeTempDestinations.values().iterator(); i.hasNext();) {
3064- ActiveMQTempDestination c = i.next();
3065- c.delete();
3066- }
3067-
3068- if (isConnectionInfoSentToBroker) {
3069- // If we announced ourselfs to the broker.. Try to let
3070- // the broker
3071- // know that the connection is being shutdown.
3072- RemoveInfo removeCommand = info.createRemoveCommand();
3073- removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
3074- doSyncSendPacket(info.createRemoveCommand(), closeTimeout);
3075- doAsyncSendPacket(new ShutdownInfo());
3076- }
3077-
3078- ServiceSupport.dispose(this.transport);
3079-
3080- started.set(false);
3081-
3082- // TODO if we move the TaskRunnerFactory to the connection
3083- // factory
3084- // then we may need to call
3085- // factory.onConnectionClose(this);
3086- if (sessionTaskRunner != null) {
3087- sessionTaskRunner.shutdown();
3088- }
3089- closed.set(true);
3090- closing.set(false);
3091- }
3092- }
3093- } finally {
3094- try {
3095- if (executor != null){
3096- executor.shutdown();
3097- }
3098- }catch(Throwable e) {
3099- LOG.error("Error shutting down thread pool " + e,e);
3100- }
3101- factoryStats.removeConnection(this);
3102- }
3103- }
3104-
3105- /**
3106- * Tells the broker to terminate its VM. This can be used to cleanly
3107- * terminate a broker running in a standalone java process. Server must have
3108- * property enable.vm.shutdown=true defined to allow this to work.
3109- */
3110- // TODO : org.apache.activemq.message.BrokerAdminCommand not yet
3111- // implemented.
3112- /*
3113- * public void terminateBrokerVM() throws JMSException { BrokerAdminCommand
3114- * command = new BrokerAdminCommand();
3115- * command.setCommand(BrokerAdminCommand.SHUTDOWN_SERVER_VM);
3116- * asyncSendPacket(command); }
3117- */
3118-
3119- /**
3120- * Create a durable connection consumer for this connection (optional
3121- * operation). This is an expert facility not used by regular JMS clients.
3122- *
3123- * @param topic topic to access
3124- * @param subscriptionName durable subscription name
3125- * @param messageSelector only messages with properties matching the message
3126- * selector expression are delivered. A value of null or an
3127- * empty string indicates that there is no message selector
3128- * for the message consumer.
3129- * @param sessionPool the server session pool to associate with this durable
3130- * connection consumer
3131- * @param maxMessages the maximum number of messages that can be assigned to
3132- * a server session at one time
3133- * @return the durable connection consumer
3134- * @throws JMSException if the <CODE>Connection</CODE> object fails to
3135- * create a connection consumer due to some internal error
3136- * or invalid arguments for <CODE>sessionPool</CODE> and
3137- * <CODE>messageSelector</CODE>.
3138- * @throws javax.jms.InvalidDestinationException if an invalid destination
3139- * is specified.
3140- * @throws javax.jms.InvalidSelectorException if the message selector is
3141- * invalid.
3142- * @see javax.jms.ConnectionConsumer
3143- * @since 1.1
3144- */
3145- public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages)
3146- throws JMSException {
3147- return this.createDurableConnectionConsumer(topic, subscriptionName, messageSelector, sessionPool, maxMessages, false);
3148- }
3149-
3150- /**
3151- * Create a durable connection consumer for this connection (optional
3152- * operation). This is an expert facility not used by regular JMS clients.
3153- *
3154- * @param topic topic to access
3155- * @param subscriptionName durable subscription name
3156- * @param messageSelector only messages with properties matching the message
3157- * selector expression are delivered. A value of null or an
3158- * empty string indicates that there is no message selector
3159- * for the message consumer.
3160- * @param sessionPool the server session pool to associate with this durable
3161- * connection consumer
3162- * @param maxMessages the maximum number of messages that can be assigned to
3163- * a server session at one time
3164- * @param noLocal set true if you want to filter out messages published
3165- * locally
3166- * @return the durable connection consumer
3167- * @throws JMSException if the <CODE>Connection</CODE> object fails to
3168- * create a connection consumer due to some internal error
3169- * or invalid arguments for <CODE>sessionPool</CODE> and
3170- * <CODE>messageSelector</CODE>.
3171- * @throws javax.jms.InvalidDestinationException if an invalid destination
3172- * is specified.
3173- * @throws javax.jms.InvalidSelectorException if the message selector is
3174- * invalid.
3175- * @see javax.jms.ConnectionConsumer
3176- * @since 1.1
3177- */
3178- public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages,
3179- boolean noLocal) throws JMSException {
3180- checkClosedOrFailed();
3181- ensureConnectionInfoSent();
3182- SessionId sessionId = new SessionId(info.getConnectionId(), -1);
3183- ConsumerInfo info = new ConsumerInfo(new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()));
3184- info.setDestination(ActiveMQMessageTransformation.transformDestination(topic));
3185- info.setSubscriptionName(subscriptionName);
3186- info.setSelector(messageSelector);
3187- info.setPrefetchSize(maxMessages);
3188- info.setDispatchAsync(isDispatchAsync());
3189-
3190- // Allows the options on the destination to configure the consumerInfo
3191- if (info.getDestination().getOptions() != null) {
3192- Map<String, String> options = new HashMap<String, String>(info.getDestination().getOptions());
3193- IntrospectionSupport.setProperties(this.info, options, "consumer.");
3194- }
3195-
3196- return new ActiveMQConnectionConsumer(this, sessionPool, info);
3197- }
3198-
3199- // Properties
3200- // -------------------------------------------------------------------------
3201-
3202- /**
3203- * Returns true if this connection has been started
3204- *
3205- * @return true if this Connection is started
3206- */
3207- public boolean isStarted() {
3208- return started.get();
3209- }
3210-
3211- /**
3212- * Returns true if the connection is closed
3213- */
3214- public boolean isClosed() {
3215- return closed.get();
3216- }
3217-
3218- /**
3219- * Returns true if the connection is in the process of being closed
3220- */
3221- public boolean isClosing() {
3222- return closing.get();
3223- }
3224-
3225- /**
3226- * Returns true if the underlying transport has failed
3227- */
3228- public boolean isTransportFailed() {
3229- return transportFailed.get();
3230- }
3231-
3232- /**
3233- * @return Returns the prefetchPolicy.
3234- */
3235- public ActiveMQPrefetchPolicy getPrefetchPolicy() {
3236- return prefetchPolicy;
3237- }
3238-
3239- /**
3240- * Sets the <a
3241- * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch
3242- * policy</a> for consumers created by this connection.
3243- */
3244- public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
3245- this.prefetchPolicy = prefetchPolicy;
3246- }
3247-
3248- /**
3249- */
3250- public Transport getTransportChannel() {
3251- return transport;
3252- }
3253-
3254- /**
3255- * @return Returns the clientID of the connection, forcing one to be
3256- * generated if one has not yet been configured.
3257- */
3258- public String getInitializedClientID() throws JMSException {
3259- ensureConnectionInfoSent();
3260- return info.getClientId();
3261- }
3262-
3263- /**
3264- * @return Returns the timeStampsDisableByDefault.
3265- */
3266- public boolean isDisableTimeStampsByDefault() {
3267- return disableTimeStampsByDefault;
3268- }
3269-
3270- /**
3271- * Sets whether or not timestamps on messages should be disabled or not. If
3272- * you disable them it adds a small performance boost.
3273- */
3274- public void setDisableTimeStampsByDefault(boolean timeStampsDisableByDefault) {
3275- this.disableTimeStampsByDefault = timeStampsDisableByDefault;
3276- }
3277-
3278- /**
3279- * @return Returns the dispatchOptimizedMessage.
3280- */
3281- public boolean isOptimizedMessageDispatch() {
3282- return optimizedMessageDispatch;
3283- }
3284-
3285- /**
3286- * If this flag is set then an larger prefetch limit is used - only
3287- * applicable for durable topic subscribers.
3288- */
3289- public void setOptimizedMessageDispatch(boolean dispatchOptimizedMessage) {
3290- this.optimizedMessageDispatch = dispatchOptimizedMessage;
3291- }
3292-
3293- /**
3294- * @return Returns the closeTimeout.
3295- */
3296- public int getCloseTimeout() {
3297- return closeTimeout;
3298- }
3299-
3300- /**
3301- * Sets the timeout before a close is considered complete. Normally a
3302- * close() on a connection waits for confirmation from the broker; this
3303- * allows that operation to timeout to save the client hanging if there is
3304- * no broker
3305- */
3306- public void setCloseTimeout(int closeTimeout) {
3307- this.closeTimeout = closeTimeout;
3308- }
3309-
3310- /**
3311- * @return ConnectionInfo
3312- */
3313- public ConnectionInfo getConnectionInfo() {
3314- return this.info;
3315- }
3316-
3317- public boolean isUseRetroactiveConsumer() {
3318- return useRetroactiveConsumer;
3319- }
3320-
3321- /**
3322- * Sets whether or not retroactive consumers are enabled. Retroactive
3323- * consumers allow non-durable topic subscribers to receive old messages
3324- * that were published before the non-durable subscriber started.
3325- */
3326- public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) {
3327- this.useRetroactiveConsumer = useRetroactiveConsumer;
3328- }
3329-
3330- public boolean isNestedMapAndListEnabled() {
3331- return nestedMapAndListEnabled;
3332- }
3333-
3334- /**
3335- * Enables/disables whether or not Message properties and MapMessage entries
3336- * support <a
3337- * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested
3338- * Structures</a> of Map and List objects
3339- */
3340- public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) {
3341- this.nestedMapAndListEnabled = structuredMapsEnabled;
3342- }
3343-
3344- public boolean isExclusiveConsumer() {
3345- return exclusiveConsumer;
3346- }
3347-
3348- /**
3349- * Enables or disables whether or not queue consumers should be exclusive or
3350- * not for example to preserve ordering when not using <a
3351- * href="http://activemq.apache.org/message-groups.html">Message Groups</a>
3352- *
3353- * @param exclusiveConsumer
3354- */
3355- public void setExclusiveConsumer(boolean exclusiveConsumer) {
3356- this.exclusiveConsumer = exclusiveConsumer;
3357- }
3358-
3359- /**
3360- * Adds a transport listener so that a client can be notified of events in
3361- * the underlying transport
3362- */
3363- public void addTransportListener(TransportListener transportListener) {
3364- transportListeners.add(transportListener);
3365- }
3366-
3367- public void removeTransportListener(TransportListener transportListener) {
3368- transportListeners.remove(transportListener);
3369- }
3370-
3371- public boolean isUseDedicatedTaskRunner() {
3372- return useDedicatedTaskRunner;
3373- }
3374-
3375- public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
3376- this.useDedicatedTaskRunner = useDedicatedTaskRunner;
3377- }
3378-
3379- public TaskRunnerFactory getSessionTaskRunner() {
3380- synchronized (this) {
3381- if (sessionTaskRunner == null) {
3382- sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner());
3383- }
3384- }
3385- return sessionTaskRunner;
3386- }
3387-
3388- public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) {
3389- this.sessionTaskRunner = sessionTaskRunner;
3390- }
3391-
3392- public MessageTransformer getTransformer() {
3393- return transformer;
3394- }
3395-
3396- /**
3397- * Sets the transformer used to transform messages before they are sent on
3398- * to the JMS bus or when they are received from the bus but before they are
3399- * delivered to the JMS client
3400- */
3401- public void setTransformer(MessageTransformer transformer) {
3402- this.transformer = transformer;
3403- }
3404-
3405- /**
3406- * @return the statsEnabled
3407- */
3408- public boolean isStatsEnabled() {
3409- return this.stats.isEnabled();
3410- }
3411-
3412- /**
3413- * @param statsEnabled the statsEnabled to set
3414- */
3415- public void setStatsEnabled(boolean statsEnabled) {
3416- this.stats.setEnabled(statsEnabled);
3417- }
3418-
3419- /**
3420- * Returns the {@link DestinationSource} object which can be used to listen to destinations
3421- * being created or destroyed or to enquire about the current destinations available on the broker
3422- *
3423- * @return a lazily created destination source
3424- * @throws JMSException
3425- */
3426- public DestinationSource getDestinationSource() throws JMSException {
3427- if (destinationSource == null) {
3428- destinationSource = new DestinationSource(this);
3429- destinationSource.start();
3430- }
3431- return destinationSource;
3432- }
3433-
3434- // Implementation methods
3435- // -------------------------------------------------------------------------
3436-
3437- /**
3438- * Used internally for adding Sessions to the Connection
3439- *
3440- * @param session
3441- * @throws JMSException
3442- * @throws JMSException
3443- */
3444- protected void addSession(ActiveMQSession session) throws JMSException {
3445- this.sessions.add(session);
3446- if (sessions.size() > 1 || session.isTransacted()) {
3447- optimizedMessageDispatch = false;
3448- }
3449- }
3450-
3451- /**
3452- * Used interanlly for removing Sessions from a Connection
3453- *
3454- * @param session
3455- */
3456- protected void removeSession(ActiveMQSession session) {
3457- this.sessions.remove(session);
3458- this.removeDispatcher(session);
3459- }
3460-
3461- /**
3462- * Add a ConnectionConsumer
3463- *
3464- * @param connectionConsumer
3465- * @throws JMSException
3466- */
3467- protected void addConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) throws JMSException {
3468- this.connectionConsumers.add(connectionConsumer);
3469- }
3470-
3471- /**
3472- * Remove a ConnectionConsumer
3473- *
3474- * @param connectionConsumer
3475- */
3476- protected void removeConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) {
3477- this.connectionConsumers.remove(connectionConsumer);
3478- this.removeDispatcher(connectionConsumer);
3479- }
3480-
3481- /**
3482- * Creates a <CODE>TopicSession</CODE> object.
3483- *
3484- * @param transacted indicates whether the session is transacted
3485- * @param acknowledgeMode indicates whether the consumer or the client will
3486- * acknowledge any messages it receives; ignored if the
3487- * session is transacted. Legal values are
3488- * <code>Session.AUTO_ACKNOWLEDGE</code>,
3489- * <code>Session.CLIENT_ACKNOWLEDGE</code>, and
3490- * <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
3491- * @return a newly created topic session
3492- * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
3493- * to create a session due to some internal error or lack of
3494- * support for the specific transaction and acknowledgement
3495- * mode.
3496- * @see Session#AUTO_ACKNOWLEDGE
3497- * @see Session#CLIENT_ACKNOWLEDGE
3498- * @see Session#DUPS_OK_ACKNOWLEDGE
3499- */
3500- public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException {
3501- return new ActiveMQTopicSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
3502- }
3503-
3504- /**
3505- * Creates a connection consumer for this connection (optional operation).
3506- * This is an expert facility not used by regular JMS clients.
3507- *
3508- * @param topic the topic to access
3509- * @param messageSelector only messages with properties matching the message
3510- * selector expression are delivered. A value of null or an
3511- * empty string indicates that there is no message selector
3512- * for the message consumer.
3513- * @param sessionPool the server session pool to associate with this
3514- * connection consumer
3515- * @param maxMessages the maximum number of messages that can be assigned to
3516- * a server session at one time
3517- * @return the connection consumer
3518- * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
3519- * to create a connection consumer due to some internal
3520- * error or invalid arguments for <CODE>sessionPool</CODE>
3521- * and <CODE>messageSelector</CODE>.
3522- * @throws javax.jms.InvalidDestinationException if an invalid topic is
3523- * specified.
3524- * @throws javax.jms.InvalidSelectorException if the message selector is
3525- * invalid.
3526- * @see javax.jms.ConnectionConsumer
3527- */
3528- public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
3529- return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, false);
3530- }
3531-
3532- /**
3533- * Creates a connection consumer for this connection (optional operation).
3534- * This is an expert facility not used by regular JMS clients.
3535- *
3536- * @param queue the queue to access
3537- * @param messageSelector only messages with properties matching the message
3538- * selector expression are delivered. A value of null or an
3539- * empty string indicates that there is no message selector
3540- * for the message consumer.
3541- * @param sessionPool the server session pool to associate with this
3542- * connection consumer
3543- * @param maxMessages the maximum number of messages that can be assigned to
3544- * a server session at one time
3545- * @return the connection consumer
3546- * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
3547- * to create a connection consumer due to some internal
3548- * error or invalid arguments for <CODE>sessionPool</CODE>
3549- * and <CODE>messageSelector</CODE>.
3550- * @throws javax.jms.InvalidDestinationException if an invalid queue is
3551- * specified.
3552- * @throws javax.jms.InvalidSelectorException if the message selector is
3553- * invalid.
3554- * @see javax.jms.ConnectionConsumer
3555- */
3556- public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
3557- return createConnectionConsumer(queue, messageSelector, sessionPool, maxMessages, false);
3558- }
3559-
3560- /**
3561- * Creates a connection consumer for this connection (optional operation).
3562- * This is an expert facility not used by regular JMS clients.
3563- *
3564- * @param destination the destination to access
3565- * @param messageSelector only messages with properties matching the message
3566- * selector expression are delivered. A value of null or an
3567- * empty string indicates that there is no message selector
3568- * for the message consumer.
3569- * @param sessionPool the server session pool to associate with this
3570- * connection consumer
3571- * @param maxMessages the maximum number of messages that can be assigned to
3572- * a server session at one time
3573- * @return the connection consumer
3574- * @throws JMSException if the <CODE>Connection</CODE> object fails to
3575- * create a connection consumer due to some internal error
3576- * or invalid arguments for <CODE>sessionPool</CODE> and
3577- * <CODE>messageSelector</CODE>.
3578- * @throws javax.jms.InvalidDestinationException if an invalid destination
3579- * is specified.
3580- * @throws javax.jms.InvalidSelectorException if the message selector is
3581- * invalid.
3582- * @see javax.jms.ConnectionConsumer
3583- * @since 1.1
3584- */
3585- public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
3586- return createConnectionConsumer(destination, messageSelector, sessionPool, maxMessages, false);
3587- }
3588-
3589- public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages, boolean noLocal)
3590- throws JMSException {
3591-
3592- checkClosedOrFailed();
3593- ensureConnectionInfoSent();
3594-
3595- ConsumerId consumerId = createConsumerId();
3596- ConsumerInfo consumerInfo = new ConsumerInfo(consumerId);
3597- consumerInfo.setDestination(ActiveMQMessageTransformation.transformDestination(destination));
3598- consumerInfo.setSelector(messageSelector);
3599- consumerInfo.setPrefetchSize(maxMessages);
3600- consumerInfo.setNoLocal(noLocal);
3601- consumerInfo.setDispatchAsync(isDispatchAsync());
3602-
3603- // Allows the options on the destination to configure the consumerInfo
3604- if (consumerInfo.getDestination().getOptions() != null) {
3605- Map<String, String> options = new HashMap<String, String>(consumerInfo.getDestination().getOptions());
3606- IntrospectionSupport.setProperties(consumerInfo, options, "consumer.");
3607- }
3608-
3609- return new ActiveMQConnectionConsumer(this, sessionPool, consumerInfo);
3610- }
3611-
3612- /**
3613- * @return
3614- */
3615- private ConsumerId createConsumerId() {
3616- return new ConsumerId(connectionSessionId, consumerIdGenerator.getNextSequenceId());
3617- }
3618-
3619- /**
3620- * @return
3621- */
3622- private ProducerId createProducerId() {
3623- return new ProducerId(connectionSessionId, producerIdGenerator.getNextSequenceId());
3624- }
3625-
3626- /**
3627- * Creates a <CODE>QueueSession</CODE> object.
3628- *
3629- * @param transacted indicates whether the session is transacted
3630- * @param acknowledgeMode indicates whether the consumer or the client will
3631- * acknowledge any messages it receives; ignored if the
3632- * session is transacted. Legal values are
3633- * <code>Session.AUTO_ACKNOWLEDGE</code>,
3634- * <code>Session.CLIENT_ACKNOWLEDGE</code>, and
3635- * <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
3636- * @return a newly created queue session
3637- * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
3638- * to create a session due to some internal error or lack of
3639- * support for the specific transaction and acknowledgement
3640- * mode.
3641- * @see Session#AUTO_ACKNOWLEDGE
3642- * @see Session#CLIENT_ACKNOWLEDGE
3643- * @see Session#DUPS_OK_ACKNOWLEDGE
3644- */
3645- public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException {
3646- return new ActiveMQQueueSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
3647- }
3648-
3649- /**
3650- * Ensures that the clientID was manually specified and not auto-generated.
3651- * If the clientID was not specified this method will throw an exception.
3652- * This method is used to ensure that the clientID + durableSubscriber name
3653- * are used correctly.
3654- *
3655- * @throws JMSException
3656- */
3657- public void checkClientIDWasManuallySpecified() throws JMSException {
3658- if (!userSpecifiedClientID) {
3659- throw new JMSException("You cannot create a durable subscriber without specifying a unique clientID on a Connection");
3660- }
3661- }
3662-
3663- /**
3664- * send a Packet through the Connection - for internal use only
3665- *
3666- * @param command
3667- * @throws JMSException
3668- */
3669- public void asyncSendPacket(Command command) throws JMSException {
3670- if (isClosed()) {
3671- throw new ConnectionClosedException();
3672- } else {
3673- doAsyncSendPacket(command);
3674- }
3675- }
3676-
3677- private void doAsyncSendPacket(Command command) throws JMSException {
3678- try {
3679- this.transport.oneway(command);
3680- } catch (IOException e) {
3681- throw JMSExceptionSupport.create(e);
3682- }
3683- }
3684-
3685- /**
3686- * Send a packet through a Connection - for internal use only
3687- *
3688- * @param command
3689- * @return
3690- * @throws JMSException
3691- */
3692- public Response syncSendPacket(Command command) throws JMSException {
3693- if (isClosed()) {
3694- throw new ConnectionClosedException();
3695- } else {
3696-
3697- try {
3698- Response response = (Response)this.transport.request(command);
3699- if (response.isException()) {
3700- ExceptionResponse er = (ExceptionResponse)response;
3701- if (er.getException() instanceof JMSException) {
3702- throw (JMSException)er.getException();
3703- } else {
3704- if (isClosed()||closing.get()) {
3705- LOG.debug("Received an exception but connection is closing");
3706- }
3707- JMSException jmsEx = null;
3708- try {
3709- jmsEx = JMSExceptionSupport.create(er.getException());
3710- }catch(Throwable e) {
3711- LOG.error("Caught an exception trying to create a JMSException for " +er.getException(),e);
3712- }
3713- if(jmsEx !=null) {
3714- throw jmsEx;
3715- }
3716- }
3717- }
3718- return response;
3719- } catch (IOException e) {
3720- throw JMSExceptionSupport.create(e);
3721- }
3722- }
3723- }
3724-
3725- /**
3726- * Send a packet through a Connection - for internal use only
3727- *
3728- * @param command
3729- * @return
3730- * @throws JMSException
3731- */
3732- public Response syncSendPacket(Command command, int timeout) throws JMSException {
3733- if (isClosed() || closing.get()) {
3734- throw new ConnectionClosedException();
3735- } else {
3736- return doSyncSendPacket(command, timeout);
3737- }
3738- }
3739-
3740- private Response doSyncSendPacket(Command command, int timeout)
3741- throws JMSException {
3742- try {
3743- Response response = (Response) (timeout > 0
3744- ? this.transport.request(command, timeout)
3745- : this.transport.request(command));
3746- if (response != null && response.isException()) {
3747- ExceptionResponse er = (ExceptionResponse)response;
3748- if (er.getException() instanceof JMSException) {
3749- throw (JMSException)er.getException();
3750- } else {
3751- throw JMSExceptionSupport.create(er.getException());
3752- }
3753- }
3754- return response;
3755- } catch (IOException e) {
3756- throw JMSExceptionSupport.create(e);
3757- }
3758- }
3759-
3760- /**
3761- * @return statistics for this Connection
3762- */
3763- public StatsImpl getStats() {
3764- return stats;
3765- }
3766-
3767- /**
3768- * simply throws an exception if the Connection is already closed or the
3769- * Transport has failed
3770- *
3771- * @throws JMSException
3772- */
3773- protected synchronized void checkClosedOrFailed() throws JMSException {
3774- checkClosed();
3775- if (transportFailed.get()) {
3776- throw new ConnectionFailedException(firstFailureError);
3777- }
3778- }
3779-
3780- /**
3781- * simply throws an exception if the Connection is already closed
3782- *
3783- * @throws JMSException
3784- */
3785- protected synchronized void checkClosed() throws JMSException {
3786- if (closed.get()) {
3787- throw new ConnectionClosedException();
3788- }
3789- }
3790-
3791- /**
3792- * Send the ConnectionInfo to the Broker
3793- *
3794- * @throws JMSException
3795- */
3796- protected void ensureConnectionInfoSent() throws JMSException {
3797- synchronized(this.ensureConnectionInfoSentMutex) {
3798- // Can we skip sending the ConnectionInfo packet??
3799- if (isConnectionInfoSentToBroker || closed.get()) {
3800- return;
3801- }
3802- //TODO shouldn't this check be on userSpecifiedClientID rather than the value of clientID?
3803- if (info.getClientId() == null || info.getClientId().trim().length() == 0) {
3804- info.setClientId(clientIdGenerator.generateId());
3805- }
3806- syncSendPacket(info.copy());
3807-
3808- this.isConnectionInfoSentToBroker = true;
3809- // Add a temp destination advisory consumer so that
3810- // We know what the valid temporary destinations are on the
3811- // broker without having to do an RPC to the broker.
3812-
3813- ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1), consumerIdGenerator.getNextSequenceId());
3814- if (watchTopicAdvisories) {
3815- advisoryConsumer = new AdvisoryConsumer(this, consumerId);
3816- }
3817- }
3818- }
3819-
3820- public synchronized boolean isWatchTopicAdvisories() {
3821- return watchTopicAdvisories;
3822- }
3823-
3824- public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) {
3825- this.watchTopicAdvisories = watchTopicAdvisories;
3826- }
3827-
3828- /**
3829- * @return Returns the useAsyncSend.
3830- */
3831- public boolean isUseAsyncSend() {
3832- return useAsyncSend;
3833- }
3834-
3835- /**
3836- * Forces the use of <a
3837- * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which
3838- * adds a massive performance boost; but means that the send() method will
3839- * return immediately whether the message has been sent or not which could
3840- * lead to message loss.
3841- */
3842- public void setUseAsyncSend(boolean useAsyncSend) {
3843- this.useAsyncSend = useAsyncSend;
3844- }
3845-
3846- /**
3847- * @return true if always sync send messages
3848- */
3849- public boolean isAlwaysSyncSend() {
3850- return this.alwaysSyncSend;
3851- }
3852-
3853- /**
3854- * Set true if always require messages to be sync sent
3855- *
3856- * @param alwaysSyncSend
3857- */
3858- public void setAlwaysSyncSend(boolean alwaysSyncSend) {
3859- this.alwaysSyncSend = alwaysSyncSend;
3860- }
3861-
3862- /**
3863- * @return the messagePrioritySupported
3864- */
3865- public boolean isMessagePrioritySupported() {
3866- return this.messagePrioritySupported;
3867- }
3868-
3869- /**
3870- * @param messagePrioritySupported the messagePrioritySupported to set
3871- */
3872- public void setMessagePrioritySupported(boolean messagePrioritySupported) {
3873- this.messagePrioritySupported = messagePrioritySupported;
3874- }
3875-
3876- /**
3877- * Cleans up this connection so that it's state is as if the connection was
3878- * just created. This allows the Resource Adapter to clean up a connection
3879- * so that it can be reused without having to close and recreate the
3880- * connection.
3881- */
3882- public void cleanup() throws JMSException {
3883-
3884- if (advisoryConsumer != null && !isTransportFailed()) {
3885- advisoryConsumer.dispose();
3886- advisoryConsumer = null;
3887- }
3888-
3889- for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
3890- ActiveMQSession s = i.next();
3891- s.dispose();
3892- }
3893- for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
3894- ActiveMQConnectionConsumer c = i.next();
3895- c.dispose();
3896- }
3897- for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
3898- ActiveMQInputStream c = i.next();
3899- c.dispose();
3900- }
3901- for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) {
3902- ActiveMQOutputStream c = i.next();
3903- c.dispose();
3904- }
3905-
3906- if (isConnectionInfoSentToBroker) {
3907- if (!transportFailed.get() && !closing.get()) {
3908- syncSendPacket(info.createRemoveCommand());
3909- }
3910- isConnectionInfoSentToBroker = false;
3911- }
3912- if (userSpecifiedClientID) {
3913- info.setClientId(null);
3914- userSpecifiedClientID = false;
3915- }
3916- clientIDSet = false;
3917-
3918- started.set(false);
3919- }
3920-
3921- /**
3922- * Changes the associated username/password that is associated with this
3923- * connection. If the connection has been used, you must called cleanup()
3924- * before calling this method.
3925- *
3926- * @throws IllegalStateException if the connection is in used.
3927- */
3928- public void changeUserInfo(String userName, String password) throws JMSException {
3929- if (isConnectionInfoSentToBroker) {
3930- throw new IllegalStateException("changeUserInfo used Connection is not allowed");
3931- }
3932- this.info.setUserName(userName);
3933- this.info.setPassword(password);
3934- }
3935-
3936- /**
3937- * @return Returns the resourceManagerId.
3938- * @throws JMSException
3939- */
3940- public String getResourceManagerId() throws JMSException {
3941- waitForBrokerInfo();
3942- if (brokerInfo == null) {
3943- throw new JMSException("Connection failed before Broker info was received.");
3944- }
3945- return brokerInfo.getBrokerId().getValue();
3946- }
3947-
3948- /**
3949- * Returns the broker name if one is available or null if one is not
3950- * available yet.
3951- */
3952- public String getBrokerName() {
3953- try {
3954- brokerInfoReceived.await(5, TimeUnit.SECONDS);
3955- if (brokerInfo == null) {
3956- return null;
3957- }
3958- return brokerInfo.getBrokerName();
3959- } catch (InterruptedException e) {
3960- Thread.currentThread().interrupt();
3961- return null;
3962- }
3963- }
3964-
3965- /**
3966- * Returns the broker information if it is available or null if it is not
3967- * available yet.
3968- */
3969- public BrokerInfo getBrokerInfo() {
3970- return brokerInfo;
3971- }
3972-
3973- /**
3974- * @return Returns the RedeliveryPolicy.
3975- * @throws JMSException
3976- */
3977- public RedeliveryPolicy getRedeliveryPolicy() throws JMSException {
3978- return redeliveryPolicy;
3979- }
3980-
3981- /**
3982- * Sets the redelivery policy to be used when messages are rolled back
3983- */
3984- public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
3985- this.redeliveryPolicy = redeliveryPolicy;
3986- }
3987-
3988- public BlobTransferPolicy getBlobTransferPolicy() {
3989- if (blobTransferPolicy == null) {
3990- blobTransferPolicy = createBlobTransferPolicy();
3991- }
3992- return blobTransferPolicy;
3993- }
3994-
3995- /**
3996- * Sets the policy used to describe how out-of-band BLOBs (Binary Large
3997- * OBjects) are transferred from producers to brokers to consumers
3998- */
3999- public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
4000- this.blobTransferPolicy = blobTransferPolicy;
4001- }
4002-
4003- /**
4004- * @return Returns the alwaysSessionAsync.
4005- */
4006- public boolean isAlwaysSessionAsync() {
4007- return alwaysSessionAsync;
4008- }
4009-
4010- /**
4011- * If this flag is set then a separate thread is not used for dispatching
4012- * messages for each Session in the Connection. However, a separate thread
4013- * is always used if there is more than one session, or the session isn't in
4014- * auto acknowledge or duplicates ok mode
4015- */
4016- public void setAlwaysSessionAsync(boolean alwaysSessionAsync) {
4017- this.alwaysSessionAsync = alwaysSessionAsync;
4018- }
4019-
4020- /**
4021- * @return Returns the optimizeAcknowledge.
4022- */
4023- public boolean isOptimizeAcknowledge() {
4024- return optimizeAcknowledge;
4025- }
4026-
4027- /**
4028- * Enables an optimised acknowledgement mode where messages are acknowledged
4029- * in batches rather than individually
4030- *
4031- * @param optimizeAcknowledge The optimizeAcknowledge to set.
4032- */
4033- public void setOptimizeAcknowledge(boolean optimizeAcknowledge) {
4034- this.optimizeAcknowledge = optimizeAcknowledge;
4035- }
4036-
4037- public long getWarnAboutUnstartedConnectionTimeout() {
4038- return warnAboutUnstartedConnectionTimeout;
4039- }
4040-
4041- /**
4042- * Enables the timeout from a connection creation to when a warning is
4043- * generated if the connection is not properly started via {@link #start()}
4044- * and a message is received by a consumer. It is a very common gotcha to
4045- * forget to <a
4046- * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start
4047- * the connection</a> so this option makes the default case to create a
4048- * warning if the user forgets. To disable the warning just set the value to <
4049- * 0 (say -1).
4050- */
4051- public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) {
4052- this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
4053- }
4054-
4055- /**
4056- * @return the sendTimeout
4057- */
4058- public int getSendTimeout() {
4059- return sendTimeout;
4060- }
4061-
4062- /**
4063- * @param sendTimeout the sendTimeout to set
4064- */
4065- public void setSendTimeout(int sendTimeout) {
4066- this.sendTimeout = sendTimeout;
4067- }
4068-
4069- /**
4070- * @return the sendAcksAsync
4071- */
4072- public boolean isSendAcksAsync() {
4073- return sendAcksAsync;
4074- }
4075-
4076- /**
4077- * @param sendAcksAsync the sendAcksAsync to set
4078- */
4079- public void setSendAcksAsync(boolean sendAcksAsync) {
4080- this.sendAcksAsync = sendAcksAsync;
4081- }
4082-
4083-
4084- /**
4085- * Returns the time this connection was created
4086- */
4087- public long getTimeCreated() {
4088- return timeCreated;
4089- }
4090-
4091- private void waitForBrokerInfo() throws JMSException {
4092- try {
4093- brokerInfoReceived.await();
4094- } catch (InterruptedException e) {
4095- Thread.currentThread().interrupt();
4096- throw JMSExceptionSupport.create(e);
4097- }
4098- }
4099-
4100- // Package protected so that it can be used in unit tests
4101- public Transport getTransport() {
4102- return transport;
4103- }
4104-
4105- public void addProducer(ProducerId producerId, ActiveMQMessageProducer producer) {
4106- producers.put(producerId, producer);
4107- }
4108-
4109- public void removeProducer(ProducerId producerId) {
4110- producers.remove(producerId);
4111- }
4112-
4113- public void addDispatcher(ConsumerId consumerId, ActiveMQDispatcher dispatcher) {
4114- dispatchers.put(consumerId, dispatcher);
4115- }
4116-
4117- public void removeDispatcher(ConsumerId consumerId) {
4118- dispatchers.remove(consumerId);
4119- }
4120-
4121- /**
4122- * @param o - the command to consume
4123- */
4124- public void onCommand(final Object o) {
4125- final Command command = (Command)o;
4126- if (!closed.get() && command != null) {
4127- try {
4128- command.visit(new CommandVisitorAdapter() {
4129- @Override
4130- public Response processMessageDispatch(MessageDispatch md) throws Exception {
4131- waitForTransportInterruptionProcessingToComplete();
4132- ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());
4133- if (dispatcher != null) {
4134- // Copy in case a embedded broker is dispatching via
4135- // vm://
4136- // md.getMessage() == null to signal end of queue
4137- // browse.
4138- Message msg = md.getMessage();
4139- if (msg != null) {
4140- msg = msg.copy();
4141- msg.setReadOnlyBody(true);
4142- msg.setReadOnlyProperties(true);
4143- msg.setRedeliveryCounter(md.getRedeliveryCounter());
4144- msg.setConnection(ActiveMQConnection.this);
4145- md.setMessage(msg);
4146- }
4147- dispatcher.dispatch(md);
4148- }
4149- return null;
4150- }
4151-
4152- @Override
4153- public Response processProducerAck(ProducerAck pa) throws Exception {
4154- if (pa != null && pa.getProducerId() != null) {
4155- ActiveMQMessageProducer producer = producers.get(pa.getProducerId());
4156- if (producer != null) {
4157- producer.onProducerAck(pa);
4158- }
4159- }
4160- return null;
4161- }
4162-
4163- @Override
4164- public Response processBrokerInfo(BrokerInfo info) throws Exception {
4165- brokerInfo = info;
4166- brokerInfoReceived.countDown();
4167- optimizeAcknowledge &= !brokerInfo.isFaultTolerantConfiguration();
4168- getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl());
4169- return null;
4170- }
4171-
4172- @Override
4173- public Response processConnectionError(final ConnectionError error) throws Exception {
4174- executor.execute(new Runnable() {
4175- public void run() {
4176- onAsyncException(error.getException());
4177- }
4178- });
4179- return null;
4180- }
4181-
4182- @Override
4183- public Response processControlCommand(ControlCommand command) throws Exception {
4184- onControlCommand(command);
4185- return null;
4186- }
4187-
4188- @Override
4189- public Response processConnectionControl(ConnectionControl control) throws Exception {
4190- onConnectionControl((ConnectionControl)command);
4191- return null;
4192- }
4193-
4194- @Override
4195- public Response processConsumerControl(ConsumerControl control) throws Exception {
4196- onConsumerControl((ConsumerControl)command);
4197- return null;
4198- }
4199-
4200- @Override
4201- public Response processWireFormat(WireFormatInfo info) throws Exception {
4202- onWireFormatInfo((WireFormatInfo)command);
4203- return null;
4204- }
4205- });
4206- } catch (Exception e) {
4207- onClientInternalException(e);
4208- }
4209-
4210- }
4211- for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
4212- TransportListener listener = iter.next();
4213- listener.onCommand(command);
4214- }
4215- }
4216-
4217- protected void onWireFormatInfo(WireFormatInfo info) {
4218- protocolVersion.set(info.getVersion());
4219- }
4220-
4221- /**
4222- * Handles async client internal exceptions.
4223- * A client internal exception is usually one that has been thrown
4224- * by a container runtime component during asynchronous processing of a
4225- * message that does not affect the connection itself.
4226- * This method notifies the <code>ClientInternalExceptionListener</code> by invoking
4227- * its <code>onException</code> method, if one has been registered with this connection.
4228- *
4229- * @param error the exception that the problem
4230- */
4231- public void onClientInternalException(final Throwable error) {
4232- if ( !closed.get() && !closing.get() ) {
4233- if ( this.clientInternalExceptionListener != null ) {
4234- executor.execute(new Runnable() {
4235- public void run() {
4236- ActiveMQConnection.this.clientInternalExceptionListener.onException(error);
4237- }
4238- });
4239- } else {
4240- LOG.debug("Async client internal exception occurred with no exception listener registered: "
4241- + error, error);
4242- }
4243- }
4244- }
4245- /**
4246- * Used for handling async exceptions
4247- *
4248- * @param error
4249- */
4250- public void onAsyncException(Throwable error) {
4251- if (!closed.get() && !closing.get()) {
4252- if (this.exceptionListener != null) {
4253-
4254- if (!(error instanceof JMSException)) {
4255- error = JMSExceptionSupport.create(error);
4256- }
4257- final JMSException e = (JMSException)error;
4258-
4259- executor.execute(new Runnable() {
4260- public void run() {
4261- ActiveMQConnection.this.exceptionListener.onException(e);
4262- }
4263- });
4264-
4265- } else {
4266- LOG.debug("Async exception with no exception listener: " + error, error);
4267- }
4268- }
4269- }
4270-
4271- public void onException(final IOException error) {
4272- onAsyncException(error);
4273- if (!closing.get() && !closed.get()) {
4274- executor.execute(new Runnable() {
4275- public void run() {
4276- transportFailed(error);
4277- ServiceSupport.dispose(ActiveMQConnection.this.transport);
4278- brokerInfoReceived.countDown();
4279- try {
4280- cleanup();
4281- } catch (JMSException e) {
4282- LOG.warn("Exception during connection cleanup, " + e, e);
4283- }
4284- for (Iterator<TransportListener> iter = transportListeners
4285- .iterator(); iter.hasNext();) {
4286- TransportListener listener = iter.next();
4287- listener.onException(error);
4288- }
4289- }
4290- });
4291- }
4292- }
4293-
4294- public void transportInterupted() {
4295- this.transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.size() - (advisoryConsumer != null ? 1:0));
4296- if (LOG.isDebugEnabled()) {
4297- LOG.debug("transport interrupted, dispatchers: " + transportInterruptionProcessingComplete.getCount());
4298- }
4299- signalInterruptionProcessingNeeded();
4300-
4301- for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
4302- ActiveMQSession s = i.next();
4303- s.clearMessagesInProgress();
4304- }
4305-
4306- for (ActiveMQConnectionConsumer connectionConsumer : this.connectionConsumers) {
4307- connectionConsumer.clearMessagesInProgress();
4308- }
4309-
4310- for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
4311- TransportListener listener = iter.next();
4312- listener.transportInterupted();
4313- }
4314- }
4315-
4316- public void transportResumed() {
4317- for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
4318- TransportListener listener = iter.next();
4319- listener.transportResumed();
4320- }
4321- }
4322-
4323- /**
4324- * Create the DestinationInfo object for the temporary destination.
4325- *
4326- * @param topic - if its true topic, else queue.
4327- * @return DestinationInfo
4328- * @throws JMSException
4329- */
4330- protected ActiveMQTempDestination createTempDestination(boolean topic) throws JMSException {
4331-
4332- // Check if Destination info is of temporary type.
4333- ActiveMQTempDestination dest;
4334- if (topic) {
4335- dest = new ActiveMQTempTopic(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
4336- } else {
4337- dest = new ActiveMQTempQueue(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
4338- }
4339-
4340- DestinationInfo info = new DestinationInfo();
4341- info.setConnectionId(this.info.getConnectionId());
4342- info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
4343- info.setDestination(dest);
4344- syncSendPacket(info);
4345-
4346- dest.setConnection(this);
4347- activeTempDestinations.put(dest, dest);
4348- return dest;
4349- }
4350-
4351- /**
4352- * @param destination
4353- * @throws JMSException
4354- */
4355- public void deleteTempDestination(ActiveMQTempDestination destination) throws JMSException {
4356-
4357- checkClosedOrFailed();
4358-
4359- for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
4360- ActiveMQSession s = i.next();
4361- if (s.isInUse(destination)) {
4362- throw new JMSException("A consumer is consuming from the temporary destination");
4363- }
4364- }
4365-
4366- activeTempDestinations.remove(destination);
4367-
4368- DestinationInfo destInfo = new DestinationInfo();
4369- destInfo.setConnectionId(this.info.getConnectionId());
4370- destInfo.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
4371- destInfo.setDestination(destination);
4372- destInfo.setTimeout(0);
4373- syncSendPacket(destInfo);
4374- }
4375-
4376- public boolean isDeleted(ActiveMQDestination dest) {
4377-
4378- // If we are not watching the advisories.. then
4379- // we will assume that the temp destination does exist.
4380- if (advisoryConsumer == null) {
4381- return false;
4382- }
4383-
4384- return !activeTempDestinations.contains(dest);
4385- }
4386-
4387- public boolean isCopyMessageOnSend() {
4388- return copyMessageOnSend;
4389- }
4390-
4391- public LongSequenceGenerator getLocalTransactionIdGenerator() {
4392- return localTransactionIdGenerator;
4393- }
4394-
4395- public boolean isUseCompression() {
4396- return useCompression;
4397- }
4398-
4399- /**
4400- * Enables the use of compression of the message bodies
4401- */
4402- public void setUseCompression(boolean useCompression) {
4403- this.useCompression = useCompression;
4404- }
4405-
4406- public void destroyDestination(ActiveMQDestination destination) throws JMSException {
4407-
4408- checkClosedOrFailed();
4409- ensureConnectionInfoSent();
4410-
4411- DestinationInfo info = new DestinationInfo();
4412- info.setConnectionId(this.info.getConnectionId());
4413- info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
4414- info.setDestination(destination);
4415- info.setTimeout(0);
4416- syncSendPacket(info);
4417-
4418- }
4419-
4420- public boolean isDispatchAsync() {
4421- return dispatchAsync;
4422- }
4423-
4424- /**
4425- * Enables or disables the default setting of whether or not consumers have
4426- * their messages <a
4427- * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched
4428- * synchronously or asynchronously by the broker</a>. For non-durable
4429- * topics for example we typically dispatch synchronously by default to
4430- * minimize context switches which boost performance. However sometimes its
4431- * better to go slower to ensure that a single blocked consumer socket does
4432- * not block delivery to other consumers.
4433- *
4434- * @param asyncDispatch If true then consumers created on this connection
4435- * will default to having their messages dispatched
4436- * asynchronously. The default value is false.
4437- */
4438- public void setDispatchAsync(boolean asyncDispatch) {
4439- this.dispatchAsync = asyncDispatch;
4440- }
4441-
4442- public boolean isObjectMessageSerializationDefered() {
4443- return objectMessageSerializationDefered;
4444- }
4445-
4446- /**
4447- * When an object is set on an ObjectMessage, the JMS spec requires the
4448- * object to be serialized by that set method. Enabling this flag causes the
4449- * object to not get serialized. The object may subsequently get serialized
4450- * if the message needs to be sent over a socket or stored to disk.
4451- */
4452- public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) {
4453- this.objectMessageSerializationDefered = objectMessageSerializationDefered;
4454- }
4455-
4456- public InputStream createInputStream(Destination dest) throws JMSException {
4457- return createInputStream(dest, null);
4458- }
4459-
4460- public InputStream createInputStream(Destination dest, String messageSelector) throws JMSException {
4461- return createInputStream(dest, messageSelector, false);
4462- }
4463-
4464- public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException {
4465- return createInputStream(dest, messageSelector, noLocal, -1);
4466- }
4467-
4468-
4469-
4470- public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal, long timeout) throws JMSException {
4471- return doCreateInputStream(dest, messageSelector, noLocal, null, timeout);
4472- }
4473-
4474- public InputStream createDurableInputStream(Topic dest, String name) throws JMSException {
4475- return createInputStream(dest, null, false);
4476- }
4477-
4478- public InputStream createDurableInputStream(Topic dest, String name, String messageSelector) throws JMSException {
4479- return createDurableInputStream(dest, name, messageSelector, false);
4480- }
4481-
4482- public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal) throws JMSException {
4483- return createDurableInputStream(dest, name, messageSelector, noLocal, -1);
4484- }
4485-
4486- public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal, long timeout) throws JMSException {
4487- return doCreateInputStream(dest, messageSelector, noLocal, name, timeout);
4488- }
4489-
4490- private InputStream doCreateInputStream(Destination dest, String messageSelector, boolean noLocal, String subName, long timeout) throws JMSException {
4491- checkClosedOrFailed();
4492- ensureConnectionInfoSent();
4493- return new ActiveMQInputStream(this, createConsumerId(), ActiveMQDestination.transform(dest), messageSelector, noLocal, subName, prefetchPolicy.getInputStreamPrefetch(), timeout);
4494- }
4495-
4496- /**
4497- * Creates a persistent output stream; individual messages will be written
4498- * to disk/database by the broker
4499- */
4500- public OutputStream createOutputStream(Destination dest) throws JMSException {
4501- return createOutputStream(dest, null, ActiveMQMessage.DEFAULT_DELIVERY_MODE, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
4502- }
4503-
4504- /**
4505- * Creates a non persistent output stream; messages will not be written to
4506- * disk
4507- */
4508- public OutputStream createNonPersistentOutputStream(Destination dest) throws JMSException {
4509- return createOutputStream(dest, null, DeliveryMode.NON_PERSISTENT, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
4510- }
4511-
4512- /**
4513- * Creates an output stream allowing full control over the delivery mode,
4514- * the priority and time to live of the messages and the properties added to
4515- * messages on the stream.
4516- *
4517- * @param streamProperties defines a map of key-value pairs where the keys
4518- * are strings and the values are primitive values (numbers
4519- * and strings) which are appended to the messages similarly
4520- * to using the
4521- * {@link javax.jms.Message#setObjectProperty(String, Object)}
4522- * method
4523- */
4524- public OutputStream createOutputStream(Destination dest, Map<String, Object> streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException {
4525- checkClosedOrFailed();
4526- ensureConnectionInfoSent();
4527- return new ActiveMQOutputStream(this, createProducerId(), ActiveMQDestination.transform(dest), streamProperties, deliveryMode, priority, timeToLive);
4528- }
4529-
4530- /**
4531- * Unsubscribes a durable subscription that has been created by a client.
4532- * <P>
4533- * This method deletes the state being maintained on behalf of the
4534- * subscriber by its provider.
4535- * <P>
4536- * It is erroneous for a client to delete a durable subscription while there
4537- * is an active <CODE>MessageConsumer </CODE> or
4538- * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
4539- * message is part of a pending transaction or has not been acknowledged in
4540- * the session.
4541- *
4542- * @param name the name used to identify this subscription
4543- * @throws JMSException if the session fails to unsubscribe to the durable
4544- * subscription due to some internal error.
4545- * @throws InvalidDestinationException if an invalid subscription name is
4546- * specified.
4547- * @since 1.1
4548- */
4549- public void unsubscribe(String name) throws InvalidDestinationException, JMSException {
4550- checkClosedOrFailed();
4551- RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
4552- rsi.setConnectionId(getConnectionInfo().getConnectionId());
4553- rsi.setSubscriptionName(name);
4554- rsi.setClientId(getConnectionInfo().getClientId());
4555- syncSendPacket(rsi);
4556- }
4557-
4558- /**
4559- * Internal send method optimized: - It does not copy the message - It can
4560- * only handle ActiveMQ messages. - You can specify if the send is async or
4561- * sync - Does not allow you to send /w a transaction.
4562- */
4563- void send(ActiveMQDestination destination, ActiveMQMessage msg, MessageId messageId, int deliveryMode, int priority, long timeToLive, boolean async) throws JMSException {
4564- checkClosedOrFailed();
4565-
4566- if (destination.isTemporary() && isDeleted(destination)) {
4567- throw new JMSException("Cannot publish to a deleted Destination: " + destination);
4568- }
4569-
4570- msg.setJMSDestination(destination);
4571- msg.setJMSDeliveryMode(deliveryMode);
4572- long expiration = 0L;
4573-
4574- if (!isDisableTimeStampsByDefault()) {
4575- long timeStamp = System.currentTimeMillis();
4576- msg.setJMSTimestamp(timeStamp);
4577- if (timeToLive > 0) {
4578- expiration = timeToLive + timeStamp;
4579- }
4580- }
4581-
4582- msg.setJMSExpiration(expiration);
4583- msg.setJMSPriority(priority);
4584-
4585- msg.setJMSRedelivered(false);
4586- msg.setMessageId(messageId);
4587-
4588- msg.onSend();
4589-
4590- msg.setProducerId(msg.getMessageId().getProducerId());
4591-
4592- if (LOG.isDebugEnabled()) {
4593- LOG.debug("Sending message: " + msg);
4594- }
4595-
4596- if (async) {
4597- asyncSendPacket(msg);
4598- } else {
4599- syncSendPacket(msg);
4600- }
4601-
4602- }
4603-
4604- public void addOutputStream(ActiveMQOutputStream stream) {
4605- outputStreams.add(stream);
4606- }
4607-
4608- public void removeOutputStream(ActiveMQOutputStream stream) {
4609- outputStreams.remove(stream);
4610- }
4611-
4612- public void addInputStream(ActiveMQInputStream stream) {
4613- inputStreams.add(stream);
4614- }
4615-
4616- public void removeInputStream(ActiveMQInputStream stream) {
4617- inputStreams.remove(stream);
4618- }
4619-
4620- protected void onControlCommand(ControlCommand command) {
4621- String text = command.getCommand();
4622- if (text != null) {
4623- if (text.equals("shutdown")) {
4624- LOG.info("JVM told to shutdown");
4625- System.exit(0);
4626- }
4627- }
4628- }
4629-
4630- protected void onConnectionControl(ConnectionControl command) {
4631- if (command.isFaultTolerant()) {
4632- this.optimizeAcknowledge = false;
4633- for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
4634- ActiveMQSession s = i.next();
4635- s.setOptimizeAcknowledge(false);
4636- }
4637- }
4638- }
4639-
4640- protected void onConsumerControl(ConsumerControl command) {
4641- if (command.isClose()) {
4642- for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
4643- ActiveMQSession s = i.next();
4644- s.close(command.getConsumerId());
4645- }
4646- } else {
4647- for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
4648- ActiveMQSession s = i.next();
4649- s.setPrefetchSize(command.getConsumerId(), command.getPrefetch());
4650- }
4651- }
4652- }
4653-
4654- protected void transportFailed(IOException error) {
4655- transportFailed.set(true);
4656- if (firstFailureError == null) {
4657- firstFailureError = error;
4658- }
4659- }
4660-
4661- /**
4662- * Should a JMS message be copied to a new JMS Message object as part of the
4663- * send() method in JMS. This is enabled by default to be compliant with the
4664- * JMS specification. You can disable it if you do not mutate JMS messages
4665- * after they are sent for a performance boost
4666- */
4667- public void setCopyMessageOnSend(boolean copyMessageOnSend) {
4668- this.copyMessageOnSend = copyMessageOnSend;
4669- }
4670-
4671- @Override
4672- public String toString() {
4673- return "ActiveMQConnection {id=" + info.getConnectionId() + ",clientId=" + info.getClientId() + ",started=" + started.get() + "}";
4674- }
4675-
4676- protected BlobTransferPolicy createBlobTransferPolicy() {
4677- return new BlobTransferPolicy();
4678- }
4679-
4680- public int getProtocolVersion() {
4681- return protocolVersion.get();
4682- }
4683-
4684- public int getProducerWindowSize() {
4685- return producerWindowSize;
4686- }
4687-
4688- public void setProducerWindowSize(int producerWindowSize) {
4689- this.producerWindowSize = producerWindowSize;
4690- }
4691-
4692- public void setAuditDepth(int auditDepth) {
4693- connectionAudit.setAuditDepth(auditDepth);
4694- }
4695-
4696- public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
4697- connectionAudit.setAuditMaximumProducerNumber(auditMaximumProducerNumber);
4698- }
4699-
4700- protected void removeDispatcher(ActiveMQDispatcher dispatcher) {
4701- connectionAudit.removeDispatcher(dispatcher);
4702- }
4703-
4704- protected boolean isDuplicate(ActiveMQDispatcher dispatcher, Message message) {
4705- return checkForDuplicates && connectionAudit.isDuplicate(dispatcher, message);
4706- }
4707-
4708- protected void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) {
4709- connectionAudit.rollbackDuplicate(dispatcher, message);
4710- }
4711-
4712- public IOException getFirstFailureError() {
4713- return firstFailureError;
4714- }
4715-
4716- protected void waitForTransportInterruptionProcessingToComplete() throws InterruptedException {
4717- CountDownLatch cdl = this.transportInterruptionProcessingComplete;
4718- if (cdl != null) {
4719- if (!closed.get() && !transportFailed.get() && cdl.getCount()>0) {
4720- LOG.warn("dispatch paused, waiting for outstanding dispatch interruption processing (" + cdl.getCount() + ") to complete..");
4721- cdl.await(10, TimeUnit.SECONDS);
4722- }
4723- signalInterruptionProcessingComplete();
4724- }
4725- }
4726-
4727- protected void transportInterruptionProcessingComplete() {
4728- CountDownLatch cdl = this.transportInterruptionProcessingComplete;
4729- if (cdl != null) {
4730- cdl.countDown();
4731- try {
4732- signalInterruptionProcessingComplete();
4733- } catch (InterruptedException ignored) {}
4734- }
4735- }
4736-
4737- private void signalInterruptionProcessingComplete() throws InterruptedException {
4738- CountDownLatch cdl = this.transportInterruptionProcessingComplete;
4739- if (cdl.getCount()==0) {
4740- if (LOG.isDebugEnabled()) {
4741- LOG.debug("transportInterruptionProcessingComplete for: " + this.getConnectionInfo().getConnectionId());
4742- }
4743- this.transportInterruptionProcessingComplete = null;
4744-
4745- FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
4746- if (failoverTransport != null) {
4747- failoverTransport.connectionInterruptProcessingComplete(this.getConnectionInfo().getConnectionId());
4748- if (LOG.isDebugEnabled()) {
4749- LOG.debug("notified failover transport (" + failoverTransport
4750- + ") of interruption completion for: " + this.getConnectionInfo().getConnectionId());
4751- }
4752- }
4753-
4754- }
4755- }
4756-
4757- private void signalInterruptionProcessingNeeded() {
4758- FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
4759- if (failoverTransport != null) {
4760- failoverTransport.getStateTracker().transportInterrupted(this.getConnectionInfo().getConnectionId());
4761- if (LOG.isDebugEnabled()) {
4762- LOG.debug("notified failover transport (" + failoverTransport
4763- + ") of pending interruption processing for: " + this.getConnectionInfo().getConnectionId());
4764- }
4765- }
4766- }
4767-
4768- /*
4769- * specify the amount of time in milliseconds that a consumer with a transaction pending recovery
4770- * will wait to receive re dispatched messages.
4771- * default value is 0 so there is no wait by default.
4772- */
4773- public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) {
4774- this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod;
4775- }
4776-
4777- public long getConsumerFailoverRedeliveryWaitPeriod() {
4778- return consumerFailoverRedeliveryWaitPeriod;
4779- }
4780-
4781- protected Scheduler getScheduler() {
4782- return this.scheduler;
4783- }
4784-
4785- protected ThreadPoolExecutor getExecutor() {
4786- return this.executor;
4787- }
4788-
4789- /**
4790- * @return the checkForDuplicates
4791- */
4792- public boolean isCheckForDuplicates() {
4793- return this.checkForDuplicates;
4794- }
4795-
4796- /**
4797- * @param checkForDuplicates the checkForDuplicates to set
4798- */
4799- public void setCheckForDuplicates(boolean checkForDuplicates) {
4800- this.checkForDuplicates = checkForDuplicates;
4801- }
4802-
4803-}
4804
4805=== added directory '.pc/CVE-2011-4605.diff/activemq-core/src/main/java/org/apache/activemq/broker'
4806=== removed directory '.pc/CVE-2011-4605.diff/activemq-core/src/main/java/org/apache/activemq/broker'
4807=== added file '.pc/CVE-2011-4605.diff/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java'
4808--- .pc/CVE-2011-4605.diff/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java 1970-01-01 00:00:00 +0000
4809+++ .pc/CVE-2011-4605.diff/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java 2012-07-17 09:01:20 +0000
4810@@ -0,0 +1,1449 @@
4811+/**
4812+ * Licensed to the Apache Software Foundation (ASF) under one or more
4813+ * contributor license agreements. See the NOTICE file distributed with
4814+ * this work for additional information regarding copyright ownership.
4815+ * The ASF licenses this file to You under the Apache License, Version 2.0
4816+ * (the "License"); you may not use this file except in compliance with
4817+ * the License. You may obtain a copy of the License at
4818+ *
4819+ * http://www.apache.org/licenses/LICENSE-2.0
4820+ *
4821+ * Unless required by applicable law or agreed to in writing, software
4822+ * distributed under the License is distributed on an "AS IS" BASIS,
4823+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
4824+ * See the License for the specific language governing permissions and
4825+ * limitations under the License.
4826+ */
4827+package org.apache.activemq.broker;
4828+
4829+import java.io.IOException;
4830+import java.net.URI;
4831+import java.util.HashMap;
4832+import java.util.Iterator;
4833+import java.util.LinkedList;
4834+import java.util.List;
4835+import java.util.Map;
4836+import java.util.Properties;
4837+import java.util.concurrent.ConcurrentHashMap;
4838+import java.util.concurrent.CopyOnWriteArrayList;
4839+import java.util.concurrent.CountDownLatch;
4840+import java.util.concurrent.TimeUnit;
4841+import java.util.concurrent.atomic.AtomicBoolean;
4842+import java.util.concurrent.atomic.AtomicInteger;
4843+import java.util.concurrent.atomic.AtomicReference;
4844+import java.util.concurrent.locks.ReentrantReadWriteLock;
4845+
4846+import javax.management.ObjectName;
4847+import javax.transaction.xa.XAResource;
4848+
4849+import org.apache.activemq.broker.ft.MasterBroker;
4850+import org.apache.activemq.broker.region.ConnectionStatistics;
4851+import org.apache.activemq.broker.region.RegionBroker;
4852+import org.apache.activemq.command.BrokerId;
4853+import org.apache.activemq.command.BrokerInfo;
4854+import org.apache.activemq.command.Command;
4855+import org.apache.activemq.command.CommandTypes;
4856+import org.apache.activemq.command.ConnectionControl;
4857+import org.apache.activemq.command.ConnectionError;
4858+import org.apache.activemq.command.ConnectionId;
4859+import org.apache.activemq.command.ConnectionInfo;
4860+import org.apache.activemq.command.ConsumerControl;
4861+import org.apache.activemq.command.ConsumerId;
4862+import org.apache.activemq.command.ConsumerInfo;
4863+import org.apache.activemq.command.ControlCommand;
4864+import org.apache.activemq.command.DataArrayResponse;
4865+import org.apache.activemq.command.DestinationInfo;
4866+import org.apache.activemq.command.ExceptionResponse;
4867+import org.apache.activemq.command.FlushCommand;
4868+import org.apache.activemq.command.IntegerResponse;
4869+import org.apache.activemq.command.KeepAliveInfo;
4870+import org.apache.activemq.command.Message;
4871+import org.apache.activemq.command.MessageAck;
4872+import org.apache.activemq.command.MessageDispatch;
4873+import org.apache.activemq.command.MessageDispatchNotification;
4874+import org.apache.activemq.command.MessagePull;
4875+import org.apache.activemq.command.ProducerAck;
4876+import org.apache.activemq.command.ProducerId;
4877+import org.apache.activemq.command.ProducerInfo;
4878+import org.apache.activemq.command.RemoveSubscriptionInfo;
4879+import org.apache.activemq.command.Response;
4880+import org.apache.activemq.command.SessionId;
4881+import org.apache.activemq.command.SessionInfo;
4882+import org.apache.activemq.command.ShutdownInfo;
4883+import org.apache.activemq.command.TransactionId;
4884+import org.apache.activemq.command.TransactionInfo;
4885+import org.apache.activemq.command.WireFormatInfo;
4886+import org.apache.activemq.network.*;
4887+import org.apache.activemq.security.MessageAuthorizationPolicy;
4888+import org.apache.activemq.state.CommandVisitor;
4889+import org.apache.activemq.state.ConnectionState;
4890+import org.apache.activemq.state.ConsumerState;
4891+import org.apache.activemq.state.ProducerState;
4892+import org.apache.activemq.state.SessionState;
4893+import org.apache.activemq.state.TransactionState;
4894+import org.apache.activemq.thread.DefaultThreadPools;
4895+import org.apache.activemq.thread.Task;
4896+import org.apache.activemq.thread.TaskRunner;
4897+import org.apache.activemq.thread.TaskRunnerFactory;
4898+import org.apache.activemq.transaction.Transaction;
4899+import org.apache.activemq.transport.DefaultTransportListener;
4900+import org.apache.activemq.transport.ResponseCorrelator;
4901+import org.apache.activemq.transport.Transport;
4902+import org.apache.activemq.transport.TransportDisposedIOException;
4903+import org.apache.activemq.transport.TransportFactory;
4904+import org.apache.activemq.util.*;
4905+import org.slf4j.Logger;
4906+import org.slf4j.LoggerFactory;
4907+import org.slf4j.MDC;
4908+
4909+public class TransportConnection implements Connection, Task, CommandVisitor {
4910+ private static final Logger LOG = LoggerFactory.getLogger(TransportConnection.class);
4911+ private static final Logger TRANSPORTLOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Transport");
4912+ private static final Logger SERVICELOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Service");
4913+ // Keeps track of the broker and connector that created this connection.
4914+ protected final Broker broker;
4915+ protected final TransportConnector connector;
4916+ // Keeps track of the state of the connections.
4917+ // protected final ConcurrentHashMap localConnectionStates=new
4918+ // ConcurrentHashMap();
4919+ protected final Map<ConnectionId, ConnectionState> brokerConnectionStates;
4920+ // The broker and wireformat info that was exchanged.
4921+ protected BrokerInfo brokerInfo;
4922+ protected final List<Command> dispatchQueue = new LinkedList<Command>();
4923+ protected TaskRunner taskRunner;
4924+ protected final AtomicReference<IOException> transportException = new AtomicReference<IOException>();
4925+ protected AtomicBoolean dispatchStopped = new AtomicBoolean(false);
4926+ private MasterBroker masterBroker;
4927+ private final Transport transport;
4928+ private MessageAuthorizationPolicy messageAuthorizationPolicy;
4929+ private WireFormatInfo wireFormatInfo;
4930+ // Used to do async dispatch.. this should perhaps be pushed down into the
4931+ // transport layer..
4932+ private boolean inServiceException;
4933+ private final ConnectionStatistics statistics = new ConnectionStatistics();
4934+ private boolean manageable;
4935+ private boolean slow;
4936+ private boolean markedCandidate;
4937+ private boolean blockedCandidate;
4938+ private boolean blocked;
4939+ private boolean connected;
4940+ private boolean active;
4941+ private boolean starting;
4942+ private boolean pendingStop;
4943+ private long timeStamp;
4944+ private final AtomicBoolean stopping = new AtomicBoolean(false);
4945+ private final CountDownLatch stopped = new CountDownLatch(1);
4946+ private final AtomicBoolean asyncException = new AtomicBoolean(false);
4947+ private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<ProducerId, ProducerBrokerExchange>();
4948+ private final Map<ConsumerId, ConsumerBrokerExchange> consumerExchanges = new HashMap<ConsumerId, ConsumerBrokerExchange>();
4949+ private final CountDownLatch dispatchStoppedLatch = new CountDownLatch(1);
4950+ private ConnectionContext context;
4951+ private boolean networkConnection;
4952+ private boolean faultTolerantConnection;
4953+ private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
4954+ private DemandForwardingBridge duplexBridge;
4955+ private final TaskRunnerFactory taskRunnerFactory;
4956+ private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister();
4957+ private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock();
4958+ private String duplexNetworkConnectorId;
4959+
4960+ /**
4961+ * @param connector
4962+ * @param transport
4963+ * @param broker
4964+ * @param taskRunnerFactory
4965+ * - can be null if you want direct dispatch to the transport
4966+ * else commands are sent async.
4967+ */
4968+ public TransportConnection(TransportConnector connector, final Transport transport, Broker broker,
4969+ TaskRunnerFactory taskRunnerFactory) {
4970+ this.connector = connector;
4971+ this.broker = broker;
4972+ this.messageAuthorizationPolicy = connector.getMessageAuthorizationPolicy();
4973+ RegionBroker rb = (RegionBroker) broker.getAdaptor(RegionBroker.class);
4974+ brokerConnectionStates = rb.getConnectionStates();
4975+ if (connector != null) {
4976+ this.statistics.setParent(connector.getStatistics());
4977+ }
4978+ this.taskRunnerFactory = taskRunnerFactory;
4979+ this.transport = transport;
4980+ this.transport.setTransportListener(new DefaultTransportListener() {
4981+ @Override
4982+ public void onCommand(Object o) {
4983+ serviceLock.readLock().lock();
4984+ try {
4985+ if (!(o instanceof Command)) {
4986+ throw new RuntimeException("Protocol violation - Command corrupted: " + o.toString());
4987+ }
4988+ Command command = (Command) o;
4989+ Response response = service(command);
4990+ if (response != null) {
4991+ dispatchSync(response);
4992+ }
4993+ } finally {
4994+ serviceLock.readLock().unlock();
4995+ }
4996+ }
4997+
4998+ @Override
4999+ public void onException(IOException exception) {
5000+ serviceLock.readLock().lock();
The diff has been truncated for viewing.

Subscribers

People subscribed via source and target branches

to all changes: