Long-outstanding commit. (Hopefully) Final version before running experiments for...
Kevin Webb [Fri, 29 May 2009 16:56:05 +0000 (16:56 +0000)]
drl/config.c
drl/estimate.c
drl/peer_comm.c
drl/raterouter.h
drl/ratetypes.h
drl/samplehold.h
drl/ulogd_DRL.c

index 8b15cd2..42f4472 100644 (file)
@@ -79,10 +79,13 @@ int get_eligible_leaves(drl_instance_t *instance) {
 
         return ENOMEM;
     }
+    memset(leaves, 0, count * sizeof(leaf_t));
 
     for (i = 0; i < count; ++i) {
         leaves[i].xid = atoi(names[i]->d_name);
         leaves[i].parent = NULL;
+        leaves[i].drop_prob = 0.0;
+        leaves[i].delay = 0;
         
         free(names[i]);
 
index 21edcbf..3cbf95a 100644 (file)
@@ -38,7 +38,7 @@ static void estimate(identity_t *ident, const double estintms) {
 
     time_difference = timeval_subtract(now, ident->common.last_update);
 
-    if (time_difference > 1.05 * (estintms / 1000 * ident->mainloop_intervals)) {
+    if (time_difference > .01 + (estintms / 1000 * ident->mainloop_intervals)) {
         printlog(LOG_WARN, "Missed interval: Scheduled for %.2f ms, actual %.2fms\n",
                  estintms * ident->mainloop_intervals, time_difference * 1000);
     }
@@ -125,8 +125,8 @@ static double allocate_fps_over_limit(identity_t *ident) {
 static inline uint32_t close_enough(uint32_t limit) {
     uint32_t difference = limit - (limit * CLOSE_ENOUGH);
 
-    if (difference < 2500) {
-        return (limit - 2500);
+    if (difference < 10240) {
+        return (limit - 10240);
     } else {
         return (limit * CLOSE_ENOUGH);
     }
@@ -219,6 +219,7 @@ static uint32_t allocate_fps(identity_t *ident, double total_weight,
         }
         Old flowstart code.
 #endif
+        //printf("rate is %d, close enough is %d, difference is %d\n", table->rate, close_enough(ident->locallimit), close_enough(ident->locallimit) - table->rate);
 
         /* Boost low-limits so that they have room to grow. */
         if (table->rate < FLOW_START_THRESHOLD) {
@@ -564,19 +565,47 @@ static uint32_t allocate_fps_old(identity_t *ident, double total_weight) {
  */
 static double allocate_grd(identity_t *ident, double aggdemand) {
     double dropprob;
-    double global_limit = (double) (ident->limit);
+    double global_limit = ident->limit;
+    double min_dropprob = ident->drop_prob * GRD_BIG_DROP;
+
+    struct timeval tv;
+    double time_now;
+    common_accounting_t *table = &ident->common;
+
+    gettimeofday(&tv, NULL);
+    time_now = (double) tv.tv_sec + (double) ((double) tv.tv_usec / (double) 1000000);
 
     if (aggdemand > global_limit) {
         dropprob = (aggdemand-global_limit)/aggdemand;
     } else {
         dropprob = 0.0;
     }
-    
+
+    if (dropprob > 0.01 && dropprob < min_dropprob) {
+        dropprob = min_dropprob;
+    }
+
     if (system_loglevel == LOG_DEBUG) {
         printf("local rate: %d, aggregate demand: %.3f, drop prob: %.3f\n",
            ident->common.rate, aggdemand, dropprob);
     }
 
+    if (table->max_flow_rate > 0) {
+        printlog(LOG_WARN, "%.2f %d 0 0 %.2f %d %d %d %d %d %d %d %d %.2f ID:%d %.3f\n",
+             time_now, table->inst_rate, aggdemand,
+             table->num_flows, table->num_flows_5k, table->num_flows_10k,
+             table->num_flows_20k, table->num_flows_50k, table->avg_rate,
+             table->max_flow_rate, table->max_flow_rate_flow_hash, dropprob,
+             ident->id, (double) table->rate / (double) table->max_flow_rate);
+    } else {
+        printlog(LOG_WARN, "%.2f %d 0 0 %.2f %d %d %d %d %d %d %d %d %.2f ID:%d 0\n",
+             time_now, table->inst_rate, aggdemand,
+             table->num_flows, table->num_flows_5k, table->num_flows_10k,
+             table->num_flows_20k, table->num_flows_50k, table->avg_rate,
+             table->max_flow_rate, table->max_flow_rate_flow_hash, dropprob,
+             ident->id);
+    }
+
     return dropprob;
 }
 
@@ -617,7 +646,6 @@ static void allocate(limiter_t *limiter, identity_t *ident) {
         /* Update other limiters with our weight by writing to comm layer. */
         write_local_value(&ident->comm, ident->localweight);
     } else {
-        ident->locallimit = 0; /* Unused with GRD. */
         ident->last_drop_prob = ident->drop_prob;
         ident->drop_prob = allocate_grd(ident, comm_val);
         
@@ -739,17 +767,11 @@ static void enforce(limiter_t *limiter, identity_t *ident) {
                 }
 
                 /* Make the call to tc. */
-#ifdef DELAY40MS
                 snprintf(cmd, CMD_BUFFER_SIZE,
-                         "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %.4f delay 40ms",
+                         "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %.4f delay %dms",
                          ident->leaves[i]->xid, ident->leaves[i]->xid,
-                         (100 * ident->leaves[i]->drop_prob));
-#else
-                snprintf(cmd, CMD_BUFFER_SIZE,
-                         "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %.4f delay 0ms",
-                         ident->leaves[i]->xid, ident->leaves[i]->xid,
-                         (100 * ident->leaves[i]->drop_prob));
-#endif
+                         (100 * ident->leaves[i]->drop_prob), ident->leaves[i]->delay);
+
                 if (do_enforcement) {
                     ret = system(cmd);
 
index 2da6737..eba9637 100644 (file)
@@ -391,7 +391,9 @@ static void ident_receive(comm_ident_t *ident, pthread_rwlock_t *limiter_rwlock)
 }
 #endif
 
-#define ALLOW_PARTITION
+/* Turn this on to simulate network partitions.
+ * Turn off for production settings. */
+//#define ALLOW_PARTITION
 
 int send_udp_mesh(comm_t *comm, uint32_t id, int sock) {
     int result = 0;
@@ -440,6 +442,8 @@ int send_udp_mesh(comm_t *comm, uint32_t id, int sock) {
             }
         }
 
+        partition_count += 1;
+
 #endif
 
         toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */
@@ -450,7 +454,6 @@ int send_udp_mesh(comm_t *comm, uint32_t id, int sock) {
             printlog(LOG_WARN, "  - The error was |%d|\n", strerror(result));
             break;
         }
-        partition_count += 1;
     }
 
     return result;
index 2594f12..37f0141 100644 (file)
@@ -60,7 +60,9 @@ enum dampenings { DAMPEN_NONE = 0, DAMPEN_TEST = 1, DAMPEN_FAILED = 2, DAMPEN_PA
  */
 #define FLOW_START_THRESHOLD (6000)
 
-#define CLOSE_ENOUGH (0.99)
+#define CLOSE_ENOUGH (0.90)
+
+#define GRD_BIG_DROP (0.90)
 
 /**
  * All fields come from the ip protocol header.
@@ -79,7 +81,7 @@ enum dampenings { DAMPEN_NONE = 0, DAMPEN_TEST = 1, DAMPEN_FAILED = 2, DAMPEN_PA
  * specified in the config file.  The second is a standard table with
  * "perfect" accounting so that we can compare the two.  Turn this off for 
  * any type of production setting. */
-#define SHADOW_ACCTING
+//#define SHADOW_ACCTING
 
 /* forward declare some structs */
 struct limiter;
index dc7f6c9..c4f6b90 100644 (file)
@@ -188,6 +188,10 @@ typedef struct leaf {
 
     /** GRD: The leaf's packet drop probability. */
     double drop_prob;
+
+    /** Only used for experimentation. */
+    int delay;
+
 } leaf_t;
 
 /**
index 7e51623..fbd8d3d 100644 (file)
 #define RANDOM_GRANULARITY (1000)
 
 /* For simple S&H testing. */
-//#define SAMPLEHOLD_PERCENTAGE (20)
-//#define SAMPLEHOLD_OVERFACTOR (5)
-
 #define SAMPLEHOLD_PERCENTAGE (5)
-#define SAMPLEHOLD_OVERFACTOR (10)
+#define SAMPLEHOLD_OVERFACTOR (1.25)
+
+//#define SAMPLEHOLD_PERCENTAGE (5)
+//#define SAMPLEHOLD_OVERFACTOR (10)
 #define SAMPLEHOLD_BONUS_FACTOR (1.05)
 
 /** In-table representation of a flow that has been sampled. */
index 79cb4ef..83e5f13 100644 (file)
@@ -1269,7 +1269,7 @@ static int create_htb_hierarchy(drl_instance_t *instance) {
 }
 
 static int setup_tc_grd(drl_instance_t *instance) {
-    int i;
+    int i, j;
     char cmd[300];
 
     for (i = 0; i < instance->leaf_count; ++i) {
@@ -1282,15 +1282,11 @@ static int setup_tc_grd(drl_instance_t *instance) {
         }
 
         /* Add the netem qdisc. */
-#ifdef DELAY40MS
-        sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1%x handle 1%x netem loss 0 delay 40ms",
-                instance->leaves[i].xid, instance->leaves[i].xid);
-#else
         sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1%x handle 1%x netem loss 0 delay 0ms",
                 instance->leaves[i].xid, instance->leaves[i].xid);
-#endif
 
         if (execute_cmd(cmd)) {
+            printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd);
             return 1;
         }
     }
@@ -1303,13 +1299,10 @@ static int setup_tc_grd(drl_instance_t *instance) {
     }
 
     /* Add the netem qdisc. */
-#ifdef DELAY40MS
-    sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1000 handle 1000 netem loss 0 delay 40ms");
-#else
     sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1000 handle 1000 netem loss 0 delay 0ms");
-#endif
 
     if (execute_cmd(cmd)) {
+        printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd);
         return 1;
     }
 
@@ -1320,16 +1313,65 @@ static int setup_tc_grd(drl_instance_t *instance) {
     }
 
     /* Add the netem qdisc. */
-#ifdef DELAY40MS
-    sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1fff handle 1fff netem loss 0 delay 40ms");
-#else
     sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1fff handle 1fff netem loss 0 delay 0ms");
-#endif
 
     if (execute_cmd(cmd)) {
+        printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd);
         return 1;
     }
 
+    /* Artifical delay or loss for experimentation. */
+    if (netem_delay.u.value || netem_loss.u.value) {
+        if (!strcmp(netem_slice.u.string, "ALL")) {
+            sprintf(cmd, "/sbin/tc qdisc change dev eth0 parent 1:1000 handle 1000 netem loss %d delay %dms", netem_loss.u.value, netem_delay.u.value);
+            if (execute_cmd(cmd)) {
+                printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd);
+                return 1;
+            }
+
+            sprintf(cmd, "/sbin/tc qdisc change dev eth0 parent 1:1fff handle 1fff netem loss %d delay %dms", netem_loss.u.value, netem_delay.u.value);
+            if (execute_cmd(cmd)) {
+                printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd);
+                return 1;
+            }
+
+            for (j = 0; j < instance->leaf_count; ++j) {
+                leaf_t *current = &instance->leaves[j];
+
+                current->delay = netem_delay.u.value;
+
+                sprintf(cmd, "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %d delay %dms", current->xid, current->xid, netem_loss.u.value, netem_delay.u.value);
+
+                if (execute_cmd(cmd)) {
+                    printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd);
+                    return 1;
+                }
+            }
+        } else {
+            uint32_t slice_xid;
+            leaf_t *leaf = NULL;
+
+            sscanf(netem_slice.u.string, "%x", &slice_xid);
+
+            leaf = (leaf_t *) map_search(instance->leaf_map, &slice_xid, sizeof(slice_xid));
+
+            if (leaf == NULL) {
+                /* Leaf not found - invalid selection. */
+                printf("Your experimental setup is incorrect...\n");
+                return 1;
+            }
+
+            leaf->delay = netem_delay.u.value;
+
+            sprintf(cmd, "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %d delay %dms", slice_xid, slice_xid, netem_loss.u.value, netem_delay.u.value);
+
+            if (execute_cmd(cmd)) {
+                printlog(LOG_CRITICAL, "TC GRD call failed: %s\n", cmd);
+                return 1;
+            }
+        }
+    }
+
     return 0;
 }