001    /*
002    
003      $Id: AdvertisementThread.java,v 1.13 2003/05/08 10:34:22 culdesac Exp $
004    
005    */
006    
007    package sharpster.daemon.externalcommunication;
008    
009    import java.lang.Thread;
010    import java.io.*;
011    import java.util.*;
012    
013    import sharpster.common.*;
014    import sharpster.daemon.usermanagement.UserManager;
015    import sharpster.daemon.SharpsterDaemon;
016    
017    import net.jxta.peergroup.PeerGroup;
018    import net.jxta.peergroup.PeerGroupFactory;
019    import net.jxta.exception.PeerGroupException;
020    import net.jxta.document.AdvertisementFactory;
021    import net.jxta.document.StructuredDocumentFactory;
022    import net.jxta.document.Advertisement;
023    import net.jxta.document.Element;
024    import net.jxta.document.MimeMediaType;
025    import net.jxta.document.StructuredDocument;
026    import net.jxta.document.StructuredTextDocument;
027    import net.jxta.document.StructuredDocumentUtils;
028    import net.jxta.discovery.DiscoveryService;
029    import net.jxta.pipe.PipeService;
030    import net.jxta.pipe.InputPipe;
031    import net.jxta.pipe.OutputPipe;
032    import net.jxta.pipe.PipeID;
033    import net.jxta.protocol.PipeAdvertisement;
034    import net.jxta.protocol.PeerGroupAdvertisement;
035    import net.jxta.protocol.ModuleImplAdvertisement;
036    import net.jxta.protocol.ModuleClassAdvertisement;
037    import net.jxta.protocol.ModuleSpecAdvertisement;
038    import net.jxta.endpoint.*;
039    import net.jxta.platform.ModuleClassID;
040    import net.jxta.platform.ModuleSpecID;
041    import net.jxta.id.IDFactory;
042    import net.jxta.socket.*;
043    
044    /**
045     *
046     */
047    public class AdvertisementThread extends Thread {
048        
049        private Mutex ecMutex;
050        // private Mutex commandManagerMutex
051        private boolean shutdownRequested;
052        private UserManager userManager;
053        private int searchPeriod = 30*1000;
054        
055        private PeerGroup sharpsterPeerGroup = null;
056        private DiscoveryService discoveryService;
057        private PipeService pipeService;
058        
059        public AdvertisementThread(Mutex mutex) {
060            shutdownRequested = false;
061            ecMutex = mutex;
062        }
063    
064        public boolean initialize(DiscoveryService disco,
065                                  PipeService pipes,
066                                  PeerGroup peerGroup,
067                                  UserManager um) {
068            discoveryService = disco;
069            pipeService = pipes;
070            sharpsterPeerGroup = peerGroup;
071            userManager = um;
072            
073            return true;
074        }
075        
076        private void getRemoteShares(LinkedList list) {
077            PipeAdvertisement pipeAdv = null;
078            DaemonDaemonMessage message;
079            InputStream inputStream;
080            OutputStream ouputStream;
081            byte[] byteArrayData;
082            ByteArrayInputStream byteInputStream;
083            ByteArrayOutputStream byteOutputStream;
084            ObjectInputStream objectInputStream;
085            ObjectOutputStream objectOutputStream;
086            ByteArrayMessageElement byteMessageElement;
087            InputStreamMessageElement streamMessageElement;
088       
089            try {
090                for(int i=0; i<list.size(); i++) {
091                    System.out.println("AT::Connecting");
092                    
093                    pipeAdv = (PipeAdvertisement)list.get(i);
094                    OutputPipe outputPipe = pipeService.createOutputPipe(pipeAdv,30*1000);
095                    if(outputPipe == null) {
096                        System.out.println("AT::Unable to connect");
097                        continue;
098                    }
099                    
100                    message = new DaemonDaemonMessage();
101                    message.commandId = ExternalMessageType.GET_SHARES;
102                    message.user = SharpsterDaemon.getPeerName();
103                    message.files = null;
104                    message.response = null;
105                    
106                    byteOutputStream = new ByteArrayOutputStream();
107                    objectOutputStream = new ObjectOutputStream(byteOutputStream);
108                    objectOutputStream.writeObject(message);
109                    objectOutputStream.flush();
110                    byteArrayData = byteOutputStream.toByteArray();
111                    objectOutputStream.close();
112                    byteOutputStream.close();
113    
114                    PipeAdvertisement pipeAdvertisement = (PipeAdvertisement) 
115                        AdvertisementFactory.newAdvertisement(PipeAdvertisement.getAdvertisementType());
116                    pipeAdvertisement.setPipeID(IDFactory.newPipeID(sharpsterPeerGroup.getPeerGroupID()));
117                    pipeAdvertisement.setName("Sharpster:Client:"+sharpster.daemon.SharpsterDaemon.getPeerName());
118                    pipeAdvertisement.setType(PipeService.UnicastType);
119                    InputPipe inputPipe = pipeService.createInputPipe(pipeAdvertisement);
120                
121                    Message msg = pipeService.createMessage();
122                    
123                    streamMessageElement = 
124                        new InputStreamMessageElement("Sharpster:PipeAdv",
125                                                      new MimeMediaType("text","xml"),
126                                                      pipeAdvertisement.getDocument(new MimeMediaType("text","xml")).getStream(),
127                                                      null);
128    
129                    byteMessageElement = 
130                        new ByteArrayMessageElement("Sharpster:Request",
131                                                    null, byteArrayData,
132                                                    0, byteArrayData.length, null);     
133                    
134                    msg.addMessageElement(streamMessageElement);                                                            
135                    msg.addMessageElement(byteMessageElement);
136    
137                    outputPipe.send(msg);
138                    System.out.println("AT::Sending command: "+message.commandId);
139    
140                    //maybe we should use poll here instead
141                    msg = inputPipe.poll(30*1000);
142                    if(msg == null) throw new Exception();
143                    
144                    byteMessageElement = 
145                        (ByteArrayMessageElement)msg.getMessageElement("Sharpster:Response");
146                    if(byteMessageElement == null) throw new Exception();
147    
148                    byteArrayData = byteMessageElement.getBytes();
149                    byteInputStream = new ByteArrayInputStream(byteArrayData);
150                    objectInputStream = new ObjectInputStream(byteInputStream);
151                    message = (DaemonDaemonMessage)objectInputStream.readObject();
152                    objectInputStream.close();
153                    byteInputStream.close();
154    
155                    if(message != null && message.response != null) {
156                        System.out.println("AT::Got response");
157                        for(int j=0;j<message.response.getResponseCount();j++) {
158                            Response resp = message.response.getResponse(j);
159                            if(resp.getType() == ResponseType.SHARED_FILES) {
160                                SharedFilesResponse sharedFilesResp = (SharedFilesResponse)resp;
161                                userManager.addFileAccess(sharedFilesResp.getFiles(), message.user);
162                            }
163                        }
164                    }               
165                }
166            }
167            catch(Exception e) {
168                //e.printStackTrace();
169                System.out.println("AT::Error fetching remote shares");
170            }
171        }
172            /*JxtaSocket socket = null;
173            
174            for(int i=0; i<list.size(); i++) {
175                pipeAdv = (PipeAdvertisement)list.get(i);
176    
177                try {
178                    System.out.print("EC::Updating shares");
179    
180                    socket = new JxtaSocket(sharpsterPeerGroup, pipeAdv);
181                    
182                    System.out.print(".");
183    
184                    //System.out.println("Client: creating input stream");
185                    ObjectInputStream in = new ObjectInputStream(socket.getInputStream());
186                    
187                    System.out.print(".");
188    
189                    //System.out.println("Client: reading data");
190                    String peerName = (String)in.readObject();
191                    //System.out.println("Client: received data from "+peerName);
192    
193                    System.out.print(".");
194    
195                    //System.out.println("Client: creating output stream");
196                    ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream());
197    
198                    System.out.print(".");
199    
200                    DaemonDaemonMessage msg = new DaemonDaemonMessage();
201                    msg.commandId = ExternalMessageType.GET_SHARES;
202                    msg.user = SharpsterDaemon.getPeerName();
203                    msg.files = null;
204                    msg.response = null;
205    
206                    //System.out.println("Client: sending data");
207                    out.writeObject(msg);
208                    out.flush();
209                    
210                    System.out.print(".");
211    
212                    //in = new ObjectInputStream(socket.getInputStream());
213                    //System.out.println("Client: reading data again");
214                    msg = (DaemonDaemonMessage)in.readObject();
215                    //System.out.println("Client: received");
216    
217                    System.out.print(".");
218    
219                    if(msg != null && msg.response != null) {
220                        //System.out.println("Client: Got my response ("+msg.response.getResponseCount()+")");
221                        for(int j=0;j<msg.response.getResponseCount();j++) {
222                            Response resp = msg.response.getResponse(j);
223                            if(resp.getType() == ResponseType.SHARED_FILES) {
224                                SharedFilesResponse sharedFilesResp = (SharedFilesResponse)resp;
225                                userManager.addFileAccess(sharedFilesResp.getFiles(), peerName);
226                            }
227                        }
228                    }
229    
230                    //System.out.println("Client: closing socket");
231                    System.out.print(".");
232                    in.close();
233                    System.out.print(".");
234                    out.close();
235                    System.out.print(".");
236                    socket.close();
237                } catch (Exception e) {
238                    e.printStackTrace();
239                    //System.out.print("failure, ");
240                    //return;
241                }
242    
243                System.out.println("done");
244            }*/
245        //}
246    
247        private LinkedList discoverServices() {
248            LinkedList list = new LinkedList();
249            Enumeration enum;
250    
251            System.out.println("EC::Discovering services");
252    
253            try {
254                enum = discoveryService.getLocalAdvertisements(DiscoveryService.ADV,
255                                                               "Name",
256                                                               "Sharpster:Share:*");
257    
258                while(enum != null && enum.hasMoreElements()) {
259                    try {
260                        PipeAdvertisement pipeAdv = (PipeAdvertisement)enum.nextElement();
261                        System.out.println("EC::Found advertisement: " + pipeAdv.getName());
262                        String localAdv = "Sharpster:Share:"+SharpsterDaemon.getPeerName();
263                        if(!localAdv.equals(pipeAdv.getName())) {
264                            String userName = pipeAdv.getName();
265                            userName = userName.substring(16);
266                            userManager.addUser(userName);
267                            list.add(pipeAdv);
268                        }
269                    } catch(Exception e) {
270                    }
271                }
272    
273                discoveryService.getRemoteAdvertisements(null, DiscoveryService.ADV,
274                                                         "Name", "Sharpster:Share:*", 
275                                                         5, null);
276            } catch (Exception e) {
277            }
278            return list;
279        }
280    
281        /**
282         * Runs the thread
283         * When this method returns, the thread dies.
284         */
285        public void run() {
286            boolean alive = true;
287            System.out.println("EC::Advertisement thread has started");
288    
289            while(alive) {
290                LinkedList list = discoverServices();
291                getRemoteShares(list);
292                if(shutdownRequested) {
293                    break;
294                }
295                try {
296                    sleep(searchPeriod);
297                } catch (Exception e) {
298                }
299            }
300            System.out.println("EC::Advertisement thread has shutdown.");
301        }
302        
303        /**
304         * Notifies the thread to stop
305         * Sets shtudownRequested and starts to wait for the thread with join()
306         */
307        public void shutdown() {
308            shutdownRequested = true;
309            try {
310                this.join();
311            } catch (java.lang.InterruptedException e) {
312            }
313        }    
314    }
315