/* -*- Mode:C++; c-basic-offset:8; tab-width:8; indent-tabs-mode:t -*- */ /* * Copyright (c) 2001 University of Mannheim, Praktische Informatik IV * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * 3. All advertising materials mentioning features or use of this software * must display the following acknowledgement: * This product includes software developed by Praktische Informatik IV, * University of Mannheim and ACIRI. * 4. Neither the name of the University of Mannheim nor of ACIRI * may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY U. MANNHEIM AND CONTRIBUTORS ``AS IS'' AND * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL U. MANNHEIM OR CONTRIBUTORS BE LIABLE * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ #include #include #include #include #include #include #include "tfmcc-sink.h" #include "formula.h" #define MAX(a, b) (((a) > (b)) ? (a) : (b)) #define MIN(a, b) (((a) < (b)) ? (a) : (b)) static class TfmccSinkClass : public TclClass { public: TfmccSinkClass() : TclClass("Agent/TFMCCSink") {} TclObject* create(int, const char*const*) { return (new TfmccSinkAgent()); } } class_tfmccSink; TfmccSinkAgent::TfmccSinkAgent() : Agent(PT_TFMCC_ACK) { bind("packetSize_", &size_); bind("fairSize_", &fairsize_); bind_bool("aggregatePackets_", &aggregate_packets_); bind_bool("aggregateLoss_", &aggregate_loss_); bind("InitHistorySize_", &hsz); bind("AdjustHistoryAfterSS_", &adjust_history_after_ss); bind("NumSamples_", &numsamples); bind("discount_", &discount); bind("printStatus_", &printStatus_); bind("smooth_", &smooth_); bind("recv_id_", &recv_id); bind("df_", &df_); // bind("ca_", &ca_); // not used bind("rate_", &rate_); bind("bval_", &bval_); bind("tight_loop_", &tight_loop_); bind("fbtime_mult_", &fbtime_mult_); bind("constant_rate_", &constant_rate_); // XXX some of this should be set in start proc (which doesn't exist yet) rtt_ = 0; rtt_est = 0; one_way_ = 0; last_timestamp_ = 0; last_arrival_ = 0; last_report_sent=0; avgsize_ = 0; sum_psize_ = 0; sum_lsize_ = 0; active_ = 1; maxseq = -1; rcvd_since_last_report = 0; loss_seen_yet = 0; lastloss = -1000; // also count loss event directly after startup false_sample = 0; round_id = -1; // so that first data packet starts a new round sample_count = 1 ; last_sample = 0; mult_factor_ = 1.0; representative = 0; comparison_rate = 0; suppression_rate = 0; fb_time = 0; receiver_leave = 0; rtvec_ = NULL; tsvec_ = NULL; lossvec_ = NULL; sizevec_ = NULL; sample = NULL ; weights = NULL ; mult = NULL ; } /* * Receive new data packet. If appropriate, generate a new report. */ void TfmccSinkAgent::recv(Packet *pkt, Handler *) { hdr_tfmcc *tfmcch = hdr_tfmcc::access(pkt); double now = Scheduler::instance().clock(); int prevmaxseq = maxseq; double p = -1; double rtt_cur; int new_round; double prev_last_arrival_; rcvd_since_last_report ++; if (numsamples < 0) { // This is the first packet received. numsamples = DEFAULT_NUMSAMPLES ; // forget about losses before this prevmaxseq = maxseq = tfmcch->seqno-1 ; // initialize last_sample last_sample = tfmcch->seqno; if (smooth_ == 1) { numsamples = numsamples + 1; } sample = (int *)malloc((numsamples+1)*sizeof(int)); weights = (double *)malloc((numsamples+1)*sizeof(double)); mult = (double *)malloc((numsamples+1)*sizeof(double)); for (int i = 0 ; i < numsamples+1 ; i ++) { sample[i] = 0 ; } if (smooth_ == 1) { weights[0] = 1.0; weights[1] = 1.0; weights[2] = 1.0; weights[3] = 1.0; weights[4] = 1.0; weights[5] = 0.8; weights[6] = 0.6; weights[7] = 0.4; weights[8] = 0.2; weights[9] = 0; } else { weights[0] = 1.0; weights[1] = 1.0; weights[2] = 1.0; weights[3] = 1.0; weights[4] = 0.8; weights[5] = 0.6; weights[6] = 0.4; weights[7] = 0.2; weights[8] = 0; } for (int i = 0; i < numsamples+1; i ++) { mult[i] = 1.0 ; } } psize_ = tfmcch->psize; avgsize_ = (int)((avgsize_ == 0) ? psize_ : (avgsize_ * 0.9 + psize_ * 0.1)); prev_last_arrival_ = (last_arrival_ == 0) ? now : last_arrival_; // no packet gap for first packet last_arrival_ = now; last_timestamp_ = tfmcch->timestamp; max_rtt = tfmcch->max_rtt; if (tfmcch->is_clr) { if (tfmcch->rtt_recv_id == recv_id) representative = 1; else representative = 0; } if (tfmcch->rtt_recv_id == recv_id) { rtt_cur = now - tfmcch->timestamp_echo - tfmcch->timestamp_offset; one_way_ = tfmcch->timestamp - tfmcch->timestamp_echo - tfmcch->timestamp_offset; if (rtt_ > 0) { double tmp_df_ = representative ? df_ : 0.5; rtt_est = rtt_ = tmp_df_ * rtt_ + (1-tmp_df_) * (rtt_cur); } else { rtt_est = rtt_ = rtt_cur; if (printStatus_) printf("r%i %f : first RTT %f (max %f)\n", recv_id, now, rtt_, max_rtt); // adjust previously faked lost history! if (false_sample == -1 && sample_count <= numsamples+1) { if (rtt_ < rtt_lossinit) { sample[sample_count-1] = int(sample[sample_count-1] * (rtt_/rtt_lossinit) * (rtt_/rtt_lossinit)); if (sample[sample_count-1] < 1) sample[sample_count-1] = 1; } } } } else { // do we have an initial measurement? if (one_way_ > 0) { rtt_cur = one_way_ + (now - tfmcch->timestamp); rtt_est = rtt_cur; } } add_packet_to_history (pkt); if ((loss_seen_yet == 0) && (tfmcch->seqno - prevmaxseq > 1)) { loss_seen_yet = 1; if (adjust_history_after_ss) { adjust_history(tfmcch->timestamp); } } if (rtt_est > 0) conservative_rtt = rtt_est; else conservative_rtt = max_rtt; p = est_loss(); if (p == 0) rate_ = 2 * est_thput(); else rate_ = p_to_b(p, conservative_rtt, 4 * conservative_rtt, fairsize_, bval_); // report a constant rate (for debugging etc) if (constant_rate_ > 0) rate_ = constant_rate_ / 8.0; if (round_id < tfmcch->round_id || round_id > tfmcch->round_id + MAX_ROUND_ID / 2) { round_id = tfmcch->round_id; new_round = 1; } else { new_round = 0; } suppression_rate = tfmcch->supp_rate; // use rate at beginning of round for suppression to avoid // possible feedback implosion in case of rate reduction during the round if (new_round) comparison_rate = rate_; if (representative && tight_loop_) { // feedback report once per RTT if (now - last_report_sent > conservative_rtt) { fb_time = 0; sendpkt(); } } else { if (new_round) fb_time = now + feedback_time(max_rtt, MAX_NUM_RECVS); if (fb_time > 0) { // reschedule packet in case last data packet was received too long ago // to prevent implosion (max. RTT should not be less than inter-packet gap) // (but don't reschedule if it's the first data packet) if (now - prev_last_arrival_ > max_rtt) fb_time += now - prev_last_arrival_ - max_rtt; if (comparison_rate >= suppression_rate || rate_ >= suppression_rate) { // suppress higher rate feedback fb_time = 0; } else { // send non-CLR feedback if it's not the first data packet and fb_time passed // (old feedback needs to be sent before checking for a new round!) if (rate_ > 0 && fb_time <= now) { sendpkt(); fb_time = 0; } } } } if (printStatus_) { if (recv_id == 0) { if (suppression_rate < DBL_MAX) printf("tmpr%i %f : l%f r%f -> %f (%f) round %i (new_round %i fb %f, add %f) | ", recv_id, now, p, conservative_rtt, rate_ * 8.0 / 1000.0, tfmcch->supp_rate * 8.0 / 1000.0, round_id, new_round, fb_time, MAX(0.0, now - prev_last_arrival_ - max_rtt)); else printf("tmpr%i %f : l%f r%f -> %f (oo) round %i (new_round %i fb %f, add %f) | ", recv_id, now, p, conservative_rtt, rate_ * 8.0 / 1000.0, round_id, new_round, fb_time, MAX(0.0, now - prev_last_arrival_ - max_rtt)); for (int i = 0 ; i <= numsamples; i++) printf("%i ",sample[i]); printf(" | %i %i\n", sample_count, numsamples); } } Packet::free(pkt); } void TfmccSinkAgent::add_packet_to_history (Packet *pkt) { hdr_tfmcc *tfmcch = hdr_tfmcc::access(pkt); double now = Scheduler::instance().clock(); register int i; register int seqno = tfmcch->seqno; if (lossvec_ == NULL) { // Initializing history. rtvec_=(double *)malloc(sizeof(double)*hsz); tsvec_=(double *)malloc(sizeof(double)*hsz); lossvec_=(char *)malloc(sizeof(char)*hsz); sizevec_=(int *)malloc(sizeof(int)*hsz); if (rtvec_ && lossvec_) { for (i = 0; i <= maxseq ; i++) { lossvec_[i] = NOLOSS; rtvec_[i] = now; tsvec_[i] = last_timestamp_; sizevec_[i] = fairsize_; } for (i = maxseq+1; i < hsz ; i ++) { lossvec_[i] = UNKNOWN; rtvec_[i] = -1; tsvec_[i] = -1; sizevec_[i] = 0; } } else { fprintf(stderr, "error allocating memory for packet buffers\n"); abort (); } } if (tfmcch->seqno - last_sample > hsz) { fprintf(stderr, "time=%f, pkt=%d, last=%d history to small\n", now, tfmcch->seqno, last_sample); abort(); } /* for the time being, we will ignore out of order and duplicate packets etc. */ if (seqno > maxseq) { rtvec_[seqno%hsz]=now; tsvec_[seqno%hsz]=last_timestamp_; lossvec_[seqno%hsz] = RCVD; sizevec_[seqno%hsz] = tfmcch->psize; i = maxseq+1; if (i < seqno) { double delta = (tsvec_[seqno%hsz]-tsvec_[maxseq%hsz])/(seqno-maxseq) ; double tstamp = tsvec_[maxseq%hsz]+delta ; while(i < seqno) { rtvec_[i%hsz]=now; tsvec_[i%hsz]=tstamp; sizevec_[i%hsz] = avgsize_; if (tsvec_[i%hsz]-lastloss > conservative_rtt) { // Lost packets are marked as "LOST" // at most once per RTT. lossvec_[i%hsz] = LOST; lastloss = tstamp; } else { // This lost packet is marked "NOLOSS" // because it does not begin a loss event. lossvec_[i%hsz] = NOLOSS; } i++; tstamp = tstamp+delta; } } maxseq = seqno; } } /* * Estimate the loss rate. This function calculates two loss rates, * and returns the smaller of the two. */ double TfmccSinkAgent::est_loss() { int i; double ave_interval1, ave_interval2; int ds ; // sample[i] counts the number of packets since the i-th loss event // sample[0] contains the most recent sample. //packets (and possibly loss events) are aggregated to form fairsize_'ed packets for (i = last_sample; i <= maxseq ; i ++) { if (aggregate_packets_) { if ((!aggregate_loss_ && lossvec_[i%hsz] == LOST)) { // adjust sample if each loss event counts as full packet sum_psize_ += fairsize_; } else { sum_psize_ += sizevec_[i%hsz]; } if (sum_psize_ >= 0) { // after received packet ignore the next packets until fairsize_ bytes were received sum_psize_ -= fairsize_; sample[0]++; } } else { sample[0]++; } if (lossvec_[i%hsz] == NOLOSS || lossvec_[i%hsz] == LOST) sum_lsize_ += sizevec_[i%hsz]; if ((aggregate_loss_ && sum_lsize_ >= 0) || (!aggregate_loss_ && lossvec_[i%hsz] == LOST)) { sum_lsize_ -= fairsize_; // new loss event sample_count++; shift_array (sample, numsamples+1, 0); multiply_array(mult, numsamples+1, mult_factor_); shift_array (mult, numsamples+1, 1.0); mult_factor_ = 1.0; } } last_sample = maxseq+1 ; if (sample_count>numsamples+1) // The array of loss intervals is full. ds=numsamples+1; else ds=sample_count; if (sample_count == 1 && false_sample == 0) // no losses yet return 0; /* do we need to discount weights? */ if (sample_count > 1 && discount && sample[0] > 0) { double ave = weighted_average(1, ds, 1.0, mult, weights, sample); int factor = 2; double ratio = (factor*ave)/sample[0]; double min_ratio = 0.5; if ( ratio < 1.0) { // the most recent loss interval is very large mult_factor_ = ratio; if (mult_factor_ < min_ratio) mult_factor_ = min_ratio; } } // Calculations including the most recent loss interval. ave_interval1 = weighted_average(0, ds, mult_factor_, mult, weights, sample); // The most recent loss interval does not end in a loss // event. Include the most recent interval in the // calculations only if this increases the estimated loss // interval. ave_interval2 = weighted_average(1, ds, mult_factor_, mult, weights, sample); if (ave_interval2 > ave_interval1) ave_interval1 = ave_interval2; if (ave_interval1 >= 0.99) { //print_loss(sample[0], ave_interval1); return 1/ave_interval1; } else { fprintf(stderr, "r%i est_loss(): sample_count %i, false_sample %i\n", recv_id, sample_count , false_sample); double avg = 0; for (int i = 0 ; i <= numsamples; i++) { fprintf(stderr, "%i ", sample[i]); avg += sample[i]; } fprintf(stderr, " --- avg %f %i\n", ave_interval1, numsamples); exit(1); //return 999; } } void TfmccSinkAgent::print_loss(int sample, double ave_interval) { double now = Scheduler::instance().clock(); double drops = 1/ave_interval; printf ("time: %7.5f current_loss_interval %5d\n", now, sample); printf ("time: %7.5f loss_rate: %7.5f\n", now, drops); } // Calculate the weighted average. double TfmccSinkAgent::weighted_average(int start, int end, double factor, double *m, double *w, int *sample) { int i; double wsum = 0; double answer = 0; if (smooth_ == 1 && start == 0) { if (end == numsamples+1) { // the array is full, but we don't want to uses // the last loss interval in the array end = end-1; } // effectively shift the weight arrays for (i = start ; i < end; i++) if (i==0) wsum += m[i]*w[i+1]; else wsum += factor*m[i]*w[i+1]; for (i = start ; i < end; i++) if (i==0) answer += m[i]*w[i+1]*sample[i]/wsum; else answer += factor*m[i]*w[i+1]*sample[i]/wsum; return answer; } else { for (i = start ; i < end; i++) if (i==0) wsum += m[i]*w[i]; else wsum += factor*m[i]*w[i]; for (i = start ; i < end; i++) if (i==0) answer += m[i]*w[i]*sample[i]/wsum; else answer += factor*m[i]*w[i]*sample[i]/wsum; return answer; } } // Shift array a[] up, starting with a[sz-2] -> a[sz-1]. void TfmccSinkAgent::shift_array(int *a, int sz, int defval) { int i ; for (i = sz-2 ; i >= 0 ; i--) { a[i+1] = a[i] ; } a[0] = defval; } void TfmccSinkAgent::shift_array(double *a, int sz, double defval) { int i ; for (i = sz-2 ; i >= 0 ; i--) { a[i+1] = a[i] ; } a[0] = defval; } // Multiply array by value, starting with array index 1. // Array index 0 of the unshifted array contains the most recent interval. void TfmccSinkAgent::multiply_array(double *a, int sz, double multiplier) { int i ; for (i = 1; i <= sz-1; i++) { double old = a[i]; a[i] = old * multiplier ; } } /* * compute estimated throughput for report. */ double TfmccSinkAgent::est_thput () { double last = rtvec_[maxseq%hsz]; int rcvd = 0; int i = maxseq; // count number of packets received in the last 2 RTTs (min. of 10 packets) while (i > 0 && (rcvd < 10 || (last - rtvec_[i%hsz] < 2*conservative_rtt))) { if (lossvec_[i%hsz] == RCVD) rcvd++; i--; } if ((last - rtvec_[i%hsz]) > 0) { return rcvd/(last - rtvec_[i%hsz]) * avgsize_; } else { return 0; } } double TfmccSinkAgent::adjust_history(double ts) { int i; double p; for (i = maxseq; i >= 0 ; i--) { if (lossvec_[i%hsz] == LOST) { lossvec_[i%hsz] = NOLOSS; } } lastloss = ts; // ignore previous loss history last_sample = maxseq + 1; // no need to divide throughput by 2 since its actual receive rate, not sending rate p=b_to_p(est_thput(), conservative_rtt, 4 * conservative_rtt, fairsize_, bval_); false_sample = (int)(1.0/p + 0.5); sample[1] = false_sample; sample[0] = 0; sample_count = 2; false_sample = -1 ; rtt_lossinit = conservative_rtt; return p; } /* * Unused */ void TfmccSinkAgent::nextpkt() { sendpkt(); } /* * Create report message, and send it. */ void TfmccSinkAgent::sendpkt() { double now = Scheduler::instance().clock(); if (!active_) return; Packet* pkt = allocpkt(); if (pkt == NULL) { fprintf(stderr, "error allocating packet\n"); abort(); } hdr_tfmcc_ack *tfmcc_ackh = hdr_tfmcc_ack::access(pkt); tfmcc_ackh->recv_id = recv_id; tfmcc_ackh->timestamp = now; tfmcc_ackh->timestamp_echo = last_timestamp_; tfmcc_ackh->timestamp_offset = now - last_arrival_; tfmcc_ackh->rate = rate_; tfmcc_ackh->round_id = round_id; tfmcc_ackh->have_rtt = (rtt_ != 0); tfmcc_ackh->have_loss = loss_seen_yet; tfmcc_ackh->receiver_leave = receiver_leave; last_report_sent = now; rcvd_since_last_report = 0; send(pkt, 0); if (printStatus_) { if (suppression_rate < DBL_MAX) printf("r%i %f : %f %f -> %2.1f (supp %1f(%1f) fb %f round %i max_rtt %f r%il%i)%c [", tfmcc_ackh->recv_id, now, est_loss(), rtt_, tfmcc_ackh->rate * 8.0 / 1000.0, suppression_rate * 8.0 / 1000.0, comparison_rate * 8.0 / 1000.0, fb_time, round_id, max_rtt, tfmcc_ackh->have_rtt, tfmcc_ackh->have_loss, representative ? '!' : ' '); else printf("r%i %f : %f %f -> %2.1f (supp oo(%1f) fb %f round %i max_rtt %f r%il%i)%c [", tfmcc_ackh->recv_id, now, est_loss(), rtt_, tfmcc_ackh->rate * 8.0 / 1000.0, comparison_rate * 8.0 / 1000.0, fb_time, round_id, max_rtt, tfmcc_ackh->have_rtt, tfmcc_ackh->have_loss, representative ? '!' : ' '); for (int i = 0 ; i <= numsamples; i++) printf("%i ", sample[i]); printf("]\n"); } /* printf("%f %i |\t",now,maxseq); double avg = 0; for (int i = 0 ; i <= numsamples; i++) { printf("%i ",sample[i]); avg += sample[i]; } printf(" avg %f loss %f XXX\n", avg / (numsamples + 1), tfmcc_ackh->loss); */ } double TfmccSinkAgent::feedback_time(double rtt, int num_recv) { double T = fbtime_mult_ * rtt; double x = Random::uniform(); double t; t = T * (1.0 + log(x) / log((double) num_recv)); if (t < 0) t = 0; return t; } int TfmccSinkAgent::command(int argc, const char*const* argv) { if (argc==2) { if (strcmp(argv[1],"stop")==0) { stop(); return TCL_OK; } } else if (argc == 3) { if (strcmp(argv[1], "weights") == 0) { /* * weights is a string of numbers, seperated by + signs * the first number is the total number of weights. * the rest of them are the actual weights * this overrides the defaults */ char *w ; w = (char *)calloc(strlen(argv[2])+1, sizeof(char)) ; if (w == NULL) { fprintf (stderr, "error allocating w\n"); abort(); } strcpy(w, (char *)argv[2]); numsamples = atoi(strtok(w,"+")); sample = (int *)malloc((numsamples+1)*sizeof(int)); weights = (double *)malloc((numsamples+1)*sizeof(double)); mult = (double *)malloc((numsamples+1)*sizeof(double)); fflush(stdout); if (sample && weights && mult) { int count = 0 ; while (count < numsamples) { sample[count] = 0; mult[count] = 1; char *w; w = strtok(NULL, "+"); if (w == NULL) break ; else { weights[count++] = atof(w); } } if (count < numsamples) { fprintf(stderr, "error in weights string %s\n", argv[2]); abort(); } sample[count] = 0; weights[count] = 0; mult[count] = 1; //printf("weights: "); //for (int i = 0; i < numsamples+1; ++i) // printf("%.2f ", weights[i]); //printf("\n"); free(w); return (TCL_OK); } else { fprintf(stderr, "error allocating memory for smaple and weights:2\n"); abort(); } } } return (Agent::command(argc, argv)); } void TfmccSinkAgent::stop() { rate_ = 0; // -> receiver leaves session active_ = 0; receiver_leave = 1; if (representative) { if (printStatus_) printf("r%i %f : CLR leave-group\n", recv_id, Scheduler::instance().clock()); } }