001 /*
002
003 $Id: AdvertisementThread.java,v 1.13 2003/05/08 10:34:22 culdesac Exp $
004
005 */
006
007 package sharpster.daemon.externalcommunication;
008
009 import java.lang.Thread;
010 import java.io.*;
011 import java.util.*;
012
013 import sharpster.common.*;
014 import sharpster.daemon.usermanagement.UserManager;
015 import sharpster.daemon.SharpsterDaemon;
016
017 import net.jxta.peergroup.PeerGroup;
018 import net.jxta.peergroup.PeerGroupFactory;
019 import net.jxta.exception.PeerGroupException;
020 import net.jxta.document.AdvertisementFactory;
021 import net.jxta.document.StructuredDocumentFactory;
022 import net.jxta.document.Advertisement;
023 import net.jxta.document.Element;
024 import net.jxta.document.MimeMediaType;
025 import net.jxta.document.StructuredDocument;
026 import net.jxta.document.StructuredTextDocument;
027 import net.jxta.document.StructuredDocumentUtils;
028 import net.jxta.discovery.DiscoveryService;
029 import net.jxta.pipe.PipeService;
030 import net.jxta.pipe.InputPipe;
031 import net.jxta.pipe.OutputPipe;
032 import net.jxta.pipe.PipeID;
033 import net.jxta.protocol.PipeAdvertisement;
034 import net.jxta.protocol.PeerGroupAdvertisement;
035 import net.jxta.protocol.ModuleImplAdvertisement;
036 import net.jxta.protocol.ModuleClassAdvertisement;
037 import net.jxta.protocol.ModuleSpecAdvertisement;
038 import net.jxta.endpoint.*;
039 import net.jxta.platform.ModuleClassID;
040 import net.jxta.platform.ModuleSpecID;
041 import net.jxta.id.IDFactory;
042 import net.jxta.socket.*;
043
044 /**
045 *
046 */
047 public class AdvertisementThread extends Thread {
048
049 private Mutex ecMutex;
050 // private Mutex commandManagerMutex
051 private boolean shutdownRequested;
052 private UserManager userManager;
053 private int searchPeriod = 30*1000;
054
055 private PeerGroup sharpsterPeerGroup = null;
056 private DiscoveryService discoveryService;
057 private PipeService pipeService;
058
059 public AdvertisementThread(Mutex mutex) {
060 shutdownRequested = false;
061 ecMutex = mutex;
062 }
063
064 public boolean initialize(DiscoveryService disco,
065 PipeService pipes,
066 PeerGroup peerGroup,
067 UserManager um) {
068 discoveryService = disco;
069 pipeService = pipes;
070 sharpsterPeerGroup = peerGroup;
071 userManager = um;
072
073 return true;
074 }
075
076 private void getRemoteShares(LinkedList list) {
077 PipeAdvertisement pipeAdv = null;
078 DaemonDaemonMessage message;
079 InputStream inputStream;
080 OutputStream ouputStream;
081 byte[] byteArrayData;
082 ByteArrayInputStream byteInputStream;
083 ByteArrayOutputStream byteOutputStream;
084 ObjectInputStream objectInputStream;
085 ObjectOutputStream objectOutputStream;
086 ByteArrayMessageElement byteMessageElement;
087 InputStreamMessageElement streamMessageElement;
088
089 try {
090 for(int i=0; i<list.size(); i++) {
091 System.out.println("AT::Connecting");
092
093 pipeAdv = (PipeAdvertisement)list.get(i);
094 OutputPipe outputPipe = pipeService.createOutputPipe(pipeAdv,30*1000);
095 if(outputPipe == null) {
096 System.out.println("AT::Unable to connect");
097 continue;
098 }
099
100 message = new DaemonDaemonMessage();
101 message.commandId = ExternalMessageType.GET_SHARES;
102 message.user = SharpsterDaemon.getPeerName();
103 message.files = null;
104 message.response = null;
105
106 byteOutputStream = new ByteArrayOutputStream();
107 objectOutputStream = new ObjectOutputStream(byteOutputStream);
108 objectOutputStream.writeObject(message);
109 objectOutputStream.flush();
110 byteArrayData = byteOutputStream.toByteArray();
111 objectOutputStream.close();
112 byteOutputStream.close();
113
114 PipeAdvertisement pipeAdvertisement = (PipeAdvertisement)
115 AdvertisementFactory.newAdvertisement(PipeAdvertisement.getAdvertisementType());
116 pipeAdvertisement.setPipeID(IDFactory.newPipeID(sharpsterPeerGroup.getPeerGroupID()));
117 pipeAdvertisement.setName("Sharpster:Client:"+sharpster.daemon.SharpsterDaemon.getPeerName());
118 pipeAdvertisement.setType(PipeService.UnicastType);
119 InputPipe inputPipe = pipeService.createInputPipe(pipeAdvertisement);
120
121 Message msg = pipeService.createMessage();
122
123 streamMessageElement =
124 new InputStreamMessageElement("Sharpster:PipeAdv",
125 new MimeMediaType("text","xml"),
126 pipeAdvertisement.getDocument(new MimeMediaType("text","xml")).getStream(),
127 null);
128
129 byteMessageElement =
130 new ByteArrayMessageElement("Sharpster:Request",
131 null, byteArrayData,
132 0, byteArrayData.length, null);
133
134 msg.addMessageElement(streamMessageElement);
135 msg.addMessageElement(byteMessageElement);
136
137 outputPipe.send(msg);
138 System.out.println("AT::Sending command: "+message.commandId);
139
140 //maybe we should use poll here instead
141 msg = inputPipe.poll(30*1000);
142 if(msg == null) throw new Exception();
143
144 byteMessageElement =
145 (ByteArrayMessageElement)msg.getMessageElement("Sharpster:Response");
146 if(byteMessageElement == null) throw new Exception();
147
148 byteArrayData = byteMessageElement.getBytes();
149 byteInputStream = new ByteArrayInputStream(byteArrayData);
150 objectInputStream = new ObjectInputStream(byteInputStream);
151 message = (DaemonDaemonMessage)objectInputStream.readObject();
152 objectInputStream.close();
153 byteInputStream.close();
154
155 if(message != null && message.response != null) {
156 System.out.println("AT::Got response");
157 for(int j=0;j<message.response.getResponseCount();j++) {
158 Response resp = message.response.getResponse(j);
159 if(resp.getType() == ResponseType.SHARED_FILES) {
160 SharedFilesResponse sharedFilesResp = (SharedFilesResponse)resp;
161 userManager.addFileAccess(sharedFilesResp.getFiles(), message.user);
162 }
163 }
164 }
165 }
166 }
167 catch(Exception e) {
168 //e.printStackTrace();
169 System.out.println("AT::Error fetching remote shares");
170 }
171 }
172 /*JxtaSocket socket = null;
173
174 for(int i=0; i<list.size(); i++) {
175 pipeAdv = (PipeAdvertisement)list.get(i);
176
177 try {
178 System.out.print("EC::Updating shares");
179
180 socket = new JxtaSocket(sharpsterPeerGroup, pipeAdv);
181
182 System.out.print(".");
183
184 //System.out.println("Client: creating input stream");
185 ObjectInputStream in = new ObjectInputStream(socket.getInputStream());
186
187 System.out.print(".");
188
189 //System.out.println("Client: reading data");
190 String peerName = (String)in.readObject();
191 //System.out.println("Client: received data from "+peerName);
192
193 System.out.print(".");
194
195 //System.out.println("Client: creating output stream");
196 ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream());
197
198 System.out.print(".");
199
200 DaemonDaemonMessage msg = new DaemonDaemonMessage();
201 msg.commandId = ExternalMessageType.GET_SHARES;
202 msg.user = SharpsterDaemon.getPeerName();
203 msg.files = null;
204 msg.response = null;
205
206 //System.out.println("Client: sending data");
207 out.writeObject(msg);
208 out.flush();
209
210 System.out.print(".");
211
212 //in = new ObjectInputStream(socket.getInputStream());
213 //System.out.println("Client: reading data again");
214 msg = (DaemonDaemonMessage)in.readObject();
215 //System.out.println("Client: received");
216
217 System.out.print(".");
218
219 if(msg != null && msg.response != null) {
220 //System.out.println("Client: Got my response ("+msg.response.getResponseCount()+")");
221 for(int j=0;j<msg.response.getResponseCount();j++) {
222 Response resp = msg.response.getResponse(j);
223 if(resp.getType() == ResponseType.SHARED_FILES) {
224 SharedFilesResponse sharedFilesResp = (SharedFilesResponse)resp;
225 userManager.addFileAccess(sharedFilesResp.getFiles(), peerName);
226 }
227 }
228 }
229
230 //System.out.println("Client: closing socket");
231 System.out.print(".");
232 in.close();
233 System.out.print(".");
234 out.close();
235 System.out.print(".");
236 socket.close();
237 } catch (Exception e) {
238 e.printStackTrace();
239 //System.out.print("failure, ");
240 //return;
241 }
242
243 System.out.println("done");
244 }*/
245 //}
246
247 private LinkedList discoverServices() {
248 LinkedList list = new LinkedList();
249 Enumeration enum;
250
251 System.out.println("EC::Discovering services");
252
253 try {
254 enum = discoveryService.getLocalAdvertisements(DiscoveryService.ADV,
255 "Name",
256 "Sharpster:Share:*");
257
258 while(enum != null && enum.hasMoreElements()) {
259 try {
260 PipeAdvertisement pipeAdv = (PipeAdvertisement)enum.nextElement();
261 System.out.println("EC::Found advertisement: " + pipeAdv.getName());
262 String localAdv = "Sharpster:Share:"+SharpsterDaemon.getPeerName();
263 if(!localAdv.equals(pipeAdv.getName())) {
264 String userName = pipeAdv.getName();
265 userName = userName.substring(16);
266 userManager.addUser(userName);
267 list.add(pipeAdv);
268 }
269 } catch(Exception e) {
270 }
271 }
272
273 discoveryService.getRemoteAdvertisements(null, DiscoveryService.ADV,
274 "Name", "Sharpster:Share:*",
275 5, null);
276 } catch (Exception e) {
277 }
278 return list;
279 }
280
281 /**
282 * Runs the thread
283 * When this method returns, the thread dies.
284 */
285 public void run() {
286 boolean alive = true;
287 System.out.println("EC::Advertisement thread has started");
288
289 while(alive) {
290 LinkedList list = discoverServices();
291 getRemoteShares(list);
292 if(shutdownRequested) {
293 break;
294 }
295 try {
296 sleep(searchPeriod);
297 } catch (Exception e) {
298 }
299 }
300 System.out.println("EC::Advertisement thread has shutdown.");
301 }
302
303 /**
304 * Notifies the thread to stop
305 * Sets shtudownRequested and starts to wait for the thread with join()
306 */
307 public void shutdown() {
308 shutdownRequested = true;
309 try {
310 this.join();
311 } catch (java.lang.InterruptedException e) {
312 }
313 }
314 }
315