]> sjero.net Git - iperf/blob - src/Reporter.c
DCCP support for iperf
[iperf] / src / Reporter.c
1 /*--------------------------------------------------------------- 
2  * Copyright (c) 1999,2000,2001,2002,2003                              
3  * The Board of Trustees of the University of Illinois            
4  * All Rights Reserved.                                           
5  *--------------------------------------------------------------- 
6  * Permission is hereby granted, free of charge, to any person    
7  * obtaining a copy of this software (Iperf) and associated       
8  * documentation files (the "Software"), to deal in the Software  
9  * without restriction, including without limitation the          
10  * rights to use, copy, modify, merge, publish, distribute,        
11  * sublicense, and/or sell copies of the Software, and to permit     
12  * persons to whom the Software is furnished to do
13  * so, subject to the following conditions: 
14  *
15  *     
16  * Redistributions of source code must retain the above 
17  * copyright notice, this list of conditions and 
18  * the following disclaimers. 
19  *
20  *     
21  * Redistributions in binary form must reproduce the above 
22  * copyright notice, this list of conditions and the following 
23  * disclaimers in the documentation and/or other materials 
24  * provided with the distribution. 
25  * 
26  *     
27  * Neither the names of the University of Illinois, NCSA, 
28  * nor the names of its contributors may be used to endorse 
29  * or promote products derived from this Software without
30  * specific prior written permission. 
31  * 
32  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 
33  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES 
34  * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 
35  * NONINFRINGEMENT. IN NO EVENT SHALL THE CONTIBUTORS OR COPYRIGHT 
36  * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, 
37  * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, 
38  * ARISING FROM, OUT OF OR IN CONNECTION WITH THE
39  * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 
40  * ________________________________________________________________
41  * National Laboratory for Applied Network Research 
42  * National Center for Supercomputing Applications 
43  * University of Illinois at Urbana-Champaign 
44  * http://www.ncsa.uiuc.edu
45  * ________________________________________________________________ 
46  *
47  * Reporter.c
48  * by Kevin Gibbs <kgibbs@nlanr.net>
49  *
50  * ________________________________________________________________ */
51
52 #include "headers.h"
53 #include "Settings.hpp"
54 #include "util.h"
55 #include "Reporter.h"
56 #include "Thread.h"
57 #include "Locale.h"
58 #include "PerfSocket.hpp"
59 #include "SocketAddr.h"
60
61 #ifdef __cplusplus
62 extern "C" {
63 #endif
64
65 /*
66   The following 4 functions are provided for Reporting
67   styles that do not have all the reporting formats. For
68   instance the provided CSV format does not have a settings
69   report so it uses settings_notimpl.
70   */
71 void* connection_notimpl( Connection_Info * nused, int nuse ) { 
72     return NULL; 
73 }
74 void settings_notimpl( ReporterData * nused ) { }
75 void statistics_notimpl( Transfer_Info * nused ) { }
76 void serverstatistics_notimpl( Connection_Info *nused1, Transfer_Info *nused2 ) { }
77
78 // To add a reporting style include its header here.
79 #include "report_default.h"
80 #include "report_CSV.h"
81
82 // The following array of report structs contains the
83 // pointers required for reporting in different reporting
84 // styles. To add a reporting style add a report struct
85 // below.
86 report_connection connection_reports[kReport_MAXIMUM] = {
87     reporter_reportpeer,
88     CSV_peer
89 };
90
91 report_settings settings_reports[kReport_MAXIMUM] = {
92     reporter_reportsettings,
93     settings_notimpl
94 };
95
96 report_statistics statistics_reports[kReport_MAXIMUM] = {
97     reporter_printstats,
98     CSV_stats
99 };
100
101 report_serverstatistics serverstatistics_reports[kReport_MAXIMUM] = {
102     reporter_serverstats,
103     CSV_serverstats
104 };
105
106 report_statistics multiple_reports[kReport_MAXIMUM] = {
107     reporter_multistats,
108     CSV_stats
109 };
110
111 char buffer[64]; // Buffer for printing
112 ReportHeader *ReportRoot = NULL;
113 extern Condition ReportCond;
114 extern Condition ReportDoneCond;
115 int reporter_process_report ( ReportHeader *report );
116 void process_report ( ReportHeader *report );
117 int reporter_handle_packet( ReportHeader *report );
118 int reporter_condprintstats( ReporterData *stats, MultiHeader *multireport, int force );
119 int reporter_print( ReporterData *stats, int type, int end );
120 void PrintMSS( ReporterData *stats );
121
122 MultiHeader* InitMulti( thread_Settings *agent, int inID ) {
123     MultiHeader *multihdr = NULL;
124     if ( agent->mThreads > 1 || agent->mThreadMode == kMode_Server ) {
125         if ( isMultipleReport( agent ) ) {
126             multihdr = malloc(sizeof(MultiHeader) +  sizeof(ReporterData) +
127                               NUM_MULTI_SLOTS * sizeof(Transfer_Info));
128         } else {
129             multihdr = malloc(sizeof(MultiHeader));
130         }
131         if ( multihdr != NULL ) {
132             memset( multihdr, 0, sizeof(MultiHeader) );
133             Condition_Initialize( &multihdr->barrier );
134             multihdr->groupID = inID;
135             multihdr->threads = agent->mThreads;
136             if ( isMultipleReport( agent ) ) {
137                 int i;
138                 ReporterData *data = NULL;
139                 multihdr->report = (ReporterData*)(multihdr + 1);
140                 memset(multihdr->report, 0, sizeof(ReporterData));
141                 multihdr->data = (Transfer_Info*)(multihdr->report + 1);
142                 data = multihdr->report;
143                 for ( i = 0; i < NUM_MULTI_SLOTS; i++ ) {
144                     multihdr->data[i].startTime = -1;
145                     multihdr->data[i].transferID = inID;
146                     multihdr->data[i].groupID = -2;
147                 }
148                 data->type = TRANSFER_REPORT;
149                 if ( agent->mInterval != 0.0 ) {
150                     struct timeval *interval = &data->intervalTime;
151                     interval->tv_sec = (long) agent->mInterval;
152                     interval->tv_usec = (long) ((agent->mInterval - interval->tv_sec) 
153                                                 * rMillion);
154                 }
155                 data->mHost = agent->mHost;
156                 data->mLocalhost = agent->mLocalhost;
157                 data->mBufLen = agent->mBufLen;
158                 data->mMSS = agent->mMSS;
159                 data->mWinSize = agent->mWinSize;
160                 data->flags = agent->flags;
161                 data->mProtocol = agent->mProtocol;
162                 data->mThreadMode = agent->mThreadMode;
163                 data->mode = agent->mReportMode;
164                 data->info.mFormat = agent->mFormat;
165                 data->info.mTTL = agent->mTTL;
166                 if ( isPacketOriented( agent ) ) {
167                     multihdr->report->info.mUDP = (char)agent->mThreadMode;
168                 }
169                 if ( isConnectionReport( agent ) ) {
170                     data->type |= CONNECTION_REPORT;
171                     data->connection.peer = agent->peer;
172                     data->connection.size_peer = agent->size_peer;
173                     SockAddr_setPortAny( &data->connection.peer );
174                     data->connection.local = agent->local;
175                     data->connection.size_local = agent->size_local;
176                     SockAddr_setPortAny( &data->connection.local );
177                 }
178             }
179         } else {
180             FAIL(1, "Out of Memory!!\n", agent);
181         }
182     }
183     return multihdr;
184 }
185
186 /*
187  * BarrierClient allows for multiple stream clients to be syncronized
188  */
189 void BarrierClient( ReportHeader *agent ) {
190     Condition_Lock(agent->multireport->barrier);
191     agent->multireport->threads--;
192     if ( agent->multireport->threads == 0 ) {
193         // last one set time and wake up everyone
194         gettimeofday( &(agent->multireport->startTime), NULL );
195         Condition_Broadcast( &agent->multireport->barrier );
196     } else {
197         Condition_Wait( &agent->multireport->barrier );
198     }
199     agent->multireport->threads++;
200     Condition_Unlock( agent->multireport->barrier );
201     agent->report.startTime = agent->multireport->startTime;
202     agent->report.nextTime = agent->report.startTime;
203     TimeAdd( agent->report.nextTime, agent->report.intervalTime );
204 }
205
206 /*
207  * InitReport is called by a transfer agent (client or
208  * server) to setup the needed structures to communicate
209  * traffic.
210  */
211 ReportHeader* InitReport( thread_Settings *agent ) {
212     ReportHeader *reporthdr = NULL;
213     ReporterData *data = NULL;
214     if ( isDataReport( agent ) ) {
215         /*
216          * Create in one big chunk
217          */
218         reporthdr = malloc( sizeof(ReportHeader) +
219                             NUM_REPORT_STRUCTS * sizeof(ReportStruct) );
220         if ( reporthdr != NULL ) {
221             // Only need to make sure the headers are clean
222             memset( reporthdr, 0, sizeof(ReportHeader));
223             reporthdr->data = (ReportStruct*)(reporthdr+1);
224             reporthdr->multireport = agent->multihdr;
225             data = &reporthdr->report;
226             reporthdr->reporterindex = NUM_REPORT_STRUCTS - 1;
227             data->info.transferID = agent->mSock;
228             data->info.groupID = (agent->multihdr != NULL ? agent->multihdr->groupID 
229                                                           : -1);
230             data->type = TRANSFER_REPORT;
231             if ( agent->mInterval != 0.0 ) {
232                 struct timeval *interval = &data->intervalTime;
233                 interval->tv_sec = (long) agent->mInterval;
234                 interval->tv_usec = (long) ((agent->mInterval - interval->tv_sec) 
235                                             * rMillion);
236             }
237             data->mHost = agent->mHost;
238             data->mLocalhost = agent->mLocalhost;
239             data->mBufLen = agent->mBufLen;
240             data->mMSS = agent->mMSS;
241             data->mWinSize = agent->mWinSize;
242             data->flags = agent->flags;
243             data->mProtocol = agent->mProtocol;
244             data->mThreadMode = agent->mThreadMode;
245             data->mode = agent->mReportMode;
246             data->info.mFormat = agent->mFormat;
247             data->info.mTTL = agent->mTTL;
248             if ( isPacketOriented( agent ) ) {
249                 reporthdr->report.info.mUDP = (char)agent->mThreadMode;
250             }
251         } else {
252             FAIL(1, "Out of Memory!!\n", agent);
253         }
254     }
255     if ( isConnectionReport( agent ) ) {
256         if ( reporthdr == NULL ) {
257             /*
258              * Create in one big chunk
259              */
260             reporthdr = malloc( sizeof(ReportHeader) );
261             if ( reporthdr != NULL ) {
262                 // Only need to make sure the headers are clean
263                 memset( reporthdr, 0, sizeof(ReportHeader));
264                 data = &reporthdr->report;
265                 data->info.transferID = agent->mSock;
266                 data->info.groupID = -1;
267             } else {
268                 FAIL(1, "Out of Memory!!\n", agent);
269             }
270         }
271         if ( reporthdr != NULL ) {
272             data->type |= CONNECTION_REPORT;
273             data->connection.peer = agent->peer;
274             data->connection.size_peer = agent->size_peer;
275             data->connection.local = agent->local;
276             data->connection.size_local = agent->size_local;
277         } else {
278             FAIL(1, "Out of Memory!!\n", agent);
279         }
280     }
281     if ( isConnectionReport( agent ) || isDataReport( agent ) ) {
282
283 #ifdef HAVE_THREAD
284         /*
285          * Update the ReportRoot to include this report.
286          */
287         if ( reporthdr->report.mThreadMode == kMode_Client &&
288              reporthdr->multireport != NULL ) {
289             // syncronize watches on my mark......
290             BarrierClient( reporthdr );
291         } else {
292             if ( reporthdr->multireport != NULL && isMultipleReport( agent )) {
293                 reporthdr->multireport->threads++;
294                 if ( reporthdr->multireport->report->startTime.tv_sec == 0 ) {
295                     gettimeofday( &(reporthdr->multireport->report->startTime), NULL );
296                 }
297                 reporthdr->report.startTime = reporthdr->multireport->report->startTime;
298             } else {
299                 // set start time
300                 gettimeofday( &(reporthdr->report.startTime), NULL );
301             }
302             reporthdr->report.nextTime = reporthdr->report.startTime;
303             TimeAdd( reporthdr->report.nextTime, reporthdr->report.intervalTime );
304         }
305         Condition_Lock( ReportCond );
306         reporthdr->next = ReportRoot;
307         ReportRoot = reporthdr;
308         Condition_Signal( &ReportCond );
309         Condition_Unlock( ReportCond );
310 #else
311         // set start time
312         gettimeofday( &(reporthdr->report.startTime), NULL );
313         /*
314          * Process the report in this thread
315          */
316         reporthdr->next = NULL;
317         process_report ( reporthdr );
318 #endif 
319     }
320     if ( !isDataReport( agent ) ) {
321         reporthdr = NULL;
322     }
323     return reporthdr;
324 }
325
326 /*
327  * ReportPacket is called by a transfer agent to record
328  * the arrival or departure of a "packet" (for TCP it 
329  * will actually represent many packets). This needs to
330  * be as simple and fast as possible as it gets called for
331  * every "packet".
332  */
333 void ReportPacket( ReportHeader* agent, ReportStruct *packet ) {
334     if ( agent != NULL ) {
335         int index = agent->reporterindex;
336         /*
337          * First find the appropriate place to put the information
338          */
339         if ( agent->agentindex == NUM_REPORT_STRUCTS ) {
340             // Just need to make sure that reporter is not on the first
341             // item
342             while ( index == 0 ) {
343                 Condition_Signal( &ReportCond );
344                 Condition_Wait( &ReportDoneCond );
345                 index = agent->reporterindex;
346             }
347             agent->agentindex = 0;
348         }
349         // Need to make sure that reporter is not about to be "lapped"
350         while ( index - 1 == agent->agentindex ) {
351             Condition_Signal( &ReportCond );
352             Condition_Wait( &ReportDoneCond );
353             index = agent->reporterindex;
354         }
355         
356         // Put the information there
357         memcpy( agent->data + agent->agentindex, packet, sizeof(ReportStruct) );
358         
359         // Updating agentindex MUST be the last thing done
360         agent->agentindex++;
361 #ifndef HAVE_THREAD
362         /*
363          * Process the report in this thread
364          */
365         process_report ( agent );
366 #endif 
367     }
368 }
369
370 /*
371  * CloseReport is called by a transfer agent to finalize
372  * the report and signal transfer is over.
373  */
374 void CloseReport( ReportHeader *agent, ReportStruct *packet ) {
375     if ( agent != NULL) {
376         /*
377          * Using PacketID of -1 ends reporting
378          */
379         packet->packetID = -1;
380         packet->packetLen = 0;
381         ReportPacket( agent, packet );
382         packet->packetID = agent->report.cntDatagrams;
383     }
384 }
385
386 /*
387  * EndReport signifies the agent no longer is interested
388  * in the report. Calls to GetReport will no longer be
389  * filled
390  */
391 void EndReport( ReportHeader *agent ) {
392     if ( agent != NULL ) {
393         int index = agent->reporterindex;
394         while ( index != -1 ) {
395             thread_rest();
396             index = agent->reporterindex;
397         }
398         agent->agentindex = -1;
399 #ifndef HAVE_THREAD
400         /*
401          * Process the report in this thread
402          */
403         process_report ( agent );
404 #endif
405     }
406 }
407
408 /*
409  * GetReport is called by the agent after a CloseReport
410  * but before an EndReport to get the stats generated
411  * by the reporter thread.
412  */
413 Transfer_Info *GetReport( ReportHeader *agent ) {
414     int index = agent->reporterindex;
415     while ( index != -1 ) {
416         thread_rest();
417         index = agent->reporterindex;
418     }
419     return &agent->report.info;
420 }
421
422 /*
423  * ReportSettings will generate a summary report for
424  * settings being used with Listeners or Clients
425  */
426 void ReportSettings( thread_Settings *agent ) {
427     if ( isSettingsReport( agent ) ) {
428         /*
429          * Create in one big chunk
430          */
431         ReportHeader *reporthdr = malloc( sizeof(ReportHeader) );
432     
433         if ( reporthdr != NULL ) {
434             ReporterData *data = &reporthdr->report;
435             data->info.transferID = agent->mSock;
436             data->info.groupID = -1;
437             reporthdr->agentindex = -1;
438             reporthdr->reporterindex = -1;
439         
440             data->mHost = agent->mHost;
441             data->mLocalhost = agent->mLocalhost;
442             data->mode = agent->mReportMode;
443             data->type = SETTINGS_REPORT;
444             data->mBufLen = agent->mBufLen;
445             data->mMSS = agent->mMSS;
446             data->mWinSize = agent->mWinSize;
447             data->flags = agent->flags;
448             data->mProtocol = agent->mProtocol;
449             data->mThreadMode = agent->mThreadMode;
450             data->mPort = agent->mPort;
451             data->info.mFormat = agent->mFormat;
452             data->info.mTTL = agent->mTTL;
453             data->connection.peer = agent->peer;
454             data->connection.size_peer = agent->size_peer;
455             data->connection.local = agent->local;
456             data->connection.size_local = agent->size_local;
457     
458     #ifdef HAVE_THREAD
459             /*
460              * Update the ReportRoot to include this report.
461              */
462             Condition_Lock( ReportCond );
463             reporthdr->next = ReportRoot;
464             ReportRoot = reporthdr;
465             Condition_Signal( &ReportCond );
466             Condition_Unlock( ReportCond );
467     #else
468             /*
469              * Process the report in this thread
470              */
471             reporthdr->next = NULL;
472             process_report ( reporthdr );
473     #endif 
474         } else {
475             FAIL(1, "Out of Memory!!\n", agent);
476         }
477     }
478 }
479
480 /*
481  * ReportServerUDP will generate a report of the UDP
482  * statistics as reported by the server on the client
483  * side.
484  */
485 void ReportServerUDP( thread_Settings *agent, server_hdr *server ) {
486     if ( (ntohl(server->flags) & HEADER_VERSION1) != 0 &&
487          isServerReport( agent ) ) {
488         /*
489          * Create in one big chunk
490          */
491         ReportHeader *reporthdr = malloc( sizeof(ReportHeader) );
492         Transfer_Info *stats = &reporthdr->report.info;
493
494         if ( reporthdr != NULL ) {
495             stats->transferID = agent->mSock;
496             stats->groupID = (agent->multihdr != NULL ? agent->multihdr->groupID 
497                                                       : -1);
498             reporthdr->agentindex = -1;
499             reporthdr->reporterindex = -1;
500
501             reporthdr->report.type = SERVER_RELAY_REPORT;
502             reporthdr->report.mode = agent->mReportMode;
503             stats->mFormat = agent->mFormat;
504             stats->jitter = ntohl( server->jitter1 );
505             stats->jitter += ntohl( server->jitter2 ) / (double)rMillion;
506             stats->TotalLen = (((max_size_t) ntohl( server->total_len1 )) << 32) +
507                                   ntohl( server->total_len2 ); 
508             stats->startTime = 0;
509             stats->endTime = ntohl( server->stop_sec );
510             stats->endTime += ntohl( server->stop_usec ) / (double)rMillion;
511             stats->cntError = ntohl( server->error_cnt );
512             stats->cntOutofOrder = ntohl( server->outorder_cnt );
513             stats->cntDatagrams = ntohl( server->datagrams );
514             stats->mUDP = (char)kMode_Server;
515             reporthdr->report.connection.peer = agent->local;
516             reporthdr->report.connection.size_peer = agent->size_local;
517             reporthdr->report.connection.local = agent->peer;
518             reporthdr->report.connection.size_local = agent->size_peer;
519             
520 #ifdef HAVE_THREAD
521             /*
522              * Update the ReportRoot to include this report.
523              */
524             Condition_Lock( ReportCond );
525             reporthdr->next = ReportRoot;
526             ReportRoot = reporthdr;
527             Condition_Signal( &ReportCond );
528             Condition_Unlock( ReportCond );
529 #else
530             /*
531              * Process the report in this thread
532              */
533             reporthdr->next = NULL;
534             process_report ( reporthdr );
535 #endif 
536         } else {
537             FAIL(1, "Out of Memory!!\n", agent);
538         }
539     }
540 }
541
542 /*
543  * This function is called only when the reporter thread
544  * This function is the loop that the reporter thread processes
545  */
546 void reporter_spawn( thread_Settings *thread ) {
547     do {
548         // This section allows for safe exiting with Ctrl-C
549         Condition_Lock ( ReportCond );
550         if ( ReportRoot == NULL ) {
551             // Allow main thread to exit if Ctrl-C is received
552             thread_setignore();
553             Condition_Wait ( &ReportCond );
554             // Stop main thread from exiting until done with all reports
555             thread_unsetignore();
556         }
557         Condition_Unlock ( ReportCond );
558
559 again:
560         if ( ReportRoot != NULL ) {
561             ReportHeader *temp = ReportRoot;
562             //Condition_Unlock ( ReportCond );
563             if ( reporter_process_report ( temp ) ) {
564                 // This section allows for more reports to be added while
565                 // the reporter is processing reports without needing to
566                 // stop the reporter or immediately notify it
567                 Condition_Lock ( ReportCond );
568                 if ( temp == ReportRoot ) {
569                     // no new reports
570                     ReportRoot = temp->next;
571                 } else {
572                     // new reports added
573                     ReportHeader *itr = ReportRoot;
574                     while ( itr->next != temp ) {
575                         itr = itr->next;
576                     }
577                     itr->next = temp->next;
578                 }
579                 // finished with report so free it
580                 free( temp );
581                 Condition_Unlock ( ReportCond );
582                 Condition_Signal( &ReportDoneCond );
583                 if (ReportRoot)
584                     goto again;
585             }
586             Condition_Signal( &ReportDoneCond );
587             usleep(10000);
588         } else {
589             //Condition_Unlock ( ReportCond );
590         }
591     } while ( 1 );
592 }
593
594 /*
595  * Used for single threaded reporting
596  */
597 void process_report ( ReportHeader *report ) {
598     if ( report != NULL ) {
599         if ( reporter_process_report( report ) ) {
600             free( report );
601         }
602     }
603 }
604
605 /*
606  * Process reports starting with "reporthdr"
607  */
608 int reporter_process_report ( ReportHeader *reporthdr ) {
609     int need_free = 0;
610
611     // Recursively process reports
612     if ( reporthdr->next != NULL ) {
613         if ( reporter_process_report( reporthdr->next ) ) {
614             // If we are done with this report then free it
615             ReportHeader *temp = reporthdr->next;
616             reporthdr->next = reporthdr->next->next;
617             free( temp );
618         }
619     }
620
621     if ( (reporthdr->report.type & SETTINGS_REPORT) != 0 ) {
622         reporthdr->report.type &= ~SETTINGS_REPORT;
623         return reporter_print( &reporthdr->report, SETTINGS_REPORT, 1 );
624     } else if ( (reporthdr->report.type & CONNECTION_REPORT) != 0 ) {
625         reporthdr->report.type &= ~CONNECTION_REPORT;
626         reporter_print( &reporthdr->report, CONNECTION_REPORT,
627                                (reporthdr->report.type == 0 ? 1 : 0) );
628         if ( reporthdr->multireport != NULL && isMultipleReport( (&reporthdr->report) )) {
629             if ( (reporthdr->multireport->report->type & CONNECTION_REPORT) != 0 ) {
630                 reporthdr->multireport->report->type &= ~CONNECTION_REPORT;
631                 reporter_print( reporthdr->multireport->report, CONNECTION_REPORT,
632                                 (reporthdr->report.type == 0 ? 1 : 0) );
633             }
634         }
635     } else if ( (reporthdr->report.type & SERVER_RELAY_REPORT) != 0 ) {
636         reporthdr->report.type &= ~SERVER_RELAY_REPORT;
637         return reporter_print( &reporthdr->report, SERVER_RELAY_REPORT, 1 );
638     }
639     if ( (reporthdr->report.type & TRANSFER_REPORT) != 0 ) {
640         // If there are more packets to process then handle them
641         if ( reporthdr->reporterindex >= 0 ) {
642             // Need to make sure we do not pass the "agent"
643             while ( reporthdr->reporterindex != reporthdr->agentindex - 1 ) {
644                 if ( reporthdr->reporterindex == NUM_REPORT_STRUCTS - 1 ) {
645                     if ( reporthdr->agentindex == 0 ) {
646                         break;
647                     } else {
648                         reporthdr->reporterindex = 0;
649                     }
650                 } else {
651                     reporthdr->reporterindex++;
652                 }
653                 if ( reporter_handle_packet( reporthdr ) ) {
654                     // No more packets to process
655                     reporthdr->reporterindex = -1;
656                     break;
657                 }
658             }
659         }
660         // If the agent is done with the report then free it
661         if ( reporthdr->agentindex == -1 ) {
662             need_free = 1;
663         }
664     }
665     return need_free;
666 }
667
668 /*
669  * Updates connection stats
670  */
671 int reporter_handle_packet( ReportHeader *reporthdr ) {
672     ReportStruct *packet = &reporthdr->data[reporthdr->reporterindex];
673     ReporterData *data = &reporthdr->report;
674     Transfer_Info *stats = &reporthdr->report.info;
675     int finished = 0;
676
677     // update received amount and time
678     data->TotalLen  += packet->packetLen;
679     data->packetTime = packet->packetTime;
680     data->cntDatagrams++;
681
682     if (packet->packetID < 0) {
683         // This is the last packet (FIN)
684         finished = 1;
685         if (isConnectionLess(&reporthdr->report)) {
686             // connectionless protocols don't count the last payload
687             if (reporthdr->report.mThreadMode == kMode_Client)
688                 data->TotalLen -= packet->packetLen;
689         } else {
690             // connection-oriented protocols dont't count the FIN
691             data->cntDatagrams--;
692         }
693     } else if ( packet->packetID != 0 ) {
694             // UDP or DCCP packet
695             double transit, deltaTransit;
696
697             // from RFC 1889, Real Time Protocol (RTP) 
698             // J = J + ( | D(i-1,i) | - J ) / 16 
699             transit = TimeDifference( packet->packetTime, packet->sentTime );
700             if ( data->lastTransit != 0.0 ) {
701                 deltaTransit = transit - data->lastTransit;
702                 if ( deltaTransit < 0.0 ) {
703                     deltaTransit = -deltaTransit;
704                 }
705                 stats->jitter += (deltaTransit - stats->jitter) / (16.0);
706             }
707             data->lastTransit = transit;
708
709             // packet loss occured if the datagram numbers aren't sequential 
710             if ( packet->packetID != data->PacketID + 1 ) {
711                 if ( packet->packetID < data->PacketID + 1 ) {
712                     data->cntOutofOrder++;
713                 } else {
714                     data->cntError += packet->packetID - data->PacketID - 1;
715                 }
716             }
717             // never decrease datagramID (e.g. if we get an out-of-order packet) 
718             if ( packet->packetID > data->PacketID ) {
719                 data->PacketID = packet->packetID;
720             }
721     }
722     // Print a report if appropriate
723     return reporter_condprintstats( &reporthdr->report, reporthdr->multireport, finished );
724 }
725
726 /*
727  * Handles summing of threads
728  */
729 void reporter_handle_multiple_reports( MultiHeader *reporthdr, Transfer_Info *stats, int force ) {
730     if ( reporthdr != NULL ) {
731         if ( reporthdr->threads > 1 ) {
732             int i;
733             Transfer_Info *current = NULL;
734             // Search for start Time
735             for ( i = 0; i < NUM_MULTI_SLOTS; i++ ) {
736                 current = &reporthdr->data[i];
737                 if ( current->startTime == stats->startTime ) {
738                     break;
739                 }
740             }
741             if ( current->startTime != stats->startTime ) {
742                 // Find first available
743                 for ( i = 0; i < NUM_MULTI_SLOTS; i++ ) {
744                     current = &reporthdr->data[i];
745                     if ( current->startTime < 0 ) {
746                         break;
747                     }
748                 }
749                 current->cntDatagrams = stats->cntDatagrams;
750                 current->cntError = stats->cntError;
751                 current->cntOutofOrder = stats->cntOutofOrder;
752                 current->TotalLen = stats->TotalLen;
753                 current->mFormat = stats->mFormat;
754                 current->endTime = stats->endTime;
755                 current->jitter = stats->jitter;
756                 current->startTime = stats->startTime;
757                 current->free = 1;
758             } else {
759                 current->cntDatagrams += stats->cntDatagrams;
760                 current->cntError += stats->cntError;
761                 current->cntOutofOrder += stats->cntOutofOrder;
762                 current->TotalLen += stats->TotalLen;
763                 current->mFormat = stats->mFormat;
764                 if ( current->endTime < stats->endTime ) {
765                     current->endTime = stats->endTime;
766                 }
767                 if ( current->jitter < stats->jitter ) {
768                     current->jitter = stats->jitter;
769                 }
770                 current->free++;
771                 if ( current->free == reporthdr->threads ) {
772                     void *reserved = reporthdr->report->info.reserved_delay;
773                     current->free = force;
774                     memcpy( &reporthdr->report->info, current, sizeof(Transfer_Info) );
775                     current->startTime = -1;
776                     reporthdr->report->info.reserved_delay = reserved;
777                     reporter_print( reporthdr->report, MULTIPLE_REPORT, force );
778                 }
779             }
780         }
781     }
782 }
783
784 /*
785  * Prints reports conditionally
786  */
787 int reporter_condprintstats( ReporterData *stats, MultiHeader *multireport, int force ) {
788     if ( force != 0 ) {
789         stats->info.cntOutofOrder = stats->cntOutofOrder;
790         // assume most of the time out-of-order packets are not
791         // duplicate packets, so conditionally subtract them from the lost packets.
792         stats->info.cntError = stats->cntError;
793         if ( stats->info.cntError > stats->info.cntOutofOrder ) {
794             stats->info.cntError -= stats->info.cntOutofOrder;
795         }
796         if (isConnectionLess(stats))
797                 stats->info.cntDatagrams = stats->PacketID;
798         else
799                 stats->info.cntDatagrams = stats->cntDatagrams;
800         stats->info.TotalLen  = stats->TotalLen;
801         stats->info.startTime = 0;
802         stats->info.endTime   = TimeDifference( stats->packetTime, stats->startTime );
803         stats->info.free      = 1;
804         reporter_print( stats, TRANSFER_REPORT, force );
805         if ( isMultipleReport(stats) ) {
806             reporter_handle_multiple_reports( multireport, &stats->info, force );
807         }
808     } else while ((stats->intervalTime.tv_sec != 0 || 
809                    stats->intervalTime.tv_usec != 0) && 
810                   TimeDifference( stats->nextTime, 
811                                   stats->packetTime ) < 0 ) {
812         stats->info.cntOutofOrder = stats->cntOutofOrder - stats->lastOutofOrder;
813         stats->lastOutofOrder = stats->cntOutofOrder;
814         // assume most of the time out-of-order packets are not
815         // duplicate packets, so conditionally subtract them from the lost packets.
816         stats->info.cntError = stats->cntError - stats->lastError;
817         if ( stats->info.cntError > stats->info.cntOutofOrder ) {
818             stats->info.cntError -= stats->info.cntOutofOrder;
819         }
820         stats->lastError = stats->cntError;
821         if (isConnectionLess(stats)) {
822                 stats->info.cntDatagrams = stats->PacketID - stats->lastDatagrams;
823                 stats->lastDatagrams     = stats->PacketID;
824         } else {
825                 stats->info.cntDatagrams = stats->cntDatagrams - stats->lastDatagrams;
826                 stats->lastDatagrams     = stats->cntDatagrams;
827         }
828         stats->info.TotalLen = stats->TotalLen - stats->lastTotal;
829         stats->lastTotal = stats->TotalLen;
830         stats->info.startTime = stats->info.endTime;
831         stats->info.endTime = TimeDifference( stats->nextTime, stats->startTime );
832         TimeAdd( stats->nextTime, stats->intervalTime );
833         stats->info.free = 0;
834         reporter_print( stats, TRANSFER_REPORT, force );
835         if ( isMultipleReport(stats) ) {
836             reporter_handle_multiple_reports( multireport, &stats->info, force );
837         }
838     }
839     return force;
840 }
841
842 /*
843  * This function handles multiple format printing by sending to the
844  * appropriate dispatch function
845  */
846 int reporter_print( ReporterData *stats, int type, int end ) {
847     switch ( type ) {
848         case TRANSFER_REPORT:
849             statistics_reports[stats->mode]( &stats->info );
850             if ( end != 0 && isPrintMSS( stats ) && !isConnectionLess( stats ) ) {
851                 PrintMSS( stats );
852             }
853             break;
854         case SERVER_RELAY_REPORT:
855             serverstatistics_reports[stats->mode]( &stats->connection, &stats->info );
856             break;
857         case SETTINGS_REPORT:
858             settings_reports[stats->mode]( stats );
859             break;
860         case CONNECTION_REPORT:
861             stats->info.reserved_delay = connection_reports[stats->mode]( 
862                                                &stats->connection,
863                                                stats->info.transferID );
864             break;
865         case MULTIPLE_REPORT:
866             multiple_reports[stats->mode]( &stats->info );
867             break;
868         default:
869             fprintf( stderr, "Printing type not implemented! No Output\n" );
870     }
871     fflush( stdout );
872     return end;
873 }
874
875 /* -------------------------------------------------------------------
876  * Report the MSS and MTU, given the MSS (or a guess thereof)
877  * This works for connection-oriented protocols only: it expects
878  * the protocol to be either TCP or DCCP and will give error otherwise
879  * ------------------------------------------------------------------- */
880
881 // compare the MSS against the (MTU - 40) to (MTU - 80) bytes.
882 // 40 byte IP header and somewhat arbitrarily, 40 more bytes of IP options.
883
884 #define checkMSS_MTU( inMSS, inMTU ) (inMTU-40) >= inMSS  &&  inMSS >= (inMTU-80)
885
886 void PrintMSS( ReporterData *stats )
887 {
888     int inMSS = stats->mProtocol == kProto_TCP
889               ?   getsock_tcp_mss( stats->info.transferID )
890               :   getsock_dccp_mps( stats->info.transferID );
891
892     if ( inMSS <= 0 ) {
893         printf( report_mss_unsupported, stats->info.transferID );
894     } else {
895         char* net;
896         int mtu = 0;
897
898         if ( checkMSS_MTU( inMSS, 1500 ) ) {
899             net = "ethernet";
900             mtu = 1500;
901         } else if ( checkMSS_MTU( inMSS, 4352 ) ) {
902             net = "FDDI";
903             mtu = 4352;
904         } else if ( checkMSS_MTU( inMSS, 9180 ) ) {
905             net = "ATM";
906             mtu = 9180;
907         } else if ( checkMSS_MTU( inMSS, 65280 ) ) {
908             net = "HIPPI";
909             mtu = 65280;
910         } else if ( checkMSS_MTU( inMSS, 576 ) ) {
911             net = "minimum";
912             mtu = 576;
913             printf( warn_no_pathmtu );
914         } else {
915             mtu = inMSS + 40;
916             net = "unknown interface";
917         }
918
919         printf( report_mss,
920                 stats->info.transferID, inMSS, mtu, net );
921     }
922 }
923 // end ReportMSS
924
925 #ifdef __cplusplus
926 } /* end extern "C" */
927 #endif