2 * Copyright (c) 2001-2007 Sun Microsystems, Inc. All rights reserved.
4 * The Sun Project JXTA(TM) Software License
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
9 * 1. Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
16 * 3. The end-user documentation included with the redistribution, if any, must
17 * include the following acknowledgment: "This product includes software
18 * developed by Sun Microsystems, Inc. for JXTA(TM) technology."
19 * Alternately, this acknowledgment may appear in the software itself, if
20 * and wherever such third-party acknowledgments normally appear.
22 * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must
23 * not be used to endorse or promote products derived from this software
24 * without prior written permission. For written permission, please contact
25 * Project JXTA at http://www.jxta.org.
27 * 5. Products derived from this software may not be called "JXTA", nor may
28 * "JXTA" appear in their name, without prior written permission of Sun.
30 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
31 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
32 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN
33 * MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
34 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
36 * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
37 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
38 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
39 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
41 * JXTA is a registered trademark of Sun Microsystems, Inc. in the United
42 * States and other countries.
44 * Please see the license information page at :
45 * <http://www.jxta.org/project/www/license.html> for instructions on use of
46 * the license in source files.
48 * ====================================================================
50 * This software consists of voluntary contributions made by many individuals
51 * on behalf of Project JXTA. For more information on Project JXTA, please see
52 * http://www.jxta.org.
54 * This license is based on the BSD license adopted by the Apache Foundation.
57 package net.jxta.impl.endpoint.cbjx;
60 import net.jxta.document.Advertisement;
61 import net.jxta.document.MimeMediaType;
62 import net.jxta.document.TextDocument;
63 import net.jxta.endpoint.ByteArrayMessageElement;
64 import net.jxta.endpoint.EndpointAddress;
65 import net.jxta.endpoint.EndpointListener;
66 import net.jxta.endpoint.EndpointService;
67 import net.jxta.endpoint.Message;
68 import net.jxta.endpoint.MessageElement;
69 import net.jxta.endpoint.MessageFilterListener;
70 import net.jxta.endpoint.MessageReceiver;
71 import net.jxta.endpoint.MessageSender;
72 import net.jxta.endpoint.Messenger;
73 import net.jxta.endpoint.TextDocumentMessageElement;
74 import net.jxta.endpoint.WireFormatMessage;
75 import net.jxta.endpoint.WireFormatMessageFactory;
76 import net.jxta.exception.PeerGroupException;
77 import net.jxta.id.ID;
78 import net.jxta.id.IDFactory;
79 import net.jxta.impl.endpoint.JxtaMessageMessageElement;
80 import net.jxta.impl.membership.pse.PSECredential;
81 import net.jxta.impl.membership.pse.PSEMembershipService;
82 import net.jxta.impl.membership.pse.PSEUtils;
83 import net.jxta.logging.Logging;
84 import net.jxta.membership.MembershipService;
85 import net.jxta.peer.PeerID;
86 import net.jxta.peergroup.PeerGroup;
87 import net.jxta.platform.Module;
88 import net.jxta.protocol.ModuleImplAdvertisement;
90 import java.io.IOException;
91 import java.security.cert.Certificate;
92 import java.security.interfaces.RSAPublicKey;
93 import java.util.Collections;
94 import java.util.Iterator;
95 import java.util.logging.Level;
96 import java.util.logging.Logger;
100 * A JXTA {@link net.jxta.endpoint.MessageTransport} implementation which
101 * which provides message verification by examining message signatures. A
102 * virtual transport, the messages are transfered between peers using some
103 * other message transport.
105 public class CbJxTransport implements Module, MessageSender, MessageReceiver, EndpointListener {
110 private final static Logger LOG = Logger.getLogger(CbJxTransport.class.getName());
113 * the name of the cbjx valid element
115 public static final String CBJX_MSG_NS = "cbjx";
118 * the name of the cbjx crypto element
120 static final String CBJX_MSG_INFO = "CryptoInfo";
123 * the name of the cbjx body element
125 static final String CBJX_MSG_BODY = "Body";
128 * the name of the cbjx body element
130 static final String CBJX_MSG_SIG = "Signature";
133 * the cbjx protocol name
135 static final String cbjxProtocolName = "cbjx";
138 * the cbjx service name
140 static final String cbjxServiceName = "CbJxTransport";
145 static PeerID localPeerID = null;
148 * the endpoint address of this peer
150 static EndpointAddress localPeerAddr = null;
153 * the peer group to which this module belongs
155 PeerGroup group = null;
158 * the endpoint service in my group
160 EndpointService endpoint = null;
163 * the membership service in my group
165 PSEMembershipService membership = null;
168 * Default constructor
170 public CbJxTransport() {}
175 public void init(PeerGroup group, ID assignedID, Advertisement impl) throws PeerGroupException {
178 ModuleImplAdvertisement implAdvertisement = (ModuleImplAdvertisement) impl;
180 localPeerID = group.getPeerID();
182 CbJxTransport.localPeerAddr = new EndpointAddress(cbjxProtocolName, group.getPeerID().getUniqueValue().toString(), null
186 if (Logging.SHOW_CONFIG && LOG.isLoggable(Level.CONFIG)) {
187 StringBuilder configInfo = new StringBuilder("Configuring CBJX Message Transport : " + assignedID);
189 if (implAdvertisement != null) {
190 configInfo.append("\n\tImplementation :");
191 configInfo.append("\n\t\tModule Spec ID: ").append(implAdvertisement.getModuleSpecID());
192 configInfo.append("\n\t\tImpl Description : ").append(implAdvertisement.getDescription());
193 configInfo.append("\n\t\tImpl URI : ").append(implAdvertisement.getUri());
194 configInfo.append("\n\t\tImpl Code : ").append(implAdvertisement.getCode());
197 configInfo.append("\n\tGroup Params :");
198 configInfo.append("\n\t\tGroup : ").append(group);
199 configInfo.append("\n\t\tPeer ID : ").append(group.getPeerID());
201 configInfo.append("\n\tConfiguration :");
202 configInfo.append("\n\t\tPublic Address : ").append(CbJxTransport.localPeerAddr);
204 LOG.config(configInfo.toString());
211 public int startApp(String[] arg) {
213 endpoint = group.getEndpointService();
215 if (null == endpoint) {
216 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
217 LOG.warning("Stalled until there is an endpoint service");
219 return START_AGAIN_STALLED;
222 MembershipService groupMembership = group.getMembershipService();
224 if (null == groupMembership) {
225 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
226 LOG.warning("Stalled until there is a membership service");
228 return START_AGAIN_STALLED;
231 if (!(groupMembership instanceof PSEMembershipService)) {
232 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
233 LOG.severe("CBJX Transport requires PSE Membership Service");
238 membership = (PSEMembershipService) groupMembership;
240 if (endpoint.addMessageTransport(this) == null) {
241 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
242 LOG.severe("Transport registration refused");
247 // XXX bondolo@jxta.org 20030526 check for errors
249 endpoint.addIncomingMessageListener(this, cbjxServiceName, null);
251 endpoint.addIncomingMessageFilterListener(new CbJxInputFilter(), null, null);
252 // endpoint.addOutgoingMessageFilterListener( new CbJxOutputFilter(), null, null );
254 if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
255 LOG.info("CbJxTransport started");
264 public void stopApp() {
266 if (endpoint != null) {
267 // FIXME 20030516 bondolo@jxta.org remove filters and listener
269 endpoint.removeMessageTransport(this);
273 if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
274 LOG.info("CbJxTransport stopped");
281 public EndpointAddress getPublicAddress() {
282 return CbJxTransport.localPeerAddr;
288 public boolean isConnectionOriented() {
289 // since we rely on other endpoint protocol we are not connection oriented
296 public boolean allowsRouting() {
297 // since we are using the endpoint router
298 // the endpoint router cannot use our endpoint to send messages
305 public EndpointService getEndpointService() {
306 return (EndpointService) endpoint.getInterface();
312 public Object transportControl(Object operation, Object value) {
319 public Iterator getPublicAddresses() {
320 return Collections.singletonList(getPublicAddress()).iterator();
326 public String getProtocolName() {
327 return cbjxProtocolName;
333 public Messenger getMessenger(EndpointAddress dest, Object hintIgnored) {
335 return new CbJxMessenger(this, dest, hintIgnored);
336 } catch (IOException failed) {
337 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
338 LOG.log(Level.WARNING, "Failed to create cbjx messenger", failed);
348 public boolean ping(EndpointAddress addr) {
349 Messenger messenger = getMessenger(addr, null);
351 boolean reachable = (null != messenger);
353 if (messenger != null) {
363 public void processIncomingMessage(Message message, EndpointAddress srcAddr, EndpointAddress dstAddr) {
364 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
365 LOG.fine("processIncomingMessage : Received message from: " + srcAddr);
368 // extract the Crypto info from the message
369 MessageElement cryptoElement = message.getMessageElement(CBJX_MSG_NS, CBJX_MSG_INFO);
371 if (cryptoElement == null) {
372 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
373 LOG.fine("processIncomingMessage : No \'" + CBJX_MSG_INFO + "\' in the message");
377 message.removeMessageElement(cryptoElement);
379 // the cbjx message info
380 CbJxMessageInfo cryptoInfo = null;
383 cryptoInfo = new CbJxMessageInfo(cryptoElement.getStream(), cryptoElement.getMimeType());
384 } catch (Throwable e) {
385 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
386 LOG.log(Level.WARNING
388 "processIncomingMessage : Couldn\'t retrieve CbJxMessageInfo from \'" + CBJX_MSG_INFO + "\' element", e);
393 Message submessage = checkCryptoInfo(message, cryptoElement, cryptoInfo);
395 if (null == submessage) {
396 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
397 LOG.warning("processIncomingMessage : discarding message from " + srcAddr);
402 // give back the message to the endpoint
404 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
405 LOG.fine("processIncomingMessage: delivering " + submessage + " to: " + cryptoInfo.getDestinationAddress());
408 endpoint.processIncomingMessage(submessage, cryptoInfo.getSourceAddress(), cryptoInfo.getDestinationAddress());
409 } catch (Throwable all) {
410 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
411 LOG.log(Level.WARNING, "processIncomingMessage: endpoint failed to demux message", all);
417 * add the CryptoInfo into the message
419 * @param submessage the message
420 * @param destAddress the destination
421 * @return Message the message with the CbJxMessageInfo added
423 public Message addCryptoInfo(Message submessage, EndpointAddress destAddress) throws IOException {
424 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
425 LOG.fine("Building CBJX wrapper for " + submessage);
428 // Remove all existing CbJx Elements from source
429 Iterator eachCbJxElement = submessage.getMessageElementsOfNamespace(CbJxTransport.CBJX_MSG_NS);
431 while (eachCbJxElement.hasNext()) {
432 MessageElement aMessageElement = (MessageElement) eachCbJxElement.next();
434 eachCbJxElement.remove();
437 Message message = new Message();
439 CbJxMessageInfo cryptoInfo = new CbJxMessageInfo();
441 // set the source Id of the message
442 cryptoInfo.setSourceID(localPeerID);
443 cryptoInfo.setSourceAddress(localPeerAddr);
444 cryptoInfo.setDestinationAddress(destAddress);
446 // add the root cert into the message info
447 PSECredential cred = (PSECredential) membership.getDefaultCredential();
450 throw new IOException("No authentication available for message signing.");
453 Certificate cert = cred.getCertificate();
455 cryptoInfo.setPeerCert(cert);
457 // compute the signature of the message body
458 TextDocument infoDoc = (TextDocument) cryptoInfo.getDocument(MimeMediaType.XMLUTF8);
459 byte[] infoSignature = null;
462 infoSignature = PSEUtils.computeSignature(CbJxDefs.signAlgoName, cred.getPrivateKey(), infoDoc.getStream());
463 } catch (Throwable e) {
464 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
465 LOG.log(Level.FINE, "failed to sign " + submessage, e);
470 // add the cbjx:CryptoInfo into the message
471 MessageElement infoSigElement = new ByteArrayMessageElement(CBJX_MSG_SIG, MimeMediaType.AOS, infoSignature, null);
473 // add the cbjx:CryptoInfo into the message
474 MessageElement cryptoInfoElement = new TextDocumentMessageElement(CBJX_MSG_INFO, infoDoc, infoSigElement);
476 message.addMessageElement(CBJX_MSG_NS, cryptoInfoElement);
478 // Compute the signature of the encapsulated message and append it to
481 // serialize the container
482 WireFormatMessage subserial = WireFormatMessageFactory.toWire(submessage, WireFormatMessageFactory.DEFAULT_WIRE_MIME, null);
484 // calculate the signature
485 byte[] bodySignature = null;
488 bodySignature = PSEUtils.computeSignature(CbJxDefs.signAlgoName, cred.getPrivateKey(), subserial.getStream());
489 } catch (Throwable e) {
490 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
491 LOG.log(Level.FINE, "failed to sign" + submessage, e);
498 // Make the signature into an element
499 MessageElement bodySigElement = new ByteArrayMessageElement(CBJX_MSG_SIG, MimeMediaType.AOS, bodySignature, null);
501 // Add the encapsulated body into the container message.
502 message.addMessageElement(CBJX_MSG_NS
504 new JxtaMessageMessageElement(CBJX_MSG_BODY, new MimeMediaType("application/x-jxta-msg"), submessage
511 public Message checkCryptoInfo(Message message, MessageElement cryptoElement, CbJxMessageInfo cryptoInfo) {
513 // extract the body element from the message
514 JxtaMessageMessageElement bodyElement = (JxtaMessageMessageElement) message.getMessageElement(CBJX_MSG_NS, CBJX_MSG_BODY);
516 if (null == bodyElement) {
517 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
518 LOG.warning("No \'" + CBJX_MSG_BODY + "\' in " + message);
522 message.removeMessageElement(bodyElement);
524 // extract the peer certificate
525 Certificate peerCert = cryptoInfo.getPeerCert();
527 // and from it the public key
528 // the public key from the message
529 RSAPublicKey publicKey = (RSAPublicKey) peerCert.getPublicKey();
531 // check the cert validity
533 peerCert.verify(publicKey);
534 } catch (Exception e) {
535 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
536 LOG.log(Level.WARNING, "Invalid peer cert", e);
543 net.jxta.impl.id.CBID.PeerID srcPeerID = (net.jxta.impl.id.CBID.PeerID) cryptoInfo.getSourceID();
545 byte[] pub_der = peerCert.getPublicKey().getEncoded();
546 net.jxta.impl.id.CBID.PeerID genID = (net.jxta.impl.id.CBID.PeerID) IDFactory.newPeerID(group.getPeerGroupID()
550 if (!srcPeerID.getUUID().equals(genID.getUUID())) {
551 // the cbid is not valid. Discard the message
552 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
553 LOG.warning("CBID of " + message + " is not valid : " + srcPeerID + " != " + genID);
558 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
559 LOG.fine("CBID of the message is valid");
561 } catch (Throwable e) {
562 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
563 LOG.log(Level.WARNING, "failed to verify cbid", e);
568 // verify the signature of the cryptinfo message
570 boolean valid = PSEUtils.verifySignature(CbJxDefs.signAlgoName, peerCert, cryptoElement.getSignature().getBytes(false)
572 cryptoElement.getStream());
575 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
576 LOG.warning("Failed to verify the signature of cryptinfo for " + message);
580 } catch (Throwable e) {
581 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
582 LOG.log(Level.WARNING, "Failed to verify the signature of cryptinfo for " + message, e);
587 // then verify the signature
588 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
589 LOG.warning("verifying signature");
592 // verify the signature of the message
594 boolean valid = PSEUtils.verifySignature(CbJxDefs.signAlgoName, peerCert, bodyElement.getSignature().getBytes(false)
596 bodyElement.getStream());
599 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
600 LOG.warning("failed to verify the signature of " + message);
604 } catch (Throwable e) {
605 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
606 LOG.log(Level.WARNING, "failed to verify the signature of " + message, e);
611 // the message is valid
612 return bodyElement.getMessage();
616 * this class filters incoming messages.
617 * it checks if messages are valid and if not discard them
619 public class CbJxInputFilter implements MessageFilterListener {
620 public CbJxInputFilter() {
627 public Message filterMessage(Message message, EndpointAddress srcAddr, EndpointAddress dstAddr) {
629 if (dstAddr.getProtocolAddress().equals(getProtocolName())) {
630 // extract the Crypto info from the message
631 MessageElement cryptoElement = message.getMessageElement(CBJX_MSG_NS, CBJX_MSG_INFO);
633 if (cryptoElement == null) {
634 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
635 LOG.fine("No \'" + CBJX_MSG_INFO + "\' in the message");
639 message.removeMessageElement(cryptoElement);
641 // the cbjx message info
642 CbJxMessageInfo cryptoInfo = null;
645 cryptoInfo = new CbJxMessageInfo(cryptoElement.getStream(), cryptoElement.getMimeType());
646 } catch (Throwable e) {
647 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
648 LOG.log(Level.WARNING, "Couldn\'t retrieve CbJxMessageInfo from \'" + CBJX_MSG_INFO + "\' element", e);
653 return checkCryptoInfo(message, cryptoElement, cryptoInfo);
662 * this class filters all outgoing messages that are not sent with
663 * messengers. (that is propagate messages). It adds CbJxInformation
666 public class CbJxOutputFilter implements MessageFilterListener {
669 * Default constructor
671 public CbJxOutputFilter() {
678 public Message filterMessage(Message message, EndpointAddress srcAddr, EndpointAddress dstAddr) {
679 Message msg = message.clone();
681 if (null == msg.getMessageElement(CBJX_MSG_NS, CBJX_MSG_INFO)) {
683 msg = addCryptoInfo(msg, dstAddr);
684 } catch (IOException failed) {