X-Git-Url: http://sjero.net/git/?p=iperf;a=blobdiff_plain;f=src%2FClient.cpp;fp=src%2FClient.cpp;h=2195c4fd3027c8b34f9da14273af7ffcca75b310;hp=d6324d96b30968862d9afb68e9507ba60a1b19c3;hb=90fc1e2c0c74319759b21d4a177c32691b88fdf3;hpb=fad82d47d76abc8f4ac6767e58b89859ad35a2ca diff --git a/src/Client.cpp b/src/Client.cpp index d6324d9..2195c4f 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -69,6 +69,9 @@ Client::Client( thread_Settings *inSettings ) { mSettings = inSettings; mBuf = NULL; + // connect + Connect( ); + // initialize buffer mBuf = new char[ mSettings->mBufLen ]; pattern( mBuf, mSettings->mBufLen ); @@ -83,9 +86,6 @@ Client::Client( thread_Settings *inSettings ) { } } - // connect - Connect( ); - if ( isReport( inSettings ) ) { ReportSettings( inSettings ); if ( mSettings->multihdr && isMultipleReport( inSettings ) ) { @@ -112,8 +112,8 @@ Client::~Client() { DELETE_ARRAY( mBuf ); } // end ~Client -const double kSecs_to_usecs = 1e6; -const int kBytes_to_Bits = 8; +const double kSecs_to_usecs = 1.0e6; +const double kBytes_to_Bits = 8.0; /* ------------------------------------------------------------------- * Send data using the connected UDP/TCP socket, @@ -122,17 +122,12 @@ const int kBytes_to_Bits = 8; * ------------------------------------------------------------------- */ void Client::Run( void ) { - struct UDP_datagram* mBuf_UDP = (struct UDP_datagram*) mBuf; - long currLen = 0; - - int delay_target = 0; - int delay = 0; - int adjust = 0; - + struct dgram_record* mBuf_Dgram = (struct dgram_record*) mBuf; + long currLen = 0, packet_gap = 0, delta, loop_time = 0, adjust = 0; char* readAt = mBuf; - - // Indicates if the stream is readable - bool canRead = true, mMode_Time = isModeTime( mSettings ); + bool canRead = true, // Indicates if the stream is readable + mMode_Time = isModeTime( mSettings ); + ReportStruct *reportstruct = NULL; // setup termination variables if ( mMode_Time ) { @@ -140,43 +135,46 @@ void Client::Run( void ) { mEndTime.add( mSettings->mAmount / 100.0 ); } - if ( isUDP( mSettings ) ) { - // Due to the UDP timestamps etc, included - // reduce the read size by an amount - // equal to the header size - + if ( isPacketOriented( mSettings ) ) { // compute delay for bandwidth restriction, constrained to [0,1] seconds - delay_target = (int) ( mSettings->mBufLen * ((kSecs_to_usecs * kBytes_to_Bits) - / mSettings->mUDPRate) ); - if ( delay_target < 0 || - delay_target > (int) 1 * kSecs_to_usecs ) { - fprintf( stderr, warn_delay_large, delay_target / kSecs_to_usecs ); - delay_target = (int) kSecs_to_usecs * 1; + packet_gap = (long)((double)(mSettings->mBufLen * kSecs_to_usecs * kBytes_to_Bits) / + (double)mSettings->mDgramRate); + + if (packet_gap < 0 || packet_gap > kSecs_to_usecs) { + fprintf( stderr, warn_delay_large, packet_gap / kSecs_to_usecs ); + packet_gap = (long)kSecs_to_usecs; } + // Initialise adjustment variable: in this way, the first adjustment will be + // the latency for sending the first packet, and all sending times are + // synchronised with regard to the first timestamp. + adjust = -packet_gap; + + // Due to the included timestamps etc, + // reduce the read size by an amount equal to the header size if ( isFileInput( mSettings ) ) { - if ( isCompat( mSettings ) ) { - Extractor_reduceReadSize( sizeof(struct UDP_datagram), mSettings ); - readAt += sizeof(struct UDP_datagram); - } else { - Extractor_reduceReadSize( sizeof(struct UDP_datagram) + - sizeof(struct client_hdr), mSettings ); - readAt += sizeof(struct UDP_datagram) + - sizeof(struct client_hdr); - } + size_t offset = sizeof(struct dgram_record); + + if (!isCompat(mSettings)) + offset += sizeof(struct client_hdr); + + Extractor_reduceReadSize(offset, mSettings); + readAt += offset; } } - ReportStruct *reportstruct = NULL; - // InitReport handles Barrier for multiple Streams mSettings->reporthdr = InitReport( mSettings ); reportstruct = new ReportStruct; - reportstruct->packetID = 0; + // Connectionless protocols use the first message as "connect" message (which is + // counted, but their FIN is not counted. For connection-oriented protocols we + // start at message #1 instead of at #0; and the FIN is not counted. + reportstruct->packetID = (!isConnectionLess(mSettings) && isPacketOriented(mSettings)); + + // Set a timestamp now corresponding to an imaginary zero-th send time. lastPacketTime.setnow(); - - do { + do { // Test case: drop 17 packets and send 2 out-of-order: // sequence 51, 52, 70, 53, 54, 71, 72 //switch( datagramID ) { @@ -185,100 +183,122 @@ void Client::Run( void ) { // case 55: datagramID = 71; break; // default: break; //} - gettimeofday( &(reportstruct->packetTime), NULL ); - if ( isUDP( mSettings ) ) { - // store datagram ID into buffer - mBuf_UDP->id = htonl( (reportstruct->packetID)++ ); - mBuf_UDP->tv_sec = htonl( reportstruct->packetTime.tv_sec ); - mBuf_UDP->tv_usec = htonl( reportstruct->packetTime.tv_usec ); - - // delay between writes - // make an adjustment for how long the last loop iteration took - // TODO this doesn't work well in certain cases, like 2 parallel streams - adjust = delay_target + lastPacketTime.subUsec( reportstruct->packetTime ); - lastPacketTime.set( reportstruct->packetTime.tv_sec, - reportstruct->packetTime.tv_usec ); - - if ( adjust > 0 || delay > 0 ) { - delay += adjust; - } - } + // Note that the timestamp does not account for the time-to-wire of the packet. + // This plays a role when looking at jitter/delay values. + gettimeofday( &(reportstruct->packetTime), NULL ); - // Read the next data block from - // the file if it's file input + if ( isPacketOriented( mSettings ) ) { + // Increment packet ID after sending and before reporting + // UDP: sends from 0..n-1, counts from 1..n + // DCCP: sends from 1..n, counts from 2..n+1 + // This difference is of interest for the client only (the server sees + // 1..n). The client uses cntDatagrams, so that the count is correct. + // (Consider the trick in CloseReport() which resetts packetID.) + mBuf_Dgram->id = htonl( reportstruct->packetID++ ); + mBuf_Dgram->tv_sec = htonl( reportstruct->packetTime.tv_sec ); + mBuf_Dgram->tv_usec = htonl( reportstruct->packetTime.tv_usec ); + } else if (!mMode_Time && mSettings->mAmount < mSettings->mBufLen) + mSettings->mBufLen = mSettings->mAmount; + + // Read the next data block from the file if it's file input if ( isFileInput( mSettings ) ) { Extractor_getNextDataBlock( readAt, mSettings ); canRead = Extractor_canRead( mSettings ) != 0; - } else - canRead = true; - - // perform write - currLen = write( mSettings->mSock, mBuf, mSettings->mBufLen ); - if ( currLen < 0 ) { - WARN_errno( currLen < 0, "write2" ); - break; } - // report packets + // Put the packet onto the wire. + // When EAGAIN is returned (DCCP), the internal TX buffer is full: due to + // congestion-control issues the packet can not be transported now. + do { + currLen = write( mSettings->mSock, mBuf, mSettings->mBufLen ); + } while (currLen < 0 && errno == EAGAIN); + + if (currLen < 0) { + WARN_errno(1, "client write"); + break; + } + // update statistics reportstruct->packetLen = currLen; ReportPacket( mSettings->reporthdr, reportstruct ); - - if ( delay > 0 ) { - delay_loop( delay ); - } - if ( !mMode_Time ) { + + if (mMode_Time) { + // time-oriented mode (-t): termination + canRead = mEndTime.after(reportstruct->packetTime); + } else { + // amount-oriented mode (-n): mAmount is unsigned + if (currLen >= mSettings->mAmount) + break; mSettings->mAmount -= currLen; } - } while ( ! (sInterupted || - (mMode_Time && mEndTime.before( reportstruct->packetTime )) || - (!mMode_Time && 0 >= mSettings->mAmount)) && canRead ); + if (isPacketOriented(mSettings)) { + // Adjust the inter-packet gap using the following variables: + // delta is the gap in between calls to send() + // adjust acts as a token bucket whenever delta != packet_gap + // loop_time equals packet_gap if adjust==0, it is corrected otherwise + // + // TODO this doesn't work well in certain cases, like 2 parallel streams + // + delta = lastPacketTime.delta_usec(); + adjust += packet_gap - delta; + loop_time = packet_gap + adjust; + + if (loop_time > 0) + delay_loop(loop_time); + } + + } while (canRead && !sInterupted); // stop timing gettimeofday( &(reportstruct->packetTime), NULL ); CloseReport( mSettings->reporthdr, reportstruct ); - if ( isUDP( mSettings ) ) { + if ( isPacketOriented( mSettings ) ) { // send a final terminating datagram - // Don't count in the mTotalLen. The server counts this one, - // but didn't count our first datagram, so we're even now. - // The negative datagram ID signifies termination to the server. - + // For connectionless protocols, don't count in the mTotalLen. + // The server counts this one, but didn't count our first datagram + // (connect message), so we're even now. + // store datagram ID into buffer - mBuf_UDP->id = htonl( -(reportstruct->packetID) ); - mBuf_UDP->tv_sec = htonl( reportstruct->packetTime.tv_sec ); - mBuf_UDP->tv_usec = htonl( reportstruct->packetTime.tv_usec ); + // The negative datagram ID signifies termination to the server. + mBuf_Dgram->id = htonl( -(reportstruct->packetID) ); + mBuf_Dgram->tv_sec = htonl( reportstruct->packetTime.tv_sec ); + mBuf_Dgram->tv_usec = htonl( reportstruct->packetTime.tv_usec ); - if ( isMulticast( mSettings ) ) { + if ( isMulticast( mSettings ) ) write( mSettings->mSock, mBuf, mSettings->mBufLen ); - } else { - write_UDP_FIN( ); - } + else + write_dgram_FIN( ); } DELETE_PTR( reportstruct ); EndReport( mSettings->reporthdr ); } -// end Run - -void Client::InitiateServer() { - if ( !isCompat( mSettings ) ) { - int currLen; - client_hdr* temp_hdr; - if ( isUDP( mSettings ) ) { - UDP_datagram *UDPhdr = (UDP_datagram *)mBuf; - temp_hdr = (client_hdr*)(UDPhdr + 1); - } else { - temp_hdr = (client_hdr*)mBuf; - } - Settings_GenerateClientHdr( mSettings, temp_hdr ); - if ( !isUDP( mSettings ) ) { - currLen = send( mSettings->mSock, mBuf, sizeof(client_hdr), 0 ); - if ( currLen < 0 ) { - WARN_errno( currLen < 0, "write1" ); - } - } - } + +void Client::InitiateServer() +{ + client_hdr *temp_hdr = (client_hdr*)mBuf; + + if (isCompat(mSettings)) + return; + // connection-less protocols communicate their settings in the first + // packet sent to the server; this packet is not counted by the server + if (isConnectionLess(mSettings)) { + dgram_record *record_hdr = (dgram_record *)mBuf; + temp_hdr = (client_hdr*)(record_hdr + 1); + } + + Settings_GenerateClientHdr( mSettings, temp_hdr ); + + // connection-oriented protocols use a short "init" message + if ( !isConnectionLess( mSettings ) ) { + int rc; + + do { + rc = send(mSettings->mSock, mBuf, sizeof(client_hdr), 0); + } while (rc < 0 && errno == EAGAIN); + WARN_errno(rc < 0, "write failed in InitiateServer()"); + } } /* ------------------------------------------------------------------- @@ -286,31 +306,20 @@ void Client::InitiateServer() { * If inLocalhost is not null, bind to that address, specifying * which outgoing interface to use. * ------------------------------------------------------------------- */ - -void Client::Connect( ) { +void Client::Connect() +{ int rc; - SockAddr_remoteAddr( mSettings ); assert( mSettings->inHostname != NULL ); - // create an internet socket - int type = ( isUDP( mSettings ) ? SOCK_DGRAM : SOCK_STREAM); - - int domain = (SockAddr_isIPv6( &mSettings->peer ) ? -#ifdef HAVE_IPV6 - AF_INET6 -#else - AF_INET -#endif - : AF_INET); - - mSettings->mSock = socket( domain, type, 0 ); - WARN_errno( mSettings->mSock == INVALID_SOCKET, "socket" ); - + // The local socket needs to be filled in first, since the + // IPv6 address testing for the peer depends on the type of + // the local socket + SockAddr_localAddr( mSettings ); + SockAddr_remoteAddr( mSettings ); + MakeSocket( mSettings); SetSocketOptions( mSettings ); - - SockAddr_localAddr( mSettings ); if ( mSettings->mLocalhost != NULL ) { // bind socket to local address rc = bind( mSettings->mSock, (sockaddr*) &mSettings->local, @@ -327,25 +336,38 @@ void Client::Connect( ) { &mSettings->size_local ); getpeername( mSettings->mSock, (sockaddr*) &mSettings->peer, &mSettings->size_peer ); -} // end Connect + + /* The DCCP packet size must not exceed the MPS (RFC 4340, 14.) */ + if (mSettings->mProtocol == kProto_DCCP) { + unsigned mps = getsock_dccp_mps(mSettings->mSock); + + if (mSettings->mBufLen > mps) + die("Buffer length %d exceeds DCCP MPS %d. Use a smaller buffer size " + "(-l) on server/client.", mSettings->mBufLen, mps); + } +} + /* ------------------------------------------------------------------- * Send a datagram on the socket. The datagram's contents should signify * a FIN to the application. Keep re-transmitting until an * acknowledgement datagram is received. * ------------------------------------------------------------------- */ - -void Client::write_UDP_FIN( ) { - int rc; +void Client::write_dgram_FIN( ) { + int rc, count = 0; fd_set readSet; struct timeval timeout; + size_t len = mSettings->mBufLen; + + // we don't need the full buffer size here - it is not counted + if (!isConnectionLess(mSettings)) + len = sizeof(dgram_record); - int count = 0; while ( count < 10 ) { - count++; + count++; // write data - write( mSettings->mSock, mBuf, mSettings->mBufLen ); + write( mSettings->mSock, mBuf, len); // wait until the socket is readable, or our timeout expires FD_ZERO( &readSet ); @@ -365,14 +387,12 @@ void Client::write_UDP_FIN( ) { WARN_errno( rc < 0, "read" ); if ( rc < 0 ) { break; - } else if ( rc >= (int) (sizeof(UDP_datagram) + sizeof(server_hdr)) ) { - ReportServerUDP( mSettings, (server_hdr*) ((UDP_datagram*)mBuf + 1) ); + } else if ( rc >= (int) (sizeof(dgram_record) + sizeof(server_hdr)) ) { + ReportServerUDP( mSettings, (server_hdr*) ((dgram_record*)mBuf + 1) ); } - return; } } - fprintf( stderr, warn_no_ack, mSettings->mSock, count ); } -// end write_UDP_FIN +// end write_dgram_FIN