/* -*- Mode:C++; c-basic-offset:8; tab-width:8; indent-tabs-mode:t -*- */ /* * Copyright (c) 2001 University of Mannheim, Praktische Informatik IV * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * 3. All advertising materials mentioning features or use of this software * must display the following acknowledgement: * This product includes software developed by Praktische Informatik IV, * University of Mannheim and ACIRI. * 4. Neither the name of the University of Mannheim nor of ACIRI * may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY U. MANNHEIM AND CONTRIBUTORS ``AS IS'' AND * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL U. MANNHEIM OR CONTRIBUTORS BE LIABLE * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ #include #include #include #include #include #include "tfmcc.h" #define MAX(a, b) (((a) > (b)) ? (a) : (b)) #define MIN(a, b) (((a) < (b)) ? (a) : (b)) int hdr_tfmcc::offset_; int hdr_tfmcc_ack::offset_; static class TFMCCHeaderClass : public PacketHeaderClass { public: TFMCCHeaderClass() : PacketHeaderClass("PacketHeader/TFMCC", sizeof(hdr_tfmcc)) { bind_offset(&hdr_tfmcc::offset_); } } class_tfmcchdr; static class TFMCC_ACKHeaderClass : public PacketHeaderClass { public: TFMCC_ACKHeaderClass() : PacketHeaderClass("PacketHeader/TFMCC_ACK", sizeof(hdr_tfmcc_ack)) { bind_offset(&hdr_tfmcc_ack::offset_); } } class_tfmcc_ackhdr; static class TfmccClass : public TclClass { public: TfmccClass() : TclClass("Agent/TFMCC") {} TclObject* create(int, const char*const*) { return (new TfmccAgent()); } } class_tfmcc; TfmccAgent::TfmccAgent() : Agent(PT_TFMCC), send_timer_(this), NoFeedbacktimer_(this), oldrate_(0), maxrate_(0) { bind("packetSize_", &size_); bind("fairSize_", &fairsize_); bind("ndatapack_", &ndatapack_); bind("InitRate_", &InitRate_); bind("overhead_", &overhead_); bind("ssmult_", &ssmult_); bind("printStatus_", &printStatus_); bind("rate_", &rate_); bind("bval_", &bval_); bind("min_rate_", &min_rate_); bind("t_factor_", &t_factor_); bind("Initial_RTT_", &Initial_RTT_); bind("sndr_id_", &sndr_id); } int TfmccAgent::command(int argc, const char*const* argv) { if (argc==2) { if (strcmp(argv[1],"start")==0) { start(); return TCL_OK; } if (strcmp(argv[1],"stop")==0) { stop(); return TCL_OK; } } return (Agent::command(argc, argv)); } void TfmccAgent::start() { double now = Scheduler::instance().clock(); seqno_ = 0; rate_ = InitRate_; delta_ = 0; oldrate_ = rate_; rate_change_ = INIT_RATE; last_change_ = 0; round_begin_ = now; // not 0, otherwise "no-feedback reduction" in first round maxrate_ = 0; ndatapack_ = 0; active_ = 1; round_id = 0; expected_rate = 0 ; inter_packet = 0; max_rtt_ = Initial_RTT_; supp_rate_ = DBL_MAX; rtt_recv_id = -1; rtt_prio = 0; rtt_recv_timestamp = 0; rtt_recv_last_feedback = 0; rtt_rate = 0; // CLR clr_id = -1; clr_timestamp = 0; clr_last_feedback = 0; // send the first packet sendpkt(); // ... at initial rate send_timer_.resched(size_/rate_); // ... and start timer so we can cut rate // in half if we do not get feedback NoFeedbacktimer_.resched(DEC_NO_REPORT*size_/rate_); } void TfmccAgent::stop() { active_ = 0; send_timer_.force_cancel(); } /* * Receive a status report from the receiver. */ void TfmccAgent::recv(Packet *pkt, Handler *) { double now = Scheduler::instance().clock(); hdr_tfmcc_ack *nck = hdr_tfmcc_ack::access(pkt); int cur_prio; int new_clr = 0; double adjusted_rate; if (rate_change_ == INIT_RATE) rate_change_ = SLOW_START; // if we don't get ANY feedback for some time, cut rate in half NoFeedbacktimer_.resched(DEC_NO_REPORT * max_rtt_); // compute receiver's current RTT double tmp_rtt = now - nck->timestamp_echo - nck->timestamp_offset; if (max_rtt_ < tmp_rtt) max_rtt_ = tmp_rtt; adjusted_rate = nck->rate; if (nck->have_rtt == 0 && rate_change_ == CONG_AVOID) { // recalculate rate with correct RTT adjusted_rate = adjusted_rate / tmp_rtt * max_rtt_; } if (adjusted_rate < min_rate_) adjusted_rate = min_rate_; // adjust suppression rate by G_FACTOR if non-CLR (and correct feedback round) // before clr_id is updated so that new CLR also contributes to suppression if ((clr_id != nck->recv_id) && (supp_rate_ > (1-G_FACTOR) * nck->rate) && (nck->round_id == round_id)) supp_rate_ = (1-G_FACTOR) * nck->rate; // if we are in slow start and we just saw a loss then come out of slow start if (rate_change_ == SLOW_START && nck->have_loss) { rate_change_ = CONG_AVOID; oldrate_ = rate_ = adjusted_rate; clr_id = nck->recv_id; new_clr = 1; } if (rate_change_ == SLOW_START) { // adjust maxrate if (clr_id == nck->recv_id) { maxrate_ = adjusted_rate; } else if ((nck->receiver_leave == 0) && (adjusted_rate < maxrate_ || clr_id == -1)) { // change CLR maxrate_ = adjusted_rate; clr_id = nck->recv_id; new_clr = 1; } slowstart(); } else if (rate_change_ == CONG_AVOID) { if ((nck->receiver_leave == 0) && (adjusted_rate < rate_ || clr_id == -1)) { // change CLR clr_id = nck->recv_id; new_clr = 1; } if (clr_id == nck->recv_id) { if (adjusted_rate >= 0) { expected_rate = adjusted_rate; if (expected_rate > rate_) increase_rate(); else decrease_rate(); } } } // determine which timestamp to include in data packet if (clr_id == nck->recv_id) { // remember CLR timestamp to include in packets w/o any other RTT feedback clr_timestamp = nck->timestamp; clr_last_feedback = now; if (!nck->have_rtt || new_clr) cur_prio = PRIO_FORCE; else cur_prio = PRIO_CLR; } else { if (!nck->have_rtt) cur_prio = PRIO_NO_RTT; else cur_prio = PRIO_RECV; } if (cur_prio > rtt_prio || (cur_prio == rtt_prio && nck->rate < rtt_rate)) { rtt_prio = cur_prio; rtt_recv_id = nck->recv_id; rtt_recv_timestamp = nck->timestamp; rtt_recv_last_feedback = now; rtt_rate = nck->rate; // use reported rate for suppression } if (printStatus_) { if (nck->receiver_leave) printf("s%i %f leave-group", sndr_id, now); else printf("s%i %f update", sndr_id, now); if (supp_rate_ == DBL_MAX) { printf(" [%i,i%i,m%i] %f/%f sendrate %f (supp oo, max %f, max rtt %f) round %i(%i) ", nck->recv_id, rtt_recv_id, clr_id, adjusted_rate * 8.0 / 1000.0, nck->rate * 8.0 / 1000.0, rate_ * 8.0 / 1000.0, maxrate_ * 8.0 / 1000.0, max_rtt_, round_id, nck->round_id); } else { printf(" [%i,i%i,m%i] %f/%f sendrate %f (supp %f, max %f, max rtt %f) round %i(%i) ", nck->recv_id, rtt_recv_id, clr_id, adjusted_rate * 8.0 / 1000.0, nck->rate * 8.0 / 1000.0, rate_ * 8.0 / 1000.0, supp_rate_ * 8.0 / 1000.0, maxrate_ * 8.0 / 1000.0, max_rtt_, round_id, nck->round_id); } printf((nck->recv_id == clr_id) ? "!\n" : "\n"); // feedback was from rep } Packet::free(pkt); } void TfmccAgent::slowstart () { double now = Scheduler::instance().clock(); if (rate_+SMALLFLOAT< size_/max_rtt_ ) { /* if this is the first report, change rate to 1 per rtt */ /* compute delta so rate increases slowly to new value */ oldrate_ = rate_; rate_ = fairsize_/max_rtt_; delta_ = (rate_ - oldrate_)/(rate_*max_rtt_/size_); last_change_ = now; } else { /* else multiply the rate by ssmult_, and compute delta, */ /* so that the rate increases slowly to new value */ if (maxrate_ > 0) { if (ssmult_*rate_ < maxrate_ && now - last_change_ > max_rtt_) { rate_ = ssmult_*rate_; delta_ = (rate_ - oldrate_)/(rate_*max_rtt_/size_); last_change_ = now; } else { if (rate_ > maxrate_ || now - last_change_ > max_rtt_) { rate_ = maxrate_; delta_ = (maxrate_ - oldrate_)/(rate_*max_rtt_/size_); if (delta_ < 0) delta_ = 0; last_change_ = now; } } } else { if (now - last_change_ > max_rtt_) { rate_ = ssmult_*rate_; delta_ = (rate_ - oldrate_)/(rate_*max_rtt_/size_); last_change_ = now; } } } } void TfmccAgent::increase_rate() { double now = Scheduler::instance().clock(); double mult = (now - last_change_) / max_rtt_; if (mult > 2) mult = 2; //printf("stmp %f | %f -> ", now, rate_*8.0/1000.0); rate_ = MIN(expected_rate, rate_ + (fairsize_/max_rtt_)*mult); //printf("%f step %f (min %f)\n", expected_rate*8.0/1000.0, rate_*8.0/1000.0, fairsize_/max_rtt_*8.0/1000.0); if (rate_ < fairsize_/max_rtt_) rate_ = fairsize_/max_rtt_; last_change_ = now; // check if rate increase should impact send timer double next = size_/rate_; if (inter_packet >= 2 * next) { assert(next > SMALLFLOAT); send_timer_.resched(next); } } void TfmccAgent::decrease_rate() { double now = Scheduler::instance().clock(); oldrate_ = rate_ = expected_rate; last_change_ = now; } void TfmccAgent::nextpkt() { double now = Scheduler::instance().clock(); double xrate = -1; // new feedback round every T = t_factor * max_RTT, // if at least one non-CLR FBM was received (supp_rate_ < DBL_MAX) // or after T = 2 * t_factor * max_RTT (only one receiver in group) if ((now - round_begin_ >= t_factor_ * max_rtt_) && (supp_rate_ < DBL_MAX) || (now - round_begin_ >= 2 * t_factor_ * max_rtt_)) { round_id = (round_id + 1) % MAX_ROUND_ID; round_begin_ = now; supp_rate_ = DBL_MAX; // XXX draft says only inc when max_rtt unchanged from prev round max_rtt_ *= MAX_RTT_DECAY; if (max_rtt_ < size_/rate_) max_rtt_ = size_/rate_; } // time out CLR if (now - clr_last_feedback > max_rtt_ * DEC_NO_REPORT) { clr_id = -1; } sendpkt(); // during slow start, we increase rate // slowly - by amount delta per packet if ((rate_change_ == SLOW_START) && (oldrate_+SMALLFLOAT< rate_)) { oldrate_ = oldrate_ + delta_; xrate = oldrate_; } else { xrate = rate_; } //printf("%f xrate: %f\n",Scheduler::instance().clock(),xrate); if (xrate > SMALLFLOAT) { inter_packet = size_/xrate; // randomize between +-50% packet interval, but no more than max_rtt gap double max_variation = MAX(0, MIN(0.5 * inter_packet, max_rtt_ - inter_packet)); double next = Random::uniform(inter_packet - max_variation, inter_packet + max_variation); assert(next > SMALLFLOAT); send_timer_.resched(next); } } void TfmccAgent::sendpkt() { if (active_) { double now = Scheduler::instance().clock(); Packet* p = allocpkt(); hdr_tfmcc *tfmcch = hdr_tfmcc::access(p); tfmcch->seqno=seqno_++; tfmcch->timestamp = now; tfmcch->supp_rate = supp_rate_; tfmcch->psize = size_; tfmcch->round_id = round_id; tfmcch->max_rtt = max_rtt_; if (clr_id == rtt_recv_id || rtt_recv_id == -1) { tfmcch->rtt_recv_id = clr_id; tfmcch->timestamp_echo = clr_timestamp; tfmcch->timestamp_offset = now - clr_last_feedback; tfmcch->is_clr = 1; } else { tfmcch->rtt_recv_id = rtt_recv_id; tfmcch->timestamp_echo = rtt_recv_timestamp; tfmcch->timestamp_offset = now - rtt_recv_last_feedback; tfmcch->is_clr = 0; } send(p, 0); /* if (printStatus_) { if (supp_rate_ == DBL_MAX) { printf("s%i %f data-packet [i%i,m%i] sendrate %f (supp oo, max %f, max rtt %f) round %i\n", sndr_id, now, rtt_recv_id, clr_id, rate_ * 8.0 / 1000.0, maxrate_ * 8.0 / 1000.0, max_rtt_, round_id); } else { printf("s%i %f data-packet [i%i,m%i] sendrate %f (supp %f, max %f, max rtt %f) round %i\n", sndr_id, now, rtt_recv_id, clr_id, rate_ * 8.0 / 1000.0, supp_rate_ * 8.0 / 1000.0, maxrate_ * 8.0 / 1000.0, max_rtt_, round_id); } } */ ndatapack_++; rtt_recv_id = -1; rtt_prio = 0; } } void TfmccAgent::reduce_rate_on_no_feedback() { rate_ *= 0.5; double next = size_ / rate_; assert(next > SMALLFLOAT); send_timer_.resched(next); NoFeedbacktimer_.resched(DEC_NO_REPORT*max_rtt_); if (printStatus_) printf("s%i %f no-feedback reduction\n", sndr_id, Scheduler::instance().clock()); } void TfmccSendTimer::expire(Event *) { a_->nextpkt(); } void TfmccNoFeedbackTimer::expire(Event *) { a_->reduce_rate_on_no_feedback(); }