001 /* 002 003 $Id: AdvertisementThread.java,v 1.13 2003/05/08 10:34:22 culdesac Exp $ 004 005 */ 006 007 package sharpster.daemon.externalcommunication; 008 009 import java.lang.Thread; 010 import java.io.*; 011 import java.util.*; 012 013 import sharpster.common.*; 014 import sharpster.daemon.usermanagement.UserManager; 015 import sharpster.daemon.SharpsterDaemon; 016 017 import net.jxta.peergroup.PeerGroup; 018 import net.jxta.peergroup.PeerGroupFactory; 019 import net.jxta.exception.PeerGroupException; 020 import net.jxta.document.AdvertisementFactory; 021 import net.jxta.document.StructuredDocumentFactory; 022 import net.jxta.document.Advertisement; 023 import net.jxta.document.Element; 024 import net.jxta.document.MimeMediaType; 025 import net.jxta.document.StructuredDocument; 026 import net.jxta.document.StructuredTextDocument; 027 import net.jxta.document.StructuredDocumentUtils; 028 import net.jxta.discovery.DiscoveryService; 029 import net.jxta.pipe.PipeService; 030 import net.jxta.pipe.InputPipe; 031 import net.jxta.pipe.OutputPipe; 032 import net.jxta.pipe.PipeID; 033 import net.jxta.protocol.PipeAdvertisement; 034 import net.jxta.protocol.PeerGroupAdvertisement; 035 import net.jxta.protocol.ModuleImplAdvertisement; 036 import net.jxta.protocol.ModuleClassAdvertisement; 037 import net.jxta.protocol.ModuleSpecAdvertisement; 038 import net.jxta.endpoint.*; 039 import net.jxta.platform.ModuleClassID; 040 import net.jxta.platform.ModuleSpecID; 041 import net.jxta.id.IDFactory; 042 import net.jxta.socket.*; 043 044 /** 045 * 046 */ 047 public class AdvertisementThread extends Thread { 048 049 private Mutex ecMutex; 050 // private Mutex commandManagerMutex 051 private boolean shutdownRequested; 052 private UserManager userManager; 053 private int searchPeriod = 30*1000; 054 055 private PeerGroup sharpsterPeerGroup = null; 056 private DiscoveryService discoveryService; 057 private PipeService pipeService; 058 059 public AdvertisementThread(Mutex mutex) { 060 shutdownRequested = false; 061 ecMutex = mutex; 062 } 063 064 public boolean initialize(DiscoveryService disco, 065 PipeService pipes, 066 PeerGroup peerGroup, 067 UserManager um) { 068 discoveryService = disco; 069 pipeService = pipes; 070 sharpsterPeerGroup = peerGroup; 071 userManager = um; 072 073 return true; 074 } 075 076 private void getRemoteShares(LinkedList list) { 077 PipeAdvertisement pipeAdv = null; 078 DaemonDaemonMessage message; 079 InputStream inputStream; 080 OutputStream ouputStream; 081 byte[] byteArrayData; 082 ByteArrayInputStream byteInputStream; 083 ByteArrayOutputStream byteOutputStream; 084 ObjectInputStream objectInputStream; 085 ObjectOutputStream objectOutputStream; 086 ByteArrayMessageElement byteMessageElement; 087 InputStreamMessageElement streamMessageElement; 088 089 try { 090 for(int i=0; i<list.size(); i++) { 091 System.out.println("AT::Connecting"); 092 093 pipeAdv = (PipeAdvertisement)list.get(i); 094 OutputPipe outputPipe = pipeService.createOutputPipe(pipeAdv,30*1000); 095 if(outputPipe == null) { 096 System.out.println("AT::Unable to connect"); 097 continue; 098 } 099 100 message = new DaemonDaemonMessage(); 101 message.commandId = ExternalMessageType.GET_SHARES; 102 message.user = SharpsterDaemon.getPeerName(); 103 message.files = null; 104 message.response = null; 105 106 byteOutputStream = new ByteArrayOutputStream(); 107 objectOutputStream = new ObjectOutputStream(byteOutputStream); 108 objectOutputStream.writeObject(message); 109 objectOutputStream.flush(); 110 byteArrayData = byteOutputStream.toByteArray(); 111 objectOutputStream.close(); 112 byteOutputStream.close(); 113 114 PipeAdvertisement pipeAdvertisement = (PipeAdvertisement) 115 AdvertisementFactory.newAdvertisement(PipeAdvertisement.getAdvertisementType()); 116 pipeAdvertisement.setPipeID(IDFactory.newPipeID(sharpsterPeerGroup.getPeerGroupID())); 117 pipeAdvertisement.setName("Sharpster:Client:"+sharpster.daemon.SharpsterDaemon.getPeerName()); 118 pipeAdvertisement.setType(PipeService.UnicastType); 119 InputPipe inputPipe = pipeService.createInputPipe(pipeAdvertisement); 120 121 Message msg = pipeService.createMessage(); 122 123 streamMessageElement = 124 new InputStreamMessageElement("Sharpster:PipeAdv", 125 new MimeMediaType("text","xml"), 126 pipeAdvertisement.getDocument(new MimeMediaType("text","xml")).getStream(), 127 null); 128 129 byteMessageElement = 130 new ByteArrayMessageElement("Sharpster:Request", 131 null, byteArrayData, 132 0, byteArrayData.length, null); 133 134 msg.addMessageElement(streamMessageElement); 135 msg.addMessageElement(byteMessageElement); 136 137 outputPipe.send(msg); 138 System.out.println("AT::Sending command: "+message.commandId); 139 140 //maybe we should use poll here instead 141 msg = inputPipe.poll(30*1000); 142 if(msg == null) throw new Exception(); 143 144 byteMessageElement = 145 (ByteArrayMessageElement)msg.getMessageElement("Sharpster:Response"); 146 if(byteMessageElement == null) throw new Exception(); 147 148 byteArrayData = byteMessageElement.getBytes(); 149 byteInputStream = new ByteArrayInputStream(byteArrayData); 150 objectInputStream = new ObjectInputStream(byteInputStream); 151 message = (DaemonDaemonMessage)objectInputStream.readObject(); 152 objectInputStream.close(); 153 byteInputStream.close(); 154 155 if(message != null && message.response != null) { 156 System.out.println("AT::Got response"); 157 for(int j=0;j<message.response.getResponseCount();j++) { 158 Response resp = message.response.getResponse(j); 159 if(resp.getType() == ResponseType.SHARED_FILES) { 160 SharedFilesResponse sharedFilesResp = (SharedFilesResponse)resp; 161 userManager.addFileAccess(sharedFilesResp.getFiles(), message.user); 162 } 163 } 164 } 165 } 166 } 167 catch(Exception e) { 168 //e.printStackTrace(); 169 System.out.println("AT::Error fetching remote shares"); 170 } 171 } 172 /*JxtaSocket socket = null; 173 174 for(int i=0; i<list.size(); i++) { 175 pipeAdv = (PipeAdvertisement)list.get(i); 176 177 try { 178 System.out.print("EC::Updating shares"); 179 180 socket = new JxtaSocket(sharpsterPeerGroup, pipeAdv); 181 182 System.out.print("."); 183 184 //System.out.println("Client: creating input stream"); 185 ObjectInputStream in = new ObjectInputStream(socket.getInputStream()); 186 187 System.out.print("."); 188 189 //System.out.println("Client: reading data"); 190 String peerName = (String)in.readObject(); 191 //System.out.println("Client: received data from "+peerName); 192 193 System.out.print("."); 194 195 //System.out.println("Client: creating output stream"); 196 ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream()); 197 198 System.out.print("."); 199 200 DaemonDaemonMessage msg = new DaemonDaemonMessage(); 201 msg.commandId = ExternalMessageType.GET_SHARES; 202 msg.user = SharpsterDaemon.getPeerName(); 203 msg.files = null; 204 msg.response = null; 205 206 //System.out.println("Client: sending data"); 207 out.writeObject(msg); 208 out.flush(); 209 210 System.out.print("."); 211 212 //in = new ObjectInputStream(socket.getInputStream()); 213 //System.out.println("Client: reading data again"); 214 msg = (DaemonDaemonMessage)in.readObject(); 215 //System.out.println("Client: received"); 216 217 System.out.print("."); 218 219 if(msg != null && msg.response != null) { 220 //System.out.println("Client: Got my response ("+msg.response.getResponseCount()+")"); 221 for(int j=0;j<msg.response.getResponseCount();j++) { 222 Response resp = msg.response.getResponse(j); 223 if(resp.getType() == ResponseType.SHARED_FILES) { 224 SharedFilesResponse sharedFilesResp = (SharedFilesResponse)resp; 225 userManager.addFileAccess(sharedFilesResp.getFiles(), peerName); 226 } 227 } 228 } 229 230 //System.out.println("Client: closing socket"); 231 System.out.print("."); 232 in.close(); 233 System.out.print("."); 234 out.close(); 235 System.out.print("."); 236 socket.close(); 237 } catch (Exception e) { 238 e.printStackTrace(); 239 //System.out.print("failure, "); 240 //return; 241 } 242 243 System.out.println("done"); 244 }*/ 245 //} 246 247 private LinkedList discoverServices() { 248 LinkedList list = new LinkedList(); 249 Enumeration enum; 250 251 System.out.println("EC::Discovering services"); 252 253 try { 254 enum = discoveryService.getLocalAdvertisements(DiscoveryService.ADV, 255 "Name", 256 "Sharpster:Share:*"); 257 258 while(enum != null && enum.hasMoreElements()) { 259 try { 260 PipeAdvertisement pipeAdv = (PipeAdvertisement)enum.nextElement(); 261 System.out.println("EC::Found advertisement: " + pipeAdv.getName()); 262 String localAdv = "Sharpster:Share:"+SharpsterDaemon.getPeerName(); 263 if(!localAdv.equals(pipeAdv.getName())) { 264 String userName = pipeAdv.getName(); 265 userName = userName.substring(16); 266 userManager.addUser(userName); 267 list.add(pipeAdv); 268 } 269 } catch(Exception e) { 270 } 271 } 272 273 discoveryService.getRemoteAdvertisements(null, DiscoveryService.ADV, 274 "Name", "Sharpster:Share:*", 275 5, null); 276 } catch (Exception e) { 277 } 278 return list; 279 } 280 281 /** 282 * Runs the thread 283 * When this method returns, the thread dies. 284 */ 285 public void run() { 286 boolean alive = true; 287 System.out.println("EC::Advertisement thread has started"); 288 289 while(alive) { 290 LinkedList list = discoverServices(); 291 getRemoteShares(list); 292 if(shutdownRequested) { 293 break; 294 } 295 try { 296 sleep(searchPeriod); 297 } catch (Exception e) { 298 } 299 } 300 System.out.println("EC::Advertisement thread has shutdown."); 301 } 302 303 /** 304 * Notifies the thread to stop 305 * Sets shtudownRequested and starts to wait for the thread with join() 306 */ 307 public void shutdown() { 308 shutdownRequested = true; 309 try { 310 this.join(); 311 } catch (java.lang.InterruptedException e) { 312 } 313 } 314 } 315