Lots of changes. In no particular order:
Kevin Webb [Mon, 30 Mar 2009 19:59:20 +0000 (19:59 +0000)]
Added an accounting type called "multipleinterval" that makes decisions based on a window of history.

Separated communication from the "system tick".  Now identities can specify a separate inverval for accounting a
nd communication.

Added the ability to do "shadow accounting" when using sample and hold to see what would have happened using reg
ular accounting.

Cleaned up the FPS allocate code a lot (finally)!

Added partition simulation via SIGUSR2 and a partition_set variable in ulogd.conf (mesh only, gossip to come).

Added the option of emulating loss/delay (via netem) on a slice for testing.

Some fixes to sample&hold and a few other small things.

16 files changed:
drl/Makefile.in
drl/calendar.h
drl/config.c
drl/config.h
drl/drl_state.c
drl/estimate.c
drl/multipleinterval.c [new file with mode: 0644]
drl/multipleinterval.h [new file with mode: 0644]
drl/peer_comm.c
drl/raterouter.h
drl/ratetypes.h
drl/samplehold.c
drl/samplehold.h
drl/standard.c
drl/standard.h
drl/ulogd_DRL.c

index 0d1b6ec..0315752 100644 (file)
@@ -8,7 +8,7 @@ SH_CFLAGS:=$(CFLAGS) -fPIC
 #
 
 SHARED_LIBS=ulogd_DRL.so
-OBJECTS=config.o drl_state.o estimate.o logging.o peer_comm.o samplehold.o simple.o standard.o ulogd_DRL.o util.o
+OBJECTS=config.o drl_state.o estimate.o logging.o multipleinterval.o peer_comm.o samplehold.o simple.o standard.o ulogd_DRL.o util.o
 
 all: $(SHARED_LIBS)
 
index 717c8a2..ecb3b41 100644 (file)
@@ -10,9 +10,8 @@
 #define SCHEDLEN (1 << SCHEDBITS)
 #define SCHEDMASK (SCHEDLEN - 1)
 
-/** Defines a struct ident_calendar whose elements are of type struct identity
- * (identity_t) */
-TAILQ_HEAD(ident_calendar, identity);
-
+/** Defines a struct ident_calendar whose elements are of type struct
+ * ident_action. */
+TAILQ_HEAD(ident_calendar, ident_action);
 
 #endif  /* _CALENDAR_H_ */
