001         package com.croftsoft.core.net.jms;
002         
003         // imported J2SE packages
004         
005         import java.io.Serializable;
006         import javax.naming.Context;
007         import javax.naming.InitialContext;
008         import javax.naming.NamingException;
009         import java.util.*;
010    
011         // imported J2EE packages
012         
013         import javax.jms.JMSException;
014         import javax.jms.Message;
015         import javax.jms.MessageListener;
016         import javax.jms.ObjectMessage;
017         import javax.jms.Session;
018         import javax.jms.Topic;
019         import javax.jms.TopicConnection;
020         import javax.jms.TopicConnectionFactory;
021         import javax.jms.TopicPublisher;
022         import javax.jms.TopicSession;
023         import javax.jms.TopicSubscriber;
024    
025         // imported CroftSoft packages
026         
027         import com.croftsoft.core.lang.lifecycle.Lifecycle;
028         import com.croftsoft.core.lang.NullArgumentException;
029         import com.croftsoft.core.util.queue.ListQueue;
030         import com.croftsoft.core.util.queue.Queue;
031         
032         /*********************************************************************
033         * Exchanges serializable Objects with a Topic via Queues.
034         *
035         * @version
036         *   $Id: Courier.java,v 1.3 2008/09/20 04:12:46 croft Exp $
037         * @since
038         *   2001-02-22
039         * @author
040         *   <a href="https://www.croftsoft.com/">David Wallace Croft</a>
041         *********************************************************************/
042    
043         public final class  Courier
044           implements Lifecycle, MessageListener, Runnable
045         //////////////////////////////////////////////////////////////////////
046         //////////////////////////////////////////////////////////////////////
047         {
048           
049         private static final String
050           DEFAULT_JNDI_TOPIC_CONNECTION_FACTORY_NAME
051           = "jms/TopicConnectionFactory";
052         
053         private static final String  DEFAULT_JNDI_TOPIC_NAME = "jms/Topic";
054         
055         private static final int  STATE_UNINITIALIZED = 0;
056         
057         private static final int  STATE_INITIALIZED   = 1;
058         
059         private static final int  STATE_STARTED       = 2;
060         
061         //////////////////////////////////////////////////////////////////////
062         //////////////////////////////////////////////////////////////////////
063         
064         private final Queue   incomingQueue;
065         
066         private final Queue   outgoingQueue;
067         
068         private final String  jndiTopicName;
069         
070         private final String  jndiTopicConnectionFactoryName;
071         
072         private int  state = STATE_UNINITIALIZED;
073         
074         private TopicConnection  topicConnection;
075         
076         private TopicSession     topicSession;
077         
078         private TopicPublisher   topicPublisher;
079         
080         private TopicSubscriber  topicSubscriber;
081         
082         private Thread   thread;
083         
084         private Object   lockObject = new Object ( );
085         
086         private boolean  isOkToRun = false;
087         
088         //////////////////////////////////////////////////////////////////////
089         //////////////////////////////////////////////////////////////////////
090         
091         public static void  main ( String [ ]  args )
092         //////////////////////////////////////////////////////////////////////
093         {
094           Queue  incomingQueue = new ListQueue ( new ArrayList ( ) );
095           
096           Queue  outgoingQueue = new ListQueue ( new ArrayList ( ) );
097           
098           Serializable  outgoingSerializable = "Test";
099           
100           if ( args.length > 0 )
101           {
102             outgoingSerializable = args [ 0 ];
103           }
104           
105           String  jndiTopicName = DEFAULT_JNDI_TOPIC_NAME;
106           
107           if ( args.length > 1 )
108           {
109             jndiTopicName = args [ 1 ];
110           }
111           
112           String  jndiTopicConnectionFactoryName
113             = DEFAULT_JNDI_TOPIC_CONNECTION_FACTORY_NAME;
114           
115           if ( args.length > 2 )
116           {
117             jndiTopicConnectionFactoryName = args [ 2 ];
118           }
119           
120           Lifecycle  lifecycle = new Courier ( incomingQueue, outgoingQueue,
121             jndiTopicName, jndiTopicConnectionFactoryName );
122           
123           System.out.println ( "Initializing..." );
124           
125           lifecycle.init ( );
126           
127           System.out.println ( "Starting..." );
128           
129           lifecycle.start ( );
130           
131           System.out.println ( "Transmitting..." );
132           
133           outgoingQueue.append ( outgoingSerializable );
134           
135           System.out.println ( "Receiving..." );
136           
137           try
138           {
139             System.out.println ( incomingQueue.pull ( ) );
140           }
141           catch ( InterruptedException  ex )
142           {
143             ex.printStackTrace ( );
144           }
145           
146           System.out.println ( "Stopping..." );
147           
148           lifecycle.stop ( );
149           
150           System.out.println ( "Destroying..." );
151           
152           lifecycle.destroy ( );
153         }
154    
155         //////////////////////////////////////////////////////////////////////
156         //////////////////////////////////////////////////////////////////////
157    
158         public  Courier (
159           Queue   incomingQueue,
160           Queue   outgoingQueue,
161           String  jndiTopicName,
162           String  jndiTopicConnectionFactoryName )
163         //////////////////////////////////////////////////////////////////////
164         {
165           NullArgumentException.check ( this.incomingQueue = incomingQueue );
166           
167           NullArgumentException.check ( this.outgoingQueue = outgoingQueue );
168           
169           NullArgumentException.check ( this.jndiTopicName = jndiTopicName );
170           
171           NullArgumentException.check ( this.jndiTopicConnectionFactoryName
172             = jndiTopicConnectionFactoryName );
173         }
174         
175         public  Courier (
176           Queue   incomingQueue,
177           Queue   outgoingQueue,
178           String  jndiTopicName )
179         //////////////////////////////////////////////////////////////////////
180         {
181           this ( incomingQueue, outgoingQueue, jndiTopicName,
182             DEFAULT_JNDI_TOPIC_CONNECTION_FACTORY_NAME );
183         }
184         
185         public  Courier (
186           Queue  incomingQueue,
187           Queue  outgoingQueue )
188         //////////////////////////////////////////////////////////////////////
189         {
190           this ( incomingQueue, outgoingQueue, DEFAULT_JNDI_TOPIC_NAME );
191         }
192         
193         //////////////////////////////////////////////////////////////////////
194         //////////////////////////////////////////////////////////////////////
195         
196         public synchronized void  init ( )
197         //////////////////////////////////////////////////////////////////////
198         {
199           if ( state != STATE_UNINITIALIZED )
200           {
201             throw new IllegalStateException ( "already initialized" );
202           }
203           
204           try
205           {
206             Context  context = new InitialContext ( );
207             
208             TopicConnectionFactory  topicConnectionFactory
209               = ( TopicConnectionFactory ) context.lookup (
210               jndiTopicConnectionFactoryName );
211           
212             Topic  topic = ( Topic ) context.lookup ( jndiTopicName );
213    
214             topicConnection = topicConnectionFactory.createTopicConnection ( );
215           
216             topicSession = topicConnection.createTopicSession (
217               false, Session.AUTO_ACKNOWLEDGE );
218           
219             topicPublisher = topicSession.createPublisher ( topic );
220             
221             String  messageSelector = null;
222             
223             boolean  noLocal = true;
224           
225             topicSubscriber = topicSession.createSubscriber (
226               topic, messageSelector, noLocal );
227           
228             topicSubscriber.setMessageListener ( this );
229           
230             state = STATE_INITIALIZED;
231           }
232           catch ( NamingException  ex )
233           {
234             ex.printStackTrace ( );
235           }
236           catch ( JMSException  ex )
237           {
238             ex.printStackTrace ( );
239             
240    // Do we need to close some stuff here?
241             
242    // Do I need a finalize method?         
243           }
244         }
245         
246         public synchronized void  start ( )
247         //////////////////////////////////////////////////////////////////////
248         {
249           if ( state != STATE_INITIALIZED )
250           {
251             throw new IllegalStateException (
252               "not initialized or already started" );
253           }
254           
255           // Use of the lockObject ensures that a new thread cannot be started
256           // until the previously running thread has completed.
257           
258           synchronized ( lockObject )
259           {
260             isOkToRun = true;
261             
262             try
263             {
264               topicConnection.start ( );
265           
266               thread = new Thread ( this );
267           
268               thread.start ( );
269           
270               state = STATE_STARTED;
271             }
272             catch ( JMSException  ex )
273             {
274    // do some cleanup here?           
275               ex.printStackTrace ( );
276             }
277           }
278         }
279         
280         public void  onMessage ( Message  message )
281         //////////////////////////////////////////////////////////////////////
282         {
283           // No need to synchronize as the Session passes messages serially.
284           
285           try
286           {
287             if ( message instanceof ObjectMessage )
288             {
289               Object  messageObject
290                 = ( ( ObjectMessage ) message ).getObject ( );
291               
292               incomingQueue.append ( messageObject );
293             }
294             else
295             {
296    // ... else what?           
297             }
298           }
299           catch ( Exception  ex )
300           {
301             // must catch all Exceptions
302             
303             ex.printStackTrace ( );
304           }
305         }
306         
307         public void  run ( )
308         //////////////////////////////////////////////////////////////////////
309         {
310           if ( thread != Thread.currentThread ( ) )
311           {
312             throw new IllegalStateException ( "call start() instead" );
313           }
314           
315           // Use of the lockObject ensures that a new thread cannot be started
316           // until the previously running thread has completed.
317           
318           synchronized ( lockObject )
319           {
320             while ( isOkToRun )
321             {
322               try
323               {
324                 Serializable  outgoingSerializable
325                   = ( Serializable ) outgoingQueue.pull ( );
326                 
327                 ObjectMessage  objectMessage
328                   = topicSession.createObjectMessage ( );
329           
330                 objectMessage.setObject ( outgoingSerializable );
331           
332                 topicPublisher.publish ( objectMessage );
333               }
334               catch ( InterruptedException  ex )
335               {
336                 // Will exit loop if isOkToRun is now false.
337               }
338               catch ( JMSException  ex )
339               {
340                 ex.printStackTrace ( );
341                 
342    // What kind of cleanup and state transition here?             
343                 
344                 isOkToRun = false;
345               }
346             }
347           }
348         }
349         
350         public synchronized void  stop ( )
351         //////////////////////////////////////////////////////////////////////
352         {
353           if ( state != STATE_STARTED )
354           {
355             throw new IllegalStateException ( "not started" );
356           }
357           
358           isOkToRun = false;
359           
360           thread.interrupt ( );
361             
362           thread = null;
363           
364           try
365           {
366             topicConnection.stop ( );
367           }
368           catch ( JMSException  ex )
369           {
370             ex.printStackTrace ( );
371             
372    // what kind of clean-up here?         
373           }
374           
375           state = STATE_INITIALIZED;
376         }
377         
378         public synchronized void  destroy ( )
379         //////////////////////////////////////////////////////////////////////
380         {
381           if ( state != STATE_INITIALIZED )
382           {
383             throw new IllegalStateException ( "not initialized" );
384           }
385           
386           try
387           {
388             topicSubscriber.close ( );
389           }
390           catch ( Exception  ex )
391           {
392             ex.printStackTrace ( );
393           }
394           
395    // what about others?
396           
397           try
398           {
399             topicConnection.close ( );
400           }
401           catch ( Exception  ex )
402           {
403             ex.printStackTrace ( );
404           }
405           
406           state = STATE_UNINITIALIZED;
407         }
408    
409         //////////////////////////////////////////////////////////////////////
410         //////////////////////////////////////////////////////////////////////
411         }