mSettings = inSettings;
mBuf = NULL;
+ // connect
+ Connect( );
+
// initialize buffer
mBuf = new char[ mSettings->mBufLen ];
pattern( mBuf, mSettings->mBufLen );
}
}
- // connect
- Connect( );
-
if ( isReport( inSettings ) ) {
ReportSettings( inSettings );
if ( mSettings->multihdr && isMultipleReport( inSettings ) ) {
mSettings->multihdr->report->connection.peer = mSettings->peer;
- mSettings->multihdr->report->connection.size_peer = mSettings->size_peer;
mSettings->multihdr->report->connection.local = mSettings->local;
- SockAddr_setPortAny( &mSettings->multihdr->report->connection.local );
- mSettings->multihdr->report->connection.size_local = mSettings->size_local;
}
}
DELETE_ARRAY( mBuf );
} // end ~Client
-const double kSecs_to_usecs = 1e6;
-const int kBytes_to_Bits = 8;
+const double kSecs_to_usecs = 1.0e6;
+const double kBytes_to_Bits = 8.0;
/* -------------------------------------------------------------------
* Send data using the connected UDP/TCP socket,
* ------------------------------------------------------------------- */
void Client::Run( void ) {
- struct UDP_datagram* mBuf_UDP = (struct UDP_datagram*) mBuf;
- long currLen = 0;
-
- int delay_target = 0;
- int delay = 0;
- int adjust = 0;
-
+ struct dgram_record* mBuf_Dgram = (struct dgram_record*) mBuf;
+ long currLen = 0, packet_gap = 0, delta, loop_time = 0, adjust = 0;
char* readAt = mBuf;
-
- // Indicates if the stream is readable
- bool canRead = true, mMode_Time = isModeTime( mSettings );
+ bool canRead = true, // Indicates if the stream is readable
+ mMode_Time = isModeTime( mSettings );
+ ReportStruct *reportstruct = NULL;
// setup termination variables
if ( mMode_Time ) {
mEndTime.setnow();
- mEndTime.add( mSettings->mAmount / 100.0 );
+ if (mSettings->mAmount == 0)
+ mEndTime.add(2209032.0); // 7 years - i.e. continuous
+ else
+ mEndTime.add(mSettings->mAmount / 100.0);
}
- if ( isUDP( mSettings ) ) {
- // Due to the UDP timestamps etc, included
- // reduce the read size by an amount
- // equal to the header size
-
+ if ( isPacketOriented( mSettings ) ) {
// compute delay for bandwidth restriction, constrained to [0,1] seconds
- delay_target = (int) ( mSettings->mBufLen * ((kSecs_to_usecs * kBytes_to_Bits)
- / mSettings->mUDPRate) );
- if ( delay_target < 0 ||
- delay_target > (int) 1 * kSecs_to_usecs ) {
- fprintf( stderr, warn_delay_large, delay_target / kSecs_to_usecs );
- delay_target = (int) kSecs_to_usecs * 1;
+ packet_gap = (long)((double)(mSettings->mBufLen * kSecs_to_usecs * kBytes_to_Bits) /
+ (double)mSettings->mDgramRate);
+
+ if (packet_gap < 0 || packet_gap > kSecs_to_usecs) {
+ fprintf( stderr, warn_delay_large, packet_gap / kSecs_to_usecs );
+ packet_gap = (long)kSecs_to_usecs;
}
+ // Initialise adjustment variable: in this way, the first adjustment will be
+ // the latency for sending the first packet, and all sending times are
+ // synchronised with regard to the first timestamp.
+ adjust = -packet_gap;
+
+ // Due to the included timestamps etc,
+ // reduce the read size by an amount equal to the header size
if ( isFileInput( mSettings ) ) {
- if ( isCompat( mSettings ) ) {
- Extractor_reduceReadSize( sizeof(struct UDP_datagram), mSettings );
- readAt += sizeof(struct UDP_datagram);
- } else {
- Extractor_reduceReadSize( sizeof(struct UDP_datagram) +
- sizeof(struct client_hdr), mSettings );
- readAt += sizeof(struct UDP_datagram) +
- sizeof(struct client_hdr);
- }
+ size_t offset = sizeof(struct dgram_record);
+
+ if (!isCompat(mSettings))
+ offset += sizeof(struct client_hdr);
+
+ Extractor_reduceReadSize(offset, mSettings);
+ readAt += offset;
}
}
- ReportStruct *reportstruct = NULL;
-
// InitReport handles Barrier for multiple Streams
mSettings->reporthdr = InitReport( mSettings );
reportstruct = new ReportStruct;
- reportstruct->packetID = 0;
+ // Connectionless protocols use the first message as "connect" message (which is
+ // counted, but their FIN is not counted. For connection-oriented protocols we
+ // start at message #1 instead of at #0; and the FIN is not counted.
+ reportstruct->packetID = (!isConnectionLess(mSettings) && isPacketOriented(mSettings));
+
+ // Set a timestamp now corresponding to an imaginary zero-th send time.
lastPacketTime.setnow();
-
- do {
+ do {
// Test case: drop 17 packets and send 2 out-of-order:
// sequence 51, 52, 70, 53, 54, 71, 72
//switch( datagramID ) {
// case 55: datagramID = 71; break;
// default: break;
//}
- gettimeofday( &(reportstruct->packetTime), NULL );
- if ( isUDP( mSettings ) ) {
- // store datagram ID into buffer
- mBuf_UDP->id = htonl( (reportstruct->packetID)++ );
- mBuf_UDP->tv_sec = htonl( reportstruct->packetTime.tv_sec );
- mBuf_UDP->tv_usec = htonl( reportstruct->packetTime.tv_usec );
-
- // delay between writes
- // make an adjustment for how long the last loop iteration took
- // TODO this doesn't work well in certain cases, like 2 parallel streams
- adjust = delay_target + lastPacketTime.subUsec( reportstruct->packetTime );
- lastPacketTime.set( reportstruct->packetTime.tv_sec,
- reportstruct->packetTime.tv_usec );
-
- if ( adjust > 0 || delay > 0 ) {
- delay += adjust;
- }
- }
+ // Note that the timestamp does not account for the time-to-wire of the packet.
+ // This plays a role when looking at jitter/delay values.
+ gettimeofday( &(reportstruct->packetTime), NULL );
- // Read the next data block from
- // the file if it's file input
+ if ( isPacketOriented( mSettings ) ) {
+ // Increment packet ID after sending and before reporting
+ // UDP: sends from 0..n-1, counts from 1..n
+ // DCCP: sends from 1..n, counts from 2..n+1
+ // This difference is of interest for the client only (the server sees
+ // 1..n). The client uses cntDatagrams, so that the count is correct.
+ // (Consider the trick in CloseReport() which resetts packetID.)
+ mBuf_Dgram->id = htonl( reportstruct->packetID++ );
+ mBuf_Dgram->tv_sec = htonl( reportstruct->packetTime.tv_sec );
+ mBuf_Dgram->tv_usec = htonl( reportstruct->packetTime.tv_usec );
+ } else if (!mMode_Time && mSettings->mAmount < mSettings->mBufLen)
+ mSettings->mBufLen = mSettings->mAmount;
+
+ // Read the next data block from the file if it's file input
if ( isFileInput( mSettings ) ) {
Extractor_getNextDataBlock( readAt, mSettings );
canRead = Extractor_canRead( mSettings ) != 0;
- } else
- canRead = true;
-
- // perform write
- currLen = write( mSettings->mSock, mBuf, mSettings->mBufLen );
- if ( currLen < 0 ) {
- WARN_errno( currLen < 0, "write2" );
- break;
}
- // report packets
+ // Put the packet onto the wire.
+ // When EAGAIN is returned (DCCP), the internal TX buffer is full: due to
+ // congestion-control issues the packet can not be transported now.
+ do {
+ currLen = write( mSettings->mSock, mBuf, mSettings->mBufLen );
+ } while (currLen < 0 && errno == EAGAIN);
+
+ if (currLen < 0) {
+ WARN_errno(1, "client write");
+ break;
+ }
+ // update statistics
reportstruct->packetLen = currLen;
ReportPacket( mSettings->reporthdr, reportstruct );
-
- if ( delay > 0 ) {
- delay_loop( delay );
- }
- if ( !mMode_Time ) {
+
+ if (mMode_Time) {
+ // time-oriented mode (-t): termination
+ canRead = mEndTime.after(reportstruct->packetTime);
+ } else {
+ // amount-oriented mode (-n): mAmount is unsigned
+ if (currLen >= mSettings->mAmount)
+ break;
mSettings->mAmount -= currLen;
}
- } while ( ! (sInterupted ||
- (mMode_Time && mEndTime.before( reportstruct->packetTime )) ||
- (!mMode_Time && 0 >= mSettings->mAmount)) && canRead );
+ if (isPacketOriented(mSettings)) {
+ // Adjust the inter-packet gap using the following variables:
+ // delta is the gap in between calls to send()
+ // adjust acts as a token bucket whenever delta != packet_gap
+ // loop_time equals packet_gap if adjust==0, it is corrected otherwise
+ //
+ // TODO this doesn't work well in certain cases, like 2 parallel streams
+ //
+ delta = lastPacketTime.delta_usec();
+ adjust += packet_gap - delta;
+ loop_time = packet_gap + adjust;
+
+ if (loop_time > 0)
+ delay_loop(loop_time);
+ }
+
+ } while (canRead && !sInterupted);
// stop timing
gettimeofday( &(reportstruct->packetTime), NULL );
CloseReport( mSettings->reporthdr, reportstruct );
- if ( isUDP( mSettings ) ) {
+ if ( isPacketOriented( mSettings ) ) {
// send a final terminating datagram
- // Don't count in the mTotalLen. The server counts this one,
- // but didn't count our first datagram, so we're even now.
- // The negative datagram ID signifies termination to the server.
-
+ // For connectionless protocols, don't count in the mTotalLen.
+ // The server counts this one, but didn't count our first datagram
+ // (connect message), so we're even now.
+
// store datagram ID into buffer
- mBuf_UDP->id = htonl( -(reportstruct->packetID) );
- mBuf_UDP->tv_sec = htonl( reportstruct->packetTime.tv_sec );
- mBuf_UDP->tv_usec = htonl( reportstruct->packetTime.tv_usec );
+ // The negative datagram ID signifies termination to the server.
+ mBuf_Dgram->id = htonl( -(reportstruct->packetID) );
+ mBuf_Dgram->tv_sec = htonl( reportstruct->packetTime.tv_sec );
+ mBuf_Dgram->tv_usec = htonl( reportstruct->packetTime.tv_usec );
- if ( isMulticast( mSettings ) ) {
+ if ( isMulticast( mSettings ) )
write( mSettings->mSock, mBuf, mSettings->mBufLen );
- } else {
- write_UDP_FIN( );
- }
+ else
+ write_dgram_FIN( );
}
DELETE_PTR( reportstruct );
EndReport( mSettings->reporthdr );
}
-// end Run
-
-void Client::InitiateServer() {
- if ( !isCompat( mSettings ) ) {
- int currLen;
- client_hdr* temp_hdr;
- if ( isUDP( mSettings ) ) {
- UDP_datagram *UDPhdr = (UDP_datagram *)mBuf;
- temp_hdr = (client_hdr*)(UDPhdr + 1);
- } else {
- temp_hdr = (client_hdr*)mBuf;
- }
- Settings_GenerateClientHdr( mSettings, temp_hdr );
- if ( !isUDP( mSettings ) ) {
- currLen = send( mSettings->mSock, mBuf, sizeof(client_hdr), 0 );
- if ( currLen < 0 ) {
- WARN_errno( currLen < 0, "write1" );
- }
- }
- }
+
+void Client::InitiateServer()
+{
+ client_hdr *temp_hdr = (client_hdr*)mBuf;
+
+ if (isCompat(mSettings))
+ return;
+ // connection-less protocols communicate their settings in the first
+ // packet sent to the server; this packet is not counted by the server
+ if (isConnectionLess(mSettings)) {
+ dgram_record *record_hdr = (dgram_record *)mBuf;
+ temp_hdr = (client_hdr*)(record_hdr + 1);
+ }
+
+ Settings_GenerateClientHdr( mSettings, temp_hdr );
+
+ // connection-oriented protocols use a short "init" message
+ if ( !isConnectionLess( mSettings ) ) {
+ int rc;
+
+ do {
+ rc = send(mSettings->mSock, mBuf, sizeof(client_hdr), 0);
+ } while (rc < 0 && errno == EAGAIN);
+ WARN_errno(rc < 0, "write failed in InitiateServer()");
+ }
}
/* -------------------------------------------------------------------
* If inLocalhost is not null, bind to that address, specifying
* which outgoing interface to use.
* ------------------------------------------------------------------- */
+void Client::Connect()
+{
+ socklen_t size_local = sizeof(mSettings->local);
-void Client::Connect( ) {
- int rc;
- SockAddr_remoteAddr( mSettings );
-
- assert( mSettings->inHostname != NULL );
-
- // create an internet socket
- int type = ( isUDP( mSettings ) ? SOCK_DGRAM : SOCK_STREAM);
-
- int domain = (SockAddr_isIPv6( &mSettings->peer ) ?
-#ifdef HAVE_IPV6
- AF_INET6
-#else
- AF_INET
-#endif
- : AF_INET);
-
- mSettings->mSock = socket( domain, type, 0 );
- WARN_errno( mSettings->mSock == INVALID_SOCKET, "socket" );
+ MakeSocket(mSettings);
- SetSocketOptions( mSettings );
+ /* determine local interface after establishing the connection */
+ getsockname(mSettings->mSock, (struct sockaddr *)&mSettings->local, &size_local);
+ /* The DCCP packet size must not exceed the MPS (RFC 4340, 14.) */
+ if (mSettings->mProtocol == kProto_DCCP) {
+ unsigned mps = getsock_dccp_mps(mSettings->mSock);
- SockAddr_localAddr( mSettings );
- if ( mSettings->mLocalhost != NULL ) {
- // bind socket to local address
- rc = bind( mSettings->mSock, (sockaddr*) &mSettings->local,
- SockAddr_get_sizeof_sockaddr( &mSettings->local ) );
- WARN_errno( rc == SOCKET_ERROR, "bind" );
+ if (mSettings->mBufLen > mps)
+ die("Buffer length %d exceeds DCCP MPS %d. Use a smaller buffer size "
+ "(-l) on server/client.", mSettings->mBufLen, mps);
}
+}
- // connect socket
- rc = connect( mSettings->mSock, (sockaddr*) &mSettings->peer,
- SockAddr_get_sizeof_sockaddr( &mSettings->peer ));
- WARN_errno( rc == SOCKET_ERROR, "connect" );
-
- getsockname( mSettings->mSock, (sockaddr*) &mSettings->local,
- &mSettings->size_local );
- getpeername( mSettings->mSock, (sockaddr*) &mSettings->peer,
- &mSettings->size_peer );
-} // end Connect
/* -------------------------------------------------------------------
* Send a datagram on the socket. The datagram's contents should signify
* a FIN to the application. Keep re-transmitting until an
* acknowledgement datagram is received.
* ------------------------------------------------------------------- */
-
-void Client::write_UDP_FIN( ) {
- int rc;
+void Client::write_dgram_FIN( ) {
+ int rc, count = 0;
fd_set readSet;
struct timeval timeout;
+ size_t len = mSettings->mBufLen;
+
+ // we don't need the full buffer size here - it is not counted
+ if (!isConnectionLess(mSettings))
+ len = sizeof(dgram_record);
- int count = 0;
while ( count < 10 ) {
- count++;
+ count++;
// write data
- write( mSettings->mSock, mBuf, mSettings->mBufLen );
+ write( mSettings->mSock, mBuf, len);
// wait until the socket is readable, or our timeout expires
FD_ZERO( &readSet );
WARN_errno( rc < 0, "read" );
if ( rc < 0 ) {
break;
- } else if ( rc >= (int) (sizeof(UDP_datagram) + sizeof(server_hdr)) ) {
- ReportServerUDP( mSettings, (server_hdr*) ((UDP_datagram*)mBuf + 1) );
+ } else if ( rc >= (int) (sizeof(dgram_record) + sizeof(server_hdr)) ) {
+ ReportServerUDP( mSettings, (server_hdr*) ((dgram_record*)mBuf + 1) );
}
-
return;
}
}
-
fprintf( stderr, warn_no_ack, mSettings->mSock, count );
}
-// end write_UDP_FIN
+// end write_dgram_FIN