]> sjero.net Git - iperf/blob - src/Client.cpp
d6324d96b30968862d9afb68e9507ba60a1b19c3
[iperf] / src / Client.cpp
1 /*--------------------------------------------------------------- 
2  * Copyright (c) 1999,2000,2001,2002,2003                              
3  * The Board of Trustees of the University of Illinois            
4  * All Rights Reserved.                                           
5  *--------------------------------------------------------------- 
6  * Permission is hereby granted, free of charge, to any person    
7  * obtaining a copy of this software (Iperf) and associated       
8  * documentation files (the "Software"), to deal in the Software  
9  * without restriction, including without limitation the          
10  * rights to use, copy, modify, merge, publish, distribute,        
11  * sublicense, and/or sell copies of the Software, and to permit     
12  * persons to whom the Software is furnished to do
13  * so, subject to the following conditions: 
14  *
15  *     
16  * Redistributions of source code must retain the above 
17  * copyright notice, this list of conditions and 
18  * the following disclaimers. 
19  *
20  *     
21  * Redistributions in binary form must reproduce the above 
22  * copyright notice, this list of conditions and the following 
23  * disclaimers in the documentation and/or other materials 
24  * provided with the distribution. 
25  * 
26  *     
27  * Neither the names of the University of Illinois, NCSA, 
28  * nor the names of its contributors may be used to endorse 
29  * or promote products derived from this Software without
30  * specific prior written permission. 
31  * 
32  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 
33  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES 
34  * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 
35  * NONINFRINGEMENT. IN NO EVENT SHALL THE CONTIBUTORS OR COPYRIGHT 
36  * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, 
37  * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, 
38  * ARISING FROM, OUT OF OR IN CONNECTION WITH THE
39  * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 
40  * ________________________________________________________________
41  * National Laboratory for Applied Network Research 
42  * National Center for Supercomputing Applications 
43  * University of Illinois at Urbana-Champaign 
44  * http://www.ncsa.uiuc.edu
45  * ________________________________________________________________ 
46  *
47  * Client.cpp
48  * by Mark Gates <mgates@nlanr.net>
49  * -------------------------------------------------------------------
50  * A client thread initiates a connect to the server and handles
51  * sending and receiving data, then closes the socket.
52  * ------------------------------------------------------------------- */
53
54 #include "headers.h"
55 #include "Client.hpp"
56 #include "Thread.h"
57 #include "SocketAddr.h"
58 #include "PerfSocket.hpp"
59 #include "Extractor.h"
60 #include "delay.hpp"
61 #include "util.h"
62 #include "Locale.h"
63
64 /* -------------------------------------------------------------------
65  * Store server hostname, optionally local hostname, and socket info.
66  * ------------------------------------------------------------------- */
67
68 Client::Client( thread_Settings *inSettings ) {
69     mSettings = inSettings;
70     mBuf = NULL;
71
72     // initialize buffer
73     mBuf = new char[ mSettings->mBufLen ];
74     pattern( mBuf, mSettings->mBufLen );
75     if ( isFileInput( mSettings ) ) {
76         if ( !isSTDIN( mSettings ) )
77             Extractor_Initialize( mSettings->mFileName, mSettings->mBufLen, mSettings );
78         else
79             Extractor_InitializeFile( stdin, mSettings->mBufLen, mSettings );
80
81         if ( !Extractor_canRead( mSettings ) ) {
82             unsetFileInput( mSettings );
83         }
84     }
85
86     // connect
87     Connect( );
88
89     if ( isReport( inSettings ) ) {
90         ReportSettings( inSettings );
91         if ( mSettings->multihdr && isMultipleReport( inSettings ) ) {
92             mSettings->multihdr->report->connection.peer = mSettings->peer;
93             mSettings->multihdr->report->connection.size_peer = mSettings->size_peer;
94             mSettings->multihdr->report->connection.local = mSettings->local;
95             SockAddr_setPortAny( &mSettings->multihdr->report->connection.local );
96             mSettings->multihdr->report->connection.size_local = mSettings->size_local;
97         }
98     }
99
100 } // end Client
101
102 /* -------------------------------------------------------------------
103  * Delete memory (hostname strings).
104  * ------------------------------------------------------------------- */
105
106 Client::~Client() {
107     if ( mSettings->mSock != INVALID_SOCKET ) {
108         int rc = close( mSettings->mSock );
109         WARN_errno( rc == SOCKET_ERROR, "close" );
110         mSettings->mSock = INVALID_SOCKET;
111     }
112     DELETE_ARRAY( mBuf );
113 } // end ~Client
114
115 const double kSecs_to_usecs = 1e6; 
116 const int    kBytes_to_Bits = 8; 
117
118 /* ------------------------------------------------------------------- 
119  * Send data using the connected UDP/TCP socket, 
120  * until a termination flag is reached. 
121  * Does not close the socket. 
122  * ------------------------------------------------------------------- */ 
123
124 void Client::Run( void ) {
125     struct UDP_datagram* mBuf_UDP = (struct UDP_datagram*) mBuf; 
126     long currLen = 0; 
127
128     int delay_target = 0; 
129     int delay = 0; 
130     int adjust = 0; 
131
132     char* readAt = mBuf;
133     
134     // Indicates if the stream is readable 
135     bool canRead = true, mMode_Time = isModeTime( mSettings ); 
136
137     // setup termination variables
138     if ( mMode_Time ) {
139         mEndTime.setnow();
140         mEndTime.add( mSettings->mAmount / 100.0 );
141     }
142
143     if ( isUDP( mSettings ) ) {
144         // Due to the UDP timestamps etc, included 
145         // reduce the read size by an amount 
146         // equal to the header size
147     
148         // compute delay for bandwidth restriction, constrained to [0,1] seconds 
149         delay_target = (int) ( mSettings->mBufLen * ((kSecs_to_usecs * kBytes_to_Bits) 
150                                                      / mSettings->mUDPRate) ); 
151         if ( delay_target < 0  || 
152              delay_target > (int) 1 * kSecs_to_usecs ) {
153             fprintf( stderr, warn_delay_large, delay_target / kSecs_to_usecs ); 
154             delay_target = (int) kSecs_to_usecs * 1; 
155         }
156         if ( isFileInput( mSettings ) ) {
157             if ( isCompat( mSettings ) ) {
158                 Extractor_reduceReadSize( sizeof(struct UDP_datagram), mSettings );
159                 readAt += sizeof(struct UDP_datagram);
160             } else {
161                 Extractor_reduceReadSize( sizeof(struct UDP_datagram) +
162                                           sizeof(struct client_hdr), mSettings );
163                 readAt += sizeof(struct UDP_datagram) +
164                           sizeof(struct client_hdr);
165             }
166         }
167     }
168
169     ReportStruct *reportstruct = NULL;
170
171     // InitReport handles Barrier for multiple Streams
172     mSettings->reporthdr = InitReport( mSettings );
173     reportstruct = new ReportStruct;
174     reportstruct->packetID = 0;
175
176     lastPacketTime.setnow();
177     
178     do {
179
180         // Test case: drop 17 packets and send 2 out-of-order: 
181         // sequence 51, 52, 70, 53, 54, 71, 72 
182         //switch( datagramID ) { 
183         //  case 53: datagramID = 70; break; 
184         //  case 71: datagramID = 53; break; 
185         //  case 55: datagramID = 71; break; 
186         //  default: break; 
187         //} 
188         gettimeofday( &(reportstruct->packetTime), NULL );
189
190         if ( isUDP( mSettings ) ) {
191             // store datagram ID into buffer 
192             mBuf_UDP->id      = htonl( (reportstruct->packetID)++ ); 
193             mBuf_UDP->tv_sec  = htonl( reportstruct->packetTime.tv_sec ); 
194             mBuf_UDP->tv_usec = htonl( reportstruct->packetTime.tv_usec );
195
196             // delay between writes 
197             // make an adjustment for how long the last loop iteration took 
198             // TODO this doesn't work well in certain cases, like 2 parallel streams 
199             adjust = delay_target + lastPacketTime.subUsec( reportstruct->packetTime ); 
200             lastPacketTime.set( reportstruct->packetTime.tv_sec, 
201                                 reportstruct->packetTime.tv_usec ); 
202
203             if ( adjust > 0  ||  delay > 0 ) {
204                 delay += adjust; 
205             }
206         }
207
208         // Read the next data block from 
209         // the file if it's file input 
210         if ( isFileInput( mSettings ) ) {
211             Extractor_getNextDataBlock( readAt, mSettings ); 
212             canRead = Extractor_canRead( mSettings ) != 0; 
213         } else
214             canRead = true; 
215
216         // perform write 
217         currLen = write( mSettings->mSock, mBuf, mSettings->mBufLen ); 
218         if ( currLen < 0 ) {
219             WARN_errno( currLen < 0, "write2" ); 
220             break; 
221         }
222
223         // report packets 
224         reportstruct->packetLen = currLen;
225         ReportPacket( mSettings->reporthdr, reportstruct );
226         
227         if ( delay > 0 ) {
228             delay_loop( delay ); 
229         }
230         if ( !mMode_Time ) {
231             mSettings->mAmount -= currLen;
232         }
233
234     } while ( ! (sInterupted  || 
235                  (mMode_Time   &&  mEndTime.before( reportstruct->packetTime ))  || 
236                  (!mMode_Time  &&  0 >= mSettings->mAmount)) && canRead ); 
237
238     // stop timing
239     gettimeofday( &(reportstruct->packetTime), NULL );
240     CloseReport( mSettings->reporthdr, reportstruct );
241
242     if ( isUDP( mSettings ) ) {
243         // send a final terminating datagram 
244         // Don't count in the mTotalLen. The server counts this one, 
245         // but didn't count our first datagram, so we're even now. 
246         // The negative datagram ID signifies termination to the server. 
247     
248         // store datagram ID into buffer 
249         mBuf_UDP->id      = htonl( -(reportstruct->packetID)  ); 
250         mBuf_UDP->tv_sec  = htonl( reportstruct->packetTime.tv_sec ); 
251         mBuf_UDP->tv_usec = htonl( reportstruct->packetTime.tv_usec ); 
252
253         if ( isMulticast( mSettings ) ) {
254             write( mSettings->mSock, mBuf, mSettings->mBufLen ); 
255         } else {
256             write_UDP_FIN( ); 
257         }
258     }
259     DELETE_PTR( reportstruct );
260     EndReport( mSettings->reporthdr );
261
262 // end Run
263
264 void Client::InitiateServer() {
265     if ( !isCompat( mSettings ) ) {
266         int currLen;
267         client_hdr* temp_hdr;
268         if ( isUDP( mSettings ) ) {
269             UDP_datagram *UDPhdr = (UDP_datagram *)mBuf;
270             temp_hdr = (client_hdr*)(UDPhdr + 1);
271         } else {
272             temp_hdr = (client_hdr*)mBuf;
273         }
274         Settings_GenerateClientHdr( mSettings, temp_hdr );
275         if ( !isUDP( mSettings ) ) {
276             currLen = send( mSettings->mSock, mBuf, sizeof(client_hdr), 0 );
277             if ( currLen < 0 ) {
278                 WARN_errno( currLen < 0, "write1" );
279             }
280         }
281     }
282 }
283
284 /* -------------------------------------------------------------------
285  * Setup a socket connected to a server.
286  * If inLocalhost is not null, bind to that address, specifying
287  * which outgoing interface to use.
288  * ------------------------------------------------------------------- */
289
290 void Client::Connect( ) {
291     int rc;
292     SockAddr_remoteAddr( mSettings );
293
294     assert( mSettings->inHostname != NULL );
295
296     // create an internet socket
297     int type = ( isUDP( mSettings )  ?  SOCK_DGRAM : SOCK_STREAM);
298
299     int domain = (SockAddr_isIPv6( &mSettings->peer ) ? 
300 #ifdef HAVE_IPV6
301                   AF_INET6
302 #else
303                   AF_INET
304 #endif
305                   : AF_INET);
306
307     mSettings->mSock = socket( domain, type, 0 );
308     WARN_errno( mSettings->mSock == INVALID_SOCKET, "socket" );
309
310     SetSocketOptions( mSettings );
311
312
313     SockAddr_localAddr( mSettings );
314     if ( mSettings->mLocalhost != NULL ) {
315         // bind socket to local address
316         rc = bind( mSettings->mSock, (sockaddr*) &mSettings->local, 
317                    SockAddr_get_sizeof_sockaddr( &mSettings->local ) );
318         WARN_errno( rc == SOCKET_ERROR, "bind" );
319     }
320
321     // connect socket
322     rc = connect( mSettings->mSock, (sockaddr*) &mSettings->peer, 
323                   SockAddr_get_sizeof_sockaddr( &mSettings->peer ));
324     WARN_errno( rc == SOCKET_ERROR, "connect" );
325
326     getsockname( mSettings->mSock, (sockaddr*) &mSettings->local, 
327                  &mSettings->size_local );
328     getpeername( mSettings->mSock, (sockaddr*) &mSettings->peer,
329                  &mSettings->size_peer );
330 } // end Connect
331
332 /* ------------------------------------------------------------------- 
333  * Send a datagram on the socket. The datagram's contents should signify 
334  * a FIN to the application. Keep re-transmitting until an 
335  * acknowledgement datagram is received. 
336  * ------------------------------------------------------------------- */ 
337
338 void Client::write_UDP_FIN( ) {
339     int rc; 
340     fd_set readSet; 
341     struct timeval timeout; 
342
343     int count = 0; 
344     while ( count < 10 ) {
345         count++; 
346
347         // write data 
348         write( mSettings->mSock, mBuf, mSettings->mBufLen ); 
349
350         // wait until the socket is readable, or our timeout expires 
351         FD_ZERO( &readSet ); 
352         FD_SET( mSettings->mSock, &readSet ); 
353         timeout.tv_sec  = 0; 
354         timeout.tv_usec = 250000; // quarter second, 250 ms 
355
356         rc = select( mSettings->mSock+1, &readSet, NULL, NULL, &timeout ); 
357         FAIL_errno( rc == SOCKET_ERROR, "select", mSettings ); 
358
359         if ( rc == 0 ) {
360             // select timed out 
361             continue; 
362         } else {
363             // socket ready to read 
364             rc = read( mSettings->mSock, mBuf, mSettings->mBufLen ); 
365             WARN_errno( rc < 0, "read" );
366             if ( rc < 0 ) {
367                 break;
368             } else if ( rc >= (int) (sizeof(UDP_datagram) + sizeof(server_hdr)) ) {
369                 ReportServerUDP( mSettings, (server_hdr*) ((UDP_datagram*)mBuf + 1) );
370             }
371
372             return; 
373         } 
374     } 
375
376     fprintf( stderr, warn_no_ack, mSettings->mSock, count ); 
377
378 // end write_UDP_FIN