index 1c56ce8..65d8eb9 100644 (file)
@@ -105,7 +105,8 @@ static int parse_common(xmlDocPtr doc, xmlNodePtr ident, ident_config *common) {
     xmlChar *branch;
     xmlChar *accounting;
     xmlChar *ewma;
-    xmlChar *intervals;
+    xmlChar *mainloop_intervals;
+    xmlChar *communication_intervals;
     xmlNodePtr fields = ident->children;
     ident_peer *current = NULL;
 
@@ -168,6 +169,8 @@ static int parse_common(xmlDocPtr doc, xmlNodePtr ident, ident_config *common) {
             common->accounting = ACT_SAMPLEHOLD;
         } else if (!xmlStrcmp(accounting, (const xmlChar *) "SIMPLE")) {
             common->accounting = ACT_SIMPLE;
+        } else if (!xmlStrcmp(accounting, (const xmlChar *) "MULTIPLEINTERVAL")) {
+            common->accounting = ACT_MULTIPLE;
         } else {
             printlog(LOG_CRITICAL, "Unknown/invalid accounting table.\n");
             xmlFree(accounting);
@@ -185,13 +188,22 @@ static int parse_common(xmlDocPtr doc, xmlNodePtr ident, ident_config *common) {
         xmlFree(ewma);
     }
 
-    intervals = xmlGetProp(ident, (const xmlChar *) "intervals");
-    if (intervals == NULL) {
-        printlog(LOG_CRITICAL, "Ident missing interval count.\n");
-        return EINVAL;
+    mainloop_intervals = xmlGetProp(ident, (const xmlChar *) "loop_intervals");
+    if (mainloop_intervals == NULL) {
+        printlog(LOG_WARN, "Ident id: %d missing loop_intervals, assuming 1.\n", common->id);
+        common->mainloop_intervals = 1;
+    } else {
+        common->mainloop_intervals = atoi((const char *) mainloop_intervals);
+        xmlFree(mainloop_intervals);
+    }
+
+    communication_intervals = xmlGetProp(ident, (const xmlChar *) "comm_intervals");
+    if (communication_intervals == NULL) {
+        printlog(LOG_WARN, "Ident id: %d missing comm_intervals, assuming 1.\n", common->id);
+        common->communication_intervals = 1;
     } else {
-        common->intervals = atoi((const char *) intervals);
-        xmlFree(intervals);
+        common->communication_intervals = atoi((const char *) communication_intervals);
+        xmlFree(communication_intervals);
     }
 
     while (fields != NULL) {
index e1a436f..82e6e1f 100644 (file)
@@ -75,9 +75,12 @@ typedef struct ident_config {
     /** The fixed (1-second) ewma weight value for this identity. */
     double fixed_ewma_weight;
 
-    /** The number of estimate intervals to wait between calls to estimate,
+    /** The number of limiter intervals to wait between calls to estimate,
      * allocate and enforce. */
-    int intervals;
+    int mainloop_intervals;
+
+    /** The number of limiter intervals to wait between communication. */
+    int communication_intervals;
 
     /** The type of this identity. */
     enum ident_types type;
index fed3326..ea68f51 100644 (file)
@@ -215,6 +215,7 @@ void *limiter_receive_thread(void *unused) {
     sigemptyset(&signal_mask);
     sigaddset(&signal_mask, SIGHUP);
     sigaddset(&signal_mask, SIGUSR1);
+    sigaddset(&signal_mask, SIGUSR2);
     pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
 
     pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
index e42d7a6..21edcbf 100644 (file)
 #include "raterouter.h" 
 #include "util.h"
 #include "ratetypes.h" /* needs util and pthread.h */
+#include "calendar.h"
 #include "logging.h"
 
-#define PRINT_COUNTER_RESET (0)
-
 extern uint8_t system_loglevel;
-static int printcounter = PRINT_COUNTER_RESET - 1;
 
 uint8_t do_enforcement = 0;
 
@@ -30,15 +28,30 @@ uint8_t do_enforcement = 0;
  * to estimate the current aggregate rate and the rate of the individual flows
  * in the table.
  */
-static void estimate(identity_t *ident) {
+static void estimate(identity_t *ident, const double estintms) {
     struct timeval now;
+    double time_difference;
 
+    pthread_mutex_lock(&ident->table_mutex); /* CLUNK ! */
+    
     gettimeofday(&now, NULL);
 
-    pthread_mutex_lock(&ident->table_mutex); /* CLUNK ! */
+    time_difference = timeval_subtract(now, ident->common.last_update);
+
+    if (time_difference > 1.05 * (estintms / 1000 * ident->mainloop_intervals)) {
+        printlog(LOG_WARN, "Missed interval: Scheduled for %.2f ms, actual %.2fms\n",
+                 estintms * ident->mainloop_intervals, time_difference * 1000);
+    }
 
     ident->table_update_function(ident->table, now, ident->ewma_weight);
 
+#ifdef SHADOW_ACCTING
+
+    standard_table_update_flows((standard_flow_table) ident->shadow_table, now,
+                                ident->ewma_weight);
+
+#endif
+
     pthread_mutex_unlock(&ident->table_mutex); /* CLINK ! */
 }
 
@@ -46,27 +59,10 @@ static void estimate(identity_t *ident) {
  * Determines the FPS weight allocation when the identity is under its current
  * local rate limit.
  */
-static double allocate_fps_under_limit(identity_t *ident, uint32_t local_rate, double peer_weights) {
-    uint32_t target = local_rate;
+static double allocate_fps_under_limit(identity_t *ident, uint32_t target, double peer_weights) {
     double ideal_weight;
     double total_weight = peer_weights + ident->last_localweight;
 
-    if (ident->flowstart) {
-        target = local_rate*4;
-        if (local_rate >= FLOW_START_THRESHOLD) {
-            ident->flowstart = false;
-        }
-    }
-    else {
-        /* June 16, 2008 (KCW)
-         * ident->flowstart gets set initially to one, but it is never set again.  However,
-         * if a limiter gets flows and then the number of flows drops to zero, it has trouble
-         * increasing the limit again. */
-        if (local_rate < FLOW_START_THRESHOLD) {
-            ident->flowstart = true;
-        }
-    }
-
     if (target >= ident->limit) {
         ideal_weight = total_weight;
     } else if (target <= 0) {
@@ -101,12 +97,14 @@ static double allocate_fps_under_limit(identity_t *ident, uint32_t local_rate, d
  */
 static double allocate_fps_over_limit(identity_t *ident) {
     double ideal_weight;
+    double total_over_max;
 
     if (ident->common.max_flow_rate > 0) {
         ideal_weight = (double) ident->locallimit / (double) ident->common.max_flow_rate;
+        total_over_max = (double) ident->common.rate / (double) ident->common.max_flow_rate;
 
-        printlog(LOG_DEBUG, "%.3f  %d  %d  %d  FlowCount, Limit, MaxRate, TotalRate\n",
-                ideal_weight, ident->locallimit, ident->common.max_flow_rate, ident->common.rate);
+        printlog(LOG_DEBUG, "ideal_over: %.3f, limit: %d, max_flow_rate: %d, total_rate: %d, total/max: %.3f\n",
+                 ideal_weight, ident->locallimit, ident->common.max_flow_rate, ident->common.rate, total_over_max);
     } else {
         ideal_weight = 1;
     }
@@ -115,10 +113,301 @@ static double allocate_fps_over_limit(identity_t *ident) {
 }
 
 /**
+ * When FPS checks to see which mode it should be operating in
+ * (over limit vs under limit), we don't want it to actually look to
+ * see if we're at the limit.  Instead, we want to see if we're getting
+ * close to the limit.  This defines how close is "close enough".
+ *
+ * For example, if the limit is 50000 and we're sending 49000, we probably
+ * want to be in the over limit mode, even if we aren't actually over the limit
+ * in order to switch to the more aggressive weight calculations.
+ */
+static inline uint32_t close_enough(uint32_t limit) {
+    uint32_t difference = limit - (limit * CLOSE_ENOUGH);
+
+    if (difference < 2500) {
+        return (limit - 2500);
+    } else {
+        return (limit * CLOSE_ENOUGH);
+    }
+}
+
+static void print_statistics(identity_t *ident, const double ideal_weight,
+                             const double total_weight, const double localweight,
+                             const char *identifier, common_accounting_t *table,
+                             const uint32_t resulting_limit) {
+    struct timeval tv;
+    double time_now;
+
+    gettimeofday(&tv, NULL);
+    time_now = (double) tv.tv_sec + (double) ((double) tv.tv_usec / (double) 1000000);
+
+    printlog(LOG_WARN, "%.2f %d %.2f %.2f %.2f %d %d %d %d %d %d %d %d %d %s:%d ",
+             time_now, table->inst_rate, ideal_weight, localweight, total_weight,
+             table->num_flows, table->num_flows_5k, table->num_flows_10k,
+             table->num_flows_20k, table->num_flows_50k, table->avg_rate,
+             table->max_flow_rate, table->max_flow_rate_flow_hash, resulting_limit,
+             identifier, ident->id);
+
+    if (table->max_flow_rate > 0) {
+        printlog(LOG_WARN, "%.3f\n", (double) table->rate / (double) table->max_flow_rate);
+    } else {
+        printlog(LOG_WARN, "0\n");
+    }
+
+    /* Print to the screen in debug mode. */
+    if (system_loglevel == LOG_DEBUG) {
+        printf("Local Rate: %d, Ideal Weight: %.3f, Local Weight: %.3f, Total Weight: %.3f\n",
+               table->rate, ideal_weight, ident->localweight, total_weight);
+    }
+}
+
+static uint32_t allocate_fps(identity_t *ident, double total_weight,
+                             common_accounting_t *table, const char *identifier) {
+
+    uint32_t resulting_limit = 0;
+    double ideal_weight = 0.0;
+    double peer_weights = total_weight - ident->last_localweight;
+
+    /* Keep track of these for measurements & comparisons only. */
+    double ideal_under = 0.0;
+    double ideal_over = 0.0;
+
+    /* Weight sanity. */
+    if (peer_weights < 0.0) {
+        peer_weights = 0.0;
+    }
+
+    if (ident->dampen_state == DAMPEN_TEST) {
+        int64_t rate_delta = (int64_t) table->inst_rate - (int64_t) table->last_inst_rate;
+        double threshold = (double) ident->limit * (double) LARGE_INCREASE_PERCENTAGE / 10;
+
+        if (rate_delta > threshold) {
+            ident->dampen_state = DAMPEN_PASSED;
+        } else {
+            ident->dampen_state = DAMPEN_FAILED;
+        }
+    }
+
+    /* Rate/weight sanity. */
+    if (table->rate <= 0) {
+        ideal_weight = 0.0;
+    }
+
+    /* Under the limit OR we failed our dampening test OR our current
+     * outgoing traffic rate is under the low "flowstart" watermark. */
+    else if (ident->dampen_state == DAMPEN_FAILED ||
+             table->rate < close_enough(ident->locallimit)) {
+#if 0
+             || ident->flowstart) {
+        uint32_t target_rate = table->rate;
+
+        if (ident->flowstart) {
+            target_rate *= 4;
+
+            if (table->rate >= FLOW_START_THRESHOLD) {
+                ident->flowstart = false;
+            }
+        } else {
+            /* June 16, 2008 (KCW)
+             * ident->flowstart gets set initially to one, but it is never set again.  However,
+             * if a limiter gets flows and then the number of flows drops to zero, it has trouble
+             * increasing the limit again. */
+            if (table->rate < FLOW_START_THRESHOLD) {
+                ident->flowstart = true;
+            }
+        }
+        Old flowstart code.
+#endif
+
+        /* Boost low-limits so that they have room to grow. */
+        if (table->rate < FLOW_START_THRESHOLD) {
+            ideal_weight = ideal_under = allocate_fps_under_limit(ident, table->rate * 4, peer_weights);
+        } else {
+            ideal_weight = ideal_under = allocate_fps_under_limit(ident, table->rate, peer_weights);
+        }
+
+        ideal_over = allocate_fps_over_limit(ident);
+
+        if (ideal_over < ideal_under) {
+            /* Degenerate case in which the agressive weight calculation was
+             * actually less than the under-the-limit case.  Use it instead
+             * and skip the dampening check in the next interval. */
+            ideal_weight = ideal_over;
+            ident->dampen_state = DAMPEN_SKIP;
+        } else {
+            ident->dampen_state = DAMPEN_NONE;
+        }
+
+        /* Apply EWMA. */
+        ident->localweight = (ident->localweight * ident->ewma_weight +
+                              ideal_weight * (1 - ident->ewma_weight));
+    }
+
+    /* At or over the limit.  Use the aggressive weight calculation. */
+    else {
+        double portion_last_interval = 0.0;
+        double portion_this_interval = 0.0;
+
+        ideal_weight = ideal_over = allocate_fps_over_limit(ident);
+        ideal_under = allocate_fps_under_limit(ident, table->rate, peer_weights);
+
+        /* Apply EWMA. */
+        ident->localweight = (ident->localweight * ident->ewma_weight +
+                              ideal_weight * (1 - ident->ewma_weight));
+
+        /* Now check whether the result of the aggressive weight calculation
+         * increases our portion of the weight "too much", in which case we
+         * dampen it. */
+
+        /* Our portion of weight in the whole system during the last interval.*/
+        portion_last_interval = ident->last_localweight / total_weight;
+
+        /* Our proposed portion of weight for the current interval. */
+        portion_this_interval = ident->localweight / (peer_weights + ident->localweight);
+
+        if (ident->dampen_state == DAMPEN_NONE &&
+            (portion_this_interval - portion_last_interval > LARGE_INCREASE_PERCENTAGE)) {
+            ident->localweight = ident->last_localweight + (LARGE_INCREASE_PERCENTAGE * total_weight);
+            ident->dampen_state = DAMPEN_TEST;
+        } else {
+            ident->dampen_state = DAMPEN_SKIP;
+        }
+    }
+
+    /* Add the weight calculated in this interval to the total. */
+    ident->total_weight = total_weight = ident->localweight + peer_weights;
+
+    /* Convert weight value into a rate limit.  If there is no measureable
+     * weight, do a L/n allocation. */
+    if (total_weight > 0) {
+        resulting_limit = (uint32_t) (ident->localweight * ident->limit / total_weight);
+    } else {
+        resulting_limit = (uint32_t) (ident->limit / (ident->comm.remote_node_count + 1));
+    }
+
+    print_statistics(ident, ideal_weight, total_weight, ident->localweight,
+                     identifier, table, resulting_limit);
+
+    return resulting_limit;
+}
+
+#ifdef SHADOW_ACCTING
+
+/* Runs through the allocate functionality without making any state changes to
+ * the identity.  Useful for comparisons, especially for comparing standard
+ * and sample&hold accounting schemes. */
+static void allocate_fps_pretend(identity_t *ident, double total_weight,
+                                 common_accounting_t *table, const char *identifier) {
+
+    uint32_t resulting_limit = 0;
+    double ideal_weight = 0.0;
+    double peer_weights = total_weight - ident->last_localweight_copy;
+    double ideal_under = 0.0;
+    double ideal_over = 0.0;
+
+    if (peer_weights < 0.0) {
+        peer_weights = 0.0;
+    }
+
+    if (ident->dampen_state_copy == DAMPEN_TEST) {
+        int64_t rate_delta = (int64_t) table->inst_rate - (int64_t) table->last_inst_rate;
+        double threshold = (double) ident->limit * (double) LARGE_INCREASE_PERCENTAGE / 10;
+
+        if (rate_delta > threshold) {
+            ident->dampen_state_copy = DAMPEN_PASSED;
+        } else {
+            ident->dampen_state_copy = DAMPEN_FAILED;
+        }
+    }
+
+    /* Rate/weight sanity. */
+    if (table->rate <= 0) {
+        ideal_weight = 0.0;
+    }
+
+    /* Under the limit OR we failed our dampening test OR our current
+     * outgoing traffic rate is under the low "flowstart" watermark. */
+    else if (ident->dampen_state_copy == DAMPEN_FAILED ||
+             table->rate < close_enough(ident->locallimit)) {
+
+        /* Boost low-limits so that they have room to grow. */
+        if (table->rate < FLOW_START_THRESHOLD) {
+            ideal_weight = ideal_under = allocate_fps_under_limit(ident, table->rate * 4, peer_weights);
+        } else {
+            ideal_weight = ideal_under = allocate_fps_under_limit(ident, table->rate, peer_weights);
+        }
+
+        ideal_over = allocate_fps_over_limit(ident);
+
+        if (ideal_over < ideal_under) {
+            /* Degenerate case in which the agressive weight calculation was
+             * actually less than the under-the-limit case.  Use it instead
+             * and skip the dampening check in the next interval. */
+            ideal_weight = ideal_over;
+            ident->dampen_state_copy = DAMPEN_SKIP;
+        } else {
+            ident->dampen_state_copy = DAMPEN_NONE;
+        }
+
+        /* Apply EWMA. */
+        ident->localweight_copy = (ident->localweight_copy * ident->ewma_weight +
+                              ideal_weight * (1 - ident->ewma_weight));
+    }
+
+    /* At or over the limit.  Use the aggressive weight calculation. */
+    else {
+        double portion_last_interval = 0.0;
+        double portion_this_interval = 0.0;
+
+        ideal_weight = ideal_over = allocate_fps_over_limit(ident);
+        ideal_under = allocate_fps_under_limit(ident, table->rate, peer_weights);
+
+        /* Apply EWMA. */
+        ident->localweight_copy = (ident->localweight_copy * ident->ewma_weight +
+                              ideal_weight * (1 - ident->ewma_weight));
+
+        /* Now check whether the result of the aggressive weight calculation
+         * increases our portion of the weight "too much", in which case we
+         * dampen it. */
+
+        /* Our portion of weight in the whole system during the last interval.*/
+        portion_last_interval = ident->last_localweight / total_weight;
+
+        /* Our proposed portion of weight for the current interval. */
+        portion_this_interval = ident->localweight_copy / (peer_weights + ident->localweight_copy);
+
+        if (ident->dampen_state_copy == DAMPEN_NONE &&
+            (portion_this_interval - portion_last_interval > LARGE_INCREASE_PERCENTAGE)) {
+            ident->localweight_copy = ident->last_localweight + (LARGE_INCREASE_PERCENTAGE * total_weight);
+            ident->dampen_state_copy = DAMPEN_TEST;
+        } else {
+            ident->dampen_state_copy = DAMPEN_SKIP;
+        }
+    }
+
+    /* Add the weight calculated in this interval to the total. */
+    total_weight = ident->localweight_copy + peer_weights;
+
+    /* Convert weight value into a rate limit.  If there is no measureable
+     * weight, do a L/n allocation. */
+    if (total_weight > 0) {
+        resulting_limit = (uint32_t) (ident->localweight_copy * ident->limit / total_weight);
+    } else {
+        resulting_limit = (uint32_t) (ident->limit / (ident->comm.remote_node_count + 1));
+    }
+
+    print_statistics(ident, ideal_weight, total_weight, ident->localweight_copy,
+                     identifier, table, resulting_limit);
+}
+
+#endif
+
+/**
  * Determines the amount of FPS weight to allocate to the identity during each
  * estimate interval.  Note that total_weight includes local weight.
  */
-static uint32_t allocate_fps(identity_t *ident, double total_weight) {
+static uint32_t allocate_fps_old(identity_t *ident, double total_weight) {
     common_accounting_t *ftable = &ident->common; /* Common flow table info */
     uint32_t local_rate = ftable->rate;
     uint32_t ideallocal = 0;
@@ -160,7 +449,7 @@ static uint32_t allocate_fps(identity_t *ident, double total_weight) {
     if (local_rate <= 0) {
         idealweight = 0;
     } else if (dampen_increase == 0 &&
-               (ident->locallimit <= 0 || local_rate < (ident->locallimit * CLOSE_ENOUGH) || ident->flowstart)) {
+               (ident->locallimit <= 0 || local_rate < close_enough(ident->locallimit) || ident->flowstart)) {
         /* We're under the limit - all flows are bottlenecked. */
         idealweight = allocate_fps_under_limit(ident, local_rate, peer_weights);
         ideal_over = allocate_fps_over_limit(ident);
@@ -236,6 +525,7 @@ static uint32_t allocate_fps(identity_t *ident, double total_weight) {
             local_rate, idealweight, ident->localweight, total_weight);
     }
 
+#if 0
     if (printcounter <= 0) {
         struct timeval tv;
         double time_now;
@@ -260,6 +550,8 @@ static uint32_t allocate_fps(identity_t *ident, double total_weight) {
         printlog(LOG_DEBUG, "MIN: min said to use flow counting, which was %.3f when other method said %.3f.\n",
                  ideal_over, ideal_under);
     }
+    See print_statistics()
+#endif
 
     printlog(LOG_DEBUG, "ideallocal is %d\n", ideallocal);
 
@@ -312,9 +604,16 @@ static void allocate(limiter_t *limiter, identity_t *ident) {
     ident->avg_bytes += ident->common.rate;
     
     if (limiter->policy == POLICY_FPS) {
-        ident->locallimit = allocate_fps(ident, comm_val);
+#ifdef SHADOW_ACCTING
+
+        allocate_fps_pretend(ident, comm_val, &ident->shadow_common, "SHADOW-ID");
+
+        ident->last_localweight_copy = ident->localweight_copy;
+#endif
+
+        ident->locallimit = allocate_fps(ident, comm_val, &ident->common, "ID");
         ident->last_localweight = ident->localweight;
-        
+
         /* Update other limiters with our weight by writing to comm layer. */
         write_local_value(&ident->comm, ident->localweight);
     } else {
@@ -385,9 +684,17 @@ static void enforce(limiter_t *limiter, identity_t *ident) {
             }
             printlog(LOG_DEBUG, "%d Limit ID:%d\n", ident->locallimit, ident->id);
 
+#if 0
             if (printcounter == PRINT_COUNTER_RESET) {
-                printlog(LOG_WARN, "%d\n", ident->locallimit);
+                if (ident->common.max_flow_rate > 0) {
+                    printlog(LOG_WARN, "%d ID:%d %.3f\n", ident->locallimit, ident->id,
+                             (double) ident->common.rate / (double) ident->common.max_flow_rate);
+                } else {
+                    printlog(LOG_WARN, "%d ID:%d 0\n", ident->locallimit, ident->id);
+                }
             }
+            This is now done in print_statistics()
+#endif
 
             snprintf(cmd, CMD_BUFFER_SIZE,
                      "/sbin/tc class change dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil %dbps quantum 1600",
@@ -398,7 +705,7 @@ static void enforce(limiter_t *limiter, identity_t *ident) {
 
                 if (ret) {
                     /* FIXME: call failed.  What to do? */
-                    printlog(LOG_CRITICAL, "***TC call failed?***\n");
+                    printlog(LOG_CRITICAL, "***TC call failed? Call was: %s***\n", cmd);
                 }
             }
             break;
@@ -476,6 +783,12 @@ static void clean(drl_instance_t *instance) {
 
         ident->table_cleanup_function(ident->table);
 
+#ifdef SHADOW_ACCTING
+
+        standard_table_cleanup((standard_flow_table) ident->shadow_table);
+
+#endif
+
         pthread_mutex_unlock(&ident->table_mutex);
     }
 
@@ -505,11 +818,11 @@ static void print_averages(drl_instance_t *instance, int print_interval) {
  * of identities.
  *
  * Each identity also has a private lock for its table.  This gets locked by
- * table-modifying functions such as estimate and clean.
+ * table-modifying functions such as estimate and clean. It's also locked in
+ * ulogd_DRL.c when the table is being updated with new packets.
  */
 void handle_estimation(void *arg) {
     limiter_t *limiter = (limiter_t *) arg;
-    identity_t *ident = NULL;
     int clean_timer, clean_wait_intervals;
     useconds_t sleep_time = limiter->estintms * 1000;
     uint32_t cal_slot = 0;
@@ -520,6 +833,7 @@ void handle_estimation(void *arg) {
     sigemptyset(&signal_mask);
     sigaddset(&signal_mask, SIGHUP);
     sigaddset(&signal_mask, SIGUSR1);
+    sigaddset(&signal_mask, SIGUSR2);
     pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
 
     /* Determine the number of intervals we should wait before hitting the
@@ -528,6 +842,8 @@ void handle_estimation(void *arg) {
     clean_timer = clean_wait_intervals;
 
     while (true) {
+        printlog(LOG_DEBUG, "--Beginning new tick.--\n");
+
         /* Sleep according to the delay of the estimate interval. */
         usleep(sleep_time);
 
@@ -540,26 +856,53 @@ void handle_estimation(void *arg) {
         /* Service all the identities that are scheduled to run during this
          * tick. */
         while (!TAILQ_EMPTY(limiter->stable_instance.cal + cal_slot)) {
-            ident = TAILQ_FIRST(limiter->stable_instance.cal + cal_slot);
-            TAILQ_REMOVE(limiter->stable_instance.cal + cal_slot, ident, calendar);
+            identity_action *iaction = TAILQ_FIRST(limiter->stable_instance.cal + cal_slot);
+            TAILQ_REMOVE(limiter->stable_instance.cal + cal_slot, iaction, calendar);
 
-            /* Update the ident's flow accouting table with the latest info. */
-            estimate(ident);
+            /* Only execute the action if it is valid. */
+            if (iaction->valid == 0) {
+                free(iaction);
+                continue;
+            }
+
+            switch (iaction->action) {
+                case ACTION_MAINLOOP:
+
+                    printlog(LOG_DEBUG, "Main loop: identity %d\n", iaction->ident->id);
+
+                    /* Update the ident's flow accouting table with the latest info. */
+                    estimate(iaction->ident, limiter->estintms);
+
+                    /* Determine its share of the rate allocation. */
+                    allocate(limiter, iaction->ident);
 
-            /* Determine its share of the rate allocation. */
-            allocate(limiter, ident);
+                    /* Make tc calls to enforce the rate we decided upon. */
+                    enforce(limiter, iaction->ident);
 
-            /* Make tc calls to enforce the rate we decided upon. */
-            enforce(limiter, ident);
+                    /* Add ident back to the queue at a future time slot. */
+                    TAILQ_INSERT_TAIL(limiter->stable_instance.cal +
+                            ((cal_slot + iaction->ident->mainloop_intervals) & SCHEDMASK),
+                            iaction, calendar);
+                    break;
 
-            /* Tell the comm library to propagate this identity's result for
-             * this interval.*/
-            send_update(&ident->comm, ident->id);
+                case ACTION_COMMUNICATE:
 
-            /* Add ident back to the queue at a future time slot. */
-            TAILQ_INSERT_TAIL(limiter->stable_instance.cal +
-                              ((cal_slot + ident->intervals) & SCHEDMASK),
-                              ident, calendar);
+                    printlog(LOG_DEBUG, "Communicating: identity %d\n", iaction->ident->id);
+
+                    /* Tell the comm library to propagate this identity's result for
+                     * this interval.*/
+                    send_update(&iaction->ident->comm, iaction->ident->id);
+
+                    /* Add ident back to the queue at a future time slot. */
+                    TAILQ_INSERT_TAIL(limiter->stable_instance.cal +
+                            ((cal_slot + iaction->ident->communication_intervals) & SCHEDMASK),
+                            iaction, calendar);
+                break;
+
+                default:
+                    printlog(LOG_CRITICAL, "Unknown identity action!?!\n");
+                    exit(EXIT_FAILURE);
+            }
         }
 
         print_interval--;
diff --git a/drl/multipleinterval.c b/drl/multipleinterval.c
new file mode 100644 (file)
index 0000000..131d7f9
--- /dev/null
@@ -0,0 +1,458 @@
+/* See the DRL-LICENSE file for this file's software license. */
+
+#include <arpa/inet.h>
+#include <assert.h>
+#include <inttypes.h>
+#include <netinet/in.h>
+#include <pthread.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <time.h>
+
+#include "common_accounting.h"
+#include "multipleinterval.h"
+#include "logging.h"
+
+multiple_flow_table multiple_table_create(uint32_t (*hash_function)(const key_flow *key), uint32_t interval_count, common_accounting_t *common) {
+    int i;
+    multiple_flow_table table = malloc(sizeof(struct mul_flow_table));
+
+    if (table == NULL) {
+        return NULL;
+    }
+
+    memset(table, 0, sizeof(struct mul_flow_table));
+    table->common = common;
+    table->hash_function = hash_function;
+    table->interval_count = interval_count;
+
+    gettimeofday(&table->common->last_update, NULL);
+
+    table->intervals = malloc(interval_count * sizeof(interval));
+
+    if (table->intervals == NULL) {
+        free(table);
+        return NULL;
+    }
+
+    memset(table->intervals, 0, interval_count * sizeof(interval));
+    table->intervals[0].valid = 1;
+    table->intervals[0].last_update = table->common->last_update;
+
+    for (i = 0; i < interval_count; ++i) {
+        table->intervals[i].next = &table->intervals[(i + 1) % interval_count];
+    }
+
+    table->current_interval = &table->intervals[0];
+
+    return table;
+}
+
+void multiple_table_destroy(multiple_flow_table table) {
+    multiple_flow *current, *next;
+
+    if ((current = table->flows_head)) {
+        while (current->next) {
+            next = current->next;
+            free(current->intervals);
+            free(current);
+            current = next;
+        }
+        free(current->intervals);
+        free(current);
+    }
+
+    free(table->intervals);
+    free(table);
+}
+
+/* Looks for the flow in the table.  If the flow isn't there, it allocates a
+ * place for it. */
+multiple_flow *multiple_table_lookup(multiple_flow_table table, const key_flow *key) {
+    uint32_t hash;
+    multiple_flow *flow;
+    struct in_addr src, dst;
+    char sip[22], dip[22];
+    int i;
+
+    if (table == NULL) {
+        return NULL;
+    }
+
+    hash = table->hash_function(key);
+
+    /* Find the flow, if it's there. */
+    for (flow = table->flows[hash]; flow; flow = flow->nexth) {
+        if (flow->source_ip == key->source_ip &&
+                flow->dest_ip == key->dest_ip &&
+                flow->source_port == key->source_port &&
+                flow->dest_port == key->dest_port &&
+                flow->protocol == key->protocol) {
+            break;
+        }
+    }
+
+    if (flow == NULL) {
+        flow = malloc(sizeof(multiple_flow));
+        if (flow == NULL) {
+            printlog(LOG_CRITICAL, "multipleinterval.c: Malloc returned NULL.\n");
+            return NULL;
+        }
+        memset(flow, 0, sizeof(multiple_flow));
+
+        flow->intervals = malloc(table->interval_count * sizeof(interval));
+        if (flow->intervals == NULL) {
+            free(flow);
+            printlog(LOG_CRITICAL, "multipleinterval.c: Malloc returned NULL.\n");
+            return NULL;
+        }
+        memset(flow->intervals, 0, table->interval_count * sizeof(interval));
+
+        flow->protocol = key->protocol;
+        flow->source_ip = key->source_ip;
+        flow->dest_ip = key->dest_ip;
+        flow->source_port = key->source_port;
+        flow->dest_port = key->dest_port;
+
+        flow->intervals[0].last_packet = key->packet_time;
+        flow->intervals[0].last_update = table->common->last_update;
+        flow->intervals[0].valid = 1;
+
+        for (i = 0; i < table->interval_count; ++i) {
+            flow->intervals[i].next = &flow->intervals[(i + 1) % table->interval_count];
+        }
+
+        flow->current_interval = &flow->intervals[0];
+
+        /* Add the flow to the hash list. */
+        flow->nexth = table->flows[hash];
+        table->flows[hash] = flow;
+
+        /* Add the flow to the linked list. */
+        if (table->flows_tail) {
+            flow->prev = table->flows_tail;
+            table->flows_tail->next = flow;
+            table->flows_tail = flow;
+        } else {
+            table->flows_head = table->flows_tail = flow;
+            /* next and prev are already null due to memset above. */
+        }
+
+        src.s_addr = ntohl(flow->source_ip);
+        dst.s_addr = ntohl(flow->dest_ip);
+        strcpy(sip, inet_ntoa(src));
+        strcpy(dip, inet_ntoa(dst));
+        printlog(LOG_DEBUG, "ALLOC:%s:%hu -> %s:%hu\n", sip,
+                flow->source_port, dip, flow->dest_port);
+    }
+
+    return flow;
+}
+
+int multiple_table_sample(multiple_flow_table table, const key_flow *key) {
+    multiple_flow *flow;
+
+    assert(table != NULL);
+    assert(table->common != NULL);
+
+    /* Update aggregate. */
+    //table->common->bytes_since += key->packet_size;
+    table->current_interval->bytes_since += key->packet_size;
+    table->current_interval->valid = 1;
+
+    /* Update flow. */
+    flow = multiple_table_lookup(table, key);
+    if (flow == NULL) {
+        return 0;
+    }
+
+    /* Update flow's last packet info so that we know when to delete. */
+    flow->last_packet = key->packet_time;
+
+    /* Update interval information. */
+    flow->current_interval->bytes_since += key->packet_size;
+    flow->current_interval->last_packet = key->packet_time;
+    flow->current_interval->valid = 1;
+
+    return 1;
+}
+
+void multiple_table_remove(multiple_flow_table table, multiple_flow *flow) {
+    key_flow key;
+    uint32_t hash;
+
+    assert(flow);
+
+    /* Remove the flow from the hash list. */
+    key.source_ip = flow->source_ip;
+    key.dest_ip = flow->dest_ip;
+    key.source_port = flow->source_port;
+    key.dest_port = flow->dest_port;
+    key.protocol = flow->protocol;
+
+    hash = table->hash_function(&key);
+
+    assert(table->flows[hash]);
+
+    if (table->flows[hash] == flow) {
+        /* It's the head of the hash list. */
+        table->flows[hash] = flow->nexth;
+    } else {
+        multiple_flow *current, *prev;
+        
+        prev = table->flows[hash];
+
+        for (current = table->flows[hash]->nexth; current; current = current->nexth) {
+            if (current == flow) {
+                prev->nexth = flow->nexth;
+                break;
+            } else {
+                prev = current;
+            }
+        }
+
+        if (current == NULL) {
+            printlog(LOG_CRITICAL, "Flow %p disappeared?\n", flow);
+        }
+        assert(current != NULL);
+    }
+
+    /* Remove the flow from the linked list. */
+    if (flow->prev == NULL && flow->next == NULL) {
+        /* It's the head, tail, and only element of the list. */
+        assert(table->flows_head == flow);
+        assert(table->flows_tail == flow);
+
+        table->flows_head = NULL;
+        table->flows_tail = NULL;
+    } else if (flow->prev == NULL) {
+        /* It's the head of the list. */
+        assert(table->flows_head == flow);
+
+        table->flows_head = flow->next;
+
+        if (table->flows_head != NULL) {
+            table->flows_head->prev = NULL;
+        }
+    } else if (flow->next == NULL) {
+        /* It's the tail of the list. */
+        assert(table->flows_tail == flow);
+
+        table->flows_tail = flow->prev;
+
+        table->flows_tail->next = NULL;
+    } else {
+        /* Not the head or tail of the list. */
+        assert(table->flows_head != flow);
+
+        flow->prev->next = flow->next;
+
+        if (flow->next != NULL) {
+            flow->next->prev = flow->prev;
+        }
+    }
+
+    /* Free the interval info. */
+    memset(flow->intervals, 0, table->interval_count * sizeof(interval));
+    free(flow->intervals);
+
+    /* Free the flow. */
+    memset(flow, 0, sizeof(multiple_flow));
+    free(flow);
+}
+
+int multiple_table_cleanup(multiple_flow_table table) {
+    multiple_flow *current = table->flows_head;
+    multiple_flow *remove;
+    time_t now = time(NULL);
+
+    while (current != NULL) {
+        if (current->last_packet + MUL_FLOW_IDLE_TIME <= now) {
+            /* Flow hasn't received a packet in the time limit - kill it. */
+            remove = current;
+            current = current->next;
+
+            multiple_table_remove(table, remove);
+        } else {
+            current = current->next;
+        }
+    }
+
+    return 0;
+}
+
+static interval *get_oldest_interval(interval *newest) {
+    interval *candidate = newest;
+    interval *oldest = NULL;
+
+    while (oldest == NULL) {
+        candidate = candidate->next;
+
+        if (candidate == newest) {
+            oldest = newest;
+        } else if (candidate->valid) {
+            oldest = candidate;
+        }
+    }
+
+    return oldest;
+}
+
+static uint32_t get_bytes_over_interval(interval *newest, interval *oldest) {
+    uint32_t result = newest->bytes_since;
+    interval *current = oldest;
+
+    while (current != newest) {
+        result += current->bytes_since;
+        current = current->next;
+    }
+
+    return result;
+}
+
+void multiple_table_update_flows(multiple_flow_table table, struct timeval now, double ewma_weight) {
+    uint32_t maxflowrate = 0;
+    double time_delta;
+    double unweighted_rate;
+    multiple_flow *current;
+    struct in_addr src, dst;
+    char sip[22], dip[22];
+    key_flow largest_flow_info;
+
+    /* Table interval variables. */
+    interval *table_newest = NULL;
+    interval *table_oldest = NULL;
+    uint32_t table_bytes_over_intervals = 0;
+
+    /* Reset statistics. */
+    table->common->num_flows = 0;
+    table->common->num_flows_5k = 0;
+    table->common->num_flows_10k = 0;
+    table->common->num_flows_20k = 0;
+    table->common->num_flows_50k = 0;
+    table->common->avg_rate = 0;
+    /* End statistics. */
+
+    table_newest = table->current_interval;
+    table_oldest = get_oldest_interval(table_newest);
+
+    table_bytes_over_intervals = get_bytes_over_interval(table_newest, table_oldest);
+
+    time_delta = timeval_subtract(now, table_oldest->last_update);
+
+    if (time_delta <= 0) {
+        unweighted_rate = 0;
+    } else {
+        unweighted_rate = table_bytes_over_intervals / time_delta;
+    }
+
+    table->common->last_inst_rate = table->common->inst_rate;
+    table->common->inst_rate = unweighted_rate;
+    printf("Unweighted rate is: %.3f, computed from %d bytes in %f seconds\n", unweighted_rate, table_bytes_over_intervals, time_delta);
+
+    table->common->last_rate = table->common->rate;
+
+    /* If the rate is zero, then we don't know anything yet.  Don't apply EWMA
+     * in that case. */
+    if (table->common->rate == 0) {
+        table->common->rate = unweighted_rate;
+    } else {
+        //FIXME: Continue to use ewma here?
+        table->common->rate = table->common->rate * ewma_weight + unweighted_rate * (1 - ewma_weight);
+    }
+
+    table->common->last_update = now;
+    table->current_interval = table->current_interval->next;
+    table->current_interval->last_update = now;
+    table->current_interval->bytes_since = 0;
+    table->current_interval->valid = 1;
+
+    /* Update per-flow information. */
+    for (current = table->flows_head; current; current = current->next) {
+        interval *newest = current->current_interval;
+        interval *oldest = get_oldest_interval(newest);
+        uint32_t bytes_over_intervals = 0;
+
+        /* This flow is invalid - don't consider it further. */
+        if (newest->valid == 0) {
+            printlog(LOG_WARN, "Found invalid flow in table.\n");
+            continue;
+        }
+
+        time_delta = timeval_subtract(now, oldest->last_update);
+        bytes_over_intervals = get_bytes_over_interval(newest, oldest);
+
+        if (time_delta <= 0) {
+            unweighted_rate = 0;
+        } else {
+            unweighted_rate = bytes_over_intervals / time_delta;
+        }
+
+        current->last_rate = current->rate;
+
+        if (current->rate == 0) {
+            current->rate = unweighted_rate;
+        } else {
+            //FIXME: Continue to use ewma here?
+            current->rate = current->rate * ewma_weight + unweighted_rate * (1 - ewma_weight);
+        }
+
+        /* Update the accounting info for intervals. */
+        current->current_interval = current->current_interval->next;
+        current->current_interval->last_update = now;
+        current->current_interval->bytes_since = 0;
+        current->current_interval->valid = 1;
+
+        if (current->rate > maxflowrate) {
+            maxflowrate = current->rate;
+            largest_flow_info.source_ip = current->source_ip;
+            largest_flow_info.dest_ip = current->dest_ip;
+            largest_flow_info.source_port = current->source_port;
+            largest_flow_info.dest_port = current->dest_port;
+            largest_flow_info.protocol = current->protocol;
+        }
+
+        if (current->rate > 51200) {
+            table->common->num_flows_50k += 1;
+            table->common->num_flows_20k += 1;
+            table->common->num_flows_10k += 1;
+            table->common->num_flows_5k += 1;
+            table->common->num_flows += 1;
+        } else if (current->rate > 20480) {
+            table->common->num_flows_20k += 1;
+            table->common->num_flows_10k += 1;
+            table->common->num_flows_5k += 1;
+            table->common->num_flows += 1;
+        } else if (current->rate > 10240) {
+            table->common->num_flows_10k += 1;
+            table->common->num_flows_5k += 1;
+            table->common->num_flows += 1;
+        } else if (current->rate > 5120) {
+            table->common->num_flows_5k += 1;
+            table->common->num_flows += 1;
+        } else {
+            table->common->num_flows += 1;
+        }
+
+        src.s_addr = ntohl(current->source_ip);
+        dst.s_addr = ntohl(current->dest_ip);
+        strcpy(sip, inet_ntoa(src));
+        strcpy(dip, inet_ntoa(dst));
+        printlog(LOG_DEBUG, "FLOW: (%p)  %s:%d -> %s:%d at %d\n", current,
+                sip, current->source_port,
+                dip, current->dest_port,
+                current->rate);
+    }
+
+    if (table->common->num_flows > 0) {
+        table->common->avg_rate = table->common->rate / table->common->num_flows;
+    }
+
+    printlog(LOG_DEBUG, "FLOW:--\n--\n");
+
+    table->common->max_flow_rate = maxflowrate;
+    table->common->max_flow_rate_flow_hash = table->hash_function(&largest_flow_info);
+}
diff --git a/drl/multipleinterval.h b/drl/multipleinterval.h
new file mode 100644 (file)
index 0000000..3fc007e
--- /dev/null
@@ -0,0 +1,174 @@
+/* See the DRL-LICENSE file for this file's software license. */
+
+/*
+ * Defines the multiple-interval "perfect" flow accounting table.
+ *
+ */
+
+#ifndef _MULTIPLE_ACCOUNTING_H_
+#define _MULTIPLE_ACCOUNTING_H_
+
+#include <inttypes.h>
+
+//FIXME: Update comments on structure variables
+
+/** The number of hash buckets in the table. */
+#define MUL_FLOW_HASH_SIZE 1024
+
+/** The number of seconds after which a flow is considered to be inactive.
+ * Inactive flows will be removed from the table during the next call to the
+ * cleanup function. */
+#define MUL_FLOW_IDLE_TIME 15
+
+#define MUL_INTERVAL_COUNT 10
+
+typedef struct mul_interval {
+    /** The number of bytes this flow has sent since the last update to this
+     * interval structure. */
+    uint32_t bytes_since;
+
+    /** The time at which this interval was last updated. */
+    struct timeval last_update;
+
+    /** The time at which the most recent packet in this flow was received
+     * in this interval. */
+    time_t last_packet;
+
+    uint32_t valid;
+
+    struct mul_interval *next;
+} interval;
+
+/** Representation of a flow in a multiple-interval table. */
+typedef struct mul_flow {
+    /* Flow information. */
+
+    /** The rate of the flow in the current estimate interval. */
+    uint32_t rate;
+
+    /** The rate of the flow in the previous estimate interval. */
+    uint32_t last_rate;
+
+    time_t last_packet;
+
+    interval *current_interval;
+
+    interval *intervals;
+
+    /* Identification information. */
+
+    /** The flow's source IP address. */
+    uint32_t source_ip;
+
+    /** The flow's destination IP address. */
+    uint32_t dest_ip;
+
+    /** The flow's source port. */
+    uint16_t source_port;
+
+    /** The flow's destination port. */
+    uint16_t dest_port;
+
+    /** The flow's protocol.  This corresponds to the protocol field of the IP
+     * header. */
+    uint8_t protocol;
+
+    /* Table state. */
+
+    /** Pointer to the next flow in the hash list. */
+    struct mul_flow *nexth;
+
+    /** Pointers to the next flow in the linked list. */
+    struct mul_flow *next;
+
+    /** Pointers to the previous flow in the linked list. */
+    struct mul_flow *prev;
+
+} multiple_flow;
+
+/**
+ * The "table" that stores the flows.  It's constructed of two main pieces.
+ *
+ * The first is an array of hash buckets.  Flows are classified into buckets
+ * by hashing the flow's values (key flow) using the table's hash function.
+ * Flows are chained together as a list in each bucket to deal with collisions.
+ *
+ * The second is a simple doubly linked list containing every flow in the table.
+ */
+struct mul_flow_table {
+
+    /** Pointer to the common flow information for the identity that owns this
+     * sampled flow table.  This is updated with aggregate information. */
+    common_accounting_t *common;
+
+    uint32_t interval_count;
+
+    interval *current_interval;
+
+    interval *intervals;
+
+    /* Table structures. */
+
+    /** Hash buckets - each is a list of flows. */
+    struct mul_flow *flows[MUL_FLOW_HASH_SIZE];
+
+    /** The head of the linked list of flows. */
+    struct mul_flow *flows_head;
+    
+    /** The tail of the linked list of flows. */
+    struct mul_flow *flows_tail;
+
+    /** Function pointer to the function that will yield the hash of a
+     * key_flow.
+     */
+    uint32_t (*hash_function)(const key_flow *key);
+
+};
+
+/** The type multiple_flow_table is really a pointer to a struct
+ * mul_flow_table. */
+typedef struct mul_flow_table *multiple_flow_table;
+
+/**
+ * Creates a new table that will use the specified hash function.
+ *
+ * Returns the new table or NULL on failure.
+ */
+multiple_flow_table multiple_table_create(uint32_t (*hash_function)(const key_flow *key), uint32_t interval_count, common_accounting_t *common);
+
+/**
+ * Destroys the specified table.
+ */
+void multiple_table_destroy(multiple_flow_table table);
+
+/**
+ * Looks for a flow that matches the given key in the specified table.  If a
+ * matching flow is not found, this function will allocate a new flow for the
+ * key.
+ *
+ * Returns a pointer to the flow that matches the key or NULL if there is no
+ * matching flow and a new flow couldn't be allocated.
+ */
+multiple_flow *multiple_table_lookup(multiple_flow_table table, const key_flow *key);
+
+/**
+ * Updates the state of the table given a newly acquired packet.
+ *
+ * Returns 1 if the flow is in the table.  0 otherwise (indicating that memory
+ * could not be allocated to add a new flow to the table for the given key.
+ */
+int multiple_table_sample(multiple_flow_table table, const key_flow *key);
+
+/**
+ * Cleans the table by removing flow entires for any flow that hasn't seen a
+ * new packet in the interval specified by MUL_FLOW_IDLE_TIME seconds.
+ */
+int multiple_table_cleanup(multiple_flow_table table);
+
+/**
+ * Updates the rate information for all flows in the table according to the
+ * specified current time and EWMA weight.
+ */
+void multiple_table_update_flows(multiple_flow_table table, struct timeval now, double ewma_weight);
+
+#endif  /* _MULTIPLE_ACCOUNTING_H_ */
index 869cde9..41ed778 100644 (file)
 #include "peer_comm.h"
 #include "logging.h"
 
+/* Artifically makes a network partition. */
+int do_partition = 0;
+int partition_set = 0xfffffff;
+
 extern limiter_t limiter;
 
 static const uint32_t MAGIC_MSG = 0x123123;
@@ -386,11 +390,22 @@ static void ident_receive(comm_ident_t *ident, pthread_rwlock_t *limiter_rwlock)
 }
 #endif
 
+#define ALLOW_PARTITION
+
 int send_udp_mesh(comm_t *comm, uint32_t id, int sock) {
     int result = 0;
     remote_limiter_t *remote;
     message_t msg;
     struct sockaddr_in toaddr;
+    int i;
+
+#ifdef ALLOW_PARTITION
+
+    int partition_count = 0;
+    struct in_addr dest;
+    char dest_ip[22];
+
+#endif
 
     memset(&toaddr, 0, sizeof(struct sockaddr_in));
     toaddr.sin_family = AF_INET;
@@ -404,8 +419,28 @@ int send_udp_mesh(comm_t *comm, uint32_t id, int sock) {
     message_to_nbo(&msg);
 
     /* Iterate though and send update to all remote limiters in our identity. */
-    map_reset_iterate(comm->remote_node_map);
-    while ((remote = map_next(comm->remote_node_map))) {
+    for (i = 0; i < comm->remote_node_count; ++i) {
+        remote = &comm->remote_limiters[i];
+
+#ifdef ALLOW_PARTITION
+
+        if (do_partition) {
+            printlog(LOG_DEBUG, "Testing partition, partition set is %x, count is %d, test is %d.\n",
+                     partition_set, partition_count, partition_set & (1 << partition_count));
+            /* If the partition count bit isn't high in the set, don't actually send anything. */
+            if ((partition_set & (1 << partition_count)) == 0) {
+                dest.s_addr = ntohl(remote->addr);
+                strcpy(dest_ip, inet_ntoa(dest));
+
+                printlog(LOG_DEBUG, "Partition: ignoring host %s\n", dest_ip);
+
+                partition_count += 1;
+                continue;
+            }
+        }
+
+#endif
+
         toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */
         toaddr.sin_port = remote->port;
         if (sendto(sock, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
@@ -414,6 +449,7 @@ int send_udp_mesh(comm_t *comm, uint32_t id, int sock) {
             printlog(LOG_WARN, "  - The error was |%d|\n", strerror(result));
             break;
         }
+        partition_count += 1;
     }
 
     return result;
@@ -435,6 +471,8 @@ int send_udp_gossip(comm_t *comm, uint32_t id, int sock) {
     for (i = 0; i < comm->gossip.gossip_branch; ++i) {
         message_t msg;
 
+        printlog(LOG_DEBUG, "Gossip loop iteration, i=%d, branch=%d\n", i, comm->gossip.gossip_branch);
+
         if (comm->retrys[i] >= 0) {
             remote = &comm->remote_limiters[comm->retrys[i]];
             targetid = comm->retrys[i];
index 789cd95..2594f12 100644 (file)
@@ -34,7 +34,8 @@
 
 enum policies { POLICY_GRD = 1, POLICY_FPS = 2 };
 enum commfabrics { COMM_MESH = 1, COMM_GOSSIP = 2 };
-enum accountings { ACT_STANDARD = 1, ACT_SAMPLEHOLD = 2, ACT_SIMPLE = 3 };
+enum accountings { ACT_STANDARD = 1, ACT_SAMPLEHOLD = 2, ACT_SIMPLE = 3, ACT_MULTIPLE = 4};
+enum dampenings { DAMPEN_NONE = 0, DAMPEN_TEST = 1, DAMPEN_FAILED = 2, DAMPEN_PASSED = 3, DAMPEN_SKIP = 4};
 
 /* The comm library also has definitions for comfabrics. This prevents us
  * from defining them twice. */
@@ -74,6 +75,12 @@ enum accountings { ACT_STANDARD = 1, ACT_SAMPLEHOLD = 2, ACT_SIMPLE = 3 };
  */
 #define FLOWKEYSIZE (13)
 
+/* Causes each identity to track every flow in two tables.  One table is as
+ * specified in the config file.  The second is a standard table with
+ * "perfect" accounting so that we can compare the two.  Turn this off for 
+ * any type of production setting. */
+#define SHADOW_ACCTING
+
 /* forward declare some structs */
 struct limiter;
 struct identity;
index 59a481a..319d071 100644 (file)
 #include "rate_accounting/simple.h"
 #endif
 
-#include "calendar.h"
+#include "bsd_queue.h"
 #include "config.h"
 #include "drl_state.h"
 #include "common_accounting.h"
 #include "standard.h"
 #include "samplehold.h"
 #include "simple.h"
+#include "multipleinterval.h"
 
 
 /** Represents a DRL entitiy/group. */
@@ -85,6 +86,9 @@ typedef struct identity {
      * flows to grow before incurring losses. */
     int flowstart;
 
+    /** Keeps track of the state of FPS dampening for this identity. */
+    enum dampenings dampen_state;
+
     /* GRD */
 
     /** GRD drop probability information. */
@@ -126,17 +130,47 @@ typedef struct identity {
     /** Function to call when the table should be destroyed. */
     void (*table_destroy_function)(void *);
 
+#ifdef SHADOW_ACCTING
+
+    common_accounting_t shadow_common;
+
+    void *shadow_table;
+
+    double localweight_copy;
+    double last_localweight_copy;
+
+    enum dampenings dampen_state_copy;
+
+#endif
+
     /* Scheduling bookkeeping. */
 
-    /* Pointers to other identities in the scheduling calendar. */
-    TAILQ_ENTRY(identity) calendar;
+    /** Scheduling unit that tells the limiter when to execute the main loop.*/
+    struct ident_action *loop_action;
 
-    /* The number of limiter ticks at which this identity should be scheduled.
-     */
-    uint32_t intervals;
+    /** Scheduling unit that tells the limiter when to communicate.*/
+    struct ident_action *comm_action;
+
+    /** The number of limiter ticks that should pass before this identity should
+     * be scheduled to execute its main estimate/allocate/enforce loop. */
+    uint32_t mainloop_intervals; 
+
+    /** The number of limiter ticks that should pass before this identity should
+     * be scheduled for communication. */
+    uint32_t communication_intervals;
 
 } identity_t;
 
+enum identity_actions { ACTION_MAINLOOP = 1, ACTION_COMMUNICATE = 2 };
+
+typedef struct ident_action {
+    struct identity *ident;
+    enum identity_actions action;
+    int valid;
+
+    TAILQ_ENTRY(ident_action) calendar;
+} identity_action;
+
 /**
  * Represents the bottom most entity in the HTB hierarchy.  For PlanetLab,
  * this corresponds to sliver (identified by Vserver context id, or xid).
index 8ebab9d..8c566d4 100644 (file)
@@ -1,10 +1,13 @@
 /* See the DRL-LICENSE file for this file's software license. */
 
+#include <arpa/inet.h>
 #include <inttypes.h>
+#include <netinet/in.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
 #include <sys/time.h>
+#include <sys/types.h>
 #include <time.h>
 
 #include "common_accounting.h"
@@ -74,14 +77,16 @@ sampled_flow_table sampled_table_create(uint32_t (*hash_function)(const key_flow
         return NULL;
     }
 
-    table->capacity = (uint32_t) ((base_size * oversampling_factor) * 1.03);
+    table->capacity = (uint32_t) (base_size * oversampling_factor);
     table->size = 0;
     table->hash_function = hash_function;
     table->sample_prob = (double) (((double) table->capacity / (double) max_bytes) * (double) RANDOM_GRANULARITY);
     table->threshold = (double) ((double) flow_percentage / 100) * max_bytes;
 
+
+    /* Allocate the backing and give it a little bit extra to deal with variance. */
     table->largest = NULL;
-    table->backing = malloc(sizeof(sampled_flow) * table->capacity);
+    table->backing = malloc(sizeof(sampled_flow) * table->capacity * 1.05);
 
     if (table->backing == NULL) {
         free(table);
@@ -271,6 +276,8 @@ void sampled_table_update_flows(sampled_flow_table table, struct timeval now, do
     uint32_t rate_delta = 0;
     double time_delta = 0;
     double unweighted_rate = 0;
+    struct in_addr src, dst;
+    char sip[22], dip[22];
 
     /* Update common aggregate information. */
     time_delta = timeval_subtract(now, table->common->last_update);
@@ -295,8 +302,11 @@ void sampled_table_update_flows(sampled_flow_table table, struct timeval now, do
                               unweighted_rate * (1 - ewma_weight);
     }
 
+    printlog(LOG_DEBUG, "table->common->rate is now %u\n", table->common->rate);
+
     table->common->bytes_since = 0;
     table->common->last_update = now;
+    table->common->num_flows = 0;
 
     /* Update per-flow information. */
     table->largest = &table->backing[i];
@@ -329,6 +339,18 @@ void sampled_table_update_flows(sampled_flow_table table, struct timeval now, do
                 largest_rate = table->backing[i].rate;
                 table->largest = &table->backing[i];
             }
+
+            table->common->num_flows += 1;
+
+            /* Print debugging info. */
+            src.s_addr = ntohl(table->backing[i].source_ip);
+            dst.s_addr = ntohl(table->backing[i].dest_ip);
+            strcpy(sip, inet_ntoa(src));
+            strcpy(dip, inet_ntoa(dst));
+            printlog(LOG_DEBUG, "FLOW: (%p)  %s:%d -> %s:%d at %d\n", &table->backing[i],
+                    sip, table->backing[i].source_port,
+                    dip, table->backing[i].dest_port,
+                    table->backing[i].rate);
         }
     }
 
index 5a554ef..f96fa26 100644 (file)
 #define FLOW_DELETED 1
 #define FLOW_USED 2
 
-#define RANDOM_GRANULARITY 1000
+#define RANDOM_GRANULARITY (1000)
+
+#define SAMPLEHOLD_PERCENTAGE (5)
+#define SAMPLEHOLD_OVERFACTOR (10)
 
 /** In-table representation of a flow that has been sampled. */
 typedef struct sampled_flow {
index f8f813c..9c996e3 100644 (file)
@@ -221,7 +221,7 @@ int standard_table_cleanup(standard_flow_table table) {
     time_t now = time(NULL);
 
     while (current != NULL) {
-        if (current->last_packet + FLOW_IDLE_TIME <= now) {
+        if (current->last_packet + STD_FLOW_IDLE_TIME <= now) {
             /* Flow hasn't received a packet in the time limit - kill it. */
             remove = current;
             current = current->next;
index d54aa3e..27a3d8c 100644 (file)
 #include <inttypes.h>
 
 /** The number of hash buckets in the table. */
-#define FLOW_HASH_SIZE 1024
+#define STD_FLOW_HASH_SIZE 1024
 
 /** The number of seconds after which a flow is considered to be inactive.
  * Inactive flows will be removed from the table during the next call to the
  * cleanup function. */
-#define FLOW_IDLE_TIME 15
+#define STD_FLOW_IDLE_TIME 15
 
 /** Representation of a flow in a standard table. */
 typedef struct std_flow {
@@ -86,7 +86,7 @@ struct std_flow_table {
     /* Table structures. */
 
     /** Hash buckets - each is a list of flows. */
-    struct std_flow *flows[FLOW_HASH_SIZE];
+    struct std_flow *flows[STD_FLOW_HASH_SIZE];
 
     /** The head of the linked list of flows. */
     struct std_flow *flows_head;
@@ -137,7 +137,7 @@ int standard_table_sample(standard_flow_table table, const key_flow *key);
 
 /**
  * Cleans the table by removing flow entires for any flow that hasn't seen a
- * new packet in the interval specified by FLOW_IDLE_TIME seconds.
+ * new packet in the interval specified by STD_FLOW_IDLE_TIME seconds.
  */
 int standard_table_cleanup(standard_flow_table table);
 
index 14de4fe..f785d30 100644 (file)
 /* DRL specifics */
 #include "raterouter.h"
 #include "util.h"
-#include "calendar.h"
 #include "ratetypes.h" /* needs util and pthread.h */
+#include "calendar.h"
 #include "logging.h"
 
 /*
  * Add the config options for DRL. 
  */
 
-static config_entry_t drl_configfile = {
+static config_entry_t partition = {
     .next = NULL,
+    .key = "partition_set",
+    .type = CONFIG_TYPE_INT,
+    .options = CONFIG_OPT_NONE,
+    .u = { .value = 0xfffffff },
+};
+
+static config_entry_t netem_slice = {
+    .next = &partition,
+    .key = "netem_slice",
+    .type = CONFIG_TYPE_STRING,
+    .options = CONFIG_OPT_NONE,
+    .u = { .string = "ALL" },
+};
+
+static config_entry_t netem_loss = {
+    .next = &netem_slice,
+    .key = "netem_loss",
+    .type = CONFIG_TYPE_INT,
+    .options = CONFIG_OPT_NONE,
+    .u = { .value = 0 },
+};
+
+static config_entry_t netem_delay = {
+    .next = &netem_loss,
+    .key = "netem_delay",
+    .type = CONFIG_TYPE_INT,
+    .options = CONFIG_OPT_NONE,
+    .u = { .value = 0 },
+};
+
+static config_entry_t drl_configfile = {
+    .next = &netem_delay,
     .key = "drl_configfile",
     .type = CONFIG_TYPE_STRING,
     .options = CONFIG_OPT_MANDATORY,
@@ -231,25 +263,34 @@ extern FILE *logfile;
 extern uint8_t system_loglevel;
 extern uint8_t do_enforcement;
 
+/* From peer_comm.c - used to simulate partition. */
+extern int do_partition;
+extern int partition_set;
+
 /* functions */
 
 static inline uint32_t
-hash_flow(uint8_t protocol, uint32_t src_ip, uint16_t src_port, uint32_t dst_ip, uint16_t dst_port)
+hash_flow(uint8_t protocol, uint32_t src_ip, uint16_t src_port, uint32_t dst_ip, uint16_t dst_port, uint32_t hash_max)
 {
     unsigned char mybytes[FLOWKEYSIZE];
     mybytes[0] = protocol;
     *(uint32_t*)(&(mybytes[1])) = src_ip;
     *(uint32_t*)(&(mybytes[5])) = dst_ip;
     *(uint32_t*)(&(mybytes[9])) = (src_port << 16) | dst_port;
-    return jhash(mybytes,FLOWKEYSIZE,salt) & (FLOW_HASH_SIZE - 1);
+    return jhash(mybytes,FLOWKEYSIZE,salt) & (hash_max - 1);
 }
 
 uint32_t sampled_hasher(const key_flow *key) {
-    return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port);
+    /* Last arg is UINT_MAX because sampled flow keeps track of its own capacity. */
+    return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port, UINT_MAX);
 }
 
 uint32_t standard_hasher(const key_flow *key) {
-    return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port);
+    return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port, STD_FLOW_HASH_SIZE);
+}
+
+uint32_t multiple_hasher(const key_flow *key) {
+    return hash_flow(key->protocol, key->source_ip, key->source_port, key->dest_ip, key->dest_port, MUL_FLOW_HASH_SIZE);
 }
 
 struct intr_id {
@@ -418,6 +459,13 @@ static int _output_drl(ulog_iret_t *res)
         /* Update the identity's table. */
         ident->table_sample_function(ident->table, &key);
 
+#ifdef SHADOW_ACCTING
+
+        /* Update the shadow perfect copy of the accounting table. */
+        standard_table_sample((standard_flow_table) ident->shadow_table, &key);
+
+#endif
+
         pthread_mutex_unlock(&ident->table_mutex);
 
         ident = ident->parent;
@@ -455,6 +503,14 @@ static void free_identity(identity_t *ident) {
             ident->table_destroy_function(ident->table);
         }
 
+        if (ident->loop_action) {
+            ident->loop_action->valid = 0;
+        }
+
+        if (ident->comm_action) {
+            ident->comm_action->valid = 0;
+        }
+
         pthread_mutex_destroy(&ident->table_mutex);
 
         free(ident);
@@ -483,6 +539,8 @@ static void free_instance(drl_instance_t *instance) {
         free(instance->machines);
     if (instance->sets)
         free(instance->sets);
+
+    /* FIXME: Drain the calendar first and free all the entries. */
     if (instance->cal) {
         free(instance->cal);
     }
@@ -523,9 +581,10 @@ static identity_t *new_identity(ident_config *config) {
     ident->id = config->id;
     ident->limit = (uint32_t) (((double) config->limit * 1000.0) / 8.0);
     ident->fixed_ewma_weight = config->fixed_ewma_weight;
-    ident->intervals = config->intervals;
+    ident->communication_intervals = config->communication_intervals;
+    ident->mainloop_intervals = config->mainloop_intervals;
     ident->ewma_weight = pow(ident->fixed_ewma_weight, 
-                             (limiter.estintms/1000.0) * config->intervals);
+                             (limiter.estintms/1000.0) * config->mainloop_intervals);
     ident->parent = NULL;
 
     pthread_mutex_init(&ident->table_mutex, NULL);
@@ -546,10 +605,24 @@ static identity_t *new_identity(ident_config *config) {
                 (void (*)(void *)) standard_table_destroy;
             break;
 
+        case ACT_MULTIPLE:
+            ident->table =
+                multiple_table_create(multiple_hasher, MUL_INTERVAL_COUNT, &ident->common);
+
+            ident->table_sample_function =
+                (int (*)(void *, const key_flow *)) multiple_table_sample;
+            ident->table_cleanup_function =
+                (int (*)(void *)) multiple_table_cleanup;
+            ident->table_update_function =
+                (void (*)(void *, struct timeval, double)) multiple_table_update_flows;
+            ident->table_destroy_function =
+                (void (*)(void *)) multiple_table_destroy;
+            break;
+
         case ACT_SAMPLEHOLD:
             ident->table = sampled_table_create(sampled_hasher,
                     ident->limit * IDENT_CLEAN_INTERVAL,
-                    1, 20, &ident->common);
+                    SAMPLEHOLD_PERCENTAGE, SAMPLEHOLD_OVERFACTOR, &ident->common);
 
             ident->table_sample_function =
                 (int (*)(void *, const key_flow *)) sampled_table_sample;
@@ -575,6 +648,18 @@ static identity_t *new_identity(ident_config *config) {
             break;
     }
 
+#ifdef SHADOW_ACCTING
+
+    ident->shadow_table = standard_table_create(standard_hasher, &ident->shadow_common);
+
+    if (ident->shadow_table == NULL) {
+        ident->table_destroy_function(ident->table);
+        free(ident);
+        return NULL;
+    }
+
+#endif
+
     /* Make sure the table was allocated. */
     if (ident->table == NULL) {
         free(ident);
@@ -621,10 +706,11 @@ static int validate_config(ident_config *config) {
     }
 
     /* Accounting must be a valid choice (ACT_STANDARD, ACT_SAMPLEHOLD,
-     * ACT_SIMPLE). */
+     * ACT_SIMPLE, ACT_MULTIPLE). */
     if (config->accounting != ACT_STANDARD &&
             config->accounting != ACT_SAMPLEHOLD &&
-            config->accounting != ACT_SIMPLE) {
+            config->accounting != ACT_SIMPLE &&
+            config->accounting != ACT_MULTIPLE) {
         return 1;
     }
 
@@ -823,12 +909,21 @@ static int init_identities(parsed_configs configs, drl_instance_t *instance) {
 
     /* Allocate and add the machine identities. */
     for (i = 0; i < configs.machine_count; ++i) {
+        identity_action *loop_action;
+        identity_action *comm_action;
         instance->machines[i] = new_identity(config);
 
         if (instance->machines[i] == NULL) {
             return ENOMEM;
         }
 
+        loop_action = malloc(sizeof(identity_action));
+        comm_action = malloc(sizeof(identity_action));
+
+        if (loop_action == NULL || comm_action == NULL) {
+            return ENOMEM;
+        }
+
         /* The first has no parent - it is the root.  All others have the
          * previous ident as their parent. */
         if (i == 0) {
@@ -845,8 +940,23 @@ static int init_identities(parsed_configs configs, drl_instance_t *instance) {
 
         config = config->next;
 
+        memset(loop_action, 0, sizeof(identity_action));
+        memset(comm_action, 0, sizeof(identity_action));
+        loop_action->ident = instance->machines[i];
+        loop_action->action = ACTION_MAINLOOP;
+        loop_action->valid = 1;
+        comm_action->ident = instance->machines[i];
+        comm_action->action = ACTION_COMMUNICATE;
+        comm_action->valid = 1;
+
+        instance->machines[i]->loop_action = loop_action;
+        instance->machines[i]->comm_action = comm_action;
+
         TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK),
-                          instance->machines[i], calendar);
+                          loop_action, calendar);
+
+        TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK),
+                          comm_action, calendar);
 
         /* Setup the array of pointers to leaves.  This is easy for machines
          * because a machine node applies to every leaf. */
@@ -874,12 +984,37 @@ static int init_identities(parsed_configs configs, drl_instance_t *instance) {
 
     /* Sets... */
     for (i = 0; i < instance->set_count; ++i) {
+        identity_action *loop_action;
+        identity_action *comm_action;
+
         if (instance->sets[i]->parent == NULL) {
             instance->sets[i]->parent = instance->last_machine;
         }
 
+        loop_action = malloc(sizeof(identity_action));
+        comm_action = malloc(sizeof(identity_action));
+
+        if (loop_action == NULL || comm_action == NULL) {
+            return ENOMEM;
+        }
+
+        memset(loop_action, 0, sizeof(identity_action));
+        memset(comm_action, 0, sizeof(identity_action));
+        loop_action->ident = instance->sets[i];
+        loop_action->action = ACTION_MAINLOOP;
+        loop_action->valid = 1;
+        comm_action->ident = instance->sets[i];
+        comm_action->action = ACTION_COMMUNICATE;
+        comm_action->valid = 1;
+
+        instance->sets[i]->loop_action = loop_action;
+        instance->sets[i]->comm_action = comm_action;
+
+        TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK),
+                          loop_action, calendar);
+
         TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK),
-                          instance->sets[i], calendar);
+                          comm_action, calendar);
 
         /* Setup the array of pointers to leaves.  This is harder for sets,
          * but this doesn't need to be super-efficient because it happens
@@ -915,7 +1050,7 @@ static void print_instance(drl_instance_t *instance) {
 
 static int assign_htb_hierarchy(drl_instance_t *instance) {
     int i, j;
-    int next_node = 0x11;
+    int next_node = 0x100;
 
     /* Chain machine nodes under 1:10. */
     for (i = 0; i < instance->machine_count; ++i) {
@@ -952,13 +1087,43 @@ static int assign_htb_hierarchy(drl_instance_t *instance) {
 
 /* Added this so that I could comment one line and kill off all of the
  * command execution. */
-static int execute_cmd(const char *cmd) {
+static inline int execute_cmd(const char *cmd) {
     return system(cmd);
 }
 
+static inline int add_htb_node(const char *iface, const uint32_t parent_major, const uint32_t parent_minor,
+                               const uint32_t classid_major, const uint32_t classid_minor,
+                               const uint64_t rate, const uint64_t ceil) {
+    char cmd[300];
+
+    sprintf(cmd, "tc class add dev %s parent %x:%x classid %x:%x htb rate %llubit ceil %llubit",
+            iface, parent_major, parent_minor, classid_major, classid_minor, rate, ceil);
+    printlog(LOG_WARN, "INIT: HTB_cmd: %s\n", cmd);
+
+    return execute_cmd(cmd);
+}
+
+static inline int add_htb_netem(const char *iface, const uint32_t parent_major,
+                                const uint32_t parent_minor, const uint32_t handle,
+                                const int loss, const int delay) {
+    char cmd[300];
+
+    sprintf(cmd, "/sbin/tc qdisc del dev %s parent %x:%x handle %x pfifo", iface, parent_major,
+            parent_minor, handle);
+    printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
+    if (execute_cmd(cmd))
+        printlog(LOG_DEBUG, "HTB_cmd: Previous deletion did not succeed.\n");
+
+    sprintf(cmd, "/sbin/tc qdisc replace dev %s parent %x:%x handle %x netem loss %d%% delay %dms",
+            iface, parent_major, parent_minor, handle, loss, delay);
+    printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
+    return execute_cmd(cmd);
+}
+
 static int create_htb_hierarchy(drl_instance_t *instance) {
     char cmd[300];
     int i, j, k;
+    uint64_t gigabit = 1024 * 1024 * 1024;
 
     /* Nuke the hierarchy. */
     sprintf(cmd, "tc qdisc del dev eth0 root handle 1: htb");
@@ -971,36 +1136,25 @@ static int create_htb_hierarchy(drl_instance_t *instance) {
         return 1;
     }
     printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
-    sprintf(cmd, "tc class add dev eth0 parent 1: classid 1:1 htb rate 1000mbit ceil 1000mbit");
-    if (execute_cmd(cmd)) {
+
+    if (add_htb_node("eth0", 1, 0, 1, 1, gigabit, gigabit))
         return 1;
-    }
-    printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
 
-    /* Add back 1:10. (Nodelimit : Megabits/sec -> bits/second)*/
+    /* Add back 1:10. (Nodelimit : kilobits/sec -> bits/second)*/
     if (limiter.nodelimit) {
-        sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:1 classid 1:10 htb rate 8bit ceil %lubit",
-                (unsigned long) limiter.nodelimit * 1024 * 1024);
+        if (add_htb_node("eth0", 1, 1, 1, 0x10, 8, (uint64_t) limiter.nodelimit * 1024))
+            return 1;
     } else {
-        sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:1 classid 1:10 htb rate 8bit ceil 1000mbit");
-    }
-
-    if (execute_cmd(cmd)) {
-        return 1;
+        if (add_htb_node("eth0", 1, 1, 1, 0x10, 8, gigabit))
+            return 1;
     }
-    printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
 
     /* Add machines. */
     for (i = 0; i < instance->machine_count; ++i) {
-        sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil %lubit",
-                instance->machines[i]->htb_parent,
-                instance->machines[i]->htb_node,
-                (unsigned long) instance->machines[i]->limit * 1024 * 1024);
-
-        if (execute_cmd(cmd)) {
+        if (add_htb_node("eth0", 1, instance->machines[i]->htb_parent, 1,
+                         instance->machines[i]->htb_node, 8, instance->machines[i]->limit * 1024)) {
             return 1;
         }
-        printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
     }
 
 #define LIMITEXEMPT
@@ -1024,82 +1178,81 @@ static int create_htb_hierarchy(drl_instance_t *instance) {
 
     /* Add sets. */
     for (j = (instance->set_count - 1); j >= 0; --j) {
-        sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:%x htb rate 8bit ceil %lubit",
-                instance->sets[j]->htb_parent,
-                instance->sets[j]->htb_node,
-                (unsigned long) instance->sets[j]->limit * 1024 * 1024);
-
-        if (execute_cmd(cmd)) {
+        if (add_htb_node("eth0", 1, instance->sets[j]->htb_parent, 1,
+                         instance->sets[j]->htb_node, 8, instance->sets[j]->limit * 1024)) {
             return 1;
         }
-        printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
     }
 
     /* Add leaves. FIXME: Set static sliver limit as ceil here! */
     for (k = 0; k < instance->leaf_count; ++k) {
         if (instance->leaves[k].parent == NULL) {
-            sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:10 classid 1:1%x htb rate 8bit ceil %lubit",
-                instance->leaves[k].xid,
-                (unsigned long) 100 * 1024 * 1024);
+            if (add_htb_node("eth0", 1, 0x10, 1, (0x1000 | instance->leaves[k].xid), 8, gigabit))
+                return 1;
         } else {
-            sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:1%x htb rate 8bit ceil %lubit",
-                    instance->leaves[k].parent->htb_node,
-                    instance->leaves[k].xid,
-                    (unsigned long) 100 * 1024 * 1024);
-        }
-
-        if (execute_cmd(cmd)) {
-            return 1;
+            if (add_htb_node("eth0", 1, instance->leaves[k].parent->htb_node, 1, (0x1000 | instance->leaves[k].xid), 8, gigabit))
+                return 1;
         }
-        printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
-        
-        sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:20 classid 1:2%x htb rate 8bit ceil 1000mbit",
-                instance->leaves[k].xid);
 
-        if (execute_cmd(cmd)) {
+        /* Add exempt node for the leaf under 1:20 as 1:2<xid> */
+        if (add_htb_node("eth0", 1, 0x20, 1, (0x2000 | instance->leaves[k].xid), 8, gigabit))
             return 1;
-        }
-        printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
     }
 
     /* Add 1:1000 and 1:2000 */
     if (instance->last_machine == NULL) {
-        sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:10 classid 1:1000 htb rate 8bit ceil 1000mbit");
+        if (add_htb_node("eth0", 1, 0x10, 1, 0x1000, 8, gigabit))
+            return 1;
     } else {
-        sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:1000 htb rate 8bit ceil 1000mbit",
-                instance->last_machine->htb_node);
-    }
-
-    if (execute_cmd(cmd)) {
-        return 1;
+        if (add_htb_node("eth0", 1, instance->last_machine->htb_node, 1, 0x1000, 8, gigabit))
+            return 1;
     }
-    printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
-    sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:20 classid 1:2000 htb rate 8bit ceil 1000mbit");
 
-    if (execute_cmd(cmd)) {
+    if (add_htb_node("eth0", 1, 0x20, 1, 0x2000, 8, gigabit))
         return 1;
-    }
-    printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
 
     /* Add 1:1fff and 1:2fff */
     if (instance->last_machine == NULL) {
-        sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:10 classid 1:1fff htb rate 8bit ceil 1000mbit");
+        if (add_htb_node("eth0", 1, 0x10, 1, 0x1fff, 8, gigabit))
+            return 1;
     } else {
-        sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:%x classid 1:1fff htb rate 8bit ceil 1000mbit",
-                instance->last_machine->htb_node);
+        if (add_htb_node("eth0", 1, instance->last_machine->htb_node, 1, 0x1fff, 8, gigabit))
+            return 1;
     }
 
-    if (execute_cmd(cmd)) {
+    if (add_htb_node("eth0", 1, 0x20, 1, 0x2fff, 8, gigabit))
         return 1;
-    }
-    printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
-    sprintf(cmd, "/sbin/tc class add dev eth0 parent 1:20 classid 1:2fff htb rate 8bit ceil 1000mbit");
 
-    if (execute_cmd(cmd)) {
-        return 1;
+    /* Artifical delay or loss for experimentation. */
+    if (netem_delay.u.value || netem_loss.u.value) {
+        if (!strcmp(netem_slice.u.string, "ALL")) {
+            /* By default, netem applies to all leaves. */
+            if (add_htb_netem("eth0", 1, 0x1000, 0x1000, netem_loss.u.value, netem_delay.u.value))
+                return 1;
+            if (add_htb_netem("eth0", 1, 0x1fff, 0x1fff, netem_loss.u.value, netem_delay.u.value))
+                return 1;
+
+            for (k = 0; k < instance->leaf_count; ++k) {
+                if (add_htb_netem("eth0", 1, (0x1000 | instance->leaves[k].xid),
+                            (0x1000 | instance->leaves[k].xid), netem_loss.u.value, netem_delay.u.value)) {
+                    return 1;
+                }
+
+                //FIXME: add exempt delay/loss here on 0x2000 ... ?
+            }
+        } else {
+            /* netem_slice is not the default ALL value.  Only apply netem
+             * to the slice that is set in netem_slice.u.string. */
+            uint32_t slice_xid;
+
+            sscanf(netem_slice.u.string, "%x", &slice_xid);
+
+            if (add_htb_netem("eth0", 1, slice_xid, slice_xid, netem_loss.u.value, netem_delay.u.value))
+                return 1;
+        }
     }
-    printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
 
+#if 0
 #ifdef DELAY40MS
     /* Only for artificial delay testing. */
     sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1000 handle 1000 pfifo");
@@ -1119,6 +1272,7 @@ static int create_htb_hierarchy(drl_instance_t *instance) {
     execute_cmd(cmd);
     /* End delay testing */
 #endif
+#endif
 
     return 0;
 }
@@ -1333,6 +1487,8 @@ static int init_drl(void) {
         return false;
     }
 
+    partition_set = partition.u.value;
+
     pthread_rwlock_unlock(&limiter.limiter_lock);
 
     if (pthread_create(&limiter.udp_recv_thread, NULL, limiter_receive_thread, NULL)) {
@@ -1512,12 +1668,14 @@ static void *signal_thread_func(void *args) {
     sigemptyset(&sigs);
     sigaddset(&sigs, SIGHUP);
     sigaddset(&sigs, SIGUSR1);
+    sigaddset(&sigs, SIGUSR2);
     pthread_sigmask(SIG_BLOCK, &sigs, NULL);
 
     while (1) {
         sigemptyset(&sigs);
         sigaddset(&sigs, SIGHUP);
         sigaddset(&sigs, SIGUSR1);
+        sigaddset(&sigs, SIGUSR2);
 
         err = sigwait(&sigs, &sig);
 
@@ -1545,6 +1703,9 @@ static void *signal_thread_func(void *args) {
                 }
                 pthread_rwlock_unlock(&limiter.limiter_lock);
                 break;
+            case SIGUSR2:
+                do_partition = !do_partition;
+                break;
             default:
                 /* Intentionally blank. */
                 break;
@@ -1562,6 +1723,7 @@ static void _drl_reg_op(void)
     sigemptyset(&signal_mask);
     sigaddset(&signal_mask, SIGHUP);
     sigaddset(&signal_mask, SIGUSR1);
+    sigaddset(&signal_mask, SIGUSR2);
     pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
 
     if (pthread_create(&signal_thread, NULL, &signal_thread_func, NULL) != 0) {