001 /*
002
003 $Id: ExternalCommunication.java,v 1.9 2003/05/15 10:46:54 culdesac Exp $
004
005 */
006
007 package sharpster.daemon.externalcommunication;
008
009 import sharpster.common.FileCollection;
010 import sharpster.common.ResponseCollection;
011 import sharpster.daemon.usermanagement.UserManager;
012 import sharpster.daemon.commandmanagement.ExternalCommandManager;
013 import sharpster.daemon.SharpsterDaemon;
014 import sharpster.daemon.externalcommunication.DaemonThread;
015 import sharpster.daemon.externalcommunication.AdvertisementThread;
016 import sharpster.common.*;
017
018 import java.io.*;
019 import java.util.*;
020
021 import net.jxta.endpoint.*;
022 import net.jxta.peergroup.PeerGroup;
023 import net.jxta.peergroup.PeerGroupFactory;
024 import net.jxta.exception.PeerGroupException;
025 import net.jxta.document.AdvertisementFactory;
026 import net.jxta.document.StructuredDocumentFactory;
027 import net.jxta.document.Advertisement;
028 import net.jxta.document.Element;
029 import net.jxta.document.MimeMediaType;
030 import net.jxta.document.StructuredDocument;
031 import net.jxta.document.StructuredTextDocument;
032 import net.jxta.document.StructuredDocumentUtils;
033 import net.jxta.discovery.DiscoveryService;
034 import net.jxta.pipe.PipeService;
035 import net.jxta.pipe.InputPipe;
036 import net.jxta.pipe.OutputPipe;
037 import net.jxta.pipe.PipeID;
038 import net.jxta.protocol.PipeAdvertisement;
039 import net.jxta.protocol.PeerGroupAdvertisement;
040 import net.jxta.protocol.ModuleImplAdvertisement;
041 import net.jxta.protocol.ModuleClassAdvertisement;
042 import net.jxta.protocol.ModuleSpecAdvertisement;
043 import net.jxta.endpoint.Message;
044 import net.jxta.platform.ModuleClassID;
045 import net.jxta.platform.ModuleSpecID;
046 import net.jxta.id.IDFactory;
047 import net.jxta.socket.*;
048
049 /**
050 * Class responsible for communcating with other Sharpster daemons.
051 */
052 public class ExternalCommunication {
053
054 private UserManager userManager;
055 private ExternalCommandManager externalCommand;
056 private DaemonThread daemonThread;
057 private AdvertisementThread advertisementThread;
058 private Mutex ecMutex;
059 private Mutex globalMutex;
060
061 private PeerGroup netPeerGroup = null;
062 private PeerGroup sharpsterPeerGroup = null;
063 private DiscoveryService discoveryService = null;
064 private PipeService pipeService = null;
065
066 /**
067 *
068 */
069 public ExternalCommunication(Mutex globalMutex) {
070 ecMutex = new Mutex();
071 this.globalMutex = globalMutex;
072 daemonThread = new DaemonThread(ecMutex, this.globalMutex);
073 advertisementThread = new AdvertisementThread(ecMutex);
074 }
075
076 /**
077 *
078 */
079 public void start() {
080 try {
081 netPeerGroup = PeerGroupFactory.newNetPeerGroup();
082 }
083 catch(PeerGroupException e) {
084 System.out.println("Fatal error: unable to create the default peer group");
085 System.exit(1);
086 }
087 try {
088 createPeerGroup();
089 }
090 catch(Exception e) {
091 System.out.println("Fatal error: unable to create the SharpsterGroup peer group");
092 System.exit(1);
093 }
094
095 advertisementThread.initialize(discoveryService,
096 pipeService,
097 sharpsterPeerGroup,
098 userManager);
099
100 daemonThread.initialize(discoveryService,
101 pipeService,
102 sharpsterPeerGroup,
103 externalCommand);
104
105 daemonThread.start();
106 advertisementThread.start();
107 }
108
109 /**
110 *
111 */
112 public void shutdown() {
113 daemonThread.shutdown();
114 advertisementThread.shutdown();
115 }
116
117 /**
118 *
119 */
120 public void initialize(UserManager um,
121 ExternalCommandManager ecm) {
122 userManager = um;
123 externalCommand = ecm;
124 }
125
126 /**
127 *
128 */
129 public void clearLocalCache() {
130 try {
131 discoveryService = netPeerGroup.getDiscoveryService();
132 discoveryService.flushAdvertisements(null,0);
133 //discoveryService.publish(group.getPeerAdvertisement(),DiscoveryService.PEER);
134 } catch(Exception e) {
135 System.out.println("Warning: unable to clear the local cache");
136 }
137 }
138
139 /**
140 *
141 */
142 public void createPeerGroup() throws Exception {
143 Enumeration enum = null;
144
145 SharpsterDaemon.setPeerName(netPeerGroup.getPeerName());
146 discoveryService = netPeerGroup.getDiscoveryService();
147
148 //clearLocalCache();
149 /*
150 for(int i=0; i<3; i++) {
151 System.out.println("EC::Trying to find local advertisements [" + (i+1) + " of 3]");
152 enum = discoveryService.getLocalAdvertisements(DiscoveryService.GROUP,
153 "Name",
154 "SharpsterGroup");
155 if(enum != null && enum.hasMoreElements()) {
156 break;
157 }
158
159 discoveryService.getRemoteAdvertisements(null,
160 DiscoveryService.GROUP,
161 "Name",
162 "SharpsterGroup", 1, null);
163 try {
164 Thread.sleep(3000);
165 }
166 catch (Exception e) {
167 }
168 }
169
170 if(enum == null || !enum.hasMoreElements()) {
171 try {
172 ModuleImplAdvertisement implAdv = netPeerGroup.getAllPurposePeerGroupImplAdvertisement();
173 sharpsterPeerGroup = netPeerGroup.newGroup(null, implAdv, "SharpsterGroup",
174 "The Sharpster share group");
175 System.out.println("EC::Could not find a group advertisement, creating one.");
176 }
177 catch(Exception e) {
178 throw e;
179 }
180 } else {
181 try {
182 PeerGroupAdvertisement netAdv = null;
183 netAdv = (PeerGroupAdvertisement)enum.nextElement();
184 sharpsterPeerGroup = netPeerGroup.newGroup(netAdv);
185 System.out.println("EC::Found a group advertisement.");
186 }
187 catch(Exception e) {
188 throw e;
189 }
190 }*/
191
192 try {
193 //We are using a static group advertisement instead of creating one.
194 PeerGroupAdvertisement netAdv = null;
195 FileInputStream is = new FileInputStream("config/SharpsterGroup.adv");
196 netAdv = (PeerGroupAdvertisement)AdvertisementFactory.newAdvertisement(MimeMediaType.XMLUTF8, is);
197 is.close();
198 sharpsterPeerGroup = netPeerGroup.newGroup(netAdv);
199 }
200 catch(Exception e) {
201 throw e;
202 }
203
204 try {
205 discoveryService = sharpsterPeerGroup.getDiscoveryService();
206 pipeService = sharpsterPeerGroup.getPipeService();
207 }
208 catch(Exception e) {
209 throw e;
210 }
211 }
212
213 /*
214 *
215 */
216 public LinkedList findLocalPeerServices(String nameFilter) {
217 LinkedList list = new LinkedList();
218 Enumeration enum;
219
220 try {
221 enum = discoveryService.getLocalAdvertisements(DiscoveryService.ADV,
222 "Name",
223 "Sharpster:Share:"+nameFilter);
224
225 while(enum != null && enum.hasMoreElements()) {
226 try {
227 PipeAdvertisement pipeAdv = (PipeAdvertisement)enum.nextElement();
228 String localAdv = "Sharpster:Share:"+SharpsterDaemon.getPeerName();
229 if(!localAdv.equals(pipeAdv.getName())) {
230 String userName = pipeAdv.getName();
231 userName = userName.substring(16);
232 userManager.addUser(userName);
233 list.add(pipeAdv);
234 }
235 } catch(Exception e) {
236 }
237 }
238 } catch (Exception e) {
239 }
240
241 return list;
242 }
243
244 private ResponseCollection sendCommand(FileCollection files,
245 String user,
246 int command,
247 String role) {
248 PipeAdvertisement pipeAdv = null;
249 DaemonDaemonMessage message;
250 InputStream inputStream;
251 OutputStream ouputStream;
252 byte[] byteArrayData;
253 ByteArrayInputStream byteInputStream;
254 ByteArrayOutputStream byteOutputStream;
255 ObjectInputStream objectInputStream;
256 ObjectOutputStream objectOutputStream;
257 ByteArrayMessageElement byteMessageElement;
258 InputStreamMessageElement streamMessageElement;
259
260 ResponseCollection responses = new ResponseCollection();
261 LinkedList list = findLocalPeerServices(user);
262
263 if(list == null || list.size() == 0) {
264 NetworkErrorResponse error = new NetworkErrorResponse();
265 error.setOrigin("ExternalCommunication");
266 error.setUser(SharpsterDaemon.getPeerName());
267 error.setError(true);
268 error.setMessage("Unable to connect to "+user);
269 responses.addResponse(error);
270 return responses;
271 }
272
273 if(list.size() > 1) pipeAdv = (PipeAdvertisement)list.getLast();
274 else pipeAdv = (PipeAdvertisement)list.getFirst();
275
276 try {
277 OutputPipe outputPipe = pipeService.createOutputPipe(pipeAdv,30*1000);
278 if(outputPipe == null) {
279 throw new Exception();
280 }
281
282 message = new DaemonDaemonMessage();
283 message.commandId = command;
284 message.user = SharpsterDaemon.getPeerName();
285 message.files = files;
286 message.role = role;
287 message.response = null;
288
289 byteOutputStream = new ByteArrayOutputStream();
290 objectOutputStream = new ObjectOutputStream(byteOutputStream);
291 objectOutputStream.writeObject(message);
292 objectOutputStream.flush();
293 byteArrayData = byteOutputStream.toByteArray();
294 objectOutputStream.close();
295 byteOutputStream.close();
296
297 PipeAdvertisement pipeAdvertisement = (PipeAdvertisement)
298 AdvertisementFactory.newAdvertisement(PipeAdvertisement.getAdvertisementType());
299 pipeAdvertisement.setPipeID(IDFactory.newPipeID(sharpsterPeerGroup.getPeerGroupID()));
300 pipeAdvertisement.setName("Sharpster:Client:"+sharpster.daemon.SharpsterDaemon.getPeerName());
301 pipeAdvertisement.setType(PipeService.UnicastType);
302 InputPipe inputPipe = pipeService.createInputPipe(pipeAdvertisement);
303
304 Message msg = pipeService.createMessage();
305
306 streamMessageElement =
307 new InputStreamMessageElement("Sharpster:PipeAdv",
308 new MimeMediaType("text","xml"),
309 pipeAdvertisement.getDocument(new MimeMediaType("text","xml")).getStream(),
310 null);
311
312 byteMessageElement =
313 new ByteArrayMessageElement("Sharpster:Request",
314 null, byteArrayData,
315 0, byteArrayData.length, null);
316
317 msg.addMessageElement(streamMessageElement);
318 msg.addMessageElement(byteMessageElement);
319
320 outputPipe.send(msg);
321 System.out.println("EC::Sending command: "+message.commandId);
322
323 //maybe we should use poll here instead
324 msg = inputPipe.waitForMessage();
325
326 byteMessageElement =
327 (ByteArrayMessageElement)msg.getMessageElement("Sharpster:Response");
328 if(byteMessageElement == null) throw new Exception();
329
330 byteArrayData = byteMessageElement.getBytes();
331 byteInputStream = new ByteArrayInputStream(byteArrayData);
332 objectInputStream = new ObjectInputStream(byteInputStream);
333 message = (DaemonDaemonMessage)objectInputStream.readObject();
334 objectInputStream.close();
335 byteInputStream.close();
336
337 responses = message.response;
338
339 /*JxtaSocket socket = new JxtaSocket(sharpsterPeerGroup, pipeAdv);
340 ObjectInputStream in = new ObjectInputStream(socket.getInputStream());
341 String peerName = (String)in.readObject();
342
343 ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream());
344 DaemonDaemonMessage msg = new DaemonDaemonMessage();
345 msg.commandId = command;
346 msg.user = SharpsterDaemon.getPeerName();
347 msg.files = files;
348 msg.response = null;
349
350 out.writeObject(msg);
351 out.flush();
352
353 msg = (DaemonDaemonMessage)in.readObject();
354 if(msg != null) responses = msg.response;
355 else throw new Exception();
356
357 in.close();
358 out.close();
359 socket.close();*/
360 }
361 catch(Exception e) {
362 //e.printStackTrace();
363 NetworkErrorResponse error = new NetworkErrorResponse();
364 error.setOrigin("ExternalCommunication");
365 error.setUser(SharpsterDaemon.getPeerName());
366 error.setError(true);
367 error.setMessage("Error during connection with "+user);
368 responses.addResponse(error);
369 return responses;
370 }
371
372 return responses;
373 }
374
375 /**
376 * Executes a checkout from a CVS of an external user.
377 */
378 public ResponseCollection checkoutFiles(FileCollection files,
379 String user,
380 String role) {
381 return sendCommand(files,user,ExternalMessageType.CHECKOUT_FILES,role);
382 }
383
384 /**
385 * Executes a commit to a CVS of an external user.
386 */
387 public ResponseCollection commitFiles(FileCollection files,
388 String user,
389 String role) {
390 return sendCommand(files,user,ExternalMessageType.COMMIT_FILES,role);
391 }
392
393 /**
394 * Executes a update to a CVS of an external user.
395 */
396 public ResponseCollection updateFiles(FileCollection files, String user) {
397 return sendCommand(files,user,ExternalMessageType.UPDATE_FILES,null);
398 }
399
400 /**
401 * Executes a file add to a CVS of an external user.
402 */
403 public ResponseCollection addFiles(FileCollection files, String user) {
404 return sendCommand(files,user,ExternalMessageType.ADD_FILES,null);
405 }
406
407 /**
408 * Executes a file remove to a CVS of an external user.
409 */
410 public ResponseCollection removeFiles(FileCollection files, String user) {
411 return sendCommand(files,user,ExternalMessageType.REMOVE_FILES,null);
412 }
413 }