001    /*
002    
003      $Id: DaemonThread.java,v 1.13 2003/05/15 12:05:55 culdesac Exp $
004    
005    */
006    
007    package sharpster.daemon.externalcommunication;
008    
009    import java.lang.Thread;
010    import java.io.*;
011    import java.util.*;
012    
013    import sharpster.common.*;
014    import sharpster.daemon.SharpsterDaemon;
015    import sharpster.daemon.commandmanagement.ExternalCommandManager;
016    
017    import net.jxta.peergroup.PeerGroup;
018    import net.jxta.peergroup.PeerGroupFactory;
019    import net.jxta.exception.PeerGroupException;
020    import net.jxta.document.AdvertisementFactory;
021    import net.jxta.document.StructuredDocumentFactory;
022    import net.jxta.document.Advertisement;
023    import net.jxta.document.Element;
024    import net.jxta.document.MimeMediaType;
025    import net.jxta.document.StructuredDocument;
026    import net.jxta.document.StructuredTextDocument;
027    import net.jxta.document.StructuredDocumentUtils;
028    import net.jxta.discovery.DiscoveryService;
029    import net.jxta.pipe.PipeService;
030    import net.jxta.pipe.InputPipe;
031    import net.jxta.pipe.OutputPipe;
032    import net.jxta.pipe.PipeID;
033    import net.jxta.protocol.PipeAdvertisement;
034    import net.jxta.protocol.PeerGroupAdvertisement;
035    import net.jxta.protocol.ModuleImplAdvertisement;
036    import net.jxta.protocol.ModuleClassAdvertisement;
037    import net.jxta.protocol.ModuleSpecAdvertisement;
038    import net.jxta.endpoint.*;
039    import net.jxta.platform.ModuleClassID;
040    import net.jxta.platform.ModuleSpecID;
041    import net.jxta.id.IDFactory;
042    import net.jxta.socket.*;
043    
044    /**
045     *
046     */
047    public class DaemonThread extends Thread {
048    
049        private Mutex ecMutex;
050        private Mutex globalMutex;
051        private boolean shutdownRequested;
052    
053        private PeerGroup sharpsterPeerGroup = null;
054        private PipeAdvertisement pipeAdvertisement;
055        private JxtaServerSocket serverSocket;
056        private DiscoveryService discoveryService;
057        private PipeService pipeService;
058        private int timeToLive = 1*60*1000;
059        private ExternalCommandManager commandManager;
060        private InputPipe inputPipe = null;
061        private OutputPipe outputPipe = null;
062    
063        public DaemonThread (Mutex ec_mutex,
064                             Mutex globalMutex) {
065            shutdownRequested = false;
066            ecMutex = ec_mutex;
067            this.globalMutex = globalMutex;
068        }
069    
070        public boolean initialize (DiscoveryService disco,
071                                   PipeService pipes,
072                                   PeerGroup peerGroup,
073                                   ExternalCommandManager cm) {
074            discoveryService = disco;
075            pipeService = pipes;
076            sharpsterPeerGroup = peerGroup;
077            commandManager = cm;
078    
079            createPipeAdvertisement();
080            publishPipeAdvertisement();
081    
082            return true;
083        }
084    
085        public void createPipeAdvertisement() {
086            try {
087                Enumeration enum = discoveryService.getLocalAdvertisements(DiscoveryService.ADV,
088                                                              "Name",
089                                                              "Sharpster:Share:"+SharpsterDaemon.getPeerName());
090    
091                if(enum != null && enum.hasMoreElements()) {
092                    System.out.println("DT::Found an old advertisement, using it instead of creating a new one.");
093                    pipeAdvertisement = (PipeAdvertisement)enum.nextElement();
094                }
095                else {
096                    System.out.println("DT::Found no old advertisement, creating a new one.");
097                    pipeAdvertisement = (PipeAdvertisement)
098                            AdvertisementFactory.newAdvertisement(PipeAdvertisement.getAdvertisementType());
099                    pipeAdvertisement.setPipeID(IDFactory.newPipeID(sharpsterPeerGroup.getPeerGroupID()));
100                    pipeAdvertisement.setName("Sharpster:Share:"+sharpster.daemon.SharpsterDaemon.getPeerName());
101                    pipeAdvertisement.setType(PipeService.UnicastType);
102                }
103    
104                inputPipe = pipeService.createInputPipe(pipeAdvertisement);
105            } catch (Exception e) {
106                System.out.println("Fatal error: could not create a pipe advertisement");
107                e.printStackTrace();
108                System.exit(1);
109            }
110        }
111    
112        public void publishPipeAdvertisement() {
113            try {
114                //discoveryService.publish(pipeAdvertisement,DiscoveryService.ADV,DiscoveryService.INFINITE_LIFETIME,timeToLive);
115                discoveryService.publish(pipeAdvertisement,DiscoveryService.ADV,timeToLive,timeToLive);
116                discoveryService.remotePublish(pipeAdvertisement,DiscoveryService.ADV,timeToLive);
117            } catch (Exception e) {
118                System.out.println("Warning: could not publish the pipe advertisement");
119            }
120        }
121    
122        //    private void receiveAndDelegate(JxtaSocket socket) {
123        private void receiveAndDelegate(Message msg) {
124            DaemonDaemonMessage message;
125            InputStream inputStream;
126            OutputStream ouputStream;
127            byte[] byteArrayData;
128            ByteArrayInputStream byteInputStream;
129            ByteArrayOutputStream byteOutputStream;
130            ObjectInputStream objectInputStream;
131            ObjectOutputStream objectOutputStream;
132            PipeAdvertisement clientPipeAdv;
133            ByteArrayMessageElement byteMessageElement;
134            InputStreamMessageElement streamMessageElement;
135    
136            try {
137                inputStream =
138                    msg.getMessageElement("Sharpster:PipeAdv").getStream();
139                clientPipeAdv = (PipeAdvertisement)
140                    AdvertisementFactory.newAdvertisement(new MimeMediaType("text","xml"),
141                                                          inputStream);
142    
143                byteMessageElement =
144                    (ByteArrayMessageElement)msg.getMessageElement("Sharpster:Request");
145                if(byteMessageElement == null) throw new Exception();
146    
147                byteArrayData = byteMessageElement.getBytes();
148                byteInputStream = new ByteArrayInputStream(byteArrayData);
149                objectInputStream = new ObjectInputStream(byteInputStream);
150                message = (DaemonDaemonMessage)objectInputStream.readObject();
151                objectInputStream.close();
152                byteInputStream.close();
153    
154                System.out.println("DT::Command recieved: "+message.commandId);
155                message.response = new ResponseCollection();
156    
157                switch(message.commandId) {
158                case ExternalMessageType.GET_SHARES:
159                    message.response = commandManager.getFileAccess(message.user);
160                    break;
161    
162                case ExternalMessageType.CHECKOUT_FILES:
163                    message.response = commandManager.remoteCheckoutFiles(message.files,message.user,message.role);
164                    break;
165    
166                case ExternalMessageType.UPDATE_FILES:
167                    message.response = commandManager.remoteUpdateFiles(message.files,message.user);
168                    break;
169    
170                case ExternalMessageType.COMMIT_FILES:
171                    message.response = commandManager.remoteCommitFiles(message.files,message.user,message.role);
172                    break;
173    
174                case ExternalMessageType.ADD_FILES:
175                    message.response = commandManager.remoteAddFiles(message.files,message.user);
176                    break;
177    
178                case ExternalMessageType.REMOVE_FILES:
179                    message.response = commandManager.remoteRemoveFiles(message.files,message.user);
180                    break;
181                }
182    
183                message.files = null;
184                message.user = SharpsterDaemon.getPeerName();
185    
186                outputPipe = pipeService.createOutputPipe(clientPipeAdv,
187                                                          30*1000);
188    
189                if(outputPipe == null) throw new Exception();
190    
191                byteOutputStream = new ByteArrayOutputStream();
192                objectOutputStream = new ObjectOutputStream(byteOutputStream);
193                objectOutputStream.writeObject(message);
194                objectOutputStream.flush();
195                byteArrayData = byteOutputStream.toByteArray();
196                objectOutputStream.close();
197                byteOutputStream.close();
198    
199                byteMessageElement = new ByteArrayMessageElement("Sharpster:Response",
200                                                                 null, byteArrayData,
201                                                                 0, byteArrayData.length, null);
202    
203                msg.addMessageElement(byteMessageElement);
204                outputPipe.send(msg);
205            }
206            catch(Exception e) {
207                //e.printStackTrace();
208            }
209    
210            /*try {
211                System.out.print("EC::Sending shares");
212                ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream());
213    
214                System.out.print(".");
215                out.writeObject(SharpsterDaemon.getPeerName());
216                out.flush();
217    
218                System.out.print(".");
219                ObjectInputStream in = new ObjectInputStream(socket.getInputStream());
220                DaemonDaemonMessage msg = (DaemonDaemonMessage)in.readObject();
221    
222                msg.response = new ResponseCollection();
223    
224                switch(msg.commandId) {
225                case ExternalMessageType.GET_SHARES:
226                    msg.response = commandManager.getFileAccess(msg.user);
227                    break;
228    
229                case ExternalMessageType.CHECKOUT_FILES:
230                    msg.response = commandManager.remoteCheckoutFiles(msg.files,msg.user);
231                    break;
232    
233                case ExternalMessageType.UPDATE_FILES:
234                    msg.response = commandManager.remoteUpdateFiles(msg.files,msg.user);
235                    break;
236    
237                case ExternalMessageType.COMMIT_FILES:
238                    msg.response = commandManager.remoteCommitFiles(msg.files,msg.user);
239                    break;
240    
241                case ExternalMessageType.ADD_FILES:
242                    msg.response = commandManager.remoteAddFiles(msg.files,msg.user);
243                    break;
244    
245                case ExternalMessageType.REMOVE_FILES:
246                    msg.response = commandManager.remoteRemoveFiles(msg.files,msg.user);
247                    break;
248                }
249    
250                msg.files = null;
251                msg.user = SharpsterDaemon.getPeerName();
252    
253                System.out.print(".");
254                out.writeObject(msg);
255                out.flush();
256    
257                System.out.print(".");
258                out.close();
259                System.out.print(".");
260                in.close();
261                System.out.print(".");
262                socket.close();
263            } catch (Exception ie) {
264                System.out.print("failure, ");
265            }
266            System.out.println("done");*/
267        }
268    
269        /**
270         * Runs the thread, when this method
271         * returns, the thread dies.
272         */
273        public void run() {
274            boolean alive = true;
275            System.out.println("EC::Daemon thread has started");
276    
277            /*try {
278                serverSocket = new JxtaServerSocket(sharpsterPeerGroup, pipeAdvertisement);
279                serverSocket.setSoTimeout(0);
280            } catch (Exception e) {
281                System.out.println("Fatal error: could not create the socket.");
282                System.exit(1);
283            }*/
284    
285            Timer timer = new Timer();
286            timer.scheduleAtFixedRate(new TimerTask() {
287                    public void run() {
288                        System.out.println("DT::Publishing advertisement");
289                        publishPipeAdvertisement();
290                    }
291                }, 0, timeToLive/2);
292    
293            while(alive) {
294                try {
295                    Message msg = inputPipe.waitForMessage();
296                    //JxtaSocket socket = serverSocket.accept();
297                    //receiveAndDelegate(socket);
298                    receiveAndDelegate(msg);
299                } catch (Exception e) {
300                    //e.printStackTrace();
301                    /*if(shutdownRequested) {
302                        try {
303                            serverSocket.close();
304                        } catch (IOException closeException) {
305                            System.out.println("Warning: could not close the socket.");
306                            }
307                        alive = false;
308                        }*/
309                }
310            }
311    
312            System.out.println("EC::Daemon thread has shutdown.");
313        }
314    
315        /**
316         * Notifies the thread to stop
317         * Sets shtudownRequested and starts to wait for the thread with join()
318         */
319        public void shutdown() {
320            shutdownRequested = true;
321            try {
322                this.join();
323            } catch (java.lang.InterruptedException e) {
324            }
325        }
326    }
327