Reincarnated GRD. Changed mesh decay to go to 1/N rather than 0.
Kevin Webb [Sat, 8 Nov 2008 00:35:54 +0000 (00:35 +0000)]
drl/drl_state.c
drl/drl_state.h
drl/estimate.c
drl/ratetypes.h
drl/ulogd_DRL.c

index 8583d0d..a30337b 100644 (file)
@@ -118,7 +118,7 @@ void free_comm(comm_t *comm) {
     }
 }
 
-int read_comm(comm_t *comm, double *aggregate) {
+int read_comm(comm_t *comm, double *aggregate, double decayto) {
     remote_limiter_t *remote;
 
     pthread_mutex_lock(&comm->lock);
@@ -131,9 +131,10 @@ int read_comm(comm_t *comm, double *aggregate) {
             *aggregate += remote->rate;
 
             /* If we continue to read it without having heard an update,
-             * we start to decay its value. */
+             * we start to make the peer's value approach decayto, getting
+             * half of the way there each time. */
             if (remote->awol >= REMOTE_AWOL_THRESHOLD) {
-                remote->rate = remote->rate / 2;
+                remote->rate += ((decayto - remote->rate) / 2);
             } else {
                 remote->awol++;
             }
index 2b1ec5e..7715a36 100644 (file)
@@ -31,7 +31,7 @@
 #define MAX_IDENTS (1024)
 #define MAX_LIMITERS (128)
 
-#define REMOTE_AWOL_THRESHOLD (3)
+#define REMOTE_AWOL_THRESHOLD (5)
 
 enum transports { UDP, TCP };
 
@@ -208,9 +208,13 @@ void free_comm(comm_t *comm);
  * @param aggregate The location at which the aggregate value will
  * be stored.
  *
+ * @param decayto When using a mesh comm fabric, limiters whose value
+ * has not been heard in several timesteps will decay to this value.
+ * Generally globallimit/N.
+ *
  * @returns 0 on success, EINVAL on error.
  */
-int read_comm(comm_t *comm, double *aggregate);
+int read_comm(comm_t *comm, double *aggregate, double decayto);
 
 /**
  * Updates the locally observed value of an identity.
index f1dd142..ed08447 100644 (file)
@@ -7,6 +7,8 @@
  * Kevin Webb 2007/2008
  */
 
+#include <assert.h>
+
 /** The size of the buffer we use to hold tc commands. */
 #define CMD_BUFFER_SIZE 200
 
@@ -204,7 +206,7 @@ static uint32_t allocate_fps(identity_t *ident, double total_weight) {
     }
 
     /* Convert weight into a rate - add in our new local weight */
-    total_weight = ident->localweight + peer_weights;
+    ident->total_weight = total_weight = ident->localweight + peer_weights;
 
     /* compute local allocation:
        if there is traffic elsewhere, use the weights
@@ -258,8 +260,8 @@ static double allocate_grd(identity_t *ident, double aggdemand) {
         dropprob = 0.0;
     }
     
-    //printf("local rate: %d, aggregate demand: %.3f, drop prob: %.3f\n",
-    //        ident->common.rate, aggdemand, dropprob);
+    printf("local rate: %d, aggregate demand: %.3f, drop prob: %.3f\n",
+           ident->common.rate, aggdemand, dropprob);
 
     return dropprob;
 }
@@ -273,7 +275,13 @@ static void allocate(limiter_t *limiter, identity_t *ident) {
     double comm_val = 0;
 
     /* Read comm_val from comm layer. */
-    read_comm(&ident->comm, &comm_val);
+    if (limiter->policy == POLICY_FPS) {
+        read_comm(&ident->comm, &comm_val,
+                ident->total_weight / (double) (ident->comm.remote_node_count + 1));
+    } else {
+        read_comm(&ident->comm, &comm_val,
+                (double) (ident->limit / (double) (ident->comm.remote_node_count + 1)));
+    }
     printlog(LOG_DEBUG, "%.3f Aggregate weight/rate (FPS/GRD)\n", comm_val);
 
     /* Experimental printing. */
@@ -281,7 +289,7 @@ static void allocate(limiter_t *limiter, identity_t *ident) {
              (double) ident->common.rate / (double) 128, ident->id);
     ident->avg_bytes += ident->common.rate;
     
-    if (limiter->policynum == POLICY_FPS) {
+    if (limiter->policy == POLICY_FPS) {
         ident->locallimit = allocate_fps(ident, comm_val);
         ident->last_localweight = ident->localweight;
         
@@ -289,8 +297,8 @@ static void allocate(limiter_t *limiter, identity_t *ident) {
         write_local_value(&ident->comm, ident->localweight);
     } else {
         ident->locallimit = 0; /* Unused with GRD. */
-        ident->last_localdropprob = ident->localdropprob;
-        ident->localdropprob = allocate_grd(ident, comm_val);
+        ident->last_drop_prob = ident->drop_prob;
+        ident->drop_prob = allocate_grd(ident, comm_val);
         
         /* Update other limiters with our rate by writing to comm layer. */
         write_local_value(&ident->comm, ident->common.rate);
@@ -301,14 +309,35 @@ static void allocate(limiter_t *limiter, identity_t *ident) {
 }
 
 /**
+ * Traces all of the parent pointers of a leaf all the way to the root in
+ * order to find the maximum drop probability in the chain.
+ */
+static double find_leaf_drop_prob(leaf_t *leaf) {
+    identity_t *current = leaf->parent;
+    double result = 0;
+
+    assert(current);
+
+    while (current != NULL) {
+        if (current->drop_prob > result) {
+            result = current->drop_prob;
+        }
+        current = current->parent;
+    }
+
+    return result;
+}
+
+/**
  * This is called once per estimate interval to enforce the rate that allocate
  * has decided upon.  It makes calls to tc using system().
  */
 static void enforce(limiter_t *limiter, identity_t *ident) {
     char cmd[CMD_BUFFER_SIZE];
     int ret = 0;
+    int i = 0;
 
-    switch (limiter->policynum) {
+    switch (limiter->policy) {
         case POLICY_FPS:
 
             /* TC treats limits of 0 (8bit) as unlimited, which causes the
@@ -344,23 +373,56 @@ static void enforce(limiter_t *limiter, identity_t *ident) {
             break;
 
         case POLICY_GRD:
-/* FIXME: Figure out where to enforce GRD. */
-#if 0
-            for (i = 0; i < ident->num_slices; i++){
-
-                sprintf(cmd, "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %.4f delay 40ms",
-                        ident->xids[i],ident->xids[i], (100*ident->localdropprob));
-
+            for (i = 0; i < ident->leaf_count; ++i) {
+                if (ident->drop_prob >= ident->leaves[i]->drop_prob) {
+                    /* The new drop probability for this identity is greater
+                     * than or equal to the leaf's current drop probability.
+                     * We can safely use the larger value at this leaf
+                     * immediately. */
+                    ident->leaves[i]->drop_prob = ident->drop_prob;
+                } else if (ident->last_drop_prob < ident->leaves[i]->drop_prob) {
+                    /* The old drop probability for this identity is less than
+                     * the leaf's current drop probability.  This means that
+                     * this identity couldn't have been the limiting ident,
+                     * so nothing needs to be done because the old limiting
+                     * ident is still the limiting factor. */
+
+                    /* Intentionally blank. */
+                } else {
+                    /* If neither of the above are true, then...
+                     * 1) The new drop probability for the identity is less
+                     * than what it previously was, and
+                     * 2) This ident may have had the maximum drop probability
+                     * of all idents limiting this leaf, and therefore we need
+                     * to follow the leaf's parents up to the root to find the
+                     * new leaf drop probability safely. */
+                    ident->leaves[i]->drop_prob =
+                            find_leaf_drop_prob(ident->leaves[i]);
+                }
+
+                /* Make the call to tc. */
+#ifdef DELAY40MS
+                snprintf(cmd, CMD_BUFFER_SIZE,
+                         "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %.4f delay 40ms",
+                         ident->leaves[i]->xid, ident->leaves[i]->xid,
+                         (100 * ident->leaves[i]->drop_prob));
+#else
+                snprintf(cmd, CMD_BUFFER_SIZE,
+                         "/sbin/tc qdisc change dev eth0 parent 1:1%x handle 1%x netem loss %.4f delay 0ms",
+                         ident->leaves[i]->xid, ident->leaves[i]->xid,
+                         (100 * ident->leaves[i]->drop_prob));
+#endif
                 ret = system(cmd);
 
-                if (ret==-1)
-                    print_system_error(ret);
+                if (ret) {
+                    /* FIXME: call failed.  What to do? */
+                }
             }
-#endif
+
             break;
 
         default: 
-            printlog(LOG_CRITICAL, "DRL enforce: unknown policy %d\n",limiter->policynum);
+            printlog(LOG_CRITICAL, "DRL enforce: unknown policy %d\n",limiter->policy);
             break;
     }
 
index c88c931..59a481a 100644 (file)
@@ -41,6 +41,13 @@ typedef struct identity {
     /** Pointer to the identity's parent in the HTB hierarchy. */
     struct identity *parent;
 
+    /** Array of the leaves that are limited by this identity. Points into the
+     * leaves array for the identity's instance. */
+    struct leaf **leaves;
+
+    /** The number of leaves for which this identity is responsible. */
+    int leaf_count;
+
     /** The fixed (per second) EWMA weight. */
     double fixed_ewma_weight;
     
@@ -71,6 +78,8 @@ typedef struct identity {
     /** FPS previous weight value. */
     double last_localweight;
 
+    double total_weight;
+
     /** A flag to indicate whether or not the identity is in the flowstart
      * state.  During flowstart, the identity's limit is raised to allow for
      * flows to grow before incurring losses. */
@@ -79,10 +88,10 @@ typedef struct identity {
     /* GRD */
 
     /** GRD drop probability information. */
-    double localdropprob;
+    double drop_prob;
     
     /** GRD previous drop probability information. */
-    double last_localdropprob;
+    double last_drop_prob;
 
     /* Flow accounting machinery. */
 
@@ -139,6 +148,9 @@ typedef struct leaf {
     /** The leaf's parent in the hierarchy.  This is the identity to which this
      * leaf belongs. */
     identity_t *parent;
+
+    /** GRD: The leaf's packet drop probability. */
+    double drop_prob;
 } leaf_t;
 
 /**
@@ -194,7 +206,7 @@ typedef struct limiter {
     uint32_t nodelimit;
 
     /** The DRL policy (GRD, FPS) this node is using. */
-    enum policies policynum;
+    enum policies policy;
 
     /** The estimate interval (in milliseconds). */
     int estintms;
index 5d7ca7e..ef2a17b 100644 (file)
@@ -766,8 +766,37 @@ static int validate_configs(parsed_configs configs, drl_instance_t *instance) {
     return 0;
 }
 
+static int fill_set_leaf_pointer(drl_instance_t *instance, identity_t *ident) {
+    int count = 0;
+    identity_t *current_ident;
+    leaf_t *current_leaf;
+    leaf_t **leaves = malloc(instance->leaf_count * sizeof(leaf_t *));
+    if (leaves == NULL) {
+        return 1;
+    }
+
+    map_reset_iterate(instance->leaf_map);
+    while ((current_leaf = (leaf_t *) map_next(instance->leaf_map))) {
+        current_ident = current_leaf->parent;
+        while (current_ident != NULL && current_ident != instance->last_machine) {
+            if (current_ident == ident) {
+                /* Found the ident we were looking for - add the leaf. */
+                leaves[count] = current_leaf;
+                count += 1;
+                break;
+            }
+            current_ident = current_ident->parent;
+        }
+    }
+
+    ident->leaves = leaves;
+    ident->leaf_count = count;
+
+    return 0;
+}
+
 static int init_identities(parsed_configs configs, drl_instance_t *instance) {
-    int i;
+    int i, j;
     ident_config *config = configs.machines;
     leaf_t *leaf = NULL;
 
@@ -817,6 +846,18 @@ static int init_identities(parsed_configs configs, drl_instance_t *instance) {
 
         TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK),
                           instance->machines[i], calendar);
+
+        /* Setup the array of pointers to leaves.  This is easy for machines
+         * because a machine node applies to every leaf. */
+        instance->machines[i]->leaves =
+            malloc(instance->leaf_count * sizeof(leaf_t *));
+        if (instance->machines[i]->leaves == NULL) {
+            return ENOMEM;
+        }
+        instance->machines[i]->leaf_count = instance->leaf_count;
+        for (j = 0; j < instance->leaf_count; ++j) {
+            instance->machines[i]->leaves[j] = &instance->leaves[j];
+        }
     }
 
     /* Connect the set subtree to the machines. Any set or leaf without a
@@ -838,6 +879,13 @@ static int init_identities(parsed_configs configs, drl_instance_t *instance) {
 
         TAILQ_INSERT_TAIL(instance->cal + (instance->cal_slot & SCHEDMASK),
                           instance->sets[i], calendar);
+
+        /* Setup the array of pointers to leaves.  This is harder for sets,
+         * but this doesn't need to be super-efficient because it happens
+         * rarely and it isn't on the critical path for reconfig(). */
+        if (fill_set_leaf_pointer(instance, instance->sets[i])) {
+            return ENOMEM;
+        }
     }
 
     /* Success. */
@@ -1039,6 +1087,94 @@ static int create_htb_hierarchy(drl_instance_t *instance) {
     }
     printlog(LOG_DEBUG, "HTB_cmd: %s\n", cmd);
 
+#ifdef DELAY40MS
+    /* Only for artificial delay testing. */
+    sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1000 handle 1000 pfifo");
+    execute_cmd(cmd);
+
+    sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1000 handle 1000 netem loss 0 delay 40ms");
+    execute_cmd(cmd);
+    sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:11f9 handle 11f9 pfifo");
+    execute_cmd(cmd);
+
+    sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:11f9 handle 11f9 netem loss 0 delay 40ms");
+    execute_cmd(cmd);
+    sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:11fa handle 11fa pfifo");
+    execute_cmd(cmd);
+
+    sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:11fa handle 11fa netem loss 0 delay 40ms");
+    execute_cmd(cmd);
+    /* End delay testing */
+#endif
+
+    return 0;
+}
+
+static int setup_tc_grd(drl_instance_t *instance) {
+    int i;
+    char cmd[300];
+
+    for (i = 0; i < instance->leaf_count; ++i) {
+        /* Delete the old pfifo qdisc that might have been there before. */
+        sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1%x handle 1%x pfifo",
+                instance->leaves[i].xid, instance->leaves[i].xid);
+
+        if (execute_cmd(cmd)) {
+            //FIXME: remove this print and do a log.
+            printf("GRD: pfifo qdisc wasn't there!\n");
+        }
+
+        /* Add the netem qdisc. */
+#ifdef DELAY40MS
+        sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1%x handle 1%x netem loss 0 delay 40ms",
+                instance->leaves[i].xid, instance->leaves[i].xid);
+#else
+        sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1%x handle 1%x netem loss 0 delay 0ms",
+                instance->leaves[i].xid, instance->leaves[i].xid);
+#endif
+
+        if (execute_cmd(cmd)) {
+            return 1;
+        }
+    }
+
+    /* Do the same for 1000 and 1fff. */
+    sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1000 handle 1000 pfifo");
+
+    if (execute_cmd(cmd)) {
+        //FIXME: remove this print and do a log.
+        printf("GRD: pfifo qdisc wasn't there!\n");
+    }
+
+    /* Add the netem qdisc. */
+#ifdef DELAY40MS
+    sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1000 handle 1000 netem loss 0 delay 40ms");
+#else
+    sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1000 handle 1000 netem loss 0 delay 0ms");
+#endif
+
+    if (execute_cmd(cmd)) {
+        return 1;
+    }
+
+    sprintf(cmd, "/sbin/tc qdisc del dev eth0 parent 1:1fff handle 1fff pfifo");
+
+    if (execute_cmd(cmd)) {
+        //FIXME: remove this print and do a log.
+        printf("GRD: pfifo qdisc wasn't there!\n");
+    }
+
+    /* Add the netem qdisc. */
+#ifdef DELAY40MS
+    sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1fff handle 1fff netem loss 0 delay 40ms");
+#else
+    sprintf(cmd, "/sbin/tc qdisc replace dev eth0 parent 1:1fff handle 1fff netem loss 0 delay 0ms");
+#endif
+
+    if (execute_cmd(cmd)) {
+        return 1;
+    }
+
     return 0;
 }
 
@@ -1097,9 +1233,9 @@ static int init_drl(void) {
 
     printlog(LOG_WARN, "     POLICY: %s\n",policy.u.string);
     if (strcasecmp(policy.u.string,"GRD") == 0) {
-        limiter.policynum = POLICY_GRD;
+        limiter.policy = POLICY_GRD;
     } else if (strcasecmp(policy.u.string,"FPS") == 0) {
-        limiter.policynum = POLICY_FPS;
+        limiter.policy = POLICY_FPS;
     } else {
         printlog(LOG_CRITICAL,
                  "Unknown DRL policy %s, aborting.\n",policy.u.string);
@@ -1155,13 +1291,27 @@ static int init_drl(void) {
     /* Debugging - FIXME: remove this? */
     print_instance(&limiter.stable_instance);
 
-    if (assign_htb_hierarchy(&limiter.stable_instance)) {
-        free_instance(&limiter.stable_instance);
-        return false;
-    }
+    switch (limiter.policy) {
+        case POLICY_FPS:
+            if (assign_htb_hierarchy(&limiter.stable_instance)) {
+                free_instance(&limiter.stable_instance);
+                return false;
+            }
+
+            if (create_htb_hierarchy(&limiter.stable_instance)) {
+                free_instance(&limiter.stable_instance);
+                return false;
+            }
+        break;
 
-    if (create_htb_hierarchy(&limiter.stable_instance)) {
-        free_instance(&limiter.stable_instance);
+        case POLICY_GRD:
+            if (setup_tc_grd(&limiter.stable_instance)) {
+                free_instance(&limiter.stable_instance);
+                return false;
+            }
+        break;
+
+        default:
         return false;
     }
 
@@ -1204,9 +1354,6 @@ static void reconfig() {
         return;
     }
 
-    /* Lock */
-    pthread_rwlock_wrlock(&limiter.limiter_lock);
-
     if (validate_configs(configs, &limiter.new_instance)) {
         free_failed_config(configs, &limiter.new_instance);
         printlog(LOG_CRITICAL, "Validation failed during reconfig().\n");
@@ -1226,28 +1373,56 @@ static void reconfig() {
 
     /* Debugging - FIXME: remove this? */
     print_instance(&limiter.new_instance);
+    
+    /* Lock */
+    pthread_rwlock_wrlock(&limiter.limiter_lock);
 
-    if (assign_htb_hierarchy(&limiter.new_instance)) {
-        free_instance(&limiter.new_instance);
-        printlog(LOG_CRITICAL, "Failed to assign HTB hierarchy during reconfig().\n");
-        pthread_rwlock_unlock(&limiter.limiter_lock);
-        return;
-    }
+    switch (limiter.policy) {
+        case POLICY_FPS:
+            if (assign_htb_hierarchy(&limiter.new_instance)) {
+                free_instance(&limiter.new_instance);
+                printlog(LOG_CRITICAL, "Failed to assign HTB hierarchy during reconfig().\n");
+                pthread_rwlock_unlock(&limiter.limiter_lock);
+                return;
+            }
 
-    if (create_htb_hierarchy(&limiter.new_instance)) {
-        free_instance(&limiter.new_instance);
-        printlog(LOG_CRITICAL, "Failed to create HTB hierarchy during reconfig().\n");
+            if (create_htb_hierarchy(&limiter.new_instance)) {
+                free_instance(&limiter.new_instance);
+                printlog(LOG_CRITICAL, "Failed to create HTB hierarchy during reconfig().\n");
+
+                /* Re-create old instance. */
+                if (create_htb_hierarchy(&limiter.stable_instance)) {
+                    /* Error reinstating the old one - big problem. */
+                    printlog(LOG_CRITICAL, "Failed to reinstate HTB hierarchy during reconfig().\n");
+                    printlog(LOG_CRITICAL, "Giving up...\n");
+                    flushlog();
+                    exit(EXIT_FAILURE);
+                }
 
-        /* Re-create old instance. */
-        if (create_htb_hierarchy(&limiter.stable_instance)) {
-            /* Error reinstating the old one - big problem. */
-            printlog(LOG_CRITICAL, "Failed to reinstate HTB hierarchy during reconfig().\n");
-            flushlog();
-            exit(EXIT_FAILURE);
-        }
+                pthread_rwlock_unlock(&limiter.limiter_lock);
+                return;
+            }
+        break;
+
+        case POLICY_GRD:
+            if (setup_tc_grd(&limiter.new_instance)) {
+                free_instance(&limiter.new_instance);
+                printlog(LOG_CRITICAL, "GRD tc calls failed during reconfig().\n");
+
+                /* Try to re-create old instance. */
+                if (setup_tc_grd(&limiter.stable_instance)) {
+                    printlog(LOG_CRITICAL, "Failed to reinstate old GRD qdiscs during reconfig().\n");
+                    printlog(LOG_CRITICAL, "Giving up...\n");
+                    flushlog();
+                    exit(EXIT_FAILURE);
+                }
+            }
+        break;
 
-        pthread_rwlock_unlock(&limiter.limiter_lock);
-        return;
+        default:
+            /* Should be impossible. */
+            printf("Pigs are flying?\n");
+            exit(EXIT_FAILURE);
     }
 
     /* Switch over new to stable instance. */