]> sjero.net Git - iperf/blobdiff - src/Client.cpp
DCCP support for iperf
[iperf] / src / Client.cpp
index d6324d96b30968862d9afb68e9507ba60a1b19c3..2195c4fd3027c8b34f9da14273af7ffcca75b310 100644 (file)
@@ -69,6 +69,9 @@ Client::Client( thread_Settings *inSettings ) {
     mSettings = inSettings;
     mBuf = NULL;
 
+    // connect
+    Connect( );
+
     // initialize buffer
     mBuf = new char[ mSettings->mBufLen ];
     pattern( mBuf, mSettings->mBufLen );
@@ -83,9 +86,6 @@ Client::Client( thread_Settings *inSettings ) {
         }
     }
 
-    // connect
-    Connect( );
-
     if ( isReport( inSettings ) ) {
         ReportSettings( inSettings );
         if ( mSettings->multihdr && isMultipleReport( inSettings ) ) {
@@ -112,8 +112,8 @@ Client::~Client() {
     DELETE_ARRAY( mBuf );
 } // end ~Client
 
-const double kSecs_to_usecs = 1e6; 
-const int    kBytes_to_Bits = 8; 
+const double kSecs_to_usecs = 1.0e6;
+const double kBytes_to_Bits = 8.0;
 
 /* ------------------------------------------------------------------- 
  * Send data using the connected UDP/TCP socket, 
@@ -122,17 +122,12 @@ const int    kBytes_to_Bits = 8;
  * ------------------------------------------------------------------- */ 
 
 void Client::Run( void ) {
-    struct UDP_datagram* mBuf_UDP = (struct UDP_datagram*) mBuf; 
-    long currLen = 0; 
-
-    int delay_target = 0; 
-    int delay = 0; 
-    int adjust = 0; 
-
+    struct dgram_record* mBuf_Dgram = (struct dgram_record*) mBuf;
+    long  currLen = 0, packet_gap = 0, delta, loop_time = 0, adjust = 0;
     char* readAt = mBuf;
-    
-    // Indicates if the stream is readable 
-    bool canRead = true, mMode_Time = isModeTime( mSettings ); 
+    bool canRead = true,       // Indicates if the stream is readable
+         mMode_Time = isModeTime( mSettings );
+    ReportStruct *reportstruct = NULL;
 
     // setup termination variables
     if ( mMode_Time ) {
@@ -140,43 +135,46 @@ void Client::Run( void ) {
         mEndTime.add( mSettings->mAmount / 100.0 );
     }
 
-    if ( isUDP( mSettings ) ) {
-        // Due to the UDP timestamps etc, included 
-        // reduce the read size by an amount 
-        // equal to the header size
-    
+    if ( isPacketOriented( mSettings ) ) {
         // compute delay for bandwidth restriction, constrained to [0,1] seconds 
-        delay_target = (int) ( mSettings->mBufLen * ((kSecs_to_usecs * kBytes_to_Bits) 
-                                                     / mSettings->mUDPRate) ); 
-        if ( delay_target < 0  || 
-             delay_target > (int) 1 * kSecs_to_usecs ) {
-            fprintf( stderr, warn_delay_large, delay_target / kSecs_to_usecs ); 
-            delay_target = (int) kSecs_to_usecs * 1; 
+        packet_gap = (long)((double)(mSettings->mBufLen * kSecs_to_usecs * kBytes_to_Bits) /
+                                                               (double)mSettings->mDgramRate);
+
+        if (packet_gap < 0 || packet_gap > kSecs_to_usecs) {
+            fprintf( stderr, warn_delay_large, packet_gap / kSecs_to_usecs );
+            packet_gap = (long)kSecs_to_usecs;
         }
+        // Initialise adjustment variable: in this way, the first adjustment will be
+        // the latency for sending the first packet, and all sending times are
+        // synchronised with regard to the first timestamp.
+        adjust = -packet_gap;
+
+        // Due to the included timestamps etc,
+        // reduce the read size by an amount equal to the header size
         if ( isFileInput( mSettings ) ) {
-            if ( isCompat( mSettings ) ) {
-                Extractor_reduceReadSize( sizeof(struct UDP_datagram), mSettings );
-                readAt += sizeof(struct UDP_datagram);
-            } else {
-                Extractor_reduceReadSize( sizeof(struct UDP_datagram) +
-                                          sizeof(struct client_hdr), mSettings );
-                readAt += sizeof(struct UDP_datagram) +
-                          sizeof(struct client_hdr);
-            }
+            size_t offset = sizeof(struct dgram_record);
+
+            if (!isCompat(mSettings))
+                offset += sizeof(struct client_hdr);
+
+            Extractor_reduceReadSize(offset, mSettings);
+            readAt += offset;
         }
     }
 
-    ReportStruct *reportstruct = NULL;
-
     // InitReport handles Barrier for multiple Streams
     mSettings->reporthdr = InitReport( mSettings );
     reportstruct = new ReportStruct;
-    reportstruct->packetID = 0;
 
+    // Connectionless protocols use the first message as "connect" message (which is
+    // counted, but their FIN is not counted. For connection-oriented protocols we
+    // start at message #1 instead of at #0; and the FIN is not counted.
+    reportstruct->packetID = (!isConnectionLess(mSettings) && isPacketOriented(mSettings));
+
+    // Set a timestamp now corresponding to an imaginary zero-th send time.
     lastPacketTime.setnow();
-    
-    do {
 
+    do {
         // Test case: drop 17 packets and send 2 out-of-order: 
         // sequence 51, 52, 70, 53, 54, 71, 72 
         //switch( datagramID ) { 
@@ -185,100 +183,122 @@ void Client::Run( void ) {
         //  case 55: datagramID = 71; break; 
         //  default: break; 
         //} 
-        gettimeofday( &(reportstruct->packetTime), NULL );
 
-        if ( isUDP( mSettings ) ) {
-            // store datagram ID into buffer 
-            mBuf_UDP->id      = htonl( (reportstruct->packetID)++ ); 
-            mBuf_UDP->tv_sec  = htonl( reportstruct->packetTime.tv_sec ); 
-            mBuf_UDP->tv_usec = htonl( reportstruct->packetTime.tv_usec );
-
-            // delay between writes 
-            // make an adjustment for how long the last loop iteration took 
-            // TODO this doesn't work well in certain cases, like 2 parallel streams 
-            adjust = delay_target + lastPacketTime.subUsec( reportstruct->packetTime ); 
-            lastPacketTime.set( reportstruct->packetTime.tv_sec, 
-                                reportstruct->packetTime.tv_usec ); 
-
-            if ( adjust > 0  ||  delay > 0 ) {
-                delay += adjust; 
-            }
-        }
+        // Note that the timestamp does not account for the time-to-wire of the packet.
+        // This plays a role when looking at jitter/delay values.
+        gettimeofday( &(reportstruct->packetTime), NULL );
 
-        // Read the next data block from 
-        // the file if it's file input 
+        if ( isPacketOriented( mSettings ) ) {
+            // Increment packet ID after sending and before reporting
+            // UDP:  sends from 0..n-1, counts from 1..n
+            // DCCP: sends from 1..n,   counts from 2..n+1
+            // This difference is of interest for the client only (the server sees
+            // 1..n). The client uses cntDatagrams, so that the count is correct.
+            // (Consider the trick in CloseReport() which resetts packetID.)
+            mBuf_Dgram->id      = htonl( reportstruct->packetID++ );
+            mBuf_Dgram->tv_sec  = htonl( reportstruct->packetTime.tv_sec );
+            mBuf_Dgram->tv_usec = htonl( reportstruct->packetTime.tv_usec );
+        } else if (!mMode_Time && mSettings->mAmount < mSettings->mBufLen)
+            mSettings->mBufLen = mSettings->mAmount;
+
+        // Read the next data block from the file if it's file input
         if ( isFileInput( mSettings ) ) {
             Extractor_getNextDataBlock( readAt, mSettings ); 
             canRead = Extractor_canRead( mSettings ) != 0; 
-        } else
-            canRead = true; 
-
-        // perform write 
-        currLen = write( mSettings->mSock, mBuf, mSettings->mBufLen ); 
-        if ( currLen < 0 ) {
-            WARN_errno( currLen < 0, "write2" ); 
-            break; 
         }
 
-        // report packets 
+        // Put the packet onto the wire.
+        // When EAGAIN is returned (DCCP), the internal TX buffer is full: due to
+        // congestion-control issues the packet can not be transported now.
+        do {
+            currLen = write( mSettings->mSock, mBuf, mSettings->mBufLen );
+        } while (currLen < 0 && errno == EAGAIN);
+
+        if (currLen < 0) {
+            WARN_errno(1, "client write");
+            break;
+        }
+        // update statistics
         reportstruct->packetLen = currLen;
         ReportPacket( mSettings->reporthdr, reportstruct );
-        
-        if ( delay > 0 ) {
-            delay_loop( delay ); 
-        }
-        if ( !mMode_Time ) {
+
+        if (mMode_Time) {
+            // time-oriented mode (-t): termination
+            canRead = mEndTime.after(reportstruct->packetTime);
+        } else {
+            // amount-oriented mode (-n): mAmount is unsigned
+            if (currLen >= mSettings->mAmount)
+                break;
             mSettings->mAmount -= currLen;
         }
 
-    } while ( ! (sInterupted  || 
-                 (mMode_Time   &&  mEndTime.before( reportstruct->packetTime ))  || 
-                 (!mMode_Time  &&  0 >= mSettings->mAmount)) && canRead ); 
+        if (isPacketOriented(mSettings)) {
+            // Adjust the inter-packet gap using the following variables:
+            //   delta      is the gap in between calls to send()
+            //   adjust     acts as a token bucket whenever delta != packet_gap
+            //   loop_time  equals packet_gap if adjust==0, it is corrected otherwise
+            //
+            // TODO this doesn't work well in certain cases, like 2 parallel streams
+            //
+            delta = lastPacketTime.delta_usec();
+            adjust += packet_gap - delta;
+            loop_time = packet_gap + adjust;
+
+            if (loop_time > 0)
+                delay_loop(loop_time);
+       }
+
+    } while (canRead && !sInterupted);
 
     // stop timing
     gettimeofday( &(reportstruct->packetTime), NULL );
     CloseReport( mSettings->reporthdr, reportstruct );
 
-    if ( isUDP( mSettings ) ) {
+    if ( isPacketOriented( mSettings ) ) {
         // send a final terminating datagram 
-        // Don't count in the mTotalLen. The server counts this one, 
-        // but didn't count our first datagram, so we're even now. 
-        // The negative datagram ID signifies termination to the server. 
-    
+        // For connectionless protocols, don't count in the mTotalLen.
+        // The server counts this one, but didn't count our first datagram
+        // (connect message), so we're even now.
+
         // store datagram ID into buffer 
-        mBuf_UDP->id      = htonl( -(reportstruct->packetID)  ); 
-        mBuf_UDP->tv_sec  = htonl( reportstruct->packetTime.tv_sec ); 
-        mBuf_UDP->tv_usec = htonl( reportstruct->packetTime.tv_usec ); 
+        // The negative datagram ID signifies termination to the server.
+        mBuf_Dgram->id      = htonl( -(reportstruct->packetID)  );
+        mBuf_Dgram->tv_sec  = htonl( reportstruct->packetTime.tv_sec );
+        mBuf_Dgram->tv_usec = htonl( reportstruct->packetTime.tv_usec );
 
-        if ( isMulticast( mSettings ) ) {
+        if ( isMulticast( mSettings ) )
             write( mSettings->mSock, mBuf, mSettings->mBufLen ); 
-        } else {
-            write_UDP_FIN( ); 
-        }
+        else
+            write_dgram_FIN( );
     }
     DELETE_PTR( reportstruct );
     EndReport( mSettings->reporthdr );
 } 
-// end Run
-
-void Client::InitiateServer() {
-    if ( !isCompat( mSettings ) ) {
-        int currLen;
-        client_hdr* temp_hdr;
-        if ( isUDP( mSettings ) ) {
-            UDP_datagram *UDPhdr = (UDP_datagram *)mBuf;
-            temp_hdr = (client_hdr*)(UDPhdr + 1);
-        } else {
-            temp_hdr = (client_hdr*)mBuf;
-        }
-        Settings_GenerateClientHdr( mSettings, temp_hdr );
-        if ( !isUDP( mSettings ) ) {
-            currLen = send( mSettings->mSock, mBuf, sizeof(client_hdr), 0 );
-            if ( currLen < 0 ) {
-                WARN_errno( currLen < 0, "write1" );
-            }
-        }
-    }
+
+void Client::InitiateServer()
+{
+    client_hdr *temp_hdr = (client_hdr*)mBuf;
+
+    if (isCompat(mSettings))
+        return;
+     // connection-less protocols communicate their settings in the first
+     // packet sent to the server; this packet is not counted by the server
+     if (isConnectionLess(mSettings)) {
+        dgram_record *record_hdr = (dgram_record *)mBuf;
+        temp_hdr = (client_hdr*)(record_hdr + 1);
+      }
+
+      Settings_GenerateClientHdr( mSettings, temp_hdr );
+
+      // connection-oriented protocols use a short "init" message
+      if ( !isConnectionLess( mSettings ) ) {
+        int rc;
+
+        do {
+             rc = send(mSettings->mSock, mBuf, sizeof(client_hdr), 0);
+        } while (rc < 0 && errno == EAGAIN);
+        WARN_errno(rc < 0, "write failed in InitiateServer()");
+      }
 }
 
 /* -------------------------------------------------------------------
@@ -286,31 +306,20 @@ void Client::InitiateServer() {
  * If inLocalhost is not null, bind to that address, specifying
  * which outgoing interface to use.
  * ------------------------------------------------------------------- */
-
-void Client::Connect( ) {
+void Client::Connect()
+{
     int rc;
-    SockAddr_remoteAddr( mSettings );
 
     assert( mSettings->inHostname != NULL );
 
-    // create an internet socket
-    int type = ( isUDP( mSettings )  ?  SOCK_DGRAM : SOCK_STREAM);
-
-    int domain = (SockAddr_isIPv6( &mSettings->peer ) ? 
-#ifdef HAVE_IPV6
-                  AF_INET6
-#else
-                  AF_INET
-#endif
-                  : AF_INET);
-
-    mSettings->mSock = socket( domain, type, 0 );
-    WARN_errno( mSettings->mSock == INVALID_SOCKET, "socket" );
-
+    // The local socket needs to be filled in first, since the
+    // IPv6 address testing for the peer depends on the type of
+    // the local socket
+    SockAddr_localAddr( mSettings );
+    SockAddr_remoteAddr( mSettings );
+    MakeSocket( mSettings);
     SetSocketOptions( mSettings );
 
-
-    SockAddr_localAddr( mSettings );
     if ( mSettings->mLocalhost != NULL ) {
         // bind socket to local address
         rc = bind( mSettings->mSock, (sockaddr*) &mSettings->local, 
@@ -327,25 +336,38 @@ void Client::Connect( ) {
                  &mSettings->size_local );
     getpeername( mSettings->mSock, (sockaddr*) &mSettings->peer,
                  &mSettings->size_peer );
-} // end Connect
+
+    /* The DCCP packet size must not exceed the MPS (RFC 4340, 14.) */
+    if (mSettings->mProtocol == kProto_DCCP) {
+        unsigned mps = getsock_dccp_mps(mSettings->mSock);
+
+        if (mSettings->mBufLen > mps)
+           die("Buffer length %d exceeds DCCP MPS %d. Use a smaller buffer size "
+              "(-l) on server/client.", mSettings->mBufLen, mps);
+    }
+}
+
 
 /* ------------------------------------------------------------------- 
  * Send a datagram on the socket. The datagram's contents should signify 
  * a FIN to the application. Keep re-transmitting until an 
  * acknowledgement datagram is received. 
  * ------------------------------------------------------------------- */ 
-
-void Client::write_UDP_FIN( ) {
-    int rc; 
+void Client::write_dgram_FIN( ) {
+    int rc, count = 0;
     fd_set readSet; 
     struct timeval timeout; 
+    size_t len = mSettings->mBufLen;
+
+    // we don't need the full buffer size here - it is not counted
+    if (!isConnectionLess(mSettings))
+        len = sizeof(dgram_record);
 
-    int count = 0; 
     while ( count < 10 ) {
-        count++; 
+        count++;
 
         // write data 
-        write( mSettings->mSock, mBuf, mSettings->mBufLen ); 
+        write( mSettings->mSock, mBuf, len);
 
         // wait until the socket is readable, or our timeout expires 
         FD_ZERO( &readSet ); 
@@ -365,14 +387,12 @@ void Client::write_UDP_FIN( ) {
             WARN_errno( rc < 0, "read" );
            if ( rc < 0 ) {
                 break;
-            } else if ( rc >= (int) (sizeof(UDP_datagram) + sizeof(server_hdr)) ) {
-                ReportServerUDP( mSettings, (server_hdr*) ((UDP_datagram*)mBuf + 1) );
+            } else if ( rc >= (int) (sizeof(dgram_record) + sizeof(server_hdr)) ) {
+                ReportServerUDP( mSettings, (server_hdr*) ((dgram_record*)mBuf + 1) );
             }
-
             return; 
         } 
     } 
-
     fprintf( stderr, warn_no_ack, mSettings->mSock, count ); 
 } 
-// end write_UDP_FIN 
+// end write_dgram_FIN