001    /*
002    
003      $Id: ExternalCommunication.java,v 1.9 2003/05/15 10:46:54 culdesac Exp $
004    
005    */
006    
007    package sharpster.daemon.externalcommunication;
008    
009    import sharpster.common.FileCollection;
010    import sharpster.common.ResponseCollection;
011    import sharpster.daemon.usermanagement.UserManager;
012    import sharpster.daemon.commandmanagement.ExternalCommandManager;
013    import sharpster.daemon.SharpsterDaemon;
014    import sharpster.daemon.externalcommunication.DaemonThread;
015    import sharpster.daemon.externalcommunication.AdvertisementThread;
016    import sharpster.common.*;
017    
018    import java.io.*;
019    import java.util.*;
020    
021    import net.jxta.endpoint.*;
022    import net.jxta.peergroup.PeerGroup;
023    import net.jxta.peergroup.PeerGroupFactory;
024    import net.jxta.exception.PeerGroupException;
025    import net.jxta.document.AdvertisementFactory;
026    import net.jxta.document.StructuredDocumentFactory;
027    import net.jxta.document.Advertisement;
028    import net.jxta.document.Element;
029    import net.jxta.document.MimeMediaType;
030    import net.jxta.document.StructuredDocument;
031    import net.jxta.document.StructuredTextDocument;
032    import net.jxta.document.StructuredDocumentUtils;
033    import net.jxta.discovery.DiscoveryService;
034    import net.jxta.pipe.PipeService;
035    import net.jxta.pipe.InputPipe;
036    import net.jxta.pipe.OutputPipe;
037    import net.jxta.pipe.PipeID;
038    import net.jxta.protocol.PipeAdvertisement;
039    import net.jxta.protocol.PeerGroupAdvertisement;
040    import net.jxta.protocol.ModuleImplAdvertisement;
041    import net.jxta.protocol.ModuleClassAdvertisement;
042    import net.jxta.protocol.ModuleSpecAdvertisement;
043    import net.jxta.endpoint.Message;
044    import net.jxta.platform.ModuleClassID;
045    import net.jxta.platform.ModuleSpecID;
046    import net.jxta.id.IDFactory;
047    import net.jxta.socket.*;
048    
049    /**
050     * Class responsible for communcating with other Sharpster daemons.
051     */
052    public class ExternalCommunication {
053    
054        private UserManager userManager;
055        private ExternalCommandManager externalCommand;
056        private DaemonThread daemonThread;
057        private AdvertisementThread advertisementThread;
058        private Mutex ecMutex;
059        private Mutex globalMutex;
060    
061        private PeerGroup netPeerGroup = null;
062        private PeerGroup sharpsterPeerGroup = null;
063        private DiscoveryService discoveryService = null;
064        private PipeService pipeService = null;
065    
066        /**
067         *
068         */
069        public ExternalCommunication(Mutex globalMutex) {
070            ecMutex = new Mutex();
071            this.globalMutex = globalMutex;
072            daemonThread = new DaemonThread(ecMutex, this.globalMutex);
073            advertisementThread = new AdvertisementThread(ecMutex);
074        }
075    
076        /**
077         *
078         */
079        public void start() {
080            try {
081                netPeerGroup = PeerGroupFactory.newNetPeerGroup();
082            }
083            catch(PeerGroupException e) {
084                System.out.println("Fatal error: unable to create the default peer group");
085                System.exit(1);
086            }
087            try {
088                createPeerGroup();
089            }
090            catch(Exception e) {
091                System.out.println("Fatal error: unable to create the SharpsterGroup peer group");
092                System.exit(1);
093            }
094    
095            advertisementThread.initialize(discoveryService,
096                                           pipeService,
097                                           sharpsterPeerGroup,
098                                           userManager);
099    
100            daemonThread.initialize(discoveryService,
101                                    pipeService,
102                                    sharpsterPeerGroup,
103                                    externalCommand);
104    
105            daemonThread.start();
106            advertisementThread.start();
107        }
108    
109        /**
110         *
111         */
112        public void shutdown() {
113            daemonThread.shutdown();
114            advertisementThread.shutdown();
115        }
116    
117        /**
118         *
119         */
120        public void initialize(UserManager um,
121                               ExternalCommandManager ecm) {
122            userManager = um;
123            externalCommand = ecm;
124        }
125    
126        /**
127         *
128         */
129        public void clearLocalCache() {
130            try {
131                discoveryService = netPeerGroup.getDiscoveryService();
132                discoveryService.flushAdvertisements(null,0);
133                //discoveryService.publish(group.getPeerAdvertisement(),DiscoveryService.PEER);
134            } catch(Exception e) {
135                System.out.println("Warning: unable to clear the local cache");
136            }
137        }
138    
139        /**
140         *
141         */
142        public void createPeerGroup() throws Exception {
143            Enumeration enum = null;
144    
145            SharpsterDaemon.setPeerName(netPeerGroup.getPeerName());
146            discoveryService = netPeerGroup.getDiscoveryService();
147    
148            //clearLocalCache();
149            /*
150            for(int i=0; i<3; i++) {
151                System.out.println("EC::Trying to find local advertisements [" + (i+1) + " of 3]");
152                enum = discoveryService.getLocalAdvertisements(DiscoveryService.GROUP,
153                                                               "Name",
154                                                               "SharpsterGroup");
155                if(enum != null && enum.hasMoreElements()) {
156                    break;
157                }
158    
159                discoveryService.getRemoteAdvertisements(null,
160                                                         DiscoveryService.GROUP,
161                                                         "Name",
162                                                         "SharpsterGroup", 1, null);
163                try {
164                    Thread.sleep(3000);
165                }
166                catch (Exception e) {
167                }
168            }
169    
170            if(enum == null || !enum.hasMoreElements()) {
171                try {
172                    ModuleImplAdvertisement implAdv = netPeerGroup.getAllPurposePeerGroupImplAdvertisement();
173                    sharpsterPeerGroup = netPeerGroup.newGroup(null, implAdv, "SharpsterGroup",
174                                                               "The Sharpster share group");
175                    System.out.println("EC::Could not find a group advertisement, creating one.");
176                }
177                catch(Exception e) {
178                    throw e;
179                }
180            } else {
181                try {
182                    PeerGroupAdvertisement netAdv = null;
183                    netAdv = (PeerGroupAdvertisement)enum.nextElement();
184                    sharpsterPeerGroup = netPeerGroup.newGroup(netAdv);
185                    System.out.println("EC::Found a group advertisement.");
186                }
187                catch(Exception e) {
188                    throw e;
189                }
190            }*/
191    
192            try {
193                //We are using a static group advertisement instead of creating one.
194                PeerGroupAdvertisement netAdv = null;
195                FileInputStream is = new FileInputStream("config/SharpsterGroup.adv");
196                netAdv = (PeerGroupAdvertisement)AdvertisementFactory.newAdvertisement(MimeMediaType.XMLUTF8, is);
197                is.close();
198                sharpsterPeerGroup = netPeerGroup.newGroup(netAdv);
199            }
200            catch(Exception e) {
201                throw e;
202            }
203    
204            try {
205                discoveryService = sharpsterPeerGroup.getDiscoveryService();
206                pipeService = sharpsterPeerGroup.getPipeService();
207            }
208            catch(Exception e) {
209                throw e;
210            }
211        }
212    
213        /*
214         *
215         */
216        public LinkedList findLocalPeerServices(String nameFilter) {
217            LinkedList list = new LinkedList();
218            Enumeration enum;
219    
220            try {
221                enum = discoveryService.getLocalAdvertisements(DiscoveryService.ADV,
222                                                               "Name",
223                                                               "Sharpster:Share:"+nameFilter);
224    
225                while(enum != null && enum.hasMoreElements()) {
226                    try {
227                        PipeAdvertisement pipeAdv = (PipeAdvertisement)enum.nextElement();
228                        String localAdv = "Sharpster:Share:"+SharpsterDaemon.getPeerName();
229                        if(!localAdv.equals(pipeAdv.getName())) {
230                            String userName = pipeAdv.getName();
231                            userName = userName.substring(16);
232                            userManager.addUser(userName);
233                            list.add(pipeAdv);
234                        }
235                    } catch(Exception e) {
236                    }
237                }
238            } catch (Exception e) {
239            }
240    
241            return list;
242        }
243    
244        private ResponseCollection sendCommand(FileCollection files,
245                                               String user,
246                                               int command,
247                                               String role) {
248            PipeAdvertisement pipeAdv = null;
249            DaemonDaemonMessage message;
250            InputStream inputStream;
251            OutputStream ouputStream;
252            byte[] byteArrayData;
253            ByteArrayInputStream byteInputStream;
254            ByteArrayOutputStream byteOutputStream;
255            ObjectInputStream objectInputStream;
256            ObjectOutputStream objectOutputStream;
257            ByteArrayMessageElement byteMessageElement;
258            InputStreamMessageElement streamMessageElement;
259    
260            ResponseCollection responses = new ResponseCollection();
261            LinkedList list = findLocalPeerServices(user);
262    
263            if(list == null || list.size() == 0) {
264                NetworkErrorResponse error = new NetworkErrorResponse();
265                error.setOrigin("ExternalCommunication");
266                error.setUser(SharpsterDaemon.getPeerName());
267                error.setError(true);
268                error.setMessage("Unable to connect to "+user);
269                responses.addResponse(error);
270                return responses;
271            }
272    
273            if(list.size() > 1) pipeAdv = (PipeAdvertisement)list.getLast();
274            else pipeAdv = (PipeAdvertisement)list.getFirst();
275    
276            try {
277                OutputPipe outputPipe = pipeService.createOutputPipe(pipeAdv,30*1000);
278                if(outputPipe == null) {
279                    throw new Exception();
280                }
281    
282                message = new DaemonDaemonMessage();
283                message.commandId = command;
284                message.user = SharpsterDaemon.getPeerName();
285                message.files = files;
286                message.role = role;
287                message.response = null;
288    
289                byteOutputStream = new ByteArrayOutputStream();
290                objectOutputStream = new ObjectOutputStream(byteOutputStream);
291                objectOutputStream.writeObject(message);
292                objectOutputStream.flush();
293                byteArrayData = byteOutputStream.toByteArray();
294                objectOutputStream.close();
295                byteOutputStream.close();
296    
297                PipeAdvertisement pipeAdvertisement = (PipeAdvertisement)
298                    AdvertisementFactory.newAdvertisement(PipeAdvertisement.getAdvertisementType());
299                pipeAdvertisement.setPipeID(IDFactory.newPipeID(sharpsterPeerGroup.getPeerGroupID()));
300                pipeAdvertisement.setName("Sharpster:Client:"+sharpster.daemon.SharpsterDaemon.getPeerName());
301                pipeAdvertisement.setType(PipeService.UnicastType);
302                InputPipe inputPipe = pipeService.createInputPipe(pipeAdvertisement);
303    
304                Message msg = pipeService.createMessage();
305    
306                streamMessageElement =
307                    new InputStreamMessageElement("Sharpster:PipeAdv",
308                                                  new MimeMediaType("text","xml"),
309                                                  pipeAdvertisement.getDocument(new MimeMediaType("text","xml")).getStream(),
310                                                  null);
311    
312                byteMessageElement =
313                    new ByteArrayMessageElement("Sharpster:Request",
314                                                null, byteArrayData,
315                                                0, byteArrayData.length, null);
316    
317                msg.addMessageElement(streamMessageElement);
318                msg.addMessageElement(byteMessageElement);
319    
320                outputPipe.send(msg);
321                System.out.println("EC::Sending command: "+message.commandId);
322    
323                //maybe we should use poll here instead
324                msg = inputPipe.waitForMessage();
325    
326                byteMessageElement =
327                    (ByteArrayMessageElement)msg.getMessageElement("Sharpster:Response");
328                if(byteMessageElement == null) throw new Exception();
329    
330                byteArrayData = byteMessageElement.getBytes();
331                byteInputStream = new ByteArrayInputStream(byteArrayData);
332                objectInputStream = new ObjectInputStream(byteInputStream);
333                message = (DaemonDaemonMessage)objectInputStream.readObject();
334                objectInputStream.close();
335                byteInputStream.close();
336    
337                responses = message.response;
338    
339                /*JxtaSocket socket = new JxtaSocket(sharpsterPeerGroup, pipeAdv);
340                ObjectInputStream in = new ObjectInputStream(socket.getInputStream());
341                String peerName = (String)in.readObject();
342    
343                ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream());
344                DaemonDaemonMessage msg = new DaemonDaemonMessage();
345                msg.commandId = command;
346                msg.user = SharpsterDaemon.getPeerName();
347                msg.files = files;
348                msg.response = null;
349    
350                out.writeObject(msg);
351                out.flush();
352    
353                msg = (DaemonDaemonMessage)in.readObject();
354                if(msg != null) responses = msg.response;
355                else throw new Exception();
356    
357                in.close();
358                out.close();
359                socket.close();*/
360            }
361            catch(Exception e) {
362                //e.printStackTrace();
363                NetworkErrorResponse error = new NetworkErrorResponse();
364                error.setOrigin("ExternalCommunication");
365                error.setUser(SharpsterDaemon.getPeerName());
366                error.setError(true);
367                error.setMessage("Error during connection with "+user);
368                responses.addResponse(error);
369                return responses;
370            }
371    
372            return responses;
373        }
374    
375        /**
376         * Executes a checkout from a CVS of an external user.
377         */
378        public ResponseCollection checkoutFiles(FileCollection files,
379                                                String user,
380                                                String role) {
381            return sendCommand(files,user,ExternalMessageType.CHECKOUT_FILES,role);
382        }
383    
384        /**
385         * Executes a commit to a CVS of an external user.
386         */
387        public ResponseCollection commitFiles(FileCollection files,
388                                              String user,
389                                              String role) {
390            return sendCommand(files,user,ExternalMessageType.COMMIT_FILES,role);
391        }
392    
393        /**
394         * Executes a update to a CVS of an external user.
395         */
396        public ResponseCollection updateFiles(FileCollection files, String user) {
397            return sendCommand(files,user,ExternalMessageType.UPDATE_FILES,null);
398        }
399    
400        /**
401         * Executes a file add to a CVS of an external user.
402         */
403        public ResponseCollection addFiles(FileCollection files, String user) {
404            return sendCommand(files,user,ExternalMessageType.ADD_FILES,null);
405        }
406    
407        /**
408         * Executes a file remove to a CVS of an external user.
409         */
410        public ResponseCollection removeFiles(FileCollection files, String user) {
411            return sendCommand(files,user,ExternalMessageType.REMOVE_FILES,null);
412        }
413    }