001 package com.croftsoft.core.net.jms;
002
003 // imported J2SE packages
004
005 import java.io.Serializable;
006 import javax.naming.Context;
007 import javax.naming.InitialContext;
008 import javax.naming.NamingException;
009 import java.util.*;
010
011 // imported J2EE packages
012
013 import javax.jms.JMSException;
014 import javax.jms.Message;
015 import javax.jms.MessageListener;
016 import javax.jms.ObjectMessage;
017 import javax.jms.Session;
018 import javax.jms.Topic;
019 import javax.jms.TopicConnection;
020 import javax.jms.TopicConnectionFactory;
021 import javax.jms.TopicPublisher;
022 import javax.jms.TopicSession;
023 import javax.jms.TopicSubscriber;
024
025 // imported CroftSoft packages
026
027 import com.croftsoft.core.lang.lifecycle.Lifecycle;
028 import com.croftsoft.core.lang.NullArgumentException;
029 import com.croftsoft.core.util.queue.ListQueue;
030 import com.croftsoft.core.util.queue.Queue;
031
032 /*********************************************************************
033 * Exchanges serializable Objects with a Topic via Queues.
034 *
035 * @version
036 * $Id: Courier.java,v 1.3 2008/09/20 04:12:46 croft Exp $
037 * @since
038 * 2001-02-22
039 * @author
040 * <a href="http://www.CroftSoft.com/">David Wallace Croft</a>
041 *********************************************************************/
042
043 public final class Courier
044 implements Lifecycle, MessageListener, Runnable
045 //////////////////////////////////////////////////////////////////////
046 //////////////////////////////////////////////////////////////////////
047 {
048
049 private static final String
050 DEFAULT_JNDI_TOPIC_CONNECTION_FACTORY_NAME
051 = "jms/TopicConnectionFactory";
052
053 private static final String DEFAULT_JNDI_TOPIC_NAME = "jms/Topic";
054
055 private static final int STATE_UNINITIALIZED = 0;
056
057 private static final int STATE_INITIALIZED = 1;
058
059 private static final int STATE_STARTED = 2;
060
061 //////////////////////////////////////////////////////////////////////
062 //////////////////////////////////////////////////////////////////////
063
064 private final Queue incomingQueue;
065
066 private final Queue outgoingQueue;
067
068 private final String jndiTopicName;
069
070 private final String jndiTopicConnectionFactoryName;
071
072 private int state = STATE_UNINITIALIZED;
073
074 private TopicConnection topicConnection;
075
076 private TopicSession topicSession;
077
078 private TopicPublisher topicPublisher;
079
080 private TopicSubscriber topicSubscriber;
081
082 private Thread thread;
083
084 private Object lockObject = new Object ( );
085
086 private boolean isOkToRun = false;
087
088 //////////////////////////////////////////////////////////////////////
089 //////////////////////////////////////////////////////////////////////
090
091 public static void main ( String [ ] args )
092 //////////////////////////////////////////////////////////////////////
093 {
094 Queue incomingQueue = new ListQueue ( new ArrayList ( ) );
095
096 Queue outgoingQueue = new ListQueue ( new ArrayList ( ) );
097
098 Serializable outgoingSerializable = "Test";
099
100 if ( args.length > 0 )
101 {
102 outgoingSerializable = args [ 0 ];
103 }
104
105 String jndiTopicName = DEFAULT_JNDI_TOPIC_NAME;
106
107 if ( args.length > 1 )
108 {
109 jndiTopicName = args [ 1 ];
110 }
111
112 String jndiTopicConnectionFactoryName
113 = DEFAULT_JNDI_TOPIC_CONNECTION_FACTORY_NAME;
114
115 if ( args.length > 2 )
116 {
117 jndiTopicConnectionFactoryName = args [ 2 ];
118 }
119
120 Lifecycle lifecycle = new Courier ( incomingQueue, outgoingQueue,
121 jndiTopicName, jndiTopicConnectionFactoryName );
122
123 System.out.println ( "Initializing..." );
124
125 lifecycle.init ( );
126
127 System.out.println ( "Starting..." );
128
129 lifecycle.start ( );
130
131 System.out.println ( "Transmitting..." );
132
133 outgoingQueue.append ( outgoingSerializable );
134
135 System.out.println ( "Receiving..." );
136
137 try
138 {
139 System.out.println ( incomingQueue.pull ( ) );
140 }
141 catch ( InterruptedException ex )
142 {
143 ex.printStackTrace ( );
144 }
145
146 System.out.println ( "Stopping..." );
147
148 lifecycle.stop ( );
149
150 System.out.println ( "Destroying..." );
151
152 lifecycle.destroy ( );
153 }
154
155 //////////////////////////////////////////////////////////////////////
156 //////////////////////////////////////////////////////////////////////
157
158 public Courier (
159 Queue incomingQueue,
160 Queue outgoingQueue,
161 String jndiTopicName,
162 String jndiTopicConnectionFactoryName )
163 //////////////////////////////////////////////////////////////////////
164 {
165 NullArgumentException.check ( this.incomingQueue = incomingQueue );
166
167 NullArgumentException.check ( this.outgoingQueue = outgoingQueue );
168
169 NullArgumentException.check ( this.jndiTopicName = jndiTopicName );
170
171 NullArgumentException.check ( this.jndiTopicConnectionFactoryName
172 = jndiTopicConnectionFactoryName );
173 }
174
175 public Courier (
176 Queue incomingQueue,
177 Queue outgoingQueue,
178 String jndiTopicName )
179 //////////////////////////////////////////////////////////////////////
180 {
181 this ( incomingQueue, outgoingQueue, jndiTopicName,
182 DEFAULT_JNDI_TOPIC_CONNECTION_FACTORY_NAME );
183 }
184
185 public Courier (
186 Queue incomingQueue,
187 Queue outgoingQueue )
188 //////////////////////////////////////////////////////////////////////
189 {
190 this ( incomingQueue, outgoingQueue, DEFAULT_JNDI_TOPIC_NAME );
191 }
192
193 //////////////////////////////////////////////////////////////////////
194 //////////////////////////////////////////////////////////////////////
195
196 public synchronized void init ( )
197 //////////////////////////////////////////////////////////////////////
198 {
199 if ( state != STATE_UNINITIALIZED )
200 {
201 throw new IllegalStateException ( "already initialized" );
202 }
203
204 try
205 {
206 Context context = new InitialContext ( );
207
208 TopicConnectionFactory topicConnectionFactory
209 = ( TopicConnectionFactory ) context.lookup (
210 jndiTopicConnectionFactoryName );
211
212 Topic topic = ( Topic ) context.lookup ( jndiTopicName );
213
214 topicConnection = topicConnectionFactory.createTopicConnection ( );
215
216 topicSession = topicConnection.createTopicSession (
217 false, Session.AUTO_ACKNOWLEDGE );
218
219 topicPublisher = topicSession.createPublisher ( topic );
220
221 String messageSelector = null;
222
223 boolean noLocal = true;
224
225 topicSubscriber = topicSession.createSubscriber (
226 topic, messageSelector, noLocal );
227
228 topicSubscriber.setMessageListener ( this );
229
230 state = STATE_INITIALIZED;
231 }
232 catch ( NamingException ex )
233 {
234 ex.printStackTrace ( );
235 }
236 catch ( JMSException ex )
237 {
238 ex.printStackTrace ( );
239
240 // Do we need to close some stuff here?
241
242 // Do I need a finalize method?
243 }
244 }
245
246 public synchronized void start ( )
247 //////////////////////////////////////////////////////////////////////
248 {
249 if ( state != STATE_INITIALIZED )
250 {
251 throw new IllegalStateException (
252 "not initialized or already started" );
253 }
254
255 // Use of the lockObject ensures that a new thread cannot be started
256 // until the previously running thread has completed.
257
258 synchronized ( lockObject )
259 {
260 isOkToRun = true;
261
262 try
263 {
264 topicConnection.start ( );
265
266 thread = new Thread ( this );
267
268 thread.start ( );
269
270 state = STATE_STARTED;
271 }
272 catch ( JMSException ex )
273 {
274 // do some cleanup here?
275 ex.printStackTrace ( );
276 }
277 }
278 }
279
280 public void onMessage ( Message message )
281 //////////////////////////////////////////////////////////////////////
282 {
283 // No need to synchronize as the Session passes messages serially.
284
285 try
286 {
287 if ( message instanceof ObjectMessage )
288 {
289 Object messageObject
290 = ( ( ObjectMessage ) message ).getObject ( );
291
292 incomingQueue.append ( messageObject );
293 }
294 else
295 {
296 // ... else what?
297 }
298 }
299 catch ( Exception ex )
300 {
301 // must catch all Exceptions
302
303 ex.printStackTrace ( );
304 }
305 }
306
307 public void run ( )
308 //////////////////////////////////////////////////////////////////////
309 {
310 if ( thread != Thread.currentThread ( ) )
311 {
312 throw new IllegalStateException ( "call start() instead" );
313 }
314
315 // Use of the lockObject ensures that a new thread cannot be started
316 // until the previously running thread has completed.
317
318 synchronized ( lockObject )
319 {
320 while ( isOkToRun )
321 {
322 try
323 {
324 Serializable outgoingSerializable
325 = ( Serializable ) outgoingQueue.pull ( );
326
327 ObjectMessage objectMessage
328 = topicSession.createObjectMessage ( );
329
330 objectMessage.setObject ( outgoingSerializable );
331
332 topicPublisher.publish ( objectMessage );
333 }
334 catch ( InterruptedException ex )
335 {
336 // Will exit loop if isOkToRun is now false.
337 }
338 catch ( JMSException ex )
339 {
340 ex.printStackTrace ( );
341
342 // What kind of cleanup and state transition here?
343
344 isOkToRun = false;
345 }
346 }
347 }
348 }
349
350 public synchronized void stop ( )
351 //////////////////////////////////////////////////////////////////////
352 {
353 if ( state != STATE_STARTED )
354 {
355 throw new IllegalStateException ( "not started" );
356 }
357
358 isOkToRun = false;
359
360 thread.interrupt ( );
361
362 thread = null;
363
364 try
365 {
366 topicConnection.stop ( );
367 }
368 catch ( JMSException ex )
369 {
370 ex.printStackTrace ( );
371
372 // what kind of clean-up here?
373 }
374
375 state = STATE_INITIALIZED;
376 }
377
378 public synchronized void destroy ( )
379 //////////////////////////////////////////////////////////////////////
380 {
381 if ( state != STATE_INITIALIZED )
382 {
383 throw new IllegalStateException ( "not initialized" );
384 }
385
386 try
387 {
388 topicSubscriber.close ( );
389 }
390 catch ( Exception ex )
391 {
392 ex.printStackTrace ( );
393 }
394
395 // what about others?
396
397 try
398 {
399 topicConnection.close ( );
400 }
401 catch ( Exception ex )
402 {
403 ex.printStackTrace ( );
404 }
405
406 state = STATE_UNINITIALIZED;
407 }
408
409 //////////////////////////////////////////////////////////////////////
410 //////////////////////////////////////////////////////////////////////
411 }