001 /*
002
003 $Id: DaemonThread.java,v 1.13 2003/05/15 12:05:55 culdesac Exp $
004
005 */
006
007 package sharpster.daemon.externalcommunication;
008
009 import java.lang.Thread;
010 import java.io.*;
011 import java.util.*;
012
013 import sharpster.common.*;
014 import sharpster.daemon.SharpsterDaemon;
015 import sharpster.daemon.commandmanagement.ExternalCommandManager;
016
017 import net.jxta.peergroup.PeerGroup;
018 import net.jxta.peergroup.PeerGroupFactory;
019 import net.jxta.exception.PeerGroupException;
020 import net.jxta.document.AdvertisementFactory;
021 import net.jxta.document.StructuredDocumentFactory;
022 import net.jxta.document.Advertisement;
023 import net.jxta.document.Element;
024 import net.jxta.document.MimeMediaType;
025 import net.jxta.document.StructuredDocument;
026 import net.jxta.document.StructuredTextDocument;
027 import net.jxta.document.StructuredDocumentUtils;
028 import net.jxta.discovery.DiscoveryService;
029 import net.jxta.pipe.PipeService;
030 import net.jxta.pipe.InputPipe;
031 import net.jxta.pipe.OutputPipe;
032 import net.jxta.pipe.PipeID;
033 import net.jxta.protocol.PipeAdvertisement;
034 import net.jxta.protocol.PeerGroupAdvertisement;
035 import net.jxta.protocol.ModuleImplAdvertisement;
036 import net.jxta.protocol.ModuleClassAdvertisement;
037 import net.jxta.protocol.ModuleSpecAdvertisement;
038 import net.jxta.endpoint.*;
039 import net.jxta.platform.ModuleClassID;
040 import net.jxta.platform.ModuleSpecID;
041 import net.jxta.id.IDFactory;
042 import net.jxta.socket.*;
043
044 /**
045 *
046 */
047 public class DaemonThread extends Thread {
048
049 private Mutex ecMutex;
050 private Mutex globalMutex;
051 private boolean shutdownRequested;
052
053 private PeerGroup sharpsterPeerGroup = null;
054 private PipeAdvertisement pipeAdvertisement;
055 private JxtaServerSocket serverSocket;
056 private DiscoveryService discoveryService;
057 private PipeService pipeService;
058 private int timeToLive = 1*60*1000;
059 private ExternalCommandManager commandManager;
060 private InputPipe inputPipe = null;
061 private OutputPipe outputPipe = null;
062
063 public DaemonThread (Mutex ec_mutex,
064 Mutex globalMutex) {
065 shutdownRequested = false;
066 ecMutex = ec_mutex;
067 this.globalMutex = globalMutex;
068 }
069
070 public boolean initialize (DiscoveryService disco,
071 PipeService pipes,
072 PeerGroup peerGroup,
073 ExternalCommandManager cm) {
074 discoveryService = disco;
075 pipeService = pipes;
076 sharpsterPeerGroup = peerGroup;
077 commandManager = cm;
078
079 createPipeAdvertisement();
080 publishPipeAdvertisement();
081
082 return true;
083 }
084
085 public void createPipeAdvertisement() {
086 try {
087 Enumeration enum = discoveryService.getLocalAdvertisements(DiscoveryService.ADV,
088 "Name",
089 "Sharpster:Share:"+SharpsterDaemon.getPeerName());
090
091 if(enum != null && enum.hasMoreElements()) {
092 System.out.println("DT::Found an old advertisement, using it instead of creating a new one.");
093 pipeAdvertisement = (PipeAdvertisement)enum.nextElement();
094 }
095 else {
096 System.out.println("DT::Found no old advertisement, creating a new one.");
097 pipeAdvertisement = (PipeAdvertisement)
098 AdvertisementFactory.newAdvertisement(PipeAdvertisement.getAdvertisementType());
099 pipeAdvertisement.setPipeID(IDFactory.newPipeID(sharpsterPeerGroup.getPeerGroupID()));
100 pipeAdvertisement.setName("Sharpster:Share:"+sharpster.daemon.SharpsterDaemon.getPeerName());
101 pipeAdvertisement.setType(PipeService.UnicastType);
102 }
103
104 inputPipe = pipeService.createInputPipe(pipeAdvertisement);
105 } catch (Exception e) {
106 System.out.println("Fatal error: could not create a pipe advertisement");
107 e.printStackTrace();
108 System.exit(1);
109 }
110 }
111
112 public void publishPipeAdvertisement() {
113 try {
114 //discoveryService.publish(pipeAdvertisement,DiscoveryService.ADV,DiscoveryService.INFINITE_LIFETIME,timeToLive);
115 discoveryService.publish(pipeAdvertisement,DiscoveryService.ADV,timeToLive,timeToLive);
116 discoveryService.remotePublish(pipeAdvertisement,DiscoveryService.ADV,timeToLive);
117 } catch (Exception e) {
118 System.out.println("Warning: could not publish the pipe advertisement");
119 }
120 }
121
122 // private void receiveAndDelegate(JxtaSocket socket) {
123 private void receiveAndDelegate(Message msg) {
124 DaemonDaemonMessage message;
125 InputStream inputStream;
126 OutputStream ouputStream;
127 byte[] byteArrayData;
128 ByteArrayInputStream byteInputStream;
129 ByteArrayOutputStream byteOutputStream;
130 ObjectInputStream objectInputStream;
131 ObjectOutputStream objectOutputStream;
132 PipeAdvertisement clientPipeAdv;
133 ByteArrayMessageElement byteMessageElement;
134 InputStreamMessageElement streamMessageElement;
135
136 try {
137 inputStream =
138 msg.getMessageElement("Sharpster:PipeAdv").getStream();
139 clientPipeAdv = (PipeAdvertisement)
140 AdvertisementFactory.newAdvertisement(new MimeMediaType("text","xml"),
141 inputStream);
142
143 byteMessageElement =
144 (ByteArrayMessageElement)msg.getMessageElement("Sharpster:Request");
145 if(byteMessageElement == null) throw new Exception();
146
147 byteArrayData = byteMessageElement.getBytes();
148 byteInputStream = new ByteArrayInputStream(byteArrayData);
149 objectInputStream = new ObjectInputStream(byteInputStream);
150 message = (DaemonDaemonMessage)objectInputStream.readObject();
151 objectInputStream.close();
152 byteInputStream.close();
153
154 System.out.println("DT::Command recieved: "+message.commandId);
155 message.response = new ResponseCollection();
156
157 switch(message.commandId) {
158 case ExternalMessageType.GET_SHARES:
159 message.response = commandManager.getFileAccess(message.user);
160 break;
161
162 case ExternalMessageType.CHECKOUT_FILES:
163 message.response = commandManager.remoteCheckoutFiles(message.files,message.user,message.role);
164 break;
165
166 case ExternalMessageType.UPDATE_FILES:
167 message.response = commandManager.remoteUpdateFiles(message.files,message.user);
168 break;
169
170 case ExternalMessageType.COMMIT_FILES:
171 message.response = commandManager.remoteCommitFiles(message.files,message.user,message.role);
172 break;
173
174 case ExternalMessageType.ADD_FILES:
175 message.response = commandManager.remoteAddFiles(message.files,message.user);
176 break;
177
178 case ExternalMessageType.REMOVE_FILES:
179 message.response = commandManager.remoteRemoveFiles(message.files,message.user);
180 break;
181 }
182
183 message.files = null;
184 message.user = SharpsterDaemon.getPeerName();
185
186 outputPipe = pipeService.createOutputPipe(clientPipeAdv,
187 30*1000);
188
189 if(outputPipe == null) throw new Exception();
190
191 byteOutputStream = new ByteArrayOutputStream();
192 objectOutputStream = new ObjectOutputStream(byteOutputStream);
193 objectOutputStream.writeObject(message);
194 objectOutputStream.flush();
195 byteArrayData = byteOutputStream.toByteArray();
196 objectOutputStream.close();
197 byteOutputStream.close();
198
199 byteMessageElement = new ByteArrayMessageElement("Sharpster:Response",
200 null, byteArrayData,
201 0, byteArrayData.length, null);
202
203 msg.addMessageElement(byteMessageElement);
204 outputPipe.send(msg);
205 }
206 catch(Exception e) {
207 //e.printStackTrace();
208 }
209
210 /*try {
211 System.out.print("EC::Sending shares");
212 ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream());
213
214 System.out.print(".");
215 out.writeObject(SharpsterDaemon.getPeerName());
216 out.flush();
217
218 System.out.print(".");
219 ObjectInputStream in = new ObjectInputStream(socket.getInputStream());
220 DaemonDaemonMessage msg = (DaemonDaemonMessage)in.readObject();
221
222 msg.response = new ResponseCollection();
223
224 switch(msg.commandId) {
225 case ExternalMessageType.GET_SHARES:
226 msg.response = commandManager.getFileAccess(msg.user);
227 break;
228
229 case ExternalMessageType.CHECKOUT_FILES:
230 msg.response = commandManager.remoteCheckoutFiles(msg.files,msg.user);
231 break;
232
233 case ExternalMessageType.UPDATE_FILES:
234 msg.response = commandManager.remoteUpdateFiles(msg.files,msg.user);
235 break;
236
237 case ExternalMessageType.COMMIT_FILES:
238 msg.response = commandManager.remoteCommitFiles(msg.files,msg.user);
239 break;
240
241 case ExternalMessageType.ADD_FILES:
242 msg.response = commandManager.remoteAddFiles(msg.files,msg.user);
243 break;
244
245 case ExternalMessageType.REMOVE_FILES:
246 msg.response = commandManager.remoteRemoveFiles(msg.files,msg.user);
247 break;
248 }
249
250 msg.files = null;
251 msg.user = SharpsterDaemon.getPeerName();
252
253 System.out.print(".");
254 out.writeObject(msg);
255 out.flush();
256
257 System.out.print(".");
258 out.close();
259 System.out.print(".");
260 in.close();
261 System.out.print(".");
262 socket.close();
263 } catch (Exception ie) {
264 System.out.print("failure, ");
265 }
266 System.out.println("done");*/
267 }
268
269 /**
270 * Runs the thread, when this method
271 * returns, the thread dies.
272 */
273 public void run() {
274 boolean alive = true;
275 System.out.println("EC::Daemon thread has started");
276
277 /*try {
278 serverSocket = new JxtaServerSocket(sharpsterPeerGroup, pipeAdvertisement);
279 serverSocket.setSoTimeout(0);
280 } catch (Exception e) {
281 System.out.println("Fatal error: could not create the socket.");
282 System.exit(1);
283 }*/
284
285 Timer timer = new Timer();
286 timer.scheduleAtFixedRate(new TimerTask() {
287 public void run() {
288 System.out.println("DT::Publishing advertisement");
289 publishPipeAdvertisement();
290 }
291 }, 0, timeToLive/2);
292
293 while(alive) {
294 try {
295 Message msg = inputPipe.waitForMessage();
296 //JxtaSocket socket = serverSocket.accept();
297 //receiveAndDelegate(socket);
298 receiveAndDelegate(msg);
299 } catch (Exception e) {
300 //e.printStackTrace();
301 /*if(shutdownRequested) {
302 try {
303 serverSocket.close();
304 } catch (IOException closeException) {
305 System.out.println("Warning: could not close the socket.");
306 }
307 alive = false;
308 }*/
309 }
310 }
311
312 System.out.println("EC::Daemon thread has shutdown.");
313 }
314
315 /**
316 * Notifies the thread to stop
317 * Sets shtudownRequested and starts to wait for the thread with join()
318 */
319 public void shutdown() {
320 shutdownRequested = true;
321 try {
322 this.join();
323 } catch (java.lang.InterruptedException e) {
324 }
325 }
326 }
327