001 /* 002 003 $Id: DaemonThread.java,v 1.13 2003/05/15 12:05:55 culdesac Exp $ 004 005 */ 006 007 package sharpster.daemon.externalcommunication; 008 009 import java.lang.Thread; 010 import java.io.*; 011 import java.util.*; 012 013 import sharpster.common.*; 014 import sharpster.daemon.SharpsterDaemon; 015 import sharpster.daemon.commandmanagement.ExternalCommandManager; 016 017 import net.jxta.peergroup.PeerGroup; 018 import net.jxta.peergroup.PeerGroupFactory; 019 import net.jxta.exception.PeerGroupException; 020 import net.jxta.document.AdvertisementFactory; 021 import net.jxta.document.StructuredDocumentFactory; 022 import net.jxta.document.Advertisement; 023 import net.jxta.document.Element; 024 import net.jxta.document.MimeMediaType; 025 import net.jxta.document.StructuredDocument; 026 import net.jxta.document.StructuredTextDocument; 027 import net.jxta.document.StructuredDocumentUtils; 028 import net.jxta.discovery.DiscoveryService; 029 import net.jxta.pipe.PipeService; 030 import net.jxta.pipe.InputPipe; 031 import net.jxta.pipe.OutputPipe; 032 import net.jxta.pipe.PipeID; 033 import net.jxta.protocol.PipeAdvertisement; 034 import net.jxta.protocol.PeerGroupAdvertisement; 035 import net.jxta.protocol.ModuleImplAdvertisement; 036 import net.jxta.protocol.ModuleClassAdvertisement; 037 import net.jxta.protocol.ModuleSpecAdvertisement; 038 import net.jxta.endpoint.*; 039 import net.jxta.platform.ModuleClassID; 040 import net.jxta.platform.ModuleSpecID; 041 import net.jxta.id.IDFactory; 042 import net.jxta.socket.*; 043 044 /** 045 * 046 */ 047 public class DaemonThread extends Thread { 048 049 private Mutex ecMutex; 050 private Mutex globalMutex; 051 private boolean shutdownRequested; 052 053 private PeerGroup sharpsterPeerGroup = null; 054 private PipeAdvertisement pipeAdvertisement; 055 private JxtaServerSocket serverSocket; 056 private DiscoveryService discoveryService; 057 private PipeService pipeService; 058 private int timeToLive = 1*60*1000; 059 private ExternalCommandManager commandManager; 060 private InputPipe inputPipe = null; 061 private OutputPipe outputPipe = null; 062 063 public DaemonThread (Mutex ec_mutex, 064 Mutex globalMutex) { 065 shutdownRequested = false; 066 ecMutex = ec_mutex; 067 this.globalMutex = globalMutex; 068 } 069 070 public boolean initialize (DiscoveryService disco, 071 PipeService pipes, 072 PeerGroup peerGroup, 073 ExternalCommandManager cm) { 074 discoveryService = disco; 075 pipeService = pipes; 076 sharpsterPeerGroup = peerGroup; 077 commandManager = cm; 078 079 createPipeAdvertisement(); 080 publishPipeAdvertisement(); 081 082 return true; 083 } 084 085 public void createPipeAdvertisement() { 086 try { 087 Enumeration enum = discoveryService.getLocalAdvertisements(DiscoveryService.ADV, 088 "Name", 089 "Sharpster:Share:"+SharpsterDaemon.getPeerName()); 090 091 if(enum != null && enum.hasMoreElements()) { 092 System.out.println("DT::Found an old advertisement, using it instead of creating a new one."); 093 pipeAdvertisement = (PipeAdvertisement)enum.nextElement(); 094 } 095 else { 096 System.out.println("DT::Found no old advertisement, creating a new one."); 097 pipeAdvertisement = (PipeAdvertisement) 098 AdvertisementFactory.newAdvertisement(PipeAdvertisement.getAdvertisementType()); 099 pipeAdvertisement.setPipeID(IDFactory.newPipeID(sharpsterPeerGroup.getPeerGroupID())); 100 pipeAdvertisement.setName("Sharpster:Share:"+sharpster.daemon.SharpsterDaemon.getPeerName()); 101 pipeAdvertisement.setType(PipeService.UnicastType); 102 } 103 104 inputPipe = pipeService.createInputPipe(pipeAdvertisement); 105 } catch (Exception e) { 106 System.out.println("Fatal error: could not create a pipe advertisement"); 107 e.printStackTrace(); 108 System.exit(1); 109 } 110 } 111 112 public void publishPipeAdvertisement() { 113 try { 114 //discoveryService.publish(pipeAdvertisement,DiscoveryService.ADV,DiscoveryService.INFINITE_LIFETIME,timeToLive); 115 discoveryService.publish(pipeAdvertisement,DiscoveryService.ADV,timeToLive,timeToLive); 116 discoveryService.remotePublish(pipeAdvertisement,DiscoveryService.ADV,timeToLive); 117 } catch (Exception e) { 118 System.out.println("Warning: could not publish the pipe advertisement"); 119 } 120 } 121 122 // private void receiveAndDelegate(JxtaSocket socket) { 123 private void receiveAndDelegate(Message msg) { 124 DaemonDaemonMessage message; 125 InputStream inputStream; 126 OutputStream ouputStream; 127 byte[] byteArrayData; 128 ByteArrayInputStream byteInputStream; 129 ByteArrayOutputStream byteOutputStream; 130 ObjectInputStream objectInputStream; 131 ObjectOutputStream objectOutputStream; 132 PipeAdvertisement clientPipeAdv; 133 ByteArrayMessageElement byteMessageElement; 134 InputStreamMessageElement streamMessageElement; 135 136 try { 137 inputStream = 138 msg.getMessageElement("Sharpster:PipeAdv").getStream(); 139 clientPipeAdv = (PipeAdvertisement) 140 AdvertisementFactory.newAdvertisement(new MimeMediaType("text","xml"), 141 inputStream); 142 143 byteMessageElement = 144 (ByteArrayMessageElement)msg.getMessageElement("Sharpster:Request"); 145 if(byteMessageElement == null) throw new Exception(); 146 147 byteArrayData = byteMessageElement.getBytes(); 148 byteInputStream = new ByteArrayInputStream(byteArrayData); 149 objectInputStream = new ObjectInputStream(byteInputStream); 150 message = (DaemonDaemonMessage)objectInputStream.readObject(); 151 objectInputStream.close(); 152 byteInputStream.close(); 153 154 System.out.println("DT::Command recieved: "+message.commandId); 155 message.response = new ResponseCollection(); 156 157 switch(message.commandId) { 158 case ExternalMessageType.GET_SHARES: 159 message.response = commandManager.getFileAccess(message.user); 160 break; 161 162 case ExternalMessageType.CHECKOUT_FILES: 163 message.response = commandManager.remoteCheckoutFiles(message.files,message.user,message.role); 164 break; 165 166 case ExternalMessageType.UPDATE_FILES: 167 message.response = commandManager.remoteUpdateFiles(message.files,message.user); 168 break; 169 170 case ExternalMessageType.COMMIT_FILES: 171 message.response = commandManager.remoteCommitFiles(message.files,message.user,message.role); 172 break; 173 174 case ExternalMessageType.ADD_FILES: 175 message.response = commandManager.remoteAddFiles(message.files,message.user); 176 break; 177 178 case ExternalMessageType.REMOVE_FILES: 179 message.response = commandManager.remoteRemoveFiles(message.files,message.user); 180 break; 181 } 182 183 message.files = null; 184 message.user = SharpsterDaemon.getPeerName(); 185 186 outputPipe = pipeService.createOutputPipe(clientPipeAdv, 187 30*1000); 188 189 if(outputPipe == null) throw new Exception(); 190 191 byteOutputStream = new ByteArrayOutputStream(); 192 objectOutputStream = new ObjectOutputStream(byteOutputStream); 193 objectOutputStream.writeObject(message); 194 objectOutputStream.flush(); 195 byteArrayData = byteOutputStream.toByteArray(); 196 objectOutputStream.close(); 197 byteOutputStream.close(); 198 199 byteMessageElement = new ByteArrayMessageElement("Sharpster:Response", 200 null, byteArrayData, 201 0, byteArrayData.length, null); 202 203 msg.addMessageElement(byteMessageElement); 204 outputPipe.send(msg); 205 } 206 catch(Exception e) { 207 //e.printStackTrace(); 208 } 209 210 /*try { 211 System.out.print("EC::Sending shares"); 212 ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream()); 213 214 System.out.print("."); 215 out.writeObject(SharpsterDaemon.getPeerName()); 216 out.flush(); 217 218 System.out.print("."); 219 ObjectInputStream in = new ObjectInputStream(socket.getInputStream()); 220 DaemonDaemonMessage msg = (DaemonDaemonMessage)in.readObject(); 221 222 msg.response = new ResponseCollection(); 223 224 switch(msg.commandId) { 225 case ExternalMessageType.GET_SHARES: 226 msg.response = commandManager.getFileAccess(msg.user); 227 break; 228 229 case ExternalMessageType.CHECKOUT_FILES: 230 msg.response = commandManager.remoteCheckoutFiles(msg.files,msg.user); 231 break; 232 233 case ExternalMessageType.UPDATE_FILES: 234 msg.response = commandManager.remoteUpdateFiles(msg.files,msg.user); 235 break; 236 237 case ExternalMessageType.COMMIT_FILES: 238 msg.response = commandManager.remoteCommitFiles(msg.files,msg.user); 239 break; 240 241 case ExternalMessageType.ADD_FILES: 242 msg.response = commandManager.remoteAddFiles(msg.files,msg.user); 243 break; 244 245 case ExternalMessageType.REMOVE_FILES: 246 msg.response = commandManager.remoteRemoveFiles(msg.files,msg.user); 247 break; 248 } 249 250 msg.files = null; 251 msg.user = SharpsterDaemon.getPeerName(); 252 253 System.out.print("."); 254 out.writeObject(msg); 255 out.flush(); 256 257 System.out.print("."); 258 out.close(); 259 System.out.print("."); 260 in.close(); 261 System.out.print("."); 262 socket.close(); 263 } catch (Exception ie) { 264 System.out.print("failure, "); 265 } 266 System.out.println("done");*/ 267 } 268 269 /** 270 * Runs the thread, when this method 271 * returns, the thread dies. 272 */ 273 public void run() { 274 boolean alive = true; 275 System.out.println("EC::Daemon thread has started"); 276 277 /*try { 278 serverSocket = new JxtaServerSocket(sharpsterPeerGroup, pipeAdvertisement); 279 serverSocket.setSoTimeout(0); 280 } catch (Exception e) { 281 System.out.println("Fatal error: could not create the socket."); 282 System.exit(1); 283 }*/ 284 285 Timer timer = new Timer(); 286 timer.scheduleAtFixedRate(new TimerTask() { 287 public void run() { 288 System.out.println("DT::Publishing advertisement"); 289 publishPipeAdvertisement(); 290 } 291 }, 0, timeToLive/2); 292 293 while(alive) { 294 try { 295 Message msg = inputPipe.waitForMessage(); 296 //JxtaSocket socket = serverSocket.accept(); 297 //receiveAndDelegate(socket); 298 receiveAndDelegate(msg); 299 } catch (Exception e) { 300 //e.printStackTrace(); 301 /*if(shutdownRequested) { 302 try { 303 serverSocket.close(); 304 } catch (IOException closeException) { 305 System.out.println("Warning: could not close the socket."); 306 } 307 alive = false; 308 }*/ 309 } 310 } 311 312 System.out.println("EC::Daemon thread has shutdown."); 313 } 314 315 /** 316 * Notifies the thread to stop 317 * Sets shtudownRequested and starts to wait for the thread with join() 318 */ 319 public void shutdown() { 320 shutdownRequested = true; 321 try { 322 this.join(); 323 } catch (java.lang.InterruptedException e) { 324 } 325 } 326 } 327