00001 00033 #include <itpp/protocol/tcp.h> 00034 #include <itpp/base/itfile.h> 00035 #include <limits> 00036 #include <cstdlib> 00037 #include <ctime> 00038 00040 00041 #ifdef _MSC_VER 00042 #pragma warning(disable:4355) 00043 #endif 00044 00045 namespace itpp 00046 { 00047 00048 // -------------------- Default parameters ---------------------------------- 00049 00050 // TCP sender and receiver 00051 00052 #define TCP_HEADERLENGTH 40 00053 00054 // TCP sender 00055 00056 #define TCP_VERSION kReno 00057 #define TCP_SMSS 1460 00058 #define TCP_INITIALCWNDREL 2 // related to MSS 00059 #define TCP_INITIALSSTHRESHREL 1 // related to MaxCWnd 00060 #define TCP_MAXCWNDREL 32 // related to MSS 00061 #define TCP_DUPACKS 3 00062 #define TCP_INITIALRTT 1 00063 const double TCP_STIMERGRAN = 0.2; 00064 const double TCP_SWSATIMERVALUE = 0.2; 00065 #define TCP_MAXBACKOFF 64 00066 const double TCP_MAXRTO = std::numeric_limits<double>::max(); 00067 #define TCP_IMMEDIATEBACKOFFRESET false 00068 #define TCP_TIMESTAMPS false 00069 #define TCP_KARN true 00070 #define TCP_NAGLE false 00071 #define TCP_GOBACKN true 00072 #define TCP_FLIGHTSIZERECOVERY false 00073 #define TCP_RENOCONSERVATION true 00074 #define TCP_CAREFULSSTHRESHREDUCTION true 00075 #define TCP_IGNOREDUPACKONTORECOVERY true 00076 #define TCP_CAREFULMULFASTRTXAVOIDANCE true 00077 #define TCP_RESTARTAFTERIDLE true 00078 00079 // TCP receiver 00080 00081 #define TCP_RMSS 1460 00082 const int TCP_BUFFERSIZE = std::numeric_limits<int>::max() / 4; 00083 #define TCP_DELAYEDACK true 00084 const double TCP_ACKDELAYTIME = 0.2; 00085 #define TCP_SENDPERIODICACKS false 00086 #define TCP_STRICTPERIODICACKS false 00087 #define TCP_PERIODICACKINTERVAL 1 00088 #define TCP_ACKSCHEDULINGDELAY 0 00089 #define TCP_ACKBUFFERWRITE false 00090 #define TCP_ACKBUFFERREAD true 00091 const int TCP_MAXUSERBLOCKSIZE = std::numeric_limits<int>::max() / 4; 00092 #define TCP_MINUSERBLOCKSIZE 1 00093 #define TCP_USERBLOCKPROCDELAY 0 00094 00095 // TCP generator 00096 00097 #define TCPGEN_BLOCKSIZE 1460 00098 00099 // TCP applications 00100 00101 #define TCPAPP_MAXNOOFACTIVEAPPS 500 00102 #define TCPAPP_DISTSTATARRAYSIZE 100 00103 #define TCPAPP_DISTSTATMAXGOODPUT 1000 00104 #define TCPAPP_DISTSTATMAXTRANSFERTIME 10000 00105 #define TCPAPP_CONDMEANSTATARRAYSIZE 100 00106 #define TCPAPP_CONDMEANSTATMAXREQSIZE 100000 00107 00108 00109 00110 inline int min(int opd1, int opd2) 00111 { 00112 return (opd1 < opd2) ? opd1 : opd2; 00113 } 00114 00115 00116 inline int max(int opd1, int opd2) 00117 { 00118 return (opd1 > opd2) ? opd1 : opd2; 00119 } 00120 00121 00122 // round is used to map a double value (e.g. RTO in TTCPSender) to the 00123 // next higher value of a certain granularity (e.g. timer granularity). 00124 inline double round(const double value, const double granularity) 00125 { 00126 return (std::ceil(value / granularity) * granularity); 00127 } 00128 00129 // -------------------- TCP_Segment ---------------------------------------- 00130 00131 TCP_Segment::TCP_Segment() : 00132 seq_begin(), 00133 seq_end() 00134 { 00135 } 00136 00137 TCP_Segment::TCP_Segment(const Sequence_Number &sn_begin, const Sequence_Number &sn_end) : 00138 seq_begin(sn_begin), 00139 seq_end(sn_end) 00140 { 00141 it_assert(seq_begin <= seq_end, "TCP_Segment::TCP_Segment, end byte " + to_str(seq_end.value()) + 00142 " < begin byte " + to_str(seq_begin.value())); 00143 } 00144 00145 00146 TCP_Segment::TCP_Segment(const TCP_Segment &segment) : 00147 seq_begin(segment.seq_begin), 00148 seq_end(segment.seq_end) 00149 { 00150 } 00151 00152 00153 TCP_Segment &TCP_Segment::operator=(const TCP_Segment &segment) 00154 { 00155 this->seq_begin = segment.seq_begin; 00156 this->seq_end = segment.seq_end; 00157 00158 return *this; 00159 } 00160 00161 00162 void TCP_Segment::combine(const TCP_Segment &segment) 00163 { 00164 it_assert(can_be_combined(segment), "TCP_Segment::CombineWith, segments cannot be combined"); 00165 00166 seq_begin = min(seq_begin, segment.seq_begin); 00167 seq_end = max(seq_end, segment.seq_end); 00168 } 00169 00170 00171 std::ostream & operator<<(std::ostream &os, const TCP_Segment &segment) 00172 { 00173 os << "(" << segment.seq_begin << "," << segment.seq_end << ")"; 00174 return os; 00175 } 00176 00177 00178 // -------------------- TCP_Packet ---------------------------------------- 00179 TCP_Packet::TCP_Packet() : 00180 fSegment(), 00181 fACK(), 00182 fWnd(0), 00183 fSessionId(0), 00184 fInfo(0) 00185 { 00186 } 00187 00188 00189 TCP_Packet::TCP_Packet(const TCP_Packet &packet) : 00190 fSegment(packet.fSegment), 00191 fACK(packet.fACK), 00192 fWnd(packet.fWnd), 00193 fSessionId(packet.fSessionId), 00194 fInfo(0) 00195 { 00196 std::cout << "TCP_Packet::TCP_Packet ############" << " "; 00197 00198 if (packet.fInfo != 0) { 00199 std::cout << "TCP_Packet::TCP_Packet rhs.fInfo ###########" << " "; 00200 fInfo = new TDebugInfo(*packet.fInfo); 00201 } 00202 } 00203 00204 00205 TCP_Packet::~TCP_Packet() 00206 { 00207 delete fInfo; 00208 } 00209 00210 00211 TCP_Packet & TCP_Packet::clone() const 00212 { 00213 return *new TCP_Packet(*this); 00214 } 00215 00216 00217 void TCP_Packet::set_info(unsigned ssThresh, unsigned recWnd, unsigned cWnd, 00218 double estRTT, Sequence_Number sndUna, 00219 Sequence_Number sndNxt, bool isRtx) 00220 { 00221 if (fInfo == 0) { 00222 fInfo = new TDebugInfo; 00223 } 00224 00225 fInfo->fSSThresh = ssThresh; 00226 fInfo->fRecWnd = recWnd; 00227 fInfo->fCWnd = cWnd; 00228 fInfo->fRTTEstimate = estRTT; 00229 fInfo->fSndUna = sndUna; 00230 fInfo->fSndNxt = sndNxt; 00231 fInfo->fRtxFlag = isRtx; 00232 } 00233 00234 00235 void TCP_Packet::print_header(std::ostream &) const 00236 { 00237 std::cout << "Hello!\n"; 00238 00239 std::cout << "Ses = " << get_session_id() << " "; 00240 00241 std::cout << "Segment = " << get_segment() << " " 00242 << "ACK = " << get_ACK() << " " 00243 << "Wnd = " << get_wnd() << " "; 00244 00245 std::cout << "DestPort = " << fDestinationPort << " " 00246 << "SourcePort = " << fSourcePort << " "; 00247 00248 00249 if (fInfo != 0) { 00250 std::cout << "SndSSThresh = " << fInfo->fSSThresh << " "; 00251 std::cout << "RecWnd = " << fInfo->fRecWnd << " "; 00252 std::cout << "SndCWnd = " << fInfo->fCWnd << " "; 00253 std::cout << "RTTEstimate = " << fInfo->fRTTEstimate << " "; 00254 std::cout << "RtxFlag = " << fInfo->fRtxFlag; 00255 } 00256 else 00257 std::cout << "fInfo = " << fInfo << " "; 00258 00259 std::cout << std::endl; 00260 00261 } 00262 00263 00264 00265 std::ostream & operator<<(std::ostream & out, TCP_Packet & msg) 00266 { 00267 msg.print_header(out); 00268 return out; 00269 } 00270 00271 00272 // -------------------- TCP_Sender ---------------------------------------- 00273 TCP_Sender::TCP_Sender(int label) : 00274 fLabel(label), 00275 fTCPVersion(TCP_VERSION), 00276 fMSS(TCP_SMSS), 00277 fTCPIPHeaderLength(TCP_HEADERLENGTH), 00278 fInitialRTT(TCP_INITIALRTT), 00279 fInitialCWnd(0), // default initialization see below 00280 fInitialSSThresh(0), // default initialization see below 00281 fMaxCWnd(0), // default initialization see below 00282 fDupACKThreshold(TCP_DUPACKS), 00283 fTimerGranularity(TCP_STIMERGRAN), 00284 fMaxRTO(TCP_MAXRTO), 00285 fMaxBackoff(TCP_MAXBACKOFF), 00286 fImmediateBackoffReset(TCP_IMMEDIATEBACKOFFRESET), 00287 fKarn(TCP_KARN), 00288 fGoBackN(TCP_GOBACKN), 00289 fFlightSizeRecovery(TCP_FLIGHTSIZERECOVERY), 00290 fRenoConservation(TCP_RENOCONSERVATION), 00291 fCarefulSSThreshReduction(TCP_CAREFULSSTHRESHREDUCTION), 00292 fIgnoreDupACKOnTORecovery(TCP_IGNOREDUPACKONTORECOVERY), 00293 fCarefulMulFastRtxAvoidance(TCP_CAREFULMULFASTRTXAVOIDANCE), 00294 fNagle(TCP_NAGLE), 00295 fSWSATimerValue(TCP_SWSATIMERVALUE), 00296 fRestartAfterIdle(TCP_RESTARTAFTERIDLE), 00297 fDebug(false), 00298 fTrace(false), 00299 fSessionId(0), 00300 fRtxTimer(*this, &TCP_Sender::HandleRtxTimeout), 00301 fSWSATimer(*this, &TCP_Sender::HandleSWSATimeout)/*,*/ 00302 { 00303 00304 // default values and parameter check for MaxCWND, InitCWND, InitSSThresh 00305 if (fMaxCWnd == 0) { 00306 fMaxCWnd = (unsigned)(TCP_MAXCWNDREL * fMSS); 00307 } 00308 else if (fMaxCWnd < fMSS) { 00309 // throw (UL_CException("TCP_Sender::TCP_Sender", 00310 // "MaxCWnd must be >= MSS")); 00311 } 00312 00313 if (fInitialCWnd == 0) { 00314 fInitialCWnd = (unsigned)(TCP_INITIALCWNDREL * fMSS); 00315 } 00316 else if ((fInitialCWnd < fMSS) || (fInitialCWnd > fMaxCWnd)) { 00317 // throw (UL_CException("TCP_Sender::TCP_Sender", 00318 // "initial CWnd must be >= MSS and <= MaxCWnd")); 00319 } 00320 00321 if ((fInitialSSThresh == 0) && (fMaxCWnd >= 2 * fMSS)) { 00322 fInitialSSThresh = (unsigned)(TCP_INITIALSSTHRESHREL * fMaxCWnd); 00323 } 00324 else if ((fInitialSSThresh < 2*fMSS) || (fInitialCWnd > fMaxCWnd)) { 00325 // throw (UL_CException("TCP_Sender::TCP_Sender", 00326 // "initial CWnd must be >= 2*MSS and <= MaxCWnd")); 00327 } 00328 00329 setup(); 00330 00331 InitStatistics(); 00332 00333 00334 tcp_send.set_name("TCP Send"); 00335 tcp_receive_ack.forward(this, &TCP_Sender::ReceiveMessageFromNet); 00336 tcp_receive_ack.set_name("TCP ACK"); 00337 tcp_socket_write.forward(this, &TCP_Sender::HandleUserMessageIndication); 00338 tcp_socket_write.set_name("SocketWrite"); 00339 tcp_release.forward(this, &TCP_Sender::release); 00340 tcp_release.set_name("Release"); 00341 00342 } 00343 00344 00345 TCP_Sender::~TCP_Sender() 00346 { 00347 } 00348 00349 void TCP_Sender::set_debug(const bool enable_debug) 00350 { 00351 fDebug = enable_debug; 00352 tcp_send.set_debug(enable_debug); 00353 } 00354 00355 void TCP_Sender::set_debug(bool enable_debug, bool enable_signal_debug) 00356 { 00357 fDebug = enable_debug; 00358 tcp_send.set_debug(enable_signal_debug); 00359 } 00360 00361 void TCP_Sender::set_trace(const bool enable_trace) 00362 { 00363 fTrace = enable_trace; 00364 } 00365 00366 void TCP_Sender::set_label(int label) 00367 { 00368 fLabel = label; 00369 } 00370 00371 void TCP_Sender::setup() 00372 { 00373 fSndUna = 0; 00374 fSndNxt = 0; 00375 fSndMax = 0; 00376 fMaxRecWnd = 0; 00377 fRecWnd = fMaxCWnd; 00378 fUserNxt = 0; 00379 fCWnd = fInitialCWnd; 00380 fSSThresh = fInitialSSThresh; 00381 fRecoveryDupACK = 0; 00382 fRecoveryTO = 0; 00383 fDupACKCnt = 0; 00384 00385 // timers 00386 fBackoff = 1; 00387 fPendingBackoffReset = false; 00388 fLastSendTime = Event_Queue::now(); 00389 00390 // RTT measurement 00391 fTimUna = 0; 00392 fSRTT = 0; 00393 fRTTVar = 0; 00394 fRTTEstimate = fInitialRTT; 00395 fRTTMPending = false; 00396 fRTTMByte = 0; 00397 00398 CWnd_val.set_size(1000); 00399 CWnd_val.zeros(); 00400 CWnd_time.set_size(1000); 00401 CWnd_time.zeros(); 00402 CWnd_val(0) = fInitialCWnd; 00403 CWnd_time(0) = 0; 00404 CWnd_index = 1; 00405 00406 SSThresh_val.set_size(1000); 00407 SSThresh_val.zeros(); 00408 SSThresh_time.set_size(1000); 00409 SSThresh_time.zeros(); 00410 SSThresh_val(0) = fInitialSSThresh; 00411 SSThresh_time(0) = 0; 00412 SSThresh_index = 1; 00413 00414 sent_seq_num_val.set_size(1000); 00415 sent_seq_num_val.zeros(); 00416 sent_seq_num_time.set_size(1000); 00417 sent_seq_num_time.zeros(); 00418 sent_seq_num_val(0) = 0; 00419 sent_seq_num_time(0) = 0; 00420 sent_seq_num_index = 1; 00421 00422 sender_recv_ack_seq_num_val.set_size(1000); 00423 sender_recv_ack_seq_num_val.zeros(); 00424 sender_recv_ack_seq_num_time.set_size(1000); 00425 sender_recv_ack_seq_num_time.zeros(); 00426 sender_recv_ack_seq_num_val(0) = 0; 00427 sender_recv_ack_seq_num_time(0) = 0; 00428 sender_recv_ack_seq_num_index = 1; 00429 00430 RTTEstimate_val.set_size(1000); 00431 RTTEstimate_val.zeros(); 00432 RTTEstimate_time.set_size(1000); 00433 RTTEstimate_time.zeros(); 00434 RTTEstimate_val(0) = fInitialRTT; 00435 RTTEstimate_time(0) = 0; 00436 RTTEstimate_index = 1; 00437 00438 RTTsample_val.set_size(1000); 00439 RTTsample_val.zeros(); 00440 RTTsample_time.set_size(1000); 00441 RTTsample_time.zeros(); 00442 RTTsample_val(0) = 0; 00443 RTTsample_time(0) = 0; 00444 RTTsample_index = 1; 00445 00446 } 00447 00448 std::string TCP_Sender::GenerateFilename() 00449 { 00450 time_t rawtime; 00451 struct tm *timeinfo; 00452 timeinfo = localtime(&rawtime); 00453 std::ostringstream filename_stream; 00454 filename_stream << "trace_tcp_sender_u" << fLabel 00455 << "_" << 1900 + timeinfo->tm_year 00456 << "_" << timeinfo->tm_mon 00457 << "_" << timeinfo->tm_mday 00458 << "__" << timeinfo->tm_hour 00459 << "_" << timeinfo->tm_min 00460 << "_" << timeinfo->tm_sec 00461 << "_.it"; 00462 return filename_stream.str(); 00463 } 00464 00465 00466 void TCP_Sender::release(std::string file) 00467 { 00468 std::string filename; 00469 fSessionId++; 00470 00471 fRtxTimer.Reset(); 00472 fSWSATimer.Reset(); 00473 00474 if (fTrace) { 00475 if (file == "") 00476 filename = GenerateFilename(); 00477 else 00478 filename = file; 00479 00480 save_trace(filename); 00481 } 00482 } 00483 00484 00485 void TCP_Sender::InitStatistics() 00486 { 00487 fNumberOfTimeouts = 0; 00488 fNumberOfIdleTimeouts = 0; 00489 fNumberOfFastRetransmits = 0; 00490 fNumberOfRTTMeasurements = 0; 00491 fNumberOfReceivedACKs = 0; 00492 } 00493 00494 00495 void TCP_Sender::StopTransientPhase() 00496 { 00497 InitStatistics(); 00498 } 00499 00500 00501 void TCP_Sender::HandleUserMessageIndication(itpp::Packet *user_data_p) 00502 { 00503 if (fDebug) { 00504 std::cout << "TCP_Sender::HandleUserMessageIndication" 00505 << " byte_size=" << user_data_p->bit_size() / 8 00506 << " ptr=" << user_data_p 00507 << " time=" << Event_Queue::now() << std::endl; 00508 } 00509 00510 SocketWriteQueue.push(user_data_p); 00511 00512 SendNewData(); // will call GetMessage (via GetNextSegmentSize) 00513 // if new data can be sent 00514 } 00515 00516 00517 void TCP_Sender::ReceiveMessageFromNet(itpp::Packet *msg) 00518 { 00519 TCP_Packet & packet = (TCP_Packet &) * msg; 00520 00521 if (fDebug) { 00522 std::cout << "TCP_Sender::ReceiveMessageFromNet" 00523 << " byte_size=" << msg->bit_size() / 8 00524 << " ptr=" << msg 00525 << " time=" << Event_Queue::now() << std::endl; 00526 } 00527 00528 if ((packet.get_session_id() == fSessionId) && // ACK of current session 00529 (packet.get_ACK() >= fSndUna)) { // ACK is OK 00530 HandleACK(packet); 00531 } 00532 00533 delete &packet; 00534 } 00535 00536 00537 void TCP_Sender::HandleACK(TCP_Packet &msg) 00538 { 00539 it_assert(msg.get_ACK() <= fSndMax, "TCP_Sender::HandleACK, received ACK > SndMax at "); 00540 00541 fNumberOfReceivedACKs++; 00542 00543 if (fTrace) { 00544 TraceACKedSeqNo(msg.get_ACK()); 00545 } 00546 00547 if (fDebug) { 00548 std::cout << "sender " << fLabel << ": " 00549 << "receive ACK: " 00550 << " t = " << Event_Queue::now() << ", " 00551 << msg << std::endl; 00552 } 00553 00554 // update receiver advertised window size 00555 fRecWnd = msg.get_wnd(); 00556 fMaxRecWnd = max(fRecWnd, fMaxRecWnd); 00557 00558 if (msg.get_ACK() == fSndUna) { // duplicate ACK 00559 00560 bool ignoreDupACK = (fSndMax == fSndUna); // no outstanding data 00561 00562 if (fIgnoreDupACKOnTORecovery) { 00563 // don't count dupacks during TO recovery! 00564 if (fCarefulMulFastRtxAvoidance) { // see RFC 2582, Section 5 00565 // like in Solaris 00566 ignoreDupACK = ignoreDupACK || (fSndUna <= fRecoveryTO); 00567 } 00568 else { 00569 // like in ns 00570 ignoreDupACK = ignoreDupACK || (fSndUna < fRecoveryTO); 00571 } 00572 } 00573 00574 if (!ignoreDupACK) { 00575 fDupACKCnt++; // count the number of duplicate ACKs 00576 00577 if (fDupACKCnt == fDupACKThreshold) { 00578 // dupack threshold is reached 00579 fNumberOfFastRetransmits++; 00580 00581 fRecoveryDupACK = fSndMax; 00582 00583 ReduceSSThresh(); // halve ssthresh (in most cases) 00584 00585 if ((fTCPVersion == kReno) || (fTCPVersion == kNewReno)) { 00586 fCWnd = fSSThresh; 00587 } 00588 else if (fTCPVersion == kTahoe) { 00589 fCWnd = fMSS; 00590 } 00591 00592 if (fTCPVersion == kReno || fTCPVersion == kNewReno) { 00593 // conservation of packets: 00594 if (fRenoConservation) { 00595 fCWnd += fDupACKThreshold * fMSS; 00596 } 00597 } 00598 else if (fTCPVersion == kTahoe) { 00599 if (fGoBackN) { 00600 fSndNxt = fSndUna; // Go-Back-N (like in ns) 00601 } 00602 } 00603 00604 UnaRetransmit(); // initiate retransmission 00605 } 00606 else if (fDupACKCnt > fDupACKThreshold) { 00607 if (fTCPVersion == kReno || fTCPVersion == kNewReno) { 00608 // conservation of packets 00609 // CWnd may exceed MaxCWnd during fast recovery, 00610 // however, the result of SendWindow() is always <= MaxCwnd 00611 if (fRenoConservation) { 00612 fCWnd += fMSS; 00613 } 00614 } 00615 } 00616 } 00617 } 00618 else { // new ACK 00619 Sequence_Number oldSndUna = fSndUna; // required for NewReno partial ACK 00620 fSndUna = msg.get_ACK(); 00621 fSndNxt = max(fSndNxt, fSndUna); // required in case of "Go-Back-N" 00622 00623 // reset retransmission timer 00624 00625 if ((fSndUna > fTimUna) && fRtxTimer.IsPending()) { 00626 // seq. no. for which rtx timer is running has been received 00627 fRtxTimer.Reset(); 00628 } 00629 00630 // backoff reset 00631 00632 if (fImmediateBackoffReset) { 00633 fBackoff = 1; 00634 } 00635 else { 00636 if (fPendingBackoffReset) { 00637 fBackoff = 1; 00638 fPendingBackoffReset = false; 00639 } 00640 else if (fBackoff > 1) { 00641 // reset backoff counter only on next new ACK (this is probably 00642 // the way to operate intended by Karn) 00643 fPendingBackoffReset = true; 00644 } 00645 } 00646 00647 // RTT measurement 00648 00649 if ((fSndUna > fRTTMByte) && fRTTMPending) { 00650 UpdateRTTVariables(Event_Queue::now() - fRTTMStartTime); 00651 fRTTMPending = false; 00652 } 00653 00654 // update CWnd and reset dupack counter 00655 00656 if (fDupACKCnt >= fDupACKThreshold) { 00657 // we are in fast recovery 00658 if (fTCPVersion == kNewReno && fSndUna < fRecoveryDupACK) { 00659 // New Reno partial ACK handling 00660 if (fRenoConservation) { 00661 fCWnd = max(fMSS, fCWnd - (fSndUna - oldSndUna) + fMSS); 00662 } 00663 UnaRetransmit(); // start retransmit immediately 00664 } 00665 else { 00666 FinishFastRecovery(); 00667 } 00668 } 00669 else { 00670 // no fast recovery 00671 fDupACKCnt = 0; 00672 if (fCWnd < fSSThresh) { 00673 // slow start phase 00674 fCWnd = min(fCWnd + fMSS, fMaxCWnd); 00675 } 00676 else { 00677 // congestion avoidance phase 00678 fCWnd += max(fMSS * fMSS / fCWnd, 1); // RFC 2581 00679 fCWnd = min(fCWnd, fMaxCWnd); 00680 } 00681 } 00682 } // new ACK 00683 00684 SendNewData(); // try to send new data (even in the case that a retransmit 00685 // had to be performed) 00686 00687 if (fTrace) { 00688 TraceCWnd(); 00689 } 00690 } 00691 00692 00693 void TCP_Sender::SendNewData(bool skipSWSA) 00694 { 00695 unsigned nextSegmentSize; 00696 00697 it_assert(fSndUna <= fSndNxt, "TCP_Sender::SendNewData, SndUna > SndNxt in sender " + to_str(fLabel) + "!"); 00698 00699 if (fRestartAfterIdle) { 00700 IdleCheck(); 00701 } 00702 00703 bool sillyWindowAvoidanceFailed = false; 00704 00705 while (!sillyWindowAvoidanceFailed && 00706 ((nextSegmentSize = GetNextSegmentSize(fSndNxt)) > 0)) { 00707 // there is new data to send and window is large enough 00708 00709 // SWSA and Nagle (RFC 1122): assume PUSH to be set 00710 unsigned queuedUnsent = fUserNxt - fSndNxt; 00711 unsigned usableWindow = max(0, (fSndUna + SendWindow()) - fSndNxt); 00712 00713 if (((unsigned)min(queuedUnsent, usableWindow) >= fMSS) || 00714 ((!fNagle || (fSndUna == fSndNxt)) && 00715 ((queuedUnsent <= usableWindow) || // Silly W. A. 00716 ((unsigned)min(queuedUnsent, usableWindow) >= fMaxRecWnd / 2) 00717 ) 00718 ) || 00719 skipSWSA 00720 ) { 00721 // Silly Window Syndrome Avoidance (SWSA) and Nagle passed 00722 00723 TCP_Segment nextSegment(fSndNxt, fSndNxt + nextSegmentSize); 00724 TCP_Packet & msg = * new TCP_Packet(); 00725 00726 msg.set_segment(nextSegment); 00727 msg.set_session_id(fSessionId); 00728 msg.set_destination_port(fLabel); // The dest and src port are set to the same 00729 msg.set_source_port(fLabel); // number for simplicity. 00730 msg.set_bit_size(8 * (nextSegmentSize + fTCPIPHeaderLength)); 00731 00732 if (fDebug) { 00733 std::cout << "TCP_Sender::SendNewData," 00734 << " nextSegmentSize=" << nextSegmentSize 00735 << " fTCPIPHeaderLength=" << fTCPIPHeaderLength 00736 << " byte_size=" << msg.bit_size() / 8 00737 << " ptr=" << &msg 00738 << " time=" << Event_Queue::now() << std::endl; 00739 } 00740 00741 // no RTT measurement for retransmitted segments 00742 // changes on Dec. 13. 2002 (Ga, Bo, Scharf) 00743 00744 if (!fRTTMPending && fSndNxt >= fSndMax) { // ##Bo## 00745 fRTTMStartTime = Event_Queue::now(); 00746 fRTTMByte = nextSegment.begin(); 00747 fRTTMPending = true; 00748 } 00749 00750 fSndNxt += nextSegmentSize; 00751 fSndMax = max(fSndNxt, fSndMax); 00752 00753 // reset SWSA timer if necessary 00754 if (skipSWSA) { 00755 skipSWSA = false; 00756 } 00757 else if (fSWSATimer.IsPending()) { 00758 fSWSATimer.Reset(); 00759 } 00760 00761 // set rtx timer if necessary 00762 if (!fRtxTimer.IsPending()) { 00763 SetRtxTimer(); 00764 } 00765 00766 00767 if (fDebug) { 00768 msg.set_info(fSSThresh, fRecWnd, fCWnd, fRTTEstimate, 00769 fSndUna, fSndNxt, false); 00770 std::cout << "sender " << fLabel 00771 << ": send new data: " 00772 << " t = " << Event_Queue::now() << ", " 00773 << msg << std::endl; 00774 } 00775 00776 SendMsg(msg); 00777 } 00778 else { 00779 sillyWindowAvoidanceFailed = true; 00780 // set SWSA timer 00781 if (!fSWSATimer.IsPending()) { 00782 fSWSATimer.Set(fSWSATimerValue); 00783 } 00784 } 00785 } 00786 00787 // set timers in case that no new data could have been sent 00788 if (!fRtxTimer.IsPending()) { 00789 if (fSndMax > fSndUna) { // there is outstanding data 00790 if (!fImmediateBackoffReset && fPendingBackoffReset) { 00791 // backoff is reset if no new data could have been sent since last 00792 // (successfull) retransmission; this is useful in case of 00793 // Reno recovery and multiple losses to avoid that in 00794 // the (unavoidable) series of timeouts the timer value 00795 // increases exponentially as this is not the intention 00796 // of the delayed backoff reset in Karn's algorithm 00797 fBackoff = 1; 00798 fPendingBackoffReset = false; 00799 } 00800 SetRtxTimer(); 00801 } 00802 } 00803 } 00804 00805 00806 void TCP_Sender::UnaRetransmit() 00807 { 00808 // resend after timeout or fast retransmit 00809 unsigned nextSegmentSize = GetNextSegmentSize(fSndUna); 00810 00811 if (nextSegmentSize > 0) { 00812 TCP_Segment nextSegment(fSndUna, fSndUna + nextSegmentSize); 00813 TCP_Packet & msg = *new TCP_Packet(); 00814 msg.set_segment(nextSegment); 00815 msg.set_session_id(fSessionId); 00816 msg.set_destination_port(fLabel); // The dest and src port are set to the same 00817 msg.set_source_port(fLabel); // number for simplicity. 00818 msg.set_bit_size(8 * (nextSegmentSize + fTCPIPHeaderLength)); 00819 00820 fSndNxt = max(fSndNxt, fSndUna + nextSegmentSize); 00821 fSndMax = max(fSndNxt, fSndMax); 00822 00823 // The RTT measurement is cancelled if the RTTM byte has a sequence 00824 // number higher or equal than the first retransmitted byte as 00825 // the ACK for the RTTM byte will be delayed by the rtx for at least 00826 // one round 00827 if (fKarn && (nextSegment.begin() <= fRTTMByte) && fRTTMPending) { 00828 fRTTMPending = false; 00829 } 00830 00831 SetRtxTimer(); 00832 00833 if (fDebug) { 00834 msg.set_info(fSSThresh, fRecWnd, fCWnd, fRTTEstimate, 00835 fSndUna, fSndNxt, true); 00836 std::cout << "sender " << fLabel; 00837 if (fDupACKCnt >= fDupACKThreshold) { 00838 std::cout << ": fast rtx: "; 00839 } 00840 else { 00841 std::cout << ": TO rtx: "; 00842 } 00843 std::cout << " t = " << Event_Queue::now() << ", " 00844 << msg << std::endl; 00845 } 00846 00847 SendMsg(msg); 00848 } 00849 else { 00850 // throw(UL_CException("TCP_Sender::UnaRetransmit", "no bytes to send")); 00851 } 00852 } 00853 00854 00855 void TCP_Sender::FinishFastRecovery() 00856 { 00857 if (fTCPVersion == kTahoe) { 00858 fDupACKCnt = 0; 00859 } 00860 else if (fTCPVersion == kReno) { 00861 // Reno fast recovery 00862 fDupACKCnt = 0; 00863 if (fFlightSizeRecovery) { 00864 fCWnd = min(fSndMax - fSndUna + fMSS, fSSThresh); 00865 } 00866 else { 00867 fCWnd = fSSThresh; 00868 } 00869 } 00870 else if (fTCPVersion == kNewReno) { 00871 // New Reno fast recovery 00872 // "Set CWnd to ... min (ssthresh, FlightSize + MSS) 00873 // ... or ssthresh" (RFC 2582) 00874 if (fFlightSizeRecovery) { 00875 fCWnd = min(fSndMax - fSndUna + fMSS, fSSThresh); 00876 } 00877 else { 00878 fCWnd = fSSThresh; 00879 } 00880 fDupACKCnt = 0; 00881 } 00882 } 00883 00884 00885 void TCP_Sender::ReduceSSThresh() 00886 { 00887 if (fCarefulSSThreshReduction) { 00888 // If Reno conservation is enabled the amount of 00889 // outstanding data ("flight size") might be rather large 00890 // and even larger than twice the old ssthresh value; 00891 // so this corresponds more to the ns behaviour where always cwnd is 00892 // taken instead of flight size. 00893 fSSThresh = max(2 * fMSS, 00894 min(min(fCWnd, fSndMax - fSndUna), fRecWnd) / 2); 00895 } 00896 else { 00897 // use filght size / 2 as recommended in RFC 2581 00898 fSSThresh = max(2 * fMSS, min(fSndMax - fSndUna, fRecWnd) / 2); 00899 } 00900 00901 it_assert(fSSThresh <= fMaxCWnd, "TCP_Sender::HandleACK, internal error: SndSSThresh is > MaxCWnd"); 00902 00903 if (fTrace) { 00904 TraceSSThresh(); 00905 } 00906 } 00907 00908 00909 void TCP_Sender::SendMsg(TCP_Packet &msg) 00910 { 00911 if (fTrace) { 00912 TraceSentSeqNo(msg.get_segment().end()); 00913 } 00914 00915 if (fRestartAfterIdle) { 00916 fLastSendTime = Event_Queue::now(); // needed for idle detection 00917 } 00918 00919 tcp_send(&msg); 00920 } 00921 00922 00923 void TCP_Sender::IdleCheck() 00924 { 00925 // idle detection according to Jacobson, SIGCOMM, 1988: 00926 // sender is currently idle and nothing has been send since RTO 00927 00928 if (fSndMax == fSndUna && Event_Queue::now() - fLastSendTime > CalcRTOValue()) { 00929 fCWnd = fInitialCWnd; // see RFC2581 00930 00931 fNumberOfIdleTimeouts++; 00932 00933 if (fTrace) { 00934 TraceCWnd(); 00935 } 00936 00937 if (fDebug) { 00938 std::cout << "sender " << fLabel 00939 << ": idle timeout: " 00940 << "t = " << Event_Queue::now() 00941 << ", SndNxt = " << fSndNxt 00942 << ", SndUna = " << fSndUna 00943 << ", Backoff = " << fBackoff 00944 << std::endl; 00945 } 00946 } 00947 } 00948 00949 00950 void TCP_Sender::HandleRtxTimeout(Ttype) 00951 { 00952 fNumberOfTimeouts++; 00953 00954 // update backoff 00955 fBackoff = min(fMaxBackoff, fBackoff * 2); 00956 if (!fImmediateBackoffReset) { 00957 fPendingBackoffReset = false; 00958 } 00959 00960 if (fDupACKCnt >= fDupACKThreshold) { 00961 FinishFastRecovery(); // reset dup ACK cnt and CWnd 00962 } 00963 else if (fDupACKCnt > 0) { 00964 fDupACKCnt = 0; // don't allow dupack action during TO recovery 00965 } 00966 00967 // update CWnd and SSThresh 00968 ReduceSSThresh(); // halve ssthresh (in most cases) 00969 fCWnd = fMSS; // not initial CWnd, see RFC 2581 00970 00971 it_assert(fSSThresh <= fMaxCWnd, "TCP_Sender::HandleRtxTimeout, internal error: SndSSThresh is > MaxCWnd"); 00972 00973 fRecoveryTO = fSndMax; 00974 00975 if (fGoBackN) { 00976 // go back N is mainly relevant in the case of multiple losses 00977 // which would lead to a series of timeouts without resetting sndnxt 00978 fSndNxt = fSndUna; 00979 } 00980 00981 if (fDebug) { 00982 std::cout << "sender " << fLabel 00983 << ": rtx timeout: " 00984 << "t = " << Event_Queue::now() 00985 << ", SndNxt = " << fSndNxt 00986 << ", SndUna = " << fSndUna 00987 << std::endl; 00988 } 00989 00990 if (fTrace) { 00991 TraceCWnd(); 00992 } 00993 00994 UnaRetransmit(); // initiate retransmission 00995 } 00996 00997 00998 void TCP_Sender::HandleSWSATimeout(Ttype) 00999 { 01000 SendNewData(true); 01001 } 01002 01003 01004 unsigned TCP_Sender::GetNextSegmentSize(const Sequence_Number & begin) 01005 { 01006 // try to get new user messages if available and necessary 01007 while ((fUserNxt < begin + fMSS) && (!SocketWriteQueue.empty())) { 01008 itpp::Packet *packet_p = SocketWriteQueue.front(); 01009 SocketWriteQueue.pop(); 01010 fUserNxt += (unsigned) packet_p->bit_size() / 8; 01011 delete packet_p; 01012 } 01013 01014 Sequence_Number end = min(min(fUserNxt, begin + fMSS), 01015 fSndUna + SendWindow()); 01016 01017 if (fDebug) { 01018 std::cout << "TCP_Sender::GetNextSegmentSize," 01019 << " fUserNxt=" << fUserNxt 01020 << " begin_seq_num=" << begin 01021 << " fMSS=" << fMSS 01022 << " fSndUna=" << fSndUna 01023 << " SendWindow()=" << SendWindow() 01024 << " end_seq_num=" << end 01025 << " time=" << Event_Queue::now() << std::endl; 01026 } 01027 01028 return max(0, end - begin); 01029 } 01030 01031 01032 unsigned TCP_Sender::SendWindow() const 01033 { 01034 return min(fRecWnd, min(fMaxCWnd, fCWnd)); 01035 } 01036 01037 01038 double TCP_Sender::CalcRTOValue() const 01039 { 01040 static const double factor = 1 + 1e-8; 01041 // to avoid "simultaneous" TO/receive ACK events in case of const. RTT 01042 01043 double rto = fBackoff * fRTTEstimate * factor; 01044 01045 if (rto > fMaxRTO) { 01046 rto = fMaxRTO; 01047 } 01048 01049 return rto; 01050 } 01051 01052 01053 void TCP_Sender::SetRtxTimer() 01054 { 01055 double rto = CalcRTOValue(); 01056 fRtxTimer.Set(rto); 01057 fTimUna = fSndUna; 01058 if (fDebug) { 01059 std::cout << "sender " << fLabel 01060 << ": set rtx timer: " 01061 << "t = " << Event_Queue::now() 01062 << ", RTO = " << rto 01063 << ", Backoff = " << fBackoff 01064 << ", TimUna = " << fTimUna 01065 << std::endl; 01066 } 01067 } 01068 01069 01070 void TCP_Sender::UpdateRTTVariables(double sampleRTT) 01071 { 01072 if (fSRTT == 0) { 01073 fSRTT = sampleRTT; 01074 fRTTVar = sampleRTT / 2; 01075 } 01076 else { 01077 // see, e.g., Comer for the values used as weights 01078 fSRTT = 0.875 * fSRTT + 0.125 * sampleRTT; 01079 fRTTVar = 0.75 * fRTTVar + 0.25 * fabs(sampleRTT - fSRTT); 01080 } 01081 01082 fRTTEstimate = round(fSRTT + 4 * fRTTVar, fTimerGranularity); 01083 01084 if (fTrace) { 01085 TraceRTTVariables(sampleRTT); 01086 } 01087 01088 fNumberOfRTTMeasurements++; 01089 } 01090 01091 01092 void TCP_Sender::TraceRTTVariables(double sampleRTT) 01093 { 01094 if (fDebug) { 01095 std::cout << "sender " << fLabel 01096 << ": RTT update: " 01097 << "t = " << Event_Queue::now() 01098 << ", sample = " << sampleRTT 01099 << ", SRTT = " << fSRTT 01100 << ", RTTVar = " << fRTTVar 01101 << ", RTTEstimate = " << fRTTEstimate 01102 << std::endl; 01103 } 01104 01105 if (RTTsample_index >= RTTsample_time.size()) { 01106 RTTsample_time.set_size(2*RTTsample_time.size(), true); 01107 RTTsample_val.set_size(2*RTTsample_val.size(), true); 01108 } 01109 RTTsample_val(RTTsample_index) = sampleRTT; 01110 RTTsample_time(RTTsample_index) = Event_Queue::now(); 01111 RTTsample_index++; 01112 01113 if (RTTEstimate_index >= RTTEstimate_time.size()) { 01114 RTTEstimate_time.set_size(2*RTTEstimate_time.size(), true); 01115 RTTEstimate_val.set_size(2*RTTEstimate_val.size(), true); 01116 } 01117 RTTEstimate_val(RTTEstimate_index) = fRTTEstimate; 01118 RTTEstimate_time(RTTEstimate_index) = Event_Queue::now(); 01119 RTTEstimate_index++; 01120 } 01121 01122 01123 void TCP_Sender::TraceCWnd() 01124 { 01125 if (fDebug) { 01126 std::cout << "sender " << fLabel 01127 << " t = " << Event_Queue::now() 01128 << " cwnd = " << fCWnd << std::endl; 01129 } 01130 if (CWnd_index >= CWnd_time.size()) { 01131 CWnd_time.set_size(2*CWnd_time.size(), true); 01132 CWnd_val.set_size(2*CWnd_val.size(), true); 01133 } 01134 CWnd_val(CWnd_index) = fCWnd; 01135 CWnd_time(CWnd_index) = Event_Queue::now(); 01136 CWnd_index++; 01137 01138 } 01139 01140 void TCP_Sender::TraceSSThresh() 01141 { 01142 if (fDebug) { 01143 std::cout << "sender " << fLabel 01144 << " t = " << Event_Queue::now() 01145 << " cwnd = " << fSSThresh << std::endl; 01146 } 01147 if (SSThresh_index >= SSThresh_time.size()) { 01148 SSThresh_time.set_size(2*SSThresh_time.size(), true); 01149 SSThresh_val.set_size(2*SSThresh_val.size(), true); 01150 } 01151 SSThresh_val(SSThresh_index) = fSSThresh; 01152 SSThresh_time(SSThresh_index) = Event_Queue::now(); 01153 SSThresh_index++; 01154 01155 } 01156 01157 void TCP_Sender::TraceSentSeqNo(const Sequence_Number sn) 01158 { 01160 if (fDebug) { 01161 std::cout << "sender " << fLabel 01162 << " t = " << Event_Queue::now() 01163 << " sent = " << sn 01164 << std::endl; 01165 } 01166 if (sent_seq_num_index >= sent_seq_num_time.size()) { 01167 sent_seq_num_time.set_size(2*sent_seq_num_time.size(), true); 01168 sent_seq_num_val.set_size(2*sent_seq_num_val.size(), true); 01169 } 01170 sent_seq_num_val(sent_seq_num_index) = sn.value(); 01171 sent_seq_num_time(sent_seq_num_index) = Event_Queue::now(); 01172 sent_seq_num_index++; 01173 } 01174 01175 01176 void TCP_Sender::TraceACKedSeqNo(const Sequence_Number sn) 01177 { 01178 if (fDebug) { 01179 std::cout << "sender " << fLabel 01180 << " t = " << Event_Queue::now() 01181 << " ACK = " << sn 01182 << std::endl; 01183 } 01184 01185 if (sender_recv_ack_seq_num_index >= sender_recv_ack_seq_num_time.size()) { 01186 sender_recv_ack_seq_num_time.set_size(2*sender_recv_ack_seq_num_time.size(), true); 01187 sender_recv_ack_seq_num_val.set_size(2*sender_recv_ack_seq_num_val.size(), true); 01188 } 01189 sender_recv_ack_seq_num_val(sender_recv_ack_seq_num_index) = sn.value(); 01190 sender_recv_ack_seq_num_time(sender_recv_ack_seq_num_index) = Event_Queue::now(); 01191 sender_recv_ack_seq_num_index++; 01192 } 01193 01194 01195 void TCP_Sender::save_trace(std::string filename) 01196 { 01197 01198 CWnd_val.set_size(CWnd_index, true); 01199 CWnd_time.set_size(CWnd_index, true); 01200 01201 SSThresh_val.set_size(SSThresh_index, true); 01202 SSThresh_time.set_size(SSThresh_index, true); 01203 01204 sent_seq_num_val.set_size(sent_seq_num_index, true); 01205 sent_seq_num_time.set_size(sent_seq_num_index, true); 01206 01207 sender_recv_ack_seq_num_val.set_size(sender_recv_ack_seq_num_index, true); 01208 sender_recv_ack_seq_num_time.set_size(sender_recv_ack_seq_num_index, true); 01209 01210 RTTEstimate_val.set_size(RTTEstimate_index, true); 01211 RTTEstimate_time.set_size(RTTEstimate_index, true); 01212 01213 RTTsample_val.set_size(RTTsample_index, true); 01214 RTTsample_time.set_size(RTTsample_index, true); 01215 01216 if (fDebug) { 01217 std::cout << "CWnd_val" << CWnd_val << std::endl; 01218 std::cout << "CWnd_time" << CWnd_time << std::endl; 01219 std::cout << "CWnd_index" << CWnd_index << std::endl; 01220 01221 std::cout << "SSThresh_val" << SSThresh_val << std::endl; 01222 std::cout << "SSThresh_time" << SSThresh_time << std::endl; 01223 std::cout << "SSThresh_index" << SSThresh_index << std::endl; 01224 01225 std::cout << "sent_seq_num_val" << sent_seq_num_val << std::endl; 01226 std::cout << "sent_seq_num_time" << sent_seq_num_time << std::endl; 01227 std::cout << "sent_seq_num_index" << sent_seq_num_index << std::endl; 01228 01229 std::cout << "sender_recv_ack_seq_num_val" << sender_recv_ack_seq_num_val << std::endl; 01230 std::cout << "sender_recv_ack_seq_num_time" << sender_recv_ack_seq_num_time << std::endl; 01231 std::cout << "sender_recv_ack_seq_num_index" << sender_recv_ack_seq_num_index << std::endl; 01232 01233 std::cout << "RTTEstimate_val" << RTTEstimate_val << std::endl; 01234 std::cout << "RTTEstimate_time" << RTTEstimate_time << std::endl; 01235 std::cout << "RTTEstimate_index" << RTTEstimate_index << std::endl; 01236 01237 std::cout << "RTTsample_val" << RTTsample_val << std::endl; 01238 std::cout << "RTTsample_time" << RTTsample_time << std::endl; 01239 std::cout << "RTTsample_index" << RTTsample_index << std::endl; 01240 01241 std::cout << "TCP_Sender::saving to file: " << filename << std::endl; 01242 } 01243 01244 it_file ff2; 01245 ff2.open(filename); 01246 01247 ff2 << Name("CWnd_val") << CWnd_val; 01248 ff2 << Name("CWnd_time") << CWnd_time; 01249 ff2 << Name("CWnd_index") << CWnd_index; 01250 01251 ff2 << Name("SSThresh_val") << SSThresh_val; 01252 ff2 << Name("SSThresh_time") << SSThresh_time; 01253 ff2 << Name("SSThresh_index") << SSThresh_index; 01254 01255 ff2 << Name("sent_seq_num_val") << sent_seq_num_val; 01256 ff2 << Name("sent_seq_num_time") << sent_seq_num_time; 01257 ff2 << Name("sent_seq_num_index") << sent_seq_num_index; 01258 01259 ff2 << Name("sender_recv_ack_seq_num_val") << sender_recv_ack_seq_num_val; 01260 ff2 << Name("sender_recv_ack_seq_num_time") << sender_recv_ack_seq_num_time; 01261 ff2 << Name("sender_recv_ack_seq_num_index") << sender_recv_ack_seq_num_index; 01262 01263 ff2 << Name("RTTEstimate_val") << RTTEstimate_val; 01264 ff2 << Name("RTTEstimate_time") << RTTEstimate_time; 01265 ff2 << Name("RTTEstimate_index") << RTTEstimate_index; 01266 01267 ff2 << Name("RTTsample_val") << RTTsample_val; 01268 ff2 << Name("RTTsample_time") << RTTsample_time; 01269 ff2 << Name("RTTsample_index") << RTTsample_index; 01270 01271 ff2.flush(); 01272 ff2.close(); 01273 } 01274 01275 01276 void TCP_Sender::print_item(std::ostream &, const std::string & keyword) 01277 { 01278 if (keyword == "Label") { 01279 std::cout << fLabel; 01280 } 01281 else if (keyword == "CWnd") { 01282 std::cout << fCWnd; 01283 } 01284 else if (keyword == "SSThresh") { 01285 std::cout << fSSThresh; 01286 } 01287 else if (keyword == "SRTT") { 01288 std::cout << fSRTT; 01289 } 01290 else if (keyword == "RTTvar") { 01291 std::cout << fRTTVar; 01292 } 01293 else if (keyword == "Backoff") { 01294 std::cout << fBackoff; 01295 } 01296 else if (keyword == "RTO") { 01297 std::cout << CalcRTOValue(); 01298 } 01299 else if (keyword == "NoOfFastRets") { 01300 std::cout << fNumberOfFastRetransmits; 01301 } 01302 else if (keyword == "NoOfRetTOs") { 01303 std::cout << fNumberOfTimeouts; 01304 } 01305 else if (keyword == "NoOfIdleTOs") { 01306 std::cout << fNumberOfIdleTimeouts; 01307 } 01308 else if (keyword == "NoOfRTTMs") { 01309 std::cout << fNumberOfRTTMeasurements; 01310 } 01311 else if (keyword == "NoOfRecACKs") { 01312 std::cout << fNumberOfReceivedACKs; 01313 } 01314 else { 01315 } 01316 } 01317 01318 01319 // -------------------- TCP_Receiver_Buffer ---------------------------------------- 01320 TCP_Receiver_Buffer::TCP_Receiver_Buffer() : 01321 fFirstByte() 01322 { 01323 } 01324 01325 01326 TCP_Receiver_Buffer::TCP_Receiver_Buffer(const TCP_Receiver_Buffer & rhs) : 01327 fFirstByte(rhs.fFirstByte), 01328 fBufList(rhs.fBufList) 01329 { 01330 } 01331 01332 01333 void TCP_Receiver_Buffer::reset() 01334 { 01335 fBufList.clear(); 01336 fFirstByte = 0; 01337 } 01338 01339 01340 TCP_Receiver_Buffer::~TCP_Receiver_Buffer() 01341 { 01342 } 01343 01344 01345 void TCP_Receiver_Buffer::write(TCP_Segment newBlock) 01346 { 01347 // error cases 01348 it_assert(newBlock.begin() <= newBlock.end(), "TCP_Receiver_Buffer::Write, no valid segment"); 01349 01350 // cut blocks beginning before fFirstByte 01351 if (newBlock.begin() < fFirstByte) { 01352 if (newBlock.end() > fFirstByte) { 01353 newBlock.set_begin(fFirstByte); 01354 } 01355 else { 01356 return; //// TODO: Is this strange? 01357 } 01358 } 01359 01360 if (newBlock.length() == 0) { // empty block, nothing to do 01361 return; 01362 } 01363 01364 if (fBufList.empty() || (newBlock.begin() > fBufList.back().end())) { 01365 // new block is behind last block in buffer 01366 fBufList.push_back(newBlock); 01367 } 01368 else { 01369 // skip list entries if beginning of newBlock > end of current one 01370 // (search for correct list position) 01371 std::list<TCP_Segment>::iterator iter; 01372 iter = fBufList.begin(); 01373 while (newBlock.begin() > iter->end()) { 01374 iter++; 01375 it_assert(iter != fBufList.end(), "TCP_Receiver_Buffer::Write, internal error"); 01376 } 01377 01378 TCP_Segment & exBlock = *iter; 01379 01380 if (exBlock.can_be_combined(newBlock)) { 01381 // overlapping or contiguous blocks -> combine 01382 exBlock.combine(newBlock); 01383 01384 // check following blocks 01385 iter++; 01386 while ((iter != fBufList.end()) && 01387 exBlock.can_be_combined(*iter)) { 01388 exBlock.combine(*iter); 01389 iter = fBufList.erase(iter); 01390 } 01391 } 01392 else { 01393 // no overlap, newBlock lies between two existing list entries 01394 // new list entry has to be created 01395 01396 fBufList.insert(iter, newBlock); 01397 } 01398 } 01399 01400 it_assert(!fBufList.empty() && fBufList.front().begin() >= fFirstByte, "TCP_Receiver_Buffer::Write, internal error"); 01401 01402 } 01403 01404 01405 // The amount of data read from the buffer is given as parameter. It has 01406 // to be less than or equal to the size of the first block stored. This 01407 // mean the caller of Read should first check how much data is available 01408 // by calling FirstBlockSize. 01409 void TCP_Receiver_Buffer::read(unsigned noOfBytes) 01410 { 01411 it_assert(first_block_size() > 0, "TCP_Receiver_Buffer::Read, No block to read"); 01412 it_assert(noOfBytes <= first_block_size(), "TCP_Receiver_Buffer::Read, submitted block size not valid"); 01413 01414 01415 if (noOfBytes < first_block_size()) { 01416 fBufList.front().set_begin(fBufList.front().begin() + noOfBytes); 01417 } 01418 else { // first block will be read completely 01419 fBufList.pop_front(); 01420 } 01421 fFirstByte += noOfBytes; 01422 01423 it_assert(fBufList.empty() || fBufList.front().begin() >= fFirstByte, "TCP_Receiver_Buffer::Read, internal error"); 01424 } 01425 01426 01427 // FirstBlockSize returns the size of the first block stored in the 01428 // buffer or 0 if the buffer is empty 01429 unsigned TCP_Receiver_Buffer::first_block_size() const 01430 { 01431 if (!fBufList.empty() && (fBufList.front().begin() == fFirstByte)) { 01432 return fBufList.front().length(); 01433 } 01434 else { 01435 return 0; 01436 } 01437 } 01438 01439 01440 std::ostream & TCP_Receiver_Buffer::info(std::ostream &os, int detail) const 01441 { 01442 os << "receiver buffer information" << std::endl 01443 << "number of blocks: " << fBufList.size() << std::endl 01444 << "first byte stored: " << fFirstByte << std::endl 01445 << "last byte stored +1: " << last_byte() << std::endl 01446 << "next byte expected: " << next_expected() << std::endl; 01447 01448 if (detail > 0) { 01449 os << "segments in receiver buffer:" << std::endl; 01450 01451 typedef std::list<TCP_Segment>::const_iterator LI; 01452 for (LI i = fBufList.begin(); i != fBufList.end(); ++i) { 01453 const TCP_Segment & block = *i; 01454 os << ". segment: " << block << std::endl; 01455 } 01456 01457 } 01458 01459 return os; 01460 } 01461 01462 01463 // -------------------- TCP_Receiver ---------------------------------------- 01464 TCP_Receiver::TCP_Receiver(int label) : 01465 fReceiverBuffer(), 01466 fLabel(label), 01467 fTCPIPHeaderLength(TCP_HEADERLENGTH), 01468 fMSS(TCP_RMSS), 01469 fBufferSize(TCP_BUFFERSIZE), 01470 fDelayedACK(TCP_DELAYEDACK), 01471 fACKDelayTime(TCP_ACKDELAYTIME), 01472 fSendPeriodicACKs(TCP_SENDPERIODICACKS), 01473 fStrictPeriodicACKs(TCP_STRICTPERIODICACKS), 01474 fPeriodicACKInterval(TCP_PERIODICACKINTERVAL), 01475 fACKSchedulingDelay(TCP_ACKSCHEDULINGDELAY), 01476 fACKOnBufferWrite(TCP_ACKBUFFERWRITE), 01477 fACKOnBufferRead(TCP_ACKBUFFERREAD), 01478 fMaxUserBlockSize(TCP_MAXUSERBLOCKSIZE), 01479 fMinUserBlockSize(TCP_MINUSERBLOCKSIZE), 01480 fUserBlockProcDelay(TCP_USERBLOCKPROCDELAY), 01481 fTrace(false), 01482 fDebug(false), 01483 fSessionId(0), 01484 fDelayedACKTimer(*this, &TCP_Receiver::DelayedACKHandler), 01485 fPeriodicACKTimer(*this, &TCP_Receiver::PeriodicACKHandler), 01486 fACKSchedulingTimer(*this, &TCP_Receiver::SendACKMessage), 01487 fWaitingACKMsg(0), 01488 fUserBlockProcTimer(*this, &TCP_Receiver::HandleEndOfProcessing) 01489 { 01490 fUserMessage = NULL; 01491 01492 01493 if (!fACKOnBufferRead && !fACKOnBufferWrite) { 01494 // throw(UL_CException("TCP_Receiver::TCP_Receiver", 01495 // "ACKs must be sent on buffer read or write or both")); 01496 } 01497 01498 setup(); 01499 01500 tcp_receive.forward(this, &TCP_Receiver::ReceiveMessageFromNet); 01501 tcp_receive.set_name("TCP Receive"); 01502 tcp_send_ack.set_name("TCP send ACK"); 01503 tcp_new_data.set_name("TCP New Data"); 01504 tcp_release.forward(this, &TCP_Receiver::release); 01505 tcp_release.set_name("TCP Release"); 01506 01507 } 01508 01509 01510 TCP_Receiver::~TCP_Receiver() 01511 { 01512 delete fWaitingACKMsg; 01513 delete fUserMessage; 01514 } 01515 01516 01517 void TCP_Receiver::set_debug(const bool enable_debug) 01518 { 01519 fDebug = enable_debug; 01520 tcp_send_ack.set_debug(enable_debug); 01521 tcp_new_data.set_debug(); 01522 } 01523 01524 void TCP_Receiver::set_debug(bool enable_debug, bool enable_signal_debug) 01525 { 01526 fDebug = enable_debug; 01527 tcp_send_ack.set_debug(enable_signal_debug); 01528 tcp_new_data.set_debug(); 01529 } 01530 01531 void TCP_Receiver::set_trace(const bool enable_trace) 01532 { 01533 fTrace = enable_trace; 01534 } 01535 01536 01537 01538 void TCP_Receiver::setup() 01539 { 01540 fAdvRcvWnd = 0; 01541 fAdvRcvNxt = 0; 01542 01543 if (fSendPeriodicACKs) { 01544 fPeriodicACKTimer.Set(fPeriodicACKInterval); 01545 } 01546 01547 fReceiverBuffer.reset(); 01548 01549 received_seq_num_val.set_size(1000); 01550 received_seq_num_val.zeros(); 01551 received_seq_num_time.set_size(1000); 01552 received_seq_num_time.zeros(); 01553 received_seq_num_val(0) = 0; 01554 received_seq_num_time(0) = 0; 01555 received_seq_num_index = 1; 01556 } 01557 01558 std::string TCP_Receiver::GenerateFilename() 01559 { 01560 time_t rawtime; 01561 struct tm *timeinfo; 01562 timeinfo = localtime(&rawtime); 01563 std::ostringstream filename_stream; 01564 filename_stream << "trace_tcp_receiver_u" << fLabel 01565 << "_" << 1900 + timeinfo->tm_year 01566 << "_" << timeinfo->tm_mon 01567 << "_" << timeinfo->tm_mday 01568 << "__" << timeinfo->tm_hour 01569 << "_" << timeinfo->tm_min 01570 << "_" << timeinfo->tm_sec 01571 << "_.it"; 01572 return filename_stream.str(); 01573 } 01574 01575 void TCP_Receiver::release(std::string file) 01576 { 01577 std::string filename; 01578 fSessionId++; 01579 01580 if (fWaitingACKMsg != 0) { 01581 delete fWaitingACKMsg; 01582 fWaitingACKMsg = 0; 01583 } 01584 if (fUserMessage != 0) { 01585 delete fUserMessage; 01586 fUserMessage = 0; 01587 } 01588 01589 fUserBlockProcTimer.Reset(); 01590 fDelayedACKTimer.Reset(); 01591 fPeriodicACKTimer.Reset(); 01592 fACKSchedulingTimer.Reset(); 01593 01594 if (fTrace) { 01595 if (file == "") 01596 filename = GenerateFilename(); 01597 else 01598 filename = file; 01599 01600 save_trace(filename); 01601 } 01602 } 01603 01604 01605 void TCP_Receiver::ReceiveMessageFromNet(itpp::Packet *msg) 01606 { 01607 TCP_Packet & packet = (TCP_Packet &) * msg; 01608 if (packet.get_destination_port() == fLabel) { 01609 if (packet.get_session_id() == fSessionId) { 01610 ReceiveDataPacket(packet); 01611 } 01612 else { 01613 it_warning("Received a TCP packet with wrong SessionId"); 01614 std::cout << "TCP_Receiver::ReceiveMessageFromNet, " 01615 << "fLabel= " << fLabel 01616 << "fSessionId= " << fSessionId << std::endl; 01617 std::cout << "packet=" << packet 01618 << ", next exp. = " << fReceiverBuffer.next_expected() 01619 << std::endl; 01620 exit(0); 01621 } 01622 } 01623 else { 01624 it_warning("Received a TCP packet with label"); 01625 exit(0); 01626 } 01627 } 01628 01629 01630 void TCP_Receiver::ReceiveDataPacket(TCP_Packet &msg) 01631 { 01632 TCP_Segment segment = msg.get_segment(); 01633 01634 bool isOutOfOrder = (segment.begin() > fReceiverBuffer.next_expected()) || 01635 (segment.end() <= fReceiverBuffer.next_expected()); 01636 01637 if (fDebug) { 01638 std::cout << "TCP_Receiver::ReceiveDataPacket receiver: " << fLabel << ": " 01639 << "receive msg: " 01640 << "t = " << Event_Queue::now() 01641 << ", next exp. = " << fReceiverBuffer.next_expected() 01642 << ", " << msg << std::endl; 01643 } 01644 01645 if (fTrace) { 01646 TraceReceivedSeqNo(segment.end()); 01647 } 01648 01649 it_assert(segment.end() <= fReceiverBuffer.first_byte() + fBufferSize, "TCP_Receiver::ReceiveTCPPacket, packet exceeds window at "); 01650 it_assert(segment.begin() < segment.end(), "TCP_Receiver::ReceiveTCPPacket, silly packet received at "); 01651 01652 fReceiverBuffer.write(segment); 01653 01654 if (isOutOfOrder) { 01655 SendACK(true); // create dupack conditionless 01656 } 01657 else { 01658 if (fACKOnBufferWrite) { 01659 SendACK(false); 01660 } 01661 IndicateUserMessage(); 01662 } 01663 01664 delete &msg; 01665 } 01666 01667 01668 void TCP_Receiver::IndicateUserMessage() 01669 { 01670 if (fUserMessage == 0) { 01671 // receive a block 01672 unsigned noOfBytes = min(fReceiverBuffer.first_block_size(), 01673 fMaxUserBlockSize); 01674 01675 if (fDebug) { 01676 std::cout << "TCP_Receiver::IndicateUserMessage " 01677 << "t = " << Event_Queue::now() 01678 << " noOfBytes = " << noOfBytes 01679 << " firstBlock = " << fReceiverBuffer.first_block_size() 01680 << std::endl; 01681 } 01682 01683 if (noOfBytes >= fMinUserBlockSize) { 01684 fUserMessage = new Packet(); 01685 fUserMessage->set_bit_size(8*noOfBytes); 01686 fUserBlockProcTimer.Set(fUserBlockProcDelay); 01687 } 01688 } 01689 } 01690 01691 01692 bool TCP_Receiver::is_user_message_available() 01693 { 01694 if (fUserMessage != 0) { 01695 return true; 01696 } 01697 01698 unsigned noOfBytes = min(fReceiverBuffer.first_block_size(), 01699 fMaxUserBlockSize); 01700 01701 if (noOfBytes >= fMinUserBlockSize) { 01702 fUserMessage = new Packet(); 01703 fUserMessage->set_bit_size(8*noOfBytes); 01704 return true; 01705 } 01706 else { 01707 return false; 01708 } 01709 } 01710 01711 01712 itpp::Packet & TCP_Receiver::get_user_message() 01713 { 01714 it_assert(fUserMessage != 0, "TCP_Receiver::GetUserMessage, no message available"); 01715 if (fDebug) { 01716 std::cout << "TCP_Receiver::GetUserMessage " 01717 << "receiver: " << fLabel << ": " 01718 << "read from buffer: " 01719 << "t = " << Event_Queue::now() 01720 << ", user msg length = " << (fUserMessage->bit_size() / 8) 01721 << ", first byte = " << fReceiverBuffer.first_byte() 01722 << ", first block size = " << fReceiverBuffer.first_block_size() 01723 << std::endl; 01724 } 01725 01726 fReceiverBuffer.read(fUserMessage->bit_size() / 8); 01727 if (fACKOnBufferRead) { 01728 SendACK(false); // send acknowledgement 01729 } 01730 01731 itpp::Packet & msg = *fUserMessage; 01732 fUserMessage = 0; 01733 01734 if (fReceiverBuffer.first_block_size() > 0) { 01735 IndicateUserMessage(); 01736 } 01737 01738 return msg; 01739 } 01740 01741 01742 01743 void TCP_Receiver::HandleEndOfProcessing(Ttype) 01744 { 01745 it_assert(fUserMessage != 0, "TCP_Receiver::HandleEndOfProcessing, no message available"); 01746 01747 01748 tcp_new_data(fLabel); 01749 } 01750 01751 01752 void TCP_Receiver::DelayedACKHandler(Ttype) 01753 { 01754 if (fDebug) { 01755 std::cout << "TCP_Receiver::DelayedACKHandler " 01756 << "receiver " << fLabel 01757 << ": delACK TO: " 01758 << "t = " << Event_Queue::now() << std::endl; 01759 } 01760 01761 SendACK(true); 01762 } 01763 01764 01765 void TCP_Receiver::PeriodicACKHandler(Ttype) 01766 { 01767 if (fDebug) { 01768 std::cout << "TCP_Receiver::PeriodicACKHandler" 01769 << "receiver " << fLabel 01770 << ": periodicACK TO: " 01771 << "t = " << Event_Queue::now() << std::endl; 01772 } 01773 01774 SendACK(true); 01775 } 01776 01777 01778 void TCP_Receiver::SendACK(bool sendConditionless) 01779 { 01780 // sendConditionless is set 01781 // ... if packet was received out of order or 01782 // ... if delayed ACK timer has expired 01783 01784 // Bei eingeschaltetem "delayed ACK" wird ein ACK nur 01785 // gesendet, wenn das Fenster um 2MSS oder 35% der 01786 // maximalen Fenstergroesse verschoben worden ist 01787 // ... oder nach delayed ACK Timeout 01788 // ... oder wenn es das ACK fur ein Out of Order Segment ist 01789 // ... oder (in der Realitat), wenn ich auch was zu senden habe. 01790 01791 if (sendConditionless || !fDelayedACK || 01792 (fReceiverBuffer.next_expected() - fAdvRcvNxt >= (int)(2 * fMSS)) || 01793 (fReceiverBuffer.next_expected() - fAdvRcvNxt >= 01794 (int)(0.35 * fBufferSize))) { 01795 // Remark: RFC2581 recommends to acknowledge every second 01796 // packet conditionless (without setting this as a requirement) 01797 // in order to avoid excessive ack delays when the receiver MSS 01798 // is larger than the sender MSS. In this uni-directional 01799 // implementation, the receiver's MSS is not actively 01800 // used for sending but only for deciding when acknowledgments 01801 // have to be returned. Thus, the best solution to account for 01802 // RFC2581 is to set the receiver's MSS always equal to the 01803 // sender's MSS. 01804 01805 // Receiver Silly Window Syndrome Avoidance: 01806 01807 if (fAdvRcvNxt + fAdvRcvWnd + min(fBufferSize / 2, fMSS) 01808 <= fReceiverBuffer.first_byte() + fBufferSize) { 01809 // Die rechte Grenze des Empfangerfensters wird nur anders angezeigt 01810 // als beim letzten ACK, wenn sie sich seither um mindestens 01811 // min (BufferSize/ 2, MSS) geandert hat. 01812 fAdvRcvWnd = fBufferSize - fReceiverBuffer.first_block_size(); 01813 } 01814 else { 01815 fAdvRcvWnd = fAdvRcvNxt + fAdvRcvWnd - fReceiverBuffer.next_expected(); 01816 } 01817 01818 fAdvRcvNxt = fReceiverBuffer.next_expected(); 01819 01820 if (fSendPeriodicACKs && 01821 (!fStrictPeriodicACKs || !fPeriodicACKTimer.IsPending())) { 01822 fPeriodicACKTimer.Set(fPeriodicACKInterval); 01823 } 01824 01825 if (fDelayedACK && fDelayedACKTimer.IsPending()) { 01826 fDelayedACKTimer.Reset(); 01827 } 01828 01829 ScheduleACKMessage(); 01830 } 01831 else { 01832 if (!fDelayedACKTimer.IsPending()) { 01833 fDelayedACKTimer.Set(fACKDelayTime); 01834 if (fDebug) { 01835 std::cout << "TCP_Receiver::SendACK" 01836 << "receiver " << fLabel 01837 << ": set delACK timer: " 01838 << "t = " << Event_Queue::now() << std::endl; 01839 } 01840 } 01841 } 01842 } 01843 01844 01845 void TCP_Receiver::ScheduleACKMessage() 01846 { 01847 if (fWaitingACKMsg == 0) { 01848 fWaitingACKMsg = new TCP_Packet; 01849 } 01850 01851 fWaitingACKMsg->set_ACK(fAdvRcvNxt); 01852 fWaitingACKMsg->set_wnd(fAdvRcvWnd); 01853 fWaitingACKMsg->set_session_id(fSessionId); 01854 fWaitingACKMsg->set_destination_port(fLabel); 01855 fWaitingACKMsg->set_source_port(fLabel); 01856 fWaitingACKMsg->set_bit_size(8*fTCPIPHeaderLength); 01857 01858 if (fACKSchedulingDelay > 0) { 01859 if (!fACKSchedulingTimer.IsPending()) { 01860 fACKSchedulingTimer.Set(fACKSchedulingDelay); 01861 } 01862 } 01863 else { 01864 SendACKMessage(Event_Queue::now()); 01865 } 01866 } 01867 01868 01869 void TCP_Receiver::SendACKMessage(Ttype) 01870 { 01871 it_assert(fWaitingACKMsg != 0, "TCP_Receiver::SendACKMessage, no ACK message waiting"); 01872 01873 if (fDebug) { 01874 std::cout << "TCP_Receiver::SendACKMessage Ack sent" 01875 << "receiver " << fLabel 01876 << ": send ACK: " 01877 << "t = " << Event_Queue::now() 01878 << ", " << (*fWaitingACKMsg) 01879 << " byte_size=" << fWaitingACKMsg->bit_size() / 8 01880 << " ptr=" << fWaitingACKMsg << std::endl; 01881 } 01882 01883 tcp_send_ack(fWaitingACKMsg); 01884 01885 fWaitingACKMsg = 0; 01886 } 01887 01888 01889 void TCP_Receiver::TraceReceivedSeqNo(const Sequence_Number &sn) 01890 { 01891 if (fDebug) { 01892 std::cout << "TCP_Receiver::TraceReceivedSeqNo " 01893 << "receiver " << fLabel 01894 << " t = " << Event_Queue::now() 01895 << " sn = " << sn << std::endl; 01896 } 01897 if (received_seq_num_index >= received_seq_num_time.size()) { 01898 received_seq_num_time.set_size(2*received_seq_num_time.size(), true); 01899 received_seq_num_val.set_size(2*received_seq_num_val.size(), true); 01900 } 01901 received_seq_num_val(received_seq_num_index) = sn.value(); 01902 received_seq_num_time(received_seq_num_index) = Event_Queue::now(); 01903 received_seq_num_index++; 01904 } 01905 01906 01907 void TCP_Receiver::save_trace(std::string filename) 01908 { 01909 01910 received_seq_num_val.set_size(received_seq_num_index, true); 01911 received_seq_num_time.set_size(received_seq_num_index, true); 01912 01913 if (fDebug) { 01914 std::cout << "received_seq_num_val" << received_seq_num_val << std::endl; 01915 std::cout << "received_seq_num_time" << received_seq_num_time << std::endl; 01916 std::cout << "received_seq_num_index" << received_seq_num_index << std::endl; 01917 std::cout << "TCP_Receiver::saving to file: " << filename << std::endl; 01918 } 01919 01920 it_file ff2; 01921 ff2.open(filename); 01922 01923 ff2 << Name("received_seq_num_val") << received_seq_num_val; 01924 ff2 << Name("received_seq_num_time") << received_seq_num_time; 01925 ff2 << Name("received_seq_num_index") << received_seq_num_index; 01926 01927 ff2.flush(); 01928 ff2.close(); 01929 } 01930 01931 01932 } //namespace itpp 01933 01934 #ifdef _MSC_VER 01935 #pragma warning(default:4355) 01936 #endif 01937
Generated on Sun Jul 26 08:36:49 2009 for IT++ by Doxygen 1.5.9