2 p2pproxy Copyright (C) 2007 Jehan Monnier ()
6 This program is free software; you can redistribute it and/or
7 modify it under the terms of the GNU General Public License
8 as published by the Free Software Foundation; either version 2
9 of the License, or (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
20 package org.linphone.p2pproxy.core.media.rtprelay;
22 import java.io.IOException;
23 import java.net.DatagramPacket;
24 import java.net.DatagramSocket;
25 import java.net.InetSocketAddress;
26 import java.net.SocketAddress;
27 import java.net.SocketException;
28 import java.net.UnknownHostException;
29 import java.util.HashMap;
31 import java.util.Timer;
32 import java.util.TimerTask;
34 import org.apache.log4j.Logger;
35 import org.linphone.p2pproxy.api.P2pProxyException;
36 import org.linphone.p2pproxy.core.GenericUdpSession;
40 public class RtpRelayServer implements GenericUdpSession.MessageHandler {
41 private final static Logger mLog = Logger.getLogger(RtpRelayServer.class);
42 private final DatagramSocket mSocket;
46 class GarbageCollectorTask extends TimerTask {
49 synchronized (RtpRelayServer.RoutingTable.this) {
50 for (RoutingElement lElement :new HashMap <String,RoutingElement>(mSessionIdTable).values()) {
51 if (System.currentTimeMillis() - lElement.getLastAccess() > mMaxSilenceDuration) {
52 mLog.info("removing element ["+lElement+"]");
53 mSessionIdTable.remove(lElement.mSessionId);
54 mSsrcElementTable.remove(lElement.mSsrcA);
55 mSsrcElementTable.remove(lElement.mSsrcB);
61 class RoutingElement {
62 //private final String mSessionId;
63 private SocketAddress mPeerARtp;
64 private SocketAddress mPeerARtcp;
66 private SocketAddress mPeerBRtp;
67 private SocketAddress mPeerBRtcp;
69 private long mLastDestAddressAccess = System.currentTimeMillis();
70 private final String mSessionId;
71 RoutingElement(long aSsrc, String aSessionId) {
73 mSessionId = aSessionId;
75 void setPeerAddress(SocketAddress aReceivedAddress,long aSsrc,boolean isRtcp) {
76 if (aSsrc == mSsrcA ) {
78 mPeerARtcp = aReceivedAddress;
80 mPeerARtp = aReceivedAddress;
82 } else if (aSsrc == mSsrcB) {
84 mPeerBRtcp = aReceivedAddress;
86 mPeerBRtp = aReceivedAddress;
89 mLog.warn("ssrc ["+aSsrc+" not found for ["+aReceivedAddress+"]");
92 void setPeerBSsrc(long aSsrc) {
95 public SocketAddress getDestAddr(long aSourceSsrc,boolean isRtcp) throws P2pProxyException{
96 mLastDestAddressAccess = System.currentTimeMillis();
97 if (aSourceSsrc == mSsrcA && isRtcp) {
98 if ( mPeerBRtcp != null) {
101 throw new P2pProxyException("PeerBRtcp not found for ssrc ["+aSourceSsrc+"]");
103 } else if (aSourceSsrc == mSsrcA && !isRtcp) {
105 if ( mPeerBRtp != null) {
108 throw new P2pProxyException("PeerBRtp not found for ssrc ["+aSourceSsrc+" ");
110 } else if (aSourceSsrc == mSsrcB && isRtcp) {
112 if ( mPeerARtcp != null) {
115 throw new P2pProxyException("PeerARtcp not found for ssrc ["+aSourceSsrc+" ");
117 } else if (aSourceSsrc == mSsrcB && !isRtcp) {
119 if ( mPeerARtp != null) {
122 throw new P2pProxyException("PeerARtp not found for ssrc ["+aSourceSsrc+" ");
125 throw new P2pProxyException("ssrc ["+aSourceSsrc+" not found");
129 public long getLastAccess() {
130 return mLastDestAddressAccess;
132 public String toString() {
133 return "Session id ["+mSessionId+"] ssrc a ["+mSsrcA+"] rtp source ["+mPeerARtp+"] rtcp source ["+mPeerARtcp+"]"
134 +"ssrc b ["+mSsrcB+"] rtp source ["+mPeerBRtp+"] rtcp source ["+mPeerBRtcp+"]";
137 private final Map <String,RoutingElement> mSessionIdTable = new HashMap <String,RoutingElement>();
138 private final Map <Long,RoutingElement> mSsrcElementTable = new HashMap <Long,RoutingElement>();
139 private final long mMaxSilenceDuration ;
140 private final long mGCPeriod;
141 Timer mTimer = new Timer("Routing Elements GC");
143 public RoutingTable (long aMaxSilenceDuration, long aGCPeriod) {
144 mMaxSilenceDuration = aMaxSilenceDuration;
145 mGCPeriod = aGCPeriod;
146 mTimer.scheduleAtFixedRate(new GarbageCollectorTask(), 0, mGCPeriod);
148 public synchronized boolean containsSessionId(String aSessionId) {
149 return mSessionIdTable.containsKey(aSessionId);
152 public synchronized void addRoutingElement(String aSessionId, long aSsrc) {
153 if (mLog.isInfoEnabled()) mLog.info("add routing element for session id ["+aSessionId+"] with ssrc ["+aSsrc+"]");
154 RoutingElement lRoutingElement = new RoutingElement(aSsrc,aSessionId);
155 mSessionIdTable.put(aSessionId, lRoutingElement);
156 mSsrcElementTable.put(aSsrc, lRoutingElement);
159 public synchronized void UpdateRoutingElement(String aSessionId, long aSsrc) {
160 if (mLog.isInfoEnabled()) mLog.info("update routing element session id ["+aSessionId+"] with ssrc ["+aSsrc+"]");
161 RoutingElement lRoutingElement = mSessionIdTable.get(aSessionId);
162 lRoutingElement.setPeerBSsrc(aSsrc);
163 mSsrcElementTable.put(aSsrc, lRoutingElement);
167 public synchronized void updateSourceAddress(long aSsrc, SocketAddress aSocketAddress,boolean isRtcp) throws P2pProxyException{
168 if (mSsrcElementTable.containsKey(aSsrc) == false) {
169 throw new P2pProxyException("No routing element present for ssrc["+aSsrc+"]");
171 if (mLog.isInfoEnabled()) mLog.info("update routing element for ssrc ["+aSsrc+"] with address ["+aSocketAddress+"]" );
172 RoutingElement lRoutineElement = mSsrcElementTable.get(aSsrc);
173 lRoutineElement.setPeerAddress(aSocketAddress, aSsrc, isRtcp);
175 public synchronized SocketAddress getDestAddress(long aSsrc,boolean isRtcp) throws P2pProxyException{
176 //1 check if element exist for this ssrc
177 if (mSsrcElementTable.containsKey(aSsrc) == false) {
178 throw new P2pProxyException("No routing element present for ssrc["+aSsrc+"]");
180 RoutingElement lRoutingElement = mSsrcElementTable.get(aSsrc);
181 return lRoutingElement.getDestAddr(aSsrc, isRtcp);
183 public synchronized int getSize() {
184 return mSessionIdTable.size();
190 private static final String SESSIONID_NAME="RSID"; //Relay session Id
191 private final RoutingTable mRoutingTable;
192 public RtpRelayServer(DatagramSocket aListeningSocket) throws SocketException, UnknownHostException {
193 this(aListeningSocket,3600000,60000);
195 public RtpRelayServer(DatagramSocket aListeningSocket,long aMaxSilenceDuration, long aGCPeriod) throws SocketException, UnknownHostException {
196 mRoutingTable = new RoutingTable(aMaxSilenceDuration,aGCPeriod);
197 mSocket = aListeningSocket;
200 public void onMessage(DatagramPacket aMessage) {
202 if (mLog.isInfoEnabled()) mLog.info("new incoming message from ["+aMessage.getSocketAddress()+"]");
203 if (isRtpRtcpPacket(aMessage) == false) {
204 if (mLog.isInfoEnabled()) mLog.info("not rtp/rtcp packet skip");
207 long lSsrc = getSsrc(aMessage);
208 if (isSessionIdPresent(aMessage)) {
209 String lSessionId = getSessionId(aMessage);
210 //1 check if already exist
211 if (mRoutingTable.containsSessionId(lSessionId)) {
212 //second call, update ssrc
213 mRoutingTable.UpdateRoutingElement(lSessionId,lSsrc);
217 mRoutingTable.addRoutingElement(lSessionId,lSsrc);
220 mRoutingTable.updateSourceAddress(lSsrc, aMessage.getSocketAddress(),isRtcp(aMessage));
222 SocketAddress lDestAddress = mRoutingTable.getDestAddress(lSsrc,isRtcp(aMessage));
223 //does not forward session id msg
224 if (!isSessionIdPresent(aMessage)) {
226 if (mLog.isInfoEnabled()) mLog.info("forwarding ["+aMessage.getLength()+"] bytes from ["+aMessage.getSocketAddress()+"] to ["+lDestAddress+"]");
227 aMessage.setSocketAddress(lDestAddress);
228 mSocket.send(aMessage);
230 } catch (IOException e) {
231 mLog.error("cannot forward ["+aMessage+"]", e);
232 } catch (Exception e) {
233 mLog.warn("unknown destination for message from ["+aMessage.getSocketAddress()+"] discarding",e);
237 public InetSocketAddress getInetSocketAddress() {
238 return (InetSocketAddress) mSocket.getLocalSocketAddress();
241 private long getSsrc(DatagramPacket aMessage) {
242 // The RTP header has the following format:
245 // 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
246 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
247 // |V=2|P|X| CC |M| PT | sequence number |
248 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
250 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
251 // | synchronization source (SSRC) identifier |
252 // +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
253 // | contributing source (CSRC) identifiers |
255 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
256 if (isRtcp(aMessage)) {
258 return b2UB(aMessage.getData()[7])
259 + (b2UB(aMessage.getData()[6]) << 8)
260 + (b2UB(aMessage.getData()[5]) << 16)
261 + (b2UB(aMessage.getData()[4]) << 24);
264 return b2UB(aMessage.getData()[11])
265 + (b2UB(aMessage.getData()[10]) << 8)
266 + (b2UB(aMessage.getData()[9]) << 16)
267 + (b2UB(aMessage.getData()[8]) << 24);
271 private boolean isSessionIdPresent(DatagramPacket aMessage) {
272 // APP: Application-Defined RTCP Packet
275 // 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
276 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
277 // |V=2|P| subtype | PT=APP=204 | length |
278 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
280 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
282 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
283 // | application-dependent data ...
284 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
285 if (b2UB(aMessage.getData()[1]) != 204) return false;
286 String aName = String.valueOf((char)b2UB(aMessage.getData()[8]))
287 + String.valueOf((char)b2UB(aMessage.getData()[9]))
288 + String.valueOf((char)b2UB(aMessage.getData()[10]))
289 + String.valueOf((char)b2UB(aMessage.getData()[11]));
290 return SESSIONID_NAME.equalsIgnoreCase(aName);
293 private String getSessionId(DatagramPacket aMessage) throws P2pProxyException{
294 // APP: Application-Defined RTCP Packet
297 // 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
298 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
299 // |V=2|P| subtype | PT=APP=204 | length |
300 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
302 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
304 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
305 // | application-dependent data ...
306 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
308 if (isSessionIdPresent(aMessage) == false) {
309 throw new P2pProxyException("Cannot find SessionId");
311 long lPacketLenght = 1 + b2UB(aMessage.getData()[3]) + (b2UB(aMessage.getData()[2]) << 8) ;
312 StringBuffer lSessionId = new StringBuffer();
313 for (int i = 12 ; i< lPacketLenght; i++) {
314 lSessionId.append(b2UB(aMessage.getData()[i]));
316 return lSessionId.toString();
319 private boolean isRtcp(DatagramPacket aMessage) {
320 return b2UB(aMessage.getData()[1]) >= 200 ;
322 public static int b2UB(byte b) {
323 return b >= 0 ? b : 256 + b;
327 public int getRoutingtableSize() {
328 return mRoutingTable.getSize();
330 boolean isRtpRtcpPacket(DatagramPacket aMessage) {
331 return (aMessage.getData()[0] >> 6) != 0;