Updates to autotools for library detection
Kevin Webb [Fri, 22 Jan 2010 20:41:46 +0000 (20:41 +0000)]
Lots of changes to the gossip code to facilitate new group membership options
Cleaned up some old code

26 files changed:
Makefile.in
Rules.make.in
configure.in
drl.xml
drl/Makefile.in
drl/config.c
drl/config.h
drl/drl_state.c
drl/drl_state.h
drl/estimate.c
drl/peer_comm.c
drl/peer_comm.h
drl/raterouter.h
drl/ratetypes.h
drl/swim.c [new file with mode: 0644]
drl/swim.h [new file with mode: 0644]
drl/ulogd_DRL.c
drl/zk_drl.c [new file with mode: 0644]
drl/zk_drl.h [new file with mode: 0644]
include/zookeeper/recordio.h [new file with mode: 0644]
include/zookeeper/zookeeper.h [new file with mode: 0644]
include/zookeeper/zookeeper.jute.h [new file with mode: 0644]
include/zookeeper/zookeeper_log.h [new file with mode: 0644]
include/zookeeper/zookeeper_version.h [new file with mode: 0644]
ulogd.conf.in
ulogd.spec

index 618d44c..0358fd8 100644 (file)
@@ -62,9 +62,9 @@ recurse:
        @for d in $(SUBDIRS); do if ! make -C $$d; then exit 1; fi; done
 
 ulogd: ulogd.c include/ulogd/ulogd.h ulogd.conf recurse
-       $(CC) $(CFLAGS) -rdynamic $< conffile/conffile.o $(LIBIPULOG)/libipulog.a -o $@ $(LDFLAGS) $(LIBS) `xml2-config --libs`
+       $(CC) $(CFLAGS) -rdynamic $< conffile/conffile.o $(LIBIPULOG)/libipulog.a -o $@ $(LDFLAGS) $(LIBS) $(XML_LDFLAGS)
 
-edit = sed -e 's,@libdir\@,$(ULOGD_LIB_PATH),g'
+edit = sed -e 's,@libdir\@,$(ULOGD_LIB_PATH),g' -e 's,@etcdir\@,$(DESTDIR)$(ETCDIR),g'
 
 ulogd.conf: ulogd.conf.in
        $(edit) ulogd.conf.in > ulogd.conf
@@ -80,6 +80,7 @@ install: all
        @INSTALL@ -D -m 755 ulogd $(DESTDIR)$(BINDIR)/ulogd
        @[ -d $(DESTDIR)$(ETCDIR) ] || mkdir -p $(DESTDIR)$(ETCDIR)
        @[ -f $(DESTDIR)$(ETCDIR)/ulogd.conf ] || @INSTALL@ -D -m 600 ulogd.conf $(DESTDIR)$(ETCDIR)/ulogd.conf
+       @[ -f $(DESTDIR)$(ETCDIR)/drl.xml ] || @INSTALL@ -D -m 600 drl.xml $(DESTDIR)$(ETCDIR)/drl.xml
 
 doc:
        $(MAKE) -C $@
index 99a7f1a..ec9679f 100644 (file)
@@ -35,7 +35,6 @@ endif
 
 LIBS=@LIBS@
 
-
 # Names of the plugins to be compiled
 ULOGD_SL:=BASE OPRINT PWSNIFF LOGEMU LOCAL SYSLOG
 
@@ -54,3 +53,8 @@ PGSQL_LDFLAGS=@DATABASE_LIB_DIR@ @PGSQL_LIB@
 SQLITE3_CFLAGS=-I@SQLITE3INCLUDES@ @EXTRA_SQLITE3_DEF@
 SQLITE3_LDFLAGS=@DATABASE_LIB_DIR@ @SQLITE3_LIB@
 
+XML_CFLAGS=@XMLINCLUDES@
+XML_LDFLAGS=@XMLLIBS@
+
+ZK_CFLAGS=@ZKFLAGS@
+ZK_LDFLAGS=@ZKLIBS@
index 2f888fb..7bd3640 100644 (file)
@@ -65,10 +65,6 @@ dnl test for MySQL
 dnl
 AC_ARG_WITH(mysql,
  --with-mysql=<directory>              mysql installed in <directory>,[
-if test $withval == no
-then
-    AC_MSG_WARN("mysql disabled.")
-else
 if test $withval != yes
 then
         dir=$withval
@@ -126,8 +122,7 @@ else
                AC_MSG_RESULT(found new MySQL)
        fi
 
-fi
-fi
+fi      
 ])      
 
 
@@ -146,6 +141,86 @@ AC_ARG_WITH(mysql-log-ip-as-string,
    AC_MSG_WARN(the use of --with-mysql-log-ip-as-string is discouraged)
 ])
 
+dnl
+dnl test for libxml2
+dnl
+AC_ARG_WITH(libxml2, --with-libxml=<directory>         libxml2 installed in <directory>,[
+if test $withval != yes
+then
+        dir=$withval
+else
+        dir="/usr/local"
+fi])
+
+libxmldir=""
+AC_MSG_CHECKING(for LIBXML2 files)
+for d in $dir/bin /usr/bin /usr/local/bin /usr/local/libxml2/bin /opt/libxml2/bin /opt/packages/libxml2/bin
+do
+    if test -x $d/xml2-config
+    then
+        AC_MSG_RESULT(found xml2-config in $d)
+        libxmldir=$d
+        break
+    fi
+done
+
+if test x$libxmldir = x
+then
+    AC_MSG_ERROR(xml2-config not found)
+else
+    XMLINCLUDES=`$libxmldir/xml2-config --cflags`
+    AC_SUBST(XMLINCLUDES)
+
+    XMLLIBS=`$libxmldir/xml2-config --libs`
+    AC_SUBST(XMLLIBS)
+fi
+
+dnl
+dnl check for zookeeper library
+dnl
+zkdir="/usr/local/lib"
+
+AC_ARG_WITH(zookeeper, --with-zookeeperlib=<directory>         zookeeper shared object located in <directory>,[
+if test $withval = no
+then
+    zkdir=no
+    AC_MSG_WARN(Building without zookeeper support.)
+else
+    zkdir=$withval
+fi])
+
+zklib=""
+if test $zkdir != no
+then
+    AC_MSG_CHECKING(for zookeeper libraries)
+    for d in $zkdir /usr/local/lib /lib /usr/lib
+    do
+        if test -f $d/libzookeeper_mt.so
+        then
+            AC_MSG_RESULT(found libzookeeper_mt.so in $d)
+            zklib=$d/libzookeeper_mt.so
+            break
+        fi
+    done
+
+    if test x$zklib = x
+    then
+        dnl no zklib
+        AC_MSG_WARN(Zookeeper libraries not found.)
+        ZKLIBS=""
+        AC_SUBST(ZKLIBS)
+
+        ZKFLAGS=""
+        AC_SUBST(ZKFLAGS)
+    else
+        dnl found it
+        ZKLIBS=$zklib
+        AC_SUBST(ZKLIBS)
+
+        ZKFLAGS="-DTHREADED -DBUILD_ZOOKEEPER"
+        AC_SUBST(ZKFLAGS)
+    fi
+fi
 
 dnl
 dnl test for PostgreSQL
diff --git a/drl.xml b/drl.xml
index 3b3f4b6..619a6ec 100644 (file)
--- a/drl.xml
+++ b/drl.xml
@@ -1,6 +1,10 @@
 <?xml version="1.0" encoding="UTF-8"?>
+
+<!--
 <drl>
-    <machine id="11" limit="10" commfabric="MESH" branch="0" accounting="STANDARD" ewma="0.1" intervals="1">
-        <peer>127.0.0.1</peer>
+    <machine id="11" limit="10240" commfabric="MESH" failure_behavior="QUORUM" accounting="STANDARD" ewma="0.1">
+        <peer>x.x.x.x</peer>
+        <peer>y.y.y.y</peer>
     </machine>
 </drl>
+-->
index 0315752..5eca2e2 100644 (file)
@@ -1,21 +1,21 @@
 #
 include @top_srcdir@/Rules.make
 
-CFLAGS+=-I@top_srcdir@ -I@top_srcdir@/libipulog/include -I@top_srcdir@/include `xml2-config --cflags`
+CFLAGS+=-I@top_srcdir@ -I@top_srcdir@/libipulog/include -I@top_srcdir@/include -I@top_srcdir@/include/zookeeper $(XML_CFLAGS) $(ZK_CFLAGS)
 SH_CFLAGS:=$(CFLAGS) -fPIC
 
 #  Normally You should not need to change anything below
 #
 
 SHARED_LIBS=ulogd_DRL.so
-OBJECTS=config.o drl_state.o estimate.o logging.o multipleinterval.o peer_comm.o samplehold.o simple.o standard.o ulogd_DRL.o util.o
+OBJECTS=config.o drl_state.o estimate.o logging.o multipleinterval.o peer_comm.o samplehold.o simple.o standard.o swim.o ulogd_DRL.o util.o zk_drl.o
 
 all: $(SHARED_LIBS)
 
 distrib:
 
 $(SHARED_LIBS): $(OBJECTS)
-       $(LD) $(LDFLAGS) -shared -o $@ $(OBJECTS) -lc 
+       $(LD) $(LDFLAGS) -shared -o $@ $(OBJECTS) -lc $(ZK_LDFLAGS)
 
 %.o: %.c
        $(CC) $(CFLAGS) -c $<
index 42f4472..d9d9ede 100644 (file)
@@ -106,6 +106,8 @@ static int parse_common(xmlDocPtr doc, xmlNodePtr ident, ident_config *common) {
     xmlChar *limit;
     xmlChar *commfabric;
     xmlChar *branch;
+    xmlChar *membership;
+    xmlChar *failure_behavior;
     xmlChar *accounting;
     xmlChar *ewma;
     xmlChar *mainloop_intervals;
@@ -114,6 +116,12 @@ static int parse_common(xmlDocPtr doc, xmlNodePtr ident, ident_config *common) {
     xmlNodePtr fields = ident->children;
     ident_peer *current = NULL;
 
+    /* The struct has been memsetted to 0, this is just to be safe. */
+    common->zk_host = NULL;
+    common->peers = NULL;
+    common->members = NULL;
+    common->next = NULL;
+
     /* Make sure no required fields are missing. */
     id = xmlGetProp(ident, (const xmlChar *) "id");
     if (id == NULL) {
@@ -150,7 +158,7 @@ static int parse_common(xmlDocPtr doc, xmlNodePtr ident, ident_config *common) {
         xmlFree(commfabric);
     }
 
-    /* Only care about branching factor if we're using gossip. */
+    /* Only care about branching factor and failure detector if we're using gossip. */
     if (common->commfabric == COMM_GOSSIP) {
         branch = xmlGetProp(ident, (const xmlChar *) "branch");
         if (branch == NULL) {
@@ -160,6 +168,46 @@ static int parse_common(xmlDocPtr doc, xmlNodePtr ident, ident_config *common) {
             common->branch = atoi((const char *) branch);
             xmlFree(branch);
         }
+
+        membership = xmlGetProp(ident, (const xmlChar *) "membership");
+        if (membership == NULL) {
+            printlog(LOG_CRITICAL, "Ident missing membership protocol selection.\n");
+            return EINVAL;
+        } else {
+            if (!xmlStrcmp(membership, (const xmlChar *) "SWIM")) {
+                common->membership = SWIM;
+            } else if (!xmlStrcmp(membership, (const xmlChar *) "ZOOKEEPER")) {
+#ifdef BUILD_ZOOKEEPER
+                common->membership = ZOOKEEPER;
+#else
+                printlog(LOG_CRITICAL, "Zookeeper requested, but support not compiled into DRL at configure time.\n");
+                xmlFree(membership);
+                return EINVAL;
+#endif
+            } else {
+                printlog(LOG_CRITICAL, "Unknown/invalid gossip group membership protocol.\n");
+                xmlFree(membership);
+                return EINVAL;
+            }
+            xmlFree(membership);
+        }
+
+        failure_behavior = xmlGetProp(ident, (const xmlChar *) "failure_behavior");
+        if (failure_behavior == NULL) {
+            printlog(LOG_CRITICAL, "Ident missing failure handling behavior.\n");
+            return EINVAL;
+        } else {
+            if (!xmlStrcmp(failure_behavior, (const xmlChar *) "PANIC")) {
+                common->failure_behavior = PANIC;
+            } else if (!xmlStrcmp(failure_behavior, (const xmlChar *) "QUORUM")) {
+                common->failure_behavior = QUORUM;
+            } else {
+                printlog(LOG_CRITICAL, "Unknown/invalid gossip failure behavior policy.\n");
+                xmlFree(failure_behavior);
+                return EINVAL;
+            }
+            xmlFree(failure_behavior);
+        }
     }
 
     accounting = xmlGetProp(ident, (const xmlChar *) "accounting");
@@ -243,6 +291,15 @@ static int parse_common(xmlDocPtr doc, xmlNodePtr ident, ident_config *common) {
                 current->next = NULL;
             }
             xmlFree(ip);
+        } else if ((!xmlStrcmp(fields->name, (const xmlChar *) "zkhost"))) {
+            xmlChar *host = xmlNodeListGetString(doc, fields->children, 1);
+
+            common->zk_host = strdup((const char *) host);
+            if (common->zk_host == NULL) {
+                return ENOMEM;
+            }
+
+            xmlFree(host);
         }
         fields = fields->next;
     }
@@ -252,6 +309,11 @@ static int parse_common(xmlDocPtr doc, xmlNodePtr ident, ident_config *common) {
         return EINVAL;
     }
 
+    if (common->membership == ZOOKEEPER && common->zk_host == NULL) {
+        printlog(LOG_CRITICAL, "Group membership protocol ZOOKEEPER requires a zkhost field.\n");
+        return EINVAL;
+    }
+
     /* No errors. */
     return 0;
 }
index 0b202c7..883b641 100644 (file)
@@ -69,6 +69,22 @@ typedef struct ident_config {
     /** The gossip branch factor (when commfabric is COMM_GOSSIP). */
     int branch;
 
+    /** The gossip group membership policy (SWIM, ZOOKEEPER). */
+    enum memberships membership;
+
+    /** The behavioral policy to use when one or more failures in group
+     * membership are detected. */
+    enum failure_behaviors failure_behavior;
+
+#ifdef BUILD_ZOOKEEPER
+
+    /** The host string that should be passed to zookeeper_init when using
+     * zookeeper.  This consists of comma-separated ipaddr:port pairs. Example:
+     * "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" */
+    char *zk_host;
+
+#endif
+
     /** The flow accounting mechanism to be used by this identity. */
     enum accountings accounting;
 
index ef57abe..d0baa4a 100644 (file)
 #include "ratetypes.h"
 #include "drl_state.h"
 #include "peer_comm.h"
+#include "swim.h"
 #include "logging.h"
 
+#ifdef BUILD_ZOOKEEPER
+    #include "zk_drl.h"
+#endif
+
 extern limiter_t limiter;
 
+static int group_membership_init(comm_t *comm, uint32_t id, ident_config *config) {
+    switch (comm->gossip.membership) {
+        case SWIM:
+            return swim_init(comm, id);
+        break;
+
+#ifdef BUILD_ZOOKEEPER
+
+        case ZOOKEEPER:
+            return zk_drl_init(comm, id, &limiter, config);
+        break;
+
+#endif
+
+        default:
+            printlog(LOG_CRITICAL, "drl_state.c: This shouldn't happen!\n");
+            return EINVAL;
+    }
+}
+
+static void group_membership_teardown(comm_t *comm) {
+    switch (comm->gossip.membership) {
+        case SWIM:
+            swim_teardown(comm);
+        break;
+
+#ifdef BUILD_ZOOKEEPER
+
+        case ZOOKEEPER:
+            zk_drl_close(comm);
+        break;
+
+#endif
+
+        default:
+            printlog(LOG_CRITICAL, "drl_state.c: This shouldn't happen!\n");
+    }
+}
+
+void null_restart_function(comm_t *comm, int32_t view_number) {
+    /* Intentionally empty. */
+}
+
 int new_comm(comm_t *comm, ident_config *config, remote_node_t *remote_nodes) {
     int i;
+    int result = 0;
 
     memset(comm, 0, sizeof(comm_t));
 
@@ -40,17 +89,32 @@ int new_comm(comm_t *comm, ident_config *config, remote_node_t *remote_nodes) {
     comm->transport_proto = UDP;
     comm->remote_node_count = config->peer_count;
     comm->gossip.gossip_branch = config->branch;
+    comm->gossip.membership = config->membership;
+    comm->gossip.failure_behavior = config->failure_behavior;
     comm->gossip.weight = 1.0;
 
     pthread_mutex_init(&comm->lock, NULL);
-
-    /* Set send function. */
+    
+    // allocate memory to the indices
+    comm->indices = (int*) malloc(sizeof(int)*comm->remote_node_count);
+    memset(comm->indices, 0, sizeof(int)*comm->remote_node_count);
+    for(i = 0; i < comm->remote_node_count; i++)
+        comm->indices[i] = i;
+    comm->shuffle_index = comm->remote_node_count;
+
+    /* Set default comm function pointers. These may get overwritten later
+     * by more specific initialization routines such as group membership
+     * init calls. */
     switch (config->commfabric) {
         case COMM_MESH:
             comm->send_function = send_udp_mesh;
+            comm->recv_function = recv_mesh;
+            comm->restart_function = null_restart_function;
             break;
         case COMM_GOSSIP:
             comm->send_function = send_udp_gossip;
+            comm->recv_function = recv_gossip;
+            comm->restart_function = null_restart_function;
             break;
     }
 
@@ -61,8 +125,7 @@ int new_comm(comm_t *comm, ident_config *config, remote_node_t *remote_nodes) {
     }
 
     /* Allocate remote_limiters array and fill it in. Add remotes to map. */
-    comm->remote_limiters =
-                        malloc(config->peer_count * sizeof(remote_limiter_t));
+    comm->remote_limiters = malloc(config->peer_count * sizeof(remote_limiter_t));
 
     if (comm->remote_limiters == NULL) {
         pthread_mutex_destroy(&comm->lock);
@@ -76,13 +139,18 @@ int new_comm(comm_t *comm, ident_config *config, remote_node_t *remote_nodes) {
         comm->remote_limiters[i].addr = remote_nodes[i].addr;
         comm->remote_limiters[i].port = remote_nodes[i].port;
         comm->remote_limiters[i].outgoing.next_seqno = 1;
+        comm->remote_limiters[i].reachability = REACHABLE;
+        comm->remote_limiters[i].awol = 0;
+        comm->remote_limiters[i].count_rounds = 0;
+        comm->remote_limiters[i].count_awol = 0;
+        comm->remote_limiters[i].count_alive = 0;
         map_insert(comm->remote_node_map, (void *) &(remote_nodes[i]),
                    sizeof(remote_node_t), &comm->remote_limiters[i]);
     }
-
-    /* Allocate and initialize retrys. */
-    comm->retrys = malloc(config->branch * sizeof(int));
-    if (comm->retrys == NULL) {
+   
+    /* Allocate and initialize selected. */
+    comm->selected = malloc(config->branch * sizeof(int));
+    if (comm->selected == NULL) {
         pthread_mutex_destroy(&comm->lock);
         free_map(comm->remote_node_map, 0);
         free(comm->remote_limiters);
@@ -90,14 +158,28 @@ int new_comm(comm_t *comm, ident_config *config, remote_node_t *remote_nodes) {
     }
 
     for (i = 0; i < config->branch; ++i) {
-        comm->retrys[i] = -1;
+        comm->selected[i] = -1;
     }
 
-    return 0;
+    if (config->commfabric == COMM_GOSSIP) {
+        result = group_membership_init(comm, config->id, config);
+        if (result) {
+            pthread_mutex_destroy(&comm->lock);
+            free_map(comm->remote_node_map, 0);
+            free(comm->remote_limiters);
+            free(comm->selected);
+        }
+    }
+
+    return result;
 }
 
 void free_comm(comm_t *comm) {
     if (comm) {
+        if (comm->comm_fabric == COMM_GOSSIP) {
+            group_membership_teardown(comm);
+        }
+
         if (comm->remote_limiters) {
             free(comm->remote_limiters);
         }
@@ -112,70 +194,76 @@ void free_comm(comm_t *comm) {
 
         pthread_mutex_destroy(&comm->lock);
 
-        if (comm->retrys) {
-            free(comm->retrys);
+        if (comm->selected) {
+            free(comm->selected);
         }
     }
 }
 
-int read_comm(comm_t *comm, double *aggregate, double decayto) {
+int read_comm(double *aggregate, uint32_t *effective_global, comm_t *comm, uint32_t global_limit) {
     remote_limiter_t *remote;
 
     pthread_mutex_lock(&comm->lock);
     if (comm->comm_fabric == COMM_MESH) {
         *aggregate = 0;
+        *effective_global = global_limit;
         map_reset_iterate(comm->remote_node_map);
         while ((remote = map_next(comm->remote_node_map))) {
-            /* remote->rate corresponds to the rate (GRD) or weight (FPS)
-             * in generated by the peer remote. */
-            *aggregate += remote->rate;
-
-            /* If we continue to read it without having heard an update,
-             * we start to make the peer's value approach decayto, getting
-             * half of the way there each time. */
-            if (remote->awol >= MESH_REMOTE_AWOL_THRESHOLD) {
+            if (remote->reachability != REACHABLE) {
                 printlog(LOG_WARN, "AWOL remote limiter detected.\n");
-                remote->rate += ((decayto - remote->rate) / 2);
+                *effective_global -= (global_limit / (comm->remote_node_count + 1));
             } else {
-                remote->awol++;
+                /* remote->rate corresponds to the rate (GRD) or weight (FPS)
+                 * in generated by the peer remote. */
+                *aggregate += remote->rate;
             }
         }
         *aggregate += comm->local_rate;
     } else if (comm->comm_fabric == COMM_GOSSIP) {
-        int i;
-        int threshold = GOSSIP_REMOTE_AWOL_THRESHOLD;
         double value = 0;
+        int i;
         value = (comm->gossip.value / comm->gossip.weight);
         value *= (comm->remote_node_count + 1);
 
-        /* Keep around the last value so that we don't stupidly pick 0 when
-         * we're negative.  If we pick 0, it looks to the limiter like it
-         * has free reign and it will take 100% of the rate allocation for
-         * itself. This is a lie.  Open question what to do here... FIXME: Use decayto?*/
-        if (value <= 0) {
-            //*aggregate = comm->gossip.last_nonzero;
-            *aggregate = 0;
-            printlog(LOG_DEBUG, "Gossip: Read aggregate of 0 from comm layer.\n");
-        } else {
-            *aggregate = value;
-            comm->gossip.last_nonzero = *aggregate;
-            printlog(LOG_DEBUG, "Gossip: Read aggregate of %.3f from comm layer.\n", value);
-        }
-
-        for (i = 0; i < comm->remote_node_count; ++i) {
-            if (comm->remote_limiters[i].awol == threshold) {
-                /* Re-claim any value/weight sent. */
-                comm->gossip.value += comm->remote_limiters[i].outgoing.saved_value;
-                comm->gossip.weight += comm->remote_limiters[i].outgoing.saved_weight;
+        /* Look up the failure handling policy and check to see if it is
+         * is currently relevant. */
+        if (comm->gossip.failure_behavior == PANIC) {
+            int panic = 0;
+            if (!comm->connected) {
+                panic = 1;
+            }
 
-                comm->remote_limiters[i].outgoing.saved_value = 0.0;
-                comm->remote_limiters[i].outgoing.saved_weight = 0.0;
+            for (i = 0; i < comm->remote_node_count; ++i) {
+                if (comm->remote_limiters[i].reachability != REACHABLE) {
+                    panic = 1;
+                }
+            }
 
-                comm->remote_limiters[i].awol += 1;
-            } else if (comm->remote_limiters[i].awol < threshold) {
-                comm->remote_limiters[i].awol += 1;
+            if (panic) {
+                printlog(LOG_DEBUG, "GOSSIP: Panicking!\n");
+                *aggregate = comm->local_rate;
+                *effective_global = (global_limit / (comm->remote_node_count + 1));
+            } else {
+                *aggregate = (value > 0) ? value : 0;
+                *effective_global = global_limit;
+            }
+        } else if (comm->gossip.failure_behavior == QUORUM) {
+            *effective_global = global_limit;
+            if (comm->connected) {
+                for (i = 0; i < comm->remote_node_count; ++i) {
+                    if (comm->remote_limiters[i].reachability != REACHABLE) {
+                        *effective_global -= (global_limit / (comm->remote_node_count + 1));
+                    }
+                }
+                *aggregate = (value > 0) ? value : 0;
+            } else {
+                /* Not part of the Quorum - do 1/n. */
+                printlog(LOG_DEBUG, "GOSSIP: Not in the quorum...Panicking!\n");
+                *aggregate = comm->local_rate;
+                *effective_global = (global_limit / (comm->remote_node_count + 1));
             }
         }
+        printlog(LOG_DEBUG, "GOSSIP: Read aggregate of %.3f from comm layer.\n", *aggregate);
     } else {
         printlog(LOG_CRITICAL, "read_comm: Invalid comm fabric: %d.\n",
                  comm->comm_fabric);
@@ -192,14 +280,11 @@ int write_local_value(comm_t *comm, const double value) {
     if (comm->comm_fabric == COMM_MESH) {
         comm->last_local_rate = comm->local_rate;
         comm->local_rate = value;
-        comm->rate_change = comm->local_rate - comm->last_local_rate;
     } else if (comm->comm_fabric == COMM_GOSSIP) {
         comm->last_local_rate = comm->local_rate;
         comm->local_rate = value;
-        comm->rate_change = comm->local_rate - comm->last_local_rate;
-        /*printf("new: %f, old: %f, weight: %f, diff: %f\n", comm->gossip.value + (comm->gossip.weight * comm->rate_change), comm->gossip.value, comm->gossip.weight, comm->rate_change);*/
-        /*comm->gossip.value = comm->gossip.value + (comm->gossip.weight * comm->rate_change);*/
-        comm->gossip.value += comm->rate_change;
+        comm->gossip.value += (comm->local_rate - comm->last_local_rate);
+        printlog(LOG_DEBUG, "GOSSIP: value: %.3f, new gossip.value: %.3f\n", value, comm->gossip.value);
     }
     else {
         printlog(LOG_CRITICAL, "write_local_value: Invalid comm fabric: %d.\n",
@@ -225,6 +310,7 @@ int send_update(comm_t *comm, uint32_t id) {
 }
 
 void *limiter_receive_thread(void *unused) {
+    printlog(LOG_DEBUG, "GOSSIP: Starting the limiter_receive thread.\n");
     sigset_t signal_mask;
 
     sigemptyset(&signal_mask);
index 950ea7e..4fb710e 100644 (file)
 
 #define MAX_IDENTS (1024)
 #define MAX_LIMITERS (128)
+#define TRUE (1)
+#define FALSE (0)
 
-#define MESH_REMOTE_AWOL_THRESHOLD (5)
-
-//FIXME: Make this more scientific?
-#define GOSSIP_REMOTE_AWOL_THRESHOLD (10 * comm->remote_node_count / comm->gossip.gossip_branch)
-
-enum transports { UDP, TCP };
+enum transports { UDP = 0, TCP = 1 };
+enum view_confidences { IN = 0, NOTIN = 1, UNSURE = 2 };
+enum reachabilities { REACHABLE = 0, SUSPECT = 1, UNREACHABLE = 2 };
 
 typedef struct gossipval {
+    /* Fields that don't change. */
     int gossip_branch;
-    double last_nonzero;
+    enum memberships membership;
+    enum failure_behaviors failure_behavior;
+
+    /* Fields that change only on restart. */
+    int32_t view;
+
+    /* Fields that change frequently. */
     double value;
     double weight;
 } gossip_t;
@@ -63,6 +69,7 @@ typedef struct remote_node {
     in_port_t port;
 } remote_node_t;
 
+//TODO: Clean this up
 typedef struct remote_limiter {
     /** The last known value at the remote limiter. */
     double rate;
@@ -71,19 +78,62 @@ typedef struct remote_limiter {
     out_neighbor_t outgoing;
 
     /* Socket to contact this remote limiter, if using TCP. */
-    int socket;
+    //int socket;
 
+    /** IP address of the remote limiter, in network byte order. */
     in_addr_t addr;
     in_port_t port;
 
-    /** Flag to keep track of situations in which we read from this node's
-     * value more than once before receiving an update from it.  We use this
-     * value to know when it's safe to begin decaying the remote node's value
-     * (because we assume that it has failed). */
+    /** Keeps track of the number of messages we have sent to this peer without
+     * having heard from them. */
     int awol;
 
+    /** Whether or not we think this peer is reachable. */
+    enum reachabilities reachability;
+
+    /**Count of the rounds since doubt has risen and count of friends which 
+     * suspect this node to be awol or alive*/
+    int count_rounds;
+    int count_awol;
+    int count_alive;
+
+    uint32_t incarnation;
+
+    int32_t view;
+    enum view_confidences view_confidence;
 } remote_limiter_t;
 
+//TODO: Reduce the size of this?
+typedef struct message {
+    uint32_t magic;
+    uint32_t ident_id;
+    double value;
+    double weight;
+    uint32_t seqno;
+    uint32_t min_seqno;
+    uint16_t type;
+    
+    /** tell ping target the address of node which requested ping */
+    in_addr_t ping_source;
+    in_port_t ping_port;
+    /** friend needs to be told the address of node suspected to be down */
+    in_addr_t check_target;
+    in_port_t check_port;
+    /** friend responds with ALIVE / AWOL */
+    uint32_t checkack_value;
+    /*Whether the message has an update piggy backed onto it*/
+    uint32_t update_present; // TRUE or FALSE
+    /*Node is reachable or not*/
+    uint32_t reachability;
+    /*Incarnation number of the node whose update 
+     * is being sent piggy backed on the message*/
+    uint32_t incarnation;
+    /*Address of the node whose update is being sent*/
+    remote_node_t node;
+
+    uint32_t view;
+} message_t;
+
 typedef struct comm {
     /** Communication policy. (COMM_MESH, COMM_GOSSIP) */
     enum commfabrics comm_fabric;
@@ -97,8 +147,6 @@ typedef struct comm {
     /** Previous local value. */
     double last_local_rate;
 
-    double rate_change;
-
     /** The number of remote nodes in the identity */
     uint32_t remote_node_count;
 
@@ -122,65 +170,41 @@ typedef struct comm {
     /** Function pointer to send function. */
     int (*send_function)(struct comm *comm, uint32_t id, int sock);
 
-#if 0
-    /** Thread for handling incoming TCP data. */
-    pthread_t tcp_recv_thread;
-
-    /** Descriptor set for reading TCP messages */
-    fd_set fds;
-#endif
+    /** Function pointer to recv function for group membership. When a message
+     * is received, it is proccessed normally and then handed to this function
+     * in case additional processing is necessary for group membership. */
+    int (*recv_function)(struct comm *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg);
 
-    /** Array of integers specifiying which nodes, if any, have outstanding
-     * unacked data.  When nodes fall in this category, and it's time to send,
-     * these nodes will be chosen first.  This only affects gossip.  A
-     * negative number means there is no retransmit necessary.  Otherwise, the
-     * value is the index into the remote_limiters array of the necessary
-     * retransmit. */
-    int *retrys;
+    /** Function to restart the communication protocol. */
+    void (*restart_function)(struct comm *comm, int32_t view_number);
 
-} comm_t;
+    /** Flag indicating whether or not we are "connected" to the group
+     * membership service.  This can only be false for membership schemes that
+     * require a persistent connection (Zookeeper). */
+    int connected;
 
-typedef struct message {
-    uint32_t magic;
-    uint32_t ident_id;
-    uint32_t seqno;
-    uint32_t min_seqno;
-    double value;
-    double weight;
-    uint16_t type;
-} message_t;
+    /** Array of integers specifiying which nodes have been selected for
+     * message transmissions during the current round. */
+    int *selected;
 
-typedef struct hello_message {
-    uint32_t magic;
-    uint32_t ident_id;
-    uint16_t port;
-} hello_t;
+    /** Array of indicies into remote_limiters.  Used to keep a shuffled
+     * ordering for future gossip targets. */ 
+    int *indices;
+    
+    /** The next index to use for target peer selection.  The indicies are
+     * re-shuffled when this reaches remote_node_count. */
+    uint32_t shuffle_index;
 
-#if 0
-struct recv_thread_args {
-    comm_ident_t *ident;
-    pthread_rwlock_t *lock;
-    uint16_t port;
-};
-#endif
+    void *membership_state;
 
 #if 0
-/**
- * Initializes the global limiter.
- *
- * @param ipaddr The IP address on which the limiter should listen.
- * INADDR_ANY will suffice.  Should be specified in network byte order.
- *
- * @param port The port on which the limiter should listen. Should be specified
- * in network byte order.
- */
-void init_limiter(const in_addr_t ipaddr, const in_port_t port);
+    /** Thread for handling incoming TCP data. */
+    pthread_t tcp_recv_thread;
 
-/**
- * Deallocates the entire global limiter.
- */
-void destroy_limiter();
+    /** Descriptor set for reading TCP messages */
+    fd_set fds;
 #endif
+} comm_t;
 
 /**
  * Fills in the communication structure of an identity.
@@ -206,18 +230,9 @@ void free_comm(comm_t *comm);
  * Calculates and reads the current aggregate value for an identity.
  * This value includes the locally observed value.
  *
- * @param comm The comm structure of the identity in question.
- *
- * @param aggregate The location at which the aggregate value will
- * be stored.
- *
- * @param decayto When using a mesh comm fabric, limiters whose value
- * has not been heard in several timesteps will decay to this value.
- * Generally globallimit/N.
- *
  * @returns 0 on success, EINVAL on error.
  */
-int read_comm(comm_t *comm, double *aggregate, double decayto);
+int read_comm(double *aggregate, uint32_t *effective_global, comm_t *comm, uint32_t global_limit);
 
 /**
  * Updates the locally observed value of an identity.
@@ -254,4 +269,34 @@ int send_update(comm_t *comm, uint32_t id);
  */
 void *limiter_receive_thread(void *unused);
 
+#if 0
+typedef struct hello_message {
+    uint32_t magic;
+    uint32_t ident_id;
+    uint16_t port;
+} hello_t;
+
+struct recv_thread_args {
+    comm_ident_t *ident;
+    pthread_rwlock_t *lock;
+    uint16_t port;
+};
+
+/**
+ * Initializes the global limiter.
+ *
+ * @param ipaddr The IP address on which the limiter should listen.
+ * INADDR_ANY will suffice.  Should be specified in network byte order.
+ *
+ * @param port The port on which the limiter should listen. Should be specified
+ * in network byte order.
+ */
+void init_limiter(const in_addr_t ipaddr, const in_port_t port);
+
+/**
+ * Deallocates the entire global limiter.
+ */
+void destroy_limiter();
+#endif
+
 #endif  /* _DRL_STATE_ */
index 3cbf95a..24adbbf 100644 (file)
@@ -63,31 +63,14 @@ static double allocate_fps_under_limit(identity_t *ident, uint32_t target, doubl
     double ideal_weight;
     double total_weight = peer_weights + ident->last_localweight;
 
-    if (target >= ident->limit) {
+    if (target >= ident->effective_limit) {
         ideal_weight = total_weight;
     } else if (target <= 0) {
         ideal_weight = 0; // no flows here
     } else {
-        ideal_weight = ((double)target / (double)ident->limit) * total_weight;
+        ideal_weight = ((double)target / (double)ident->effective_limit) * total_weight;
     }
 
-#if 0
-    else if (peer_weights <= 0) {
-#if 0
-        // doesn't matter what we pick as our weight, so pick 1 / N.
-        ideal_weight = MAX_FLOW_SCALING_FACTOR / (remote_count(ident->i_handle) + 1);
-#endif
-        ideal_weight = ((double)target / (double)ident->limit) * total_weight;
-    } else {
-#if 0
-        double divisor = (double) ident->limit - (double) target;
-        ideal_weight = ((double) target * peer_weights) / divisor;
-#else
-        ideal_weight = ((double)target / (double)ident->limit) * total_weight;
-#endif
-    }
-#endif
-
     return ideal_weight;
 }
 
@@ -180,7 +163,7 @@ static uint32_t allocate_fps(identity_t *ident, double total_weight,
 
     if (ident->dampen_state == DAMPEN_TEST) {
         int64_t rate_delta = (int64_t) table->inst_rate - (int64_t) table->last_inst_rate;
-        double threshold = (double) ident->limit * (double) LARGE_INCREASE_PERCENTAGE / 10;
+        double threshold = (double) ident->effective_limit * (double) LARGE_INCREASE_PERCENTAGE / 10;
 
         if (rate_delta > threshold) {
             ident->dampen_state = DAMPEN_PASSED;
@@ -282,7 +265,7 @@ static uint32_t allocate_fps(identity_t *ident, double total_weight,
     /* Convert weight value into a rate limit.  If there is no measureable
      * weight, do a L/n allocation. */
     if (total_weight > 0) {
-        resulting_limit = (uint32_t) (ident->localweight * ident->limit / total_weight);
+        resulting_limit = (uint32_t) (ident->localweight * ident->effective_limit / total_weight);
     } else {
         resulting_limit = (uint32_t) (ident->limit / (ident->comm.remote_node_count + 1));
     }
@@ -313,7 +296,7 @@ static void allocate_fps_pretend(identity_t *ident, double total_weight,
 
     if (ident->dampen_state_copy == DAMPEN_TEST) {
         int64_t rate_delta = (int64_t) table->inst_rate - (int64_t) table->last_inst_rate;
-        double threshold = (double) ident->limit * (double) LARGE_INCREASE_PERCENTAGE / 10;
+        double threshold = (double) ident->effective_limit * (double) LARGE_INCREASE_PERCENTAGE / 10;
 
         if (rate_delta > threshold) {
             ident->dampen_state_copy = DAMPEN_PASSED;
@@ -393,7 +376,7 @@ static void allocate_fps_pretend(identity_t *ident, double total_weight,
     /* Convert weight value into a rate limit.  If there is no measureable
      * weight, do a L/n allocation. */
     if (total_weight > 0) {
-        resulting_limit = (uint32_t) (ident->localweight_copy * ident->limit / total_weight);
+        resulting_limit = (uint32_t) (ident->localweight_copy * ident->effective_limit / total_weight);
     } else {
         resulting_limit = (uint32_t) (ident->limit / (ident->comm.remote_node_count + 1));
     }
@@ -405,167 +388,11 @@ static void allocate_fps_pretend(identity_t *ident, double total_weight,
 #endif
 
 /**
- * Determines the amount of FPS weight to allocate to the identity during each
- * estimate interval.  Note that total_weight includes local weight.
- */
-static uint32_t allocate_fps_old(identity_t *ident, double total_weight) {
-    common_accounting_t *ftable = &ident->common; /* Common flow table info */
-    uint32_t local_rate = ftable->rate;
-    uint32_t ideallocal = 0;
-    double peer_weights; /* sum of weights of all other limiters */
-    double idealweight = 0;
-    double last_portion = 0;
-    double this_portion = 0;
-
-    static int dampen = 0;
-    int dampen_increase = 0;
-
-    double ideal_under = 0;
-    double ideal_over = 0;
-
-    int regime = 0;
-
-    /* two cases:
-       1. the aggregate is < limit
-       2. the aggregate is >= limit
-       */
-    peer_weights = total_weight - ident->last_localweight;
-    if (peer_weights < 0) {
-        peer_weights = 0;
-    }
-
-    if (dampen == 1) {
-        int64_t rate_delta =
-            (int64_t) ftable->inst_rate - (int64_t) ftable->last_inst_rate;
-        double threshold =
-            (double) ident->limit * (double) LARGE_INCREASE_PERCENTAGE / 10;
-
-        if (rate_delta > threshold) {
-            dampen_increase = 1;
-            printlog(LOG_DEBUG, "DAMPEN: delta(%.3f) thresh(%.3f)\n",
-                     rate_delta, threshold);
-        }
-    }
-
-    if (local_rate <= 0) {
-        idealweight = 0;
-    } else if (dampen_increase == 0 &&
-               (ident->locallimit <= 0 || local_rate < close_enough(ident->locallimit) || ident->flowstart)) {
-        /* We're under the limit - all flows are bottlenecked. */
-        idealweight = allocate_fps_under_limit(ident, local_rate, peer_weights);
-        ideal_over = allocate_fps_over_limit(ident);
-        ideal_under = idealweight;
-
-        if (ideal_over < idealweight) {
-            idealweight = ideal_over;
-            regime = 3;
-            dampen = 2;
-        } else {
-            regime = 1;
-            dampen = 0;
-        }
-
-        /* Apply EWMA */
-        ident->localweight = (ident->localweight * ident->ewma_weight +
-                              idealweight * (1 - ident->ewma_weight));
-        
-    } else {
-        idealweight = allocate_fps_over_limit(ident);
-        
-        /* Apply EWMA */
-        ident->localweight = (ident->localweight * ident->ewma_weight +
-                              idealweight * (1 - ident->ewma_weight));
-
-        /* This is the portion of the total weight in the system that was caused
-         * by this limiter in the last interval. */
-        last_portion = ident->last_localweight / total_weight;
-
-        /* This is the fraction of the total weight in the system that our
-         * proposed value for idealweight would use. */
-        this_portion = ident->localweight / (peer_weights + ident->localweight);
-
-        /* Dampen the large increase the first time... */
-        if (dampen == 0 && (this_portion - last_portion > LARGE_INCREASE_PERCENTAGE)) {
-            ident->localweight = ident->last_localweight + (LARGE_INCREASE_PERCENTAGE * total_weight);
-            dampen = 1;
-        } else {
-            dampen = 2;
-        }
-
-        ideal_under = allocate_fps_under_limit(ident, local_rate, peer_weights);
-        ideal_over = idealweight;
-
-        regime = 2;
-    }
-
-    /* Convert weight into a rate - add in our new local weight */
-    ident->total_weight = total_weight = ident->localweight + peer_weights;
-
-    /* compute local allocation:
-       if there is traffic elsewhere, use the weights
-       otherwise do a L/n allocation */
-    if (total_weight > 0) {
-    //if (peer_weights > 0) {
-        ideallocal = (uint32_t) (ident->localweight * ident->limit / total_weight);
-    } else {
-        ideallocal = ident->limit / (ident->comm.remote_node_count + 1);
-    }
-
-    printlog(LOG_DEBUG, "%.3f ActualWeight\n", ident->localweight);
-
-    printlog(LOG_DEBUG, "%.3f %.3f %.3f %.3f  Under / Over / Actual / Rate\n",
-            ideal_under / (ideal_under + peer_weights),
-            ideal_over / (ideal_over + peer_weights),
-            ident->localweight / (ident->localweight + peer_weights),
-            (double) local_rate / (double) ident->limit);
-
-    printlog(LOG_DEBUG, "%.3f %.3f IdealUnd IdealOve\n",ideal_under,ideal_over);
-
-    if (system_loglevel == LOG_DEBUG) {
-        printf("local_rate: %d, idealweight: %.3f, localweight: %.3f, total_weight: %.3f\n",
-            local_rate, idealweight, ident->localweight, total_weight);
-    }
-
-#if 0
-    if (printcounter <= 0) {
-        struct timeval tv;
-        double time_now;
-
-        gettimeofday(&tv, NULL);
-        time_now = (double) tv.tv_sec + (double) ((double) tv.tv_usec / (double) 1000000);
-
-        printlog(LOG_WARN, "%.2f %d %.2f %.2f %.2f %d %d %d %d %d %d %d %d ", time_now, ftable->inst_rate,
-            idealweight, ident->localweight, total_weight, ftable->num_flows, ftable->num_flows_5k,
-            ftable->num_flows_10k, ftable->num_flows_20k, ftable->num_flows_50k, ftable->avg_rate,
-            ftable->max_flow_rate, ftable->max_flow_rate_flow_hash);
-
-        printcounter = PRINT_COUNTER_RESET;
-    } else {
-        printcounter -= 1;
-    }
-
-    //printf("Dampen: %d, dampen_increase: %d, peer_weights: %.3f, regime: %d\n",
-    //       dampen, dampen_increase, peer_weights, regime);
-
-    if (regime == 3) {
-        printlog(LOG_DEBUG, "MIN: min said to use flow counting, which was %.3f when other method said %.3f.\n",
-                 ideal_over, ideal_under);
-    }
-    See print_statistics()
-#endif
-
-    printlog(LOG_DEBUG, "ideallocal is %d\n", ideallocal);
-
-    return(ideallocal);
-}
-
-/**
  * Determines the local drop probability for a GRD identity every estimate
  * interval.
  */
 static double allocate_grd(identity_t *ident, double aggdemand) {
     double dropprob;
-    double global_limit = ident->limit;
     double min_dropprob = ident->drop_prob * GRD_BIG_DROP;
 
     struct timeval tv;
@@ -575,8 +402,8 @@ static double allocate_grd(identity_t *ident, double aggdemand) {
     gettimeofday(&tv, NULL);
     time_now = (double) tv.tv_sec + (double) ((double) tv.tv_usec / (double) 1000000);
 
-    if (aggdemand > global_limit) {
-        dropprob = (aggdemand-global_limit)/aggdemand;
+    if (aggdemand > ident->effective_limit) {
+        dropprob = (aggdemand - ident->effective_limit) / aggdemand;
     } else {
         dropprob = 0.0;
     }
@@ -615,18 +442,12 @@ static double allocate_grd(identity_t *ident, double aggdemand) {
  */
 static void allocate(limiter_t *limiter, identity_t *ident) {
     /* Represents aggregate rate for GRD and aggregate weight for FPS. */
-    double comm_val = 0;
-
-    /* Read comm_val from comm layer. */
-    if (limiter->policy == POLICY_FPS) {
-        read_comm(&ident->comm, &comm_val,
-                ident->total_weight / (double) (ident->comm.remote_node_count + 1));
-    } else {
-        read_comm(&ident->comm, &comm_val,
-                (double) (ident->limit / (double) (ident->comm.remote_node_count + 1)));
-    }
-    printlog(LOG_DEBUG, "%.3f Aggregate weight/rate (FPS/GRD)\n", comm_val);
+    double aggregate = 0;
 
+    /* Read aggregate from comm layer. */
+    read_comm(&aggregate, &ident->effective_limit, &ident->comm, ident->limit);
+    printlog(LOG_DEBUG, "%.3f Aggregate weight/rate (FPS/GRD)\n", aggregate);
+    
     /* Experimental printing. */
     printlog(LOG_DEBUG, "%.3f \t Kbps used rate. ID:%d\n",
              (double) ident->common.rate / (double) 128, ident->id);
@@ -635,19 +456,19 @@ static void allocate(limiter_t *limiter, identity_t *ident) {
     if (limiter->policy == POLICY_FPS) {
 #ifdef SHADOW_ACCTING
 
-        allocate_fps_pretend(ident, comm_val, &ident->shadow_common, "SHADOW-ID");
+        allocate_fps_pretend(ident, aggregate, &ident->shadow_common, "SHADOW-ID");
 
         ident->last_localweight_copy = ident->localweight_copy;
 #endif
 
-        ident->locallimit = allocate_fps(ident, comm_val, &ident->common, "ID");
+        ident->locallimit = allocate_fps(ident, aggregate, &ident->common, "ID");
         ident->last_localweight = ident->localweight;
 
         /* Update other limiters with our weight by writing to comm layer. */
         write_local_value(&ident->comm, ident->localweight);
     } else {
         ident->last_drop_prob = ident->drop_prob;
-        ident->drop_prob = allocate_grd(ident, comm_val);
+        ident->drop_prob = allocate_grd(ident, aggregate);
         
         /* Update other limiters with our rate by writing to comm layer. */
         write_local_value(&ident->comm, ident->common.rate);
index eba9637..fc2dc86 100644 (file)
 #include "peer_comm.h"
 #include "logging.h"
 
-/* Artifically makes a network partition. */
-int do_partition = 0;
-int partition_set = 0xfffffff;
+#define NULL_PEER (-2)
+#define MESH_REMOTE_AWOL_THRESHOLD (5)
+#define GOSSIP_REMOTE_AWOL_THRESHOLD (5)
 
-extern limiter_t limiter;
+/* From ulogd_DRL.c */
+extern int do_partition;
+extern int partition_set;
 
-static const uint32_t MAGIC_MSG = 0x123123;
-static const uint32_t MAGIC_HELLO = 0x456456;
-static const uint16_t MSG = 1;
-static const uint16_t ACK = 2;
+extern limiter_t limiter;
 
-static void message_to_hbo(message_t *msg) {
+void message_to_hbo(message_t *msg) {
     msg->magic = ntohl(msg->magic);
     msg->ident_id = ntohl(msg->ident_id);
+    /* value is a double */
+    /* weight is a double */
     msg->seqno = ntohl(msg->seqno);
     msg->min_seqno = ntohl(msg->min_seqno);
     msg->type = ntohs(msg->type);
-    /* value is a double */
-    /* weight is a double */
+    /* ping_source, ping_port, check_target, and check_port stay in nbo. */
+    msg->checkack_value = ntohl(msg->checkack_value);
+    msg->update_present = ntohl(msg->update_present);
+    msg->reachability = ntohl(msg->reachability);
+    msg->incarnation = ntohl(msg->incarnation);
+    /* node has two fields, both stay in nbo. */
+    msg->view = ntohl(msg->view);
 }
 
-static void message_to_nbo(message_t *msg) {
+void message_to_nbo(message_t *msg) {
     msg->magic = htonl(msg->magic);
     msg->ident_id = htonl(msg->ident_id);
+    /* value is a double */
+    /* weight is a double */
     msg->seqno = htonl(msg->seqno);
     msg->min_seqno = htonl(msg->min_seqno);
     msg->type = htons(msg->type);
-    /* value is a double */
-    /* weight is a double */
-}
-
-static void hello_to_hbo(hello_t *hello) {
-    hello->magic = ntohl(hello->magic);
-    hello->ident_id = ntohl(hello->ident_id);
-    hello->port = ntohs(hello->port);
-}
-
-static void hello_to_nbo(hello_t *hello) {
-    hello->magic = htonl(hello->magic);
-    hello->ident_id = htonl(hello->ident_id);
-    hello->port = htons(hello->port);
-}
-
-static int is_connected(remote_limiter_t *remote) {
-    struct sockaddr_in addr;
-    socklen_t addrlen = sizeof(addr);
-
-    if (getpeername(remote->socket, (struct sockaddr *) &addr, &addrlen) == 0)
-        return 1;
-    else
-        return 0;
+    /* ping_source, ping_port, check_target, and check_port already in nbo. */
+    msg->checkack_value = htonl(msg->checkack_value);
+    msg->update_present = htonl(msg->update_present);
+    msg->reachability = htonl(msg->reachability);
+    msg->incarnation = htonl(msg->incarnation);
+    /* node has two fields, both already in nbo. */
+    msg->view = htonl(msg->view);
 }
 
-static int send_ack(identity_t *ident, remote_limiter_t *remote, uint32_t seqno) {
+int send_ack(uint32_t id, remote_limiter_t *remote, uint32_t seqno, uint16_t type, int32_t view) {
     int result = 0;
     message_t msg;
     struct sockaddr_in toaddr;
@@ -108,9 +99,10 @@ static int send_ack(identity_t *ident, remote_limiter_t *remote, uint32_t seqno)
 
     memset(&msg, 0, sizeof(msg));
     msg.magic = MAGIC_MSG;
-    msg.ident_id = ident->id;
-    msg.type = ACK;
+    msg.ident_id = id;
+    msg.type = type;
     msg.seqno = seqno;
+    msg.view = view;
 
     message_to_nbo(&msg);
 
@@ -171,230 +163,26 @@ void limiter_receive() {
         return;
     }
 
-    switch (ident->comm.comm_fabric) {
-        case COMM_MESH: {
-            /* Use the message's value to be our new GRDrate/FPSweight for the
-             * message's sender. */
-            remote->rate = msg.value;
-
-            /* Reset the AWOL counter to zero since we received an update. */
-            remote->awol = 0;
-        }
-        break;
-
-        case COMM_GOSSIP: {
-            if (msg.type == ACK) {
-                if (msg.seqno == remote->outgoing.next_seqno - 1) {
-                    int i;
-
-                    /* Ack for most recent message.  Clear saved state. */
-                    remote->outgoing.first_seqno = remote->outgoing.next_seqno;
-                    remote->outgoing.saved_value = 0;
-                    remote->outgoing.saved_weight = 0;
-
-                    for (i = 0; i < ident->comm.gossip.gossip_branch; ++i) {
-                        if (ident->comm.retrys[i] >= 0 &&
-                            remote == &ident->comm.remote_limiters[ident->comm.retrys[i]]) {
-                                ident->comm.retrys[i] = -2;
-                        }
-                    }
-                }
-                /* Ignore ack if it isn't for most recent message. */
-            } else {
-                if (msg.min_seqno > remote->incoming.seen_seqno) {
-                    /* Entirely new information */
-                    remote->incoming.seen_seqno = msg.seqno;
-                    remote->incoming.saved_value = msg.value;
-                    remote->incoming.saved_weight = msg.weight;
-                    ident->comm.gossip.value += msg.value;
-                    ident->comm.gossip.weight += msg.weight;
-                    send_ack(ident, remote, msg.seqno);
-                    remote->awol = 0;
-                } else if (msg.seqno > remote->incoming.seen_seqno) {
-                    /* Only some of the message is old news. */
-                    double diff_value = msg.value - remote->incoming.saved_value;
-                    double diff_weight = msg.weight - remote->incoming.saved_weight;
-
-                    remote->incoming.seen_seqno = msg.seqno;
-                    remote->incoming.saved_value = msg.value;
-                    remote->incoming.saved_weight = msg.weight;
-
-                    ident->comm.gossip.value += diff_value;
-                    ident->comm.gossip.weight += diff_weight;
-                    send_ack(ident, remote, msg.seqno);
-                    remote->awol = 0;
-                } else {
-                    /* The entire message is old news. (Duplicate). */
-                    /* Do nothing. */
-                }
-            }
-        }
-        break;
-
-        default: {
-            printlog(LOG_CRITICAL, "ERR: Unknown identity comm fabric.\n");
-        }
-    }
-
+    /* Pass the message to the comm's recv function, which is responsible for
+     * processing its contents. */
+    ident->comm.recv_function(&ident->comm, ident->id, limiter.udp_socket, remote, &msg);
+    
     pthread_mutex_unlock(&ident->comm.lock);
     pthread_rwlock_unlock(&limiter.limiter_lock);
 }
 
-#if 0
-static void limiter_accept(comm_limiter_t *limiter) {
-    int sock, result;
-    struct sockaddr_in fromaddr;
-    socklen_t fromlen = sizeof(fromaddr);
-    remote_node_t sender;
-    remote_limiter_t *remote;
-    hello_t hello;
-    comm_ident_t *ident;
-    ident_handle *handle = NULL;
-
-    sock = accept(limiter->tcp_socket, (struct sockaddr *)&fromaddr, &fromlen);
-
-    assert(sock > 0);
-
-    memset(&hello, 0, sizeof(hello_t));
-    result = recv(sock, &hello, sizeof(hello_t), 0);
+int recv_mesh(comm_t *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg) {
+    /* Use the message's value to be our new GRDrate/FPSweight for the
+     * message's sender. */
+    remote->rate = msg->value;
 
-    if (result < 0) {
-        close(sock);
-        return; /* Failure - ignore it. */
-    }
-
-    assert(result == sizeof(hello_t));
+    /* Reset the AWOL counter to zero since we received an update. */
+    remote->awol = 0;
+    remote->reachability = REACHABLE;
 
-    hello_to_hbo(&hello);
-
-    assert(hello.magic == MAGIC_HELLO);
-
-    memset(&sender, 0, sizeof(remote_node_t));
-    sender.addr = fromaddr.sin_addr.s_addr;
-    sender.port = ntohs(hello.port);
-
-    pthread_testcancel();
-
-    pthread_rwlock_rdlock(&limiter->rwlock);
-
-    handle = map_search(limiter->ident_id_to_handle, (void *) &hello.ident_id, sizeof(hello.ident_id));
-
-    if (handle == NULL) {
-        printlog(LOG_WARN, "WARN:recvd hello for unknown identity.\n");
-        pthread_rwlock_unlock(&limiter->rwlock);
-        return;
-    }
-
-    ident = limiter->identities[*handle];
-    assert(ident != NULL);
-
-    pthread_mutex_lock(&ident->lock);
-
-    remote = map_search(ident->remote_node_map, &sender, sizeof(remote_node_t));
-
-    if (remote == NULL) {
-        printlog(LOG_WARN, "WARN: Accepted connection from unknown identity.\n");
-        pthread_mutex_unlock(&ident->lock);
-        pthread_rwlock_unlock(&limiter->rwlock);
-        close(sock);
-        return;
-    }
-
-    if (is_connected(remote)) {
-        /* We are still connected, don't need the new socket. */
-        close(sock);
-        pthread_mutex_unlock(&ident->lock);
-        pthread_rwlock_unlock(&limiter->rwlock);
-        return;
-    }
-
-    /* We weren't connected, but we are now... */
-    remote->socket = sock;
-    printf("Got connection on: %d\n", sock);
-    FD_SET(sock, &ident->fds);
-
-    pthread_mutex_unlock(&ident->lock);
-    pthread_rwlock_unlock(&limiter->rwlock);
-}
-
-static void read_tcp_message(comm_ident_t *ident, pthread_rwlock_t *limiter_rwlock, int sock) {
-    int result;
-    message_t msg;
-
-    memset(&msg, 0, sizeof(message_t));
-
-    result = recv(sock, &msg, sizeof(message_t), 0);
-
-    if (result < 0) {
-        pthread_rwlock_rdlock(limiter_rwlock);
-        pthread_mutex_lock(&ident->lock);
-        FD_CLR(sock, &ident->fds);
-        close(sock);
-        pthread_mutex_unlock(&ident->lock);
-        pthread_rwlock_unlock(limiter_rwlock);
-        return;
-    }
-
-    assert(result == sizeof(message_t));
-
-    message_to_hbo(&msg);
-    assert(msg.magic == MAGIC_MSG);
-
-    pthread_rwlock_rdlock(limiter_rwlock);
-    pthread_mutex_lock(&ident->lock);
-
-    switch (ident->comm_fabric) {
-        case COMM_GOSSIP: {
-            ident->gossip.value += msg.value;
-            ident->gossip.weight += msg.weight;
-        }
-        break;
-
-        default: {
-            assert(1 == 0); /* This case shouldn't happen. Punt for now... */
-        }
-    }
-    pthread_mutex_unlock(&ident->lock);
-    pthread_rwlock_unlock(limiter_rwlock);
+    return 0;
 }
 
-static void ident_receive(comm_ident_t *ident, pthread_rwlock_t *limiter_rwlock) {
-    int select_result, i;
-    fd_set fds_copy;
-    struct timeval timeout;
-
-    FD_ZERO(&fds_copy);
-    timeout.tv_sec = 15;
-    timeout.tv_usec = 0;
-
-    pthread_rwlock_rdlock(limiter_rwlock);
-    pthread_mutex_lock(&ident->lock);
-    memcpy(&fds_copy, &ident->fds, sizeof(fd_set));
-    pthread_mutex_unlock(&ident->lock);
-    pthread_rwlock_unlock(limiter_rwlock);
-    
-    /* mask interrupt signals for this thread? */
-
-    select_result = select(FD_SETSIZE, &fds_copy, NULL, NULL, &timeout);
-
-    assert(select_result >= 0);
-    
-    if (select_result == 0)
-        return; /* Timed out */
-
-    for (i = 0; (i < FD_SETSIZE) && select_result; ++i) {
-        if (FD_ISSET(i, &fds_copy)) {
-            read_tcp_message(ident, limiter_rwlock, i);
-            select_result--;
-        }
-    }
-}
-#endif
-
-/* Turn this on to simulate network partitions.
- * Turn off for production settings. */
-//#define ALLOW_PARTITION
-
 int send_udp_mesh(comm_t *comm, uint32_t id, int sock) {
     int result = 0;
     remote_limiter_t *remote;
@@ -417,6 +205,7 @@ int send_udp_mesh(comm_t *comm, uint32_t id, int sock) {
     msg.magic = MAGIC_MSG;
     msg.ident_id = id;
     msg.value = comm->local_rate;
+    msg.view = comm->gossip.view;
     /* Do we want seqnos for mesh?  We can get by without them. */
 
     message_to_nbo(&msg);
@@ -425,6 +214,14 @@ int send_udp_mesh(comm_t *comm, uint32_t id, int sock) {
     for (i = 0; i < comm->remote_node_count; ++i) {
         remote = &comm->remote_limiters[i];
 
+        /* Increase this counter.  For mesh, it represents the number of messages we have sent to
+         * this remote limiter without having heard from it.  This is reset to 0 when we receive
+         * an update from this peer. */
+        remote->awol += 1;
+        if (remote->awol > MESH_REMOTE_AWOL_THRESHOLD) {
+            remote->reachability = UNREACHABLE;
+        }
+
 #ifdef ALLOW_PARTITION
 
         if (do_partition) {
@@ -459,10 +256,82 @@ int send_udp_mesh(comm_t *comm, uint32_t id, int sock) {
     return result;
 }
 
+int recv_gossip(comm_t *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg) {
+    if (msg->type == ACK) {
+        /* If ACK was received then reset the awol count */
+        if (msg->seqno == remote->outgoing.next_seqno - 1) {
+            /* Ack for most recent message.  Clear saved state. */
+            remote->outgoing.first_seqno = remote->outgoing.next_seqno;
+            remote->outgoing.saved_value = 0;
+            remote->outgoing.saved_weight = 0;
+
+            remote->awol = 0;
+        }
+        /* Ignore ack if it isn't for most recent message. */
+    } else if (msg->type == MSG) {
+        if (msg->min_seqno > remote->incoming.seen_seqno) {
+            /* Entirely new information */
+            remote->incoming.seen_seqno = msg->seqno;
+            remote->incoming.saved_value = msg->value;
+            remote->incoming.saved_weight = msg->weight;
+            comm->gossip.value += msg->value;
+            comm->gossip.weight += msg->weight;
+            send_ack(id, remote, msg->seqno, ACK, comm->gossip.view);
+            remote->awol = 0;
+        } 
+        else if (msg->seqno > remote->incoming.seen_seqno) {
+            /* Only some of the message is old news. */
+            double diff_value = msg->value - remote->incoming.saved_value;
+            double diff_weight = msg->weight - remote->incoming.saved_weight;
+
+            remote->incoming.seen_seqno = msg->seqno;
+            remote->incoming.saved_value = msg->value;
+            remote->incoming.saved_weight = msg->weight;
+
+            comm->gossip.value += diff_value;
+            comm->gossip.weight += diff_weight;
+            send_ack(id, remote, msg->seqno, ACK, comm->gossip.view);
+            remote->awol = 0;
+        } 
+        else {
+            /* The entire message is old news. (Duplicate). */
+            /* Do nothing. */
+        }
+    }
+
+    return 0;
+}
+
+int find_gossip_target(comm_t *comm) {
+    int target = NULL_PEER;
+    int k;
+
+    if (comm->shuffle_index < comm->remote_node_count) {
+        target = comm->indices[comm->shuffle_index];
+        printlog(LOG_DEBUG,"GOSSIP: found index %d.\n", target);
+        comm->shuffle_index++;
+    }
+    else {
+        // shuffle the remote_limiters array
+        printlog(LOG_DEBUG, "GOSSIP: shuffling the array.\n");
+        for ( k = 0; k < comm->remote_node_count; k++) {
+            uint32_t l = myrand() % comm->remote_node_count;
+            int t;
+            t = comm->indices[l];
+            comm->indices[l] = comm->indices[k];
+            comm->indices[k] = t;
+        }
+        comm->shuffle_index = 0;
+        target = comm->indices[comm->shuffle_index];
+        printlog(LOG_DEBUG,"GOSSIP: found index after spilling over %d.\n", target);
+        comm->shuffle_index++;
+    }
+    return target;
+}
+
 int send_udp_gossip(comm_t *comm, uint32_t id, int sock) {
-    int i, j, targetid;
-    int awol_threshold = GOSSIP_REMOTE_AWOL_THRESHOLD;
-    int rand_count; //HACK...
+    int i, j;
+    int retry_index = 0;
     int result = 0;
     remote_limiter_t *remote;
     struct sockaddr_in toaddr;
@@ -473,7 +342,7 @@ int send_udp_gossip(comm_t *comm, uint32_t id, int sock) {
      * that was sent to the peers.  In the case of not being able to send to a
      * peer though, we increment this to reclaim the value/weight locally. */
     int message_portion = 1;
-    
+
     memset(&toaddr, 0, sizeof(struct sockaddr_in));
     toaddr.sin_family = AF_INET;
 
@@ -481,51 +350,63 @@ int send_udp_gossip(comm_t *comm, uint32_t id, int sock) {
     msg_weight = comm->gossip.weight / (comm->gossip.gossip_branch + 1);
 
     for (i = 0; i < comm->gossip.gossip_branch; ++i) {
+        int targetid = NULL_PEER;
+        int rand_count = 0;
         message_t msg;
 
         printlog(LOG_DEBUG, "Gossip loop iteration, i=%d, branch=%d\n", i, comm->gossip.gossip_branch);
 
-        if (comm->retrys[i] >= 0) {
-            remote = &comm->remote_limiters[comm->retrys[i]];
-            targetid = comm->retrys[i];
-
-            if (remote->awol > awol_threshold) {
-                message_portion += 1;
-                printlog(LOG_DEBUG, "Gossip: Ignoring AWOL peer id %d.\n", comm->retrys[i]);
-                comm->retrys[i] = -1;
-                continue;
+        /* If there are any peers with unacked messages, select them first. */
+        while (retry_index < comm->remote_node_count) {
+            if (comm->remote_limiters[retry_index].awol > 0 && comm->remote_limiters[retry_index].reachability == REACHABLE) {
+                targetid = retry_index;
+                printlog(LOG_DEBUG, "GOSSIP: Selected peerindex %d because it had unacked messages.\n", targetid);
             }
-        } else {
-            targetid = -2;
-            rand_count = 0;
-
-            while (targetid == -2 && rand_count < 50) {
-                targetid = myrand() % comm->remote_node_count;
-                rand_count += 1;
-
-                /* Don't select an already-used index. */
-                for (j = 0; j < comm->gossip.gossip_branch; ++j) {
-                    if (targetid == comm->retrys[j] || comm->remote_limiters[targetid].awol > awol_threshold) {
-                        printlog(LOG_DEBUG, "Gossip: disqualified targetid %d.  retrys[j] is %d, and remote awol count is %d\n", targetid, comm->retrys[j], comm->remote_limiters[targetid].awol);
-                        targetid = -2;
-                        break;
-                    }
+
+            retry_index += 1;
+        }
+
+        while (targetid == NULL_PEER && rand_count < 10) {
+            /* Select a recipient from a randomly-shuffled array. */
+            targetid = find_gossip_target(comm);
+
+            assert(targetid != NULL_PEER);
+
+            /* Don't select an already-used index. */
+            for (j = 0; j < i; ++j) {
+                if (targetid == comm->selected[j]) {
+                    printlog(LOG_DEBUG, "GOSSIP: disqualified targetid %d.  selected[j=%d] is %d\n", targetid, j, comm->selected[j]); 
+                    targetid = NULL_PEER;
+                    break;
                 }
             }
 
-            if (targetid < 0) {
-                /* Couldn't find a suitable peer to send to... */
-                message_portion += 1;
-                printlog(LOG_DEBUG, "Gossip: exhausted random peer search.\n");
-                continue;
-            } else {
-                printlog(LOG_DEBUG, "Gossip: settled on peer id %d.\n", targetid);
+            /* Don't select an unreachable peer or one that is not in our view. */
+            if (targetid != NULL_PEER) {
+                if (comm->remote_limiters[targetid].reachability != REACHABLE ||
+                        comm->remote_limiters[targetid].view != comm->gossip.view ||
+                        comm->remote_limiters[targetid].view_confidence != IN) {
+                    printlog(LOG_DEBUG, "GOSSIP: disqualified targetid %d, reachability is %d, remote view is %d (confidence:%d), my view is %d\n",
+                            targetid, comm->remote_limiters[targetid].reachability, comm->remote_limiters[targetid].view,
+                            comm->remote_limiters[targetid].view_confidence, comm->gossip.view);
+                    targetid = NULL_PEER;
+                }
             }
 
-            remote = &comm->remote_limiters[targetid];
+            rand_count++;
         }
-        
-        comm->retrys[i] = targetid;
+
+        if (targetid == NULL_PEER) {
+            /* Couldn't find a suitable peer to send to... */
+            message_portion += 1;
+            printlog(LOG_DEBUG, "GOSSIP: exhausted random peer search.\n");
+            continue;
+        } else {
+            printlog(LOG_DEBUG, "GOSSIP: settled on peer id %d.\n", targetid);
+        }
+
+        remote = &comm->remote_limiters[targetid];
+        comm->selected[i] = targetid;
 
         toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */
         toaddr.sin_port = remote->port;
@@ -538,15 +419,19 @@ int send_udp_gossip(comm_t *comm, uint32_t id, int sock) {
         msg.seqno = remote->outgoing.next_seqno;
         msg.min_seqno = remote->outgoing.first_seqno;
         msg.type = MSG;
-
+        msg.view = comm->gossip.view;
+        
         remote->outgoing.next_seqno++;
         remote->outgoing.saved_value += msg_value;
         remote->outgoing.saved_weight += msg_weight;
+        /* Represents the number of messages we have sent to this peer without hearing a message or ACK from it. */
+        remote->awol += 1;
+
 
 #ifdef ALLOW_PARTITION
 
         if (do_partition && ((partition_set & (1 << targetid)) == 0)) {
-            printlog(LOG_DEBUG, "Partition: Gossip: ignoring targetid %d\n", targetid);
+            printlog(LOG_DEBUG, "Partition: GOSSIP: ignoring targetid %d\n", targetid);
             continue;
         }
 
@@ -560,14 +445,37 @@ int send_udp_gossip(comm_t *comm, uint32_t id, int sock) {
             break;
         }
     }
-
     comm->gossip.value = msg_value * message_portion;
     comm->gossip.weight = msg_weight * message_portion;
 
     return result;
 }
 
+
+/* Old TCP code. */
 #if 0
+static void hello_to_hbo(hello_t *hello) {
+    hello->magic = ntohl(hello->magic);
+    hello->ident_id = ntohl(hello->ident_id);
+    hello->port = ntohs(hello->port);
+}
+
+static void hello_to_nbo(hello_t *hello) {
+    hello->magic = htonl(hello->magic);
+    hello->ident_id = htonl(hello->ident_id);
+    hello->port = htons(hello->port);
+}
+
+static int is_connected(remote_limiter_t *remote) {
+    struct sockaddr_in addr;
+    socklen_t addrlen = sizeof(addr);
+
+    if (getpeername(remote->socket, (struct sockaddr *) &addr, &addrlen) == 0)
+        return 1;
+    else
+        return 0;
+}
+
 int send_tcp_gossip(comm_ident_t *ident, FILE *logfile, int unused) {
     int i, targetid, sock;
     int result = 0;
@@ -601,9 +509,7 @@ int send_tcp_gossip(comm_ident_t *ident, FILE *logfile, int unused) {
 
     return result;
 }
-#endif
 
-#if 0
 void *limiter_accept_thread(void *limiter) {
     sigset_t signal_mask;
 
@@ -686,4 +592,153 @@ void *ident_receive_thread(void *recv_args) {
     }
     pthread_exit(NULL);
 }
+
+static void limiter_accept(comm_limiter_t *limiter) {
+    int sock, result;
+    struct sockaddr_in fromaddr;
+    socklen_t fromlen = sizeof(fromaddr);
+    remote_node_t sender;
+    remote_limiter_t *remote;
+    hello_t hello;
+    comm_ident_t *ident;
+    dent_handle *handle = NULL;
+
+    sock = accept(limiter->tcp_socket, (struct sockaddr *)&fromaddr, &fromlen);
+
+    assert(sock > 0);
+
+    memset(&hello, 0, sizeof(hello_t));
+    result = recv(sock, &hello, sizeof(hello_t), 0);
+
+    if (result < 0) {
+        close(sock);
+        return; /* Failure - ignore it. */
+    }
+
+    assert(result == sizeof(hello_t));
+
+    hello_to_hbo(&hello);
+
+    assert(hello.magic == MAGIC_HELLO);
+
+    memset(&sender, 0, sizeof(remote_node_t));
+    sender.addr = fromaddr.sin_addr.s_addr;
+    sender.port = ntohs(hello.port);
+
+    pthread_testcancel();
+
+    pthread_rwlock_rdlock(&limiter->rwlock);
+
+    handle = map_search(limiter->ident_id_to_handle, (void *) &hello.ident_id, sizeof(hello.ident_id));
+
+    if (handle == NULL) {
+        printlog(LOG_WARN, "WARN:recvd hello for unknown identity.\n");
+        pthread_rwlock_unlock(&limiter->rwlock);
+        return;
+    }
+
+    ident = limiter->identities[*handle];
+    assert(ident != NULL);
+
+    pthread_mutex_lock(&ident->lock);
+
+    remote = map_search(ident->remote_node_map, &sender, sizeof(remote_node_t));
+
+    if (remote == NULL) {
+        printlog(LOG_WARN, "WARN: Accepted connection from unknown identity.\n");
+        pthread_mutex_unlock(&ident->lock);
+        pthread_rwlock_unlock(&limiter->rwlock);
+        close(sock);
+        return;
+    }
+
+    if (is_connected(remote)) {
+        /* We are still connected, don't need the new socket. */
+        close(sock);
+        pthread_mutex_unlock(&ident->lock);
+        pthread_rwlock_unlock(&limiter->rwlock);
+        return;
+    }
+
+    /* We weren't connected, but we are now... */
+    remote->socket = sock;
+    printf("Got connection on: %d\n", sock);
+    FD_SET(sock, &ident->fds);
+
+    pthread_mutex_unlock(&ident->lock);
+    pthread_rwlock_unlock(&limiter->rwlock);
+}
+
+static void read_tcp_message(comm_ident_t *ident, pthread_rwlock_t *limiter_rwlock, int sock) {
+    int result;
+    message_t msg;
+
+    memset(&msg, 0, sizeof(message_t));
+
+    result = recv(sock, &msg, sizeof(message_t), 0);
+
+    if (result < 0) {
+        pthread_rwlock_rdlock(limiter_rwlock);
+        pthread_mutex_lock(&ident->lock);
+        FD_CLR(sock, &ident->fds);
+        close(sock);
+        pthread_mutex_unlock(&ident->lock);
+        pthread_rwlock_unlock(limiter_rwlock);
+        return;
+    }
+
+    assert(result == sizeof(message_t));
+
+    message_to_hbo(&msg);
+    assert(msg.magic == MAGIC_MSG);
+
+    pthread_rwlock_rdlock(limiter_rwlock);
+    pthread_mutex_lock(&ident->lock);
+
+    switch (ident->comm_fabric) {
+        case COMM_GOSSIP: {
+            ident->gossip.value += msg.value;
+            ident->gossip.weight += msg.weight;
+        }
+        break;
+
+        default: {
+            assert(1 == 0); /* This case shouldn't happen. Punt for now... */
+        }
+    }
+    pthread_mutex_unlock(&ident->lock);
+    pthread_rwlock_unlock(limiter_rwlock);
+}
+
+static void ident_receive(comm_ident_t *ident, pthread_rwlock_t *limiter_rwlock) {
+    int select_result, i;
+    fd_set fds_copy;
+    struct timeval timeout;
+
+    FD_ZERO(&fds_copy);
+    timeout.tv_sec = 15;
+    timeout.tv_usec = 0;
+
+    pthread_rwlock_rdlock(limiter_rwlock);
+    pthread_mutex_lock(&ident->lock);
+    memcpy(&fds_copy, &ident->fds, sizeof(fd_set));
+    pthread_mutex_unlock(&ident->lock);
+    pthread_rwlock_unlock(limiter_rwlock);
+    
+    /* mask interrupt signals for this thread? */
+
+    select_result = select(FD_SETSIZE, &fds_copy, NULL, NULL, &timeout);
+
+    assert(select_result >= 0);
+    
+    if (select_result == 0)
+        return; /* Timed out */
+
+    for (i = 0; (i < FD_SETSIZE) && select_result; ++i) {
+        if (FD_ISSET(i, &fds_copy)) {
+            read_tcp_message(ident, limiter_rwlock, i);
+            select_result--;
+        }
+    }
+}
 #endif
index c8fd2e1..0e28386 100644 (file)
@@ -3,18 +3,36 @@
 #ifndef _PEER_COMM_H_
 #define _PEER_COMM_H_
 
+#define NULL_PEER (-2)
+#define MESH_REMOTE_AWOL_THRESHOLD (5)
+#define GOSSIP_REMOTE_AWOL_THRESHOLD (5)
+
+static const uint32_t MAGIC_MSG = 0x123123;
+static const uint32_t MAGIC_HELLO = 0x456456;
+static const uint16_t MSG = 1;
+static const uint16_t ACK = 2;
+
 void limiter_receive();
 
-#if 0
-void *limiter_accept_thread(void *limiter);
+void message_to_hbo(message_t *msg);
 
-void *ident_receive_thread(void *limiter);
-#endif
+void message_to_nbo(message_t *msg);
 
 int send_udp_mesh(comm_t *comm, uint32_t id, int sock);
+int recv_mesh(comm_t *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg);
+
 int send_udp_gossip(comm_t *comm, uint32_t id, int sock);
+int recv_gossip(comm_t *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg);
+
+int find_gossip_target(comm_t *comm);
+
+int send_ack(uint32_t id, remote_limiter_t *remote, uint32_t seqno, uint16_t type, int32_t view);
 
 #if 0
+void *limiter_accept_thread(void *limiter);
+
+void *ident_receive_thread(void *limiter);
+
 int send_tcp_gossip(comm_ident_t *ident, FILE *logfile, int unused);
 #endif
 
index 37f0141..5e89e89 100644 (file)
 
 enum policies { POLICY_GRD = 1, POLICY_FPS = 2 };
 enum commfabrics { COMM_MESH = 1, COMM_GOSSIP = 2 };
-enum accountings { ACT_STANDARD = 1, ACT_SAMPLEHOLD = 2, ACT_SIMPLE = 3, ACT_MULTIPLE = 4};
-enum dampenings { DAMPEN_NONE = 0, DAMPEN_TEST = 1, DAMPEN_FAILED = 2, DAMPEN_PASSED = 3, DAMPEN_SKIP = 4};
+enum accountings { ACT_STANDARD = 1, ACT_SAMPLEHOLD = 2, ACT_SIMPLE = 3, ACT_MULTIPLE = 4 };
+enum dampenings { DAMPEN_NONE = 0, DAMPEN_TEST = 1, DAMPEN_FAILED = 2, DAMPEN_PASSED = 3, DAMPEN_SKIP = 4 };
+enum memberships { SWIM = 1, ZOOKEEPER = 2 };
+enum failure_behaviors { PANIC = 1, QUORUM = 2 };
 
 /* The comm library also has definitions for comfabrics. This prevents us
  * from defining them twice. */
@@ -83,6 +85,10 @@ enum dampenings { DAMPEN_NONE = 0, DAMPEN_TEST = 1, DAMPEN_FAILED = 2, DAMPEN_PA
  * any type of production setting. */
 //#define SHADOW_ACCTING
 
+/* Turn this on to simulate network partitions.
+ * Turn off for production settings. */
+#define ALLOW_PARTITION
+
 /* forward declare some structs */
 struct limiter;
 struct identity;
index c4f6b90..5430c2d 100644 (file)
@@ -36,6 +36,10 @@ typedef struct identity {
     /** The global rate limit. */
     uint32_t limit;
 
+    /** The effective global rate limit.  Can be lower than the nominal global
+     * rate limit due to the failure of one or more peers. */
+    uint32_t effective_limit;
+
     /** The local rate limit. */
     uint32_t locallimit;
     
diff --git a/drl/swim.c b/drl/swim.c
new file mode 100644 (file)
index 0000000..9993ec6
--- /dev/null
@@ -0,0 +1,853 @@
+/* See the DRL-LICENSE file for this file's software license. */
+
+#include <errno.h>
+
+/* Debug output. */
+#include <stdio.h>
+#include <stdlib.h>
+
+/* Socket functions. */
+#include <sys/types.h>
+#include <sys/socket.h>
+
+/* Byte ordering and address structures. */
+#include <arpa/inet.h>
+
+/* memset() */
+#include <string.h>
+
+#include "raterouter.h"
+#include "ratetypes.h"
+#include "drl_state.h"
+#include "peer_comm.h"
+#include "swim.h"
+#include "logging.h"
+
+/* From ulogd_DRL.c */
+extern int do_partition;
+extern int partition_set;
+
+extern limiter_t limiter;
+
+/**Finds the update, if found then frees the memory of the new_update
+ * and returns 1. If find fails then this returns 0*/
+static int find_and_update(update_t *updates, update_t *new_update) {
+    if( updates == NULL ) {
+        printlog(LOG_DEBUG, "SWIM: INFECT: no existing updates\n");
+        return 0;
+    }
+    update_t *pointer = updates;
+    while(pointer != NULL) {
+        if(pointer->remote == new_update->remote && pointer->remote->incarnation >= new_update->remote->incarnation) {
+            pointer->count = 0;
+            printlog(LOG_DEBUG, "SWIM: INFECT: update already exists\n");
+            free(new_update);
+            return 1;
+        }
+        pointer = pointer->next;
+    }
+    printlog(LOG_DEBUG, "SWIM: INFECT: update not found among existing updates\n");
+    return 0;
+}
+
+/*Just adds to the end of list and returns the head*/
+update_t *add_to_list(update_t *updates, update_t *new_update) {
+    printlog(LOG_DEBUG, "SWIM: INFECT: adding to list of updates: %s is %d\n", inet_ntoa(*(struct in_addr *)&new_update->remote->addr), new_update->remote->reachability);
+    update_t *head = updates;
+    update_t *pointer;
+    if (head == NULL) {
+        head = new_update;
+    } else {
+        pointer = head;
+        while(pointer->next != NULL) {
+            pointer = pointer->next;
+        }
+        pointer->next = new_update;
+    }
+    return head;
+}
+
+/** Given the address of the suspected node this function
+ * identifies friends who can probe the suspected node
+ * After recording these, the messages for help are sent 
+ * in the next gossip round*/
+static int help_from_friends(comm_t *comm, int suspect_index, uint32_t id, int sock) {
+    printlog(LOG_DEBUG,"SWIM: In function help_from_friends suspected node: %s, index: %d\n",inet_ntoa(*(struct in_addr *)&comm->remote_limiters[suspect_index].addr), suspect_index);
+    int i=0, j = 0;
+    int result;
+    int count_friends = (comm->remote_node_count > MAX_FRIENDS ) ? MAX_FRIENDS : comm->remote_node_count; // A more logical way?
+    // remote_node_t friend_nodes[count_friends];
+    // int friend_ids[count_friends];
+
+    while( (i - j) < count_friends && i < comm->remote_node_count ) {
+        // Do not pick a friend who is suspected to be down
+        if (comm->remote_limiters[i].reachability != REACHABLE) {
+            j++; i++;
+            continue;
+        }
+        /**construct the message and send it to friend
+         * pick up friend i*/
+        message_t check_msg;
+        memset(&check_msg, 0, sizeof(message_t));
+        check_msg.magic = MAGIC_MSG;
+        check_msg.ident_id = id;
+        check_msg.value = 0;
+        check_msg.weight = 0;
+        check_msg.seqno = 0;
+        check_msg.min_seqno = 0;
+        check_msg.type = CHECK;
+        check_msg.check_target = comm->remote_limiters[suspect_index].addr;
+        check_msg.check_port = comm->remote_limiters[suspect_index].port;
+
+        // send the message
+        struct sockaddr_in toaddr;
+        memset(&toaddr, 0, sizeof(struct sockaddr_in));
+        toaddr.sin_family = AF_INET;
+        toaddr.sin_addr.s_addr = comm->remote_limiters[i].addr;
+        toaddr.sin_port = comm->remote_limiters[i].port;
+        message_to_nbo(&check_msg);
+
+        printlog(LOG_DEBUG,"SWIM: Sending CHECK message to friend %s i: %d", inet_ntoa(*(struct in_addr *)&toaddr.sin_addr.s_addr), i);
+        printlog(LOG_DEBUG," Suspect: %s", inet_ntoa(*(struct in_addr *)&check_msg.check_target));
+        printlog(LOG_DEBUG," Initial: %s suspect_index: %d\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[suspect_index].addr), suspect_index);
+        if (sendto(sock, &check_msg, sizeof(check_msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
+            printlog(LOG_WARN, "WARN: limiter_send_mesh: sento failed.\n");
+            result = errno;
+            printlog(LOG_WARN, "  - The error was |%d|\n", strerror(result));
+        }
+        i++;
+    }
+    printlog(LOG_DEBUG,"SWIM: Out function help_from_friends.\n");
+    return 0;
+}
+
+/** Receiving CHECK packet*/
+static void swim_receive_check(comm_t *comm, int sock, remote_limiter_t *remote, message_t *msg) {
+
+#ifdef ALLOW_PARTITION
+    int id;
+    for(id = 0; id < comm->remote_node_count; id++) {
+        if(comm->remote_limiters[id].addr == remote->addr && comm->remote_limiters[id].port == remote->port) {
+            if (do_partition && ((partition_set & (1 << id)) == 0)) {
+                printlog(LOG_DEBUG, "SWIM: Ignoring CHECK message from %d\n", id);
+                return;
+            }
+        }
+    }
+#endif
+
+//FIX
+    if(remote->reachability != REACHABLE)
+        return;
+
+    swim_comm_t *swim_comm = (swim_comm_t *) comm->membership_state;
+
+    // create the message that has to be sent to the suspected node
+    printlog(LOG_DEBUG,"SWIM: received CHECK message from %s", inet_ntoa(*(struct in_addr *)&remote->addr));
+    printlog(LOG_DEBUG,", sending PING to %s\n", inet_ntoa(*(struct in_addr *)&msg->check_target));
+    int result;
+    message_t ping_msg;
+    memset(&ping_msg, 0, sizeof(message_t));
+    ping_msg.magic = MAGIC_MSG;
+    ping_msg.ident_id = msg->ident_id;
+    ping_msg.value = 0;
+    ping_msg.weight = 0;
+    ping_msg.seqno = 0;
+    ping_msg.min_seqno = 0;
+    ping_msg.type = PING;
+    ping_msg.ping_source = remote->addr;
+    ping_msg.ping_port = remote->port;
+    // send the ping message
+    struct sockaddr_in toaddr;
+    memset(&toaddr, 0, sizeof(struct sockaddr_in));
+    toaddr.sin_family = AF_INET;
+    toaddr.sin_addr.s_addr = msg->check_target;
+    toaddr.sin_port = msg->check_port;
+    message_to_nbo(&ping_msg);
+
+    /** add to ping targets before sending message */
+    ping_target_t *suspect = (ping_target_t*) malloc(sizeof(ping_target_t));
+    memset(suspect, 0, sizeof(ping_target_t));
+    suspect->target.addr = msg->check_target;
+    suspect->target.port = msg->check_port;
+    suspect->source.addr = remote->addr;
+    suspect->source.port = remote->port;
+    printlog(LOG_DEBUG, "SWIM: adding %s to PING list\n", inet_ntoa(*(struct in_addr *)&suspect->target.addr));
+   
+    ping_target_t *pointer = swim_comm->ping_targets;
+    if( swim_comm->ping_targets != NULL ) {
+        while(pointer->next != NULL) {
+            pointer = pointer->next;
+        }
+        pointer->next = suspect;
+    }
+    else {
+        swim_comm->ping_targets = suspect;
+    }
+    /** added to the end of the list of ping_targets */
+   
+    if (sendto(limiter.udp_socket, &ping_msg, sizeof(ping_msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
+        printlog(LOG_WARN, "WARN: limiter_send_mesh: sento failed.\n");
+        result = errno;
+        printlog(LOG_WARN, "  - The error was |%d|\n", strerror(result));
+    } else {
+        printlog(LOG_DEBUG,"SWIM: Sent PING message\n");
+    }
+    printlog(LOG_DEBUG,"SWIM: Processed CHECK packet\n");
+    return;
+}
+
+/** Receiving PING packet*/
+static void swim_receive_ping(comm_t *comm, int sock, remote_limiter_t *remote, message_t *msg) {
+
+#ifdef ALLOW_PARTITION
+    int id;
+    for(id = 0; id < comm->remote_node_count; id++) {
+        if(comm->remote_limiters[id].addr == remote->addr && comm->remote_limiters[id].port == remote->port) {
+            if (do_partition && ((partition_set & (1 << id)) == 0)) {
+                printlog(LOG_DEBUG, "SWIM: Ignoring PING message from %d\n", id);
+                return;
+            }
+        }
+    }
+#endif
+    printlog(LOG_DEBUG,"SWIM: receiving the PING message from %s\n", inet_ntoa(*(struct in_addr *)&remote->addr));
+
+//FIX
+    if(remote->reachability != REACHABLE)
+        return;
+    
+    int result;
+    swim_comm_t *swim_comm = (swim_comm_t *) comm->membership_state;
+
+    message_t pingack_msg;
+    memset(&pingack_msg, 0, sizeof(message_t));
+    pingack_msg.magic = MAGIC_MSG;
+    pingack_msg.ident_id = msg->ident_id;
+    pingack_msg.value = 0;
+    pingack_msg.weight = 0;
+    pingack_msg.seqno = 0;
+    pingack_msg.min_seqno = 0;
+    pingack_msg.type = PING_ACK;
+    
+    swim_comm->incarnation++;
+    pingack_msg.update_present = TRUE;
+    pingack_msg.reachability = REACHABLE;
+    pingack_msg.incarnation = swim_comm->incarnation;
+    FILE *fp = fopen("/root/incarnation", "w+");
+    fprintf(fp, "%d", swim_comm->incarnation + 1);
+    fflush(fp);
+    fclose(fp);
+    // send PING_ACK
+    struct sockaddr_in toaddr;
+    memset(&toaddr, 0, sizeof(struct sockaddr_in));
+    toaddr.sin_family = AF_INET;
+    toaddr.sin_addr.s_addr = remote->addr;
+    toaddr.sin_port = remote->port;
+    message_to_nbo(&pingack_msg);
+
+    if (sendto(limiter.udp_socket, &pingack_msg, sizeof(pingack_msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
+        printlog(LOG_WARN, "WARN: limiter_send_mesh: sento failed.\n");
+        result = errno;
+        printlog(LOG_WARN, "  - The error was |%d|\n", strerror(result));
+    } else {
+        printlog(LOG_DEBUG, "SWIM: sent PING_ACK to %s\n", inet_ntoa(*(struct in_addr *)&remote->addr));
+    }
+    printlog(LOG_DEBUG,"SWIM: Processed PING packet\n");
+    return;
+}
+
+/** Receiving PING_ACK packet*/
+static void swim_receive_pingack(comm_t *comm, int sock, remote_limiter_t *remote, message_t *msg) {
+    // find the source which requested this ping and inform it with CHECK_ACK, ALIVE
+    // look up in the ping_targets list.
+    printlog(LOG_DEBUG, "SWIM: receiving the PING_ACK message from %s\n", inet_ntoa(*(struct in_addr *)&remote->addr));
+    flushlog();
+    int result, confirm;
+    int delete_head = 0;
+    ping_target_t* pointer;
+    ping_target_t* prev_pointer;
+    swim_comm_t *swim_comm = (swim_comm_t *) comm->membership_state;
+    pointer = swim_comm->ping_targets;
+    prev_pointer = swim_comm->ping_targets;
+
+/*  
+ *  Removed this because if a PING_ACK arrives then CHECK_ACK would be
+ *  sent to all the nodes which requested a check on this node. Hence pointer
+ *  could be NULL but we could receive a CHECK_ACK packet
+ *  if (pointer == NULL) {
+        printlog(LOG_DEBUG, "SWIM: Received PING_ACK for a PING not sent\n");
+        return;
+    }
+*/    
+    while(pointer!=NULL) {
+        if(pointer->target.addr == remote->addr && pointer->target.port == remote->port) {
+            // suspect has been found in the list
+            // now construct the CHECK_ACK message and send it to source
+            message_t checkack_msg;
+            memset(&checkack_msg, 0, sizeof(message_t));
+            checkack_msg.magic = MAGIC_MSG;
+            checkack_msg.ident_id = msg->ident_id;
+            checkack_msg.value = 0;
+            checkack_msg.weight = 0;
+            checkack_msg.seqno = 0;
+            checkack_msg.min_seqno = 0;
+            checkack_msg.type = CHECK_ACK;
+            checkack_msg.checkack_value = ALIVE;
+            // inform the source of the addr and port of suspected node
+            checkack_msg.check_target = remote->addr;
+            checkack_msg.check_port = remote->port;
+            struct sockaddr_in toaddr;
+            memset(&toaddr, 0, sizeof(struct sockaddr_in));
+            // found source
+            toaddr.sin_family = AF_INET;
+            toaddr.sin_addr.s_addr = pointer->source.addr;
+            toaddr.sin_port = pointer->source.port;
+            message_to_nbo(&checkack_msg);
+
+            if (sendto(limiter.udp_socket, &checkack_msg, sizeof(checkack_msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
+                printlog(LOG_WARN, "WARN: swim_receive_pingack : sento failed.\n");
+                result = errno;
+                printlog(LOG_WARN, "  - The error was |%d|\n", strerror(result));
+            }
+
+            /** Now delete this suspect from friends list of nodes*/
+            if(prev_pointer == pointer) {
+                swim_comm->ping_targets = pointer->next;
+                pointer->next = NULL;
+                free(pointer);
+                pointer = swim_comm->ping_targets;
+                delete_head = 1;
+            } else {
+                prev_pointer->next = pointer->next;
+                pointer->next = NULL;
+                free(pointer);
+                pointer = prev_pointer;
+            }
+            confirm = 1;
+        }
+        prev_pointer = pointer;
+        if(pointer != NULL && delete_head != 1) {
+            pointer = pointer->next;
+        }
+        delete_head = 0;
+        printf("SWIM: PING ACK\n");
+    }
+    // PING_ACK has been received then add to the list of updates
+    remote_node_t sender;
+    memset(&sender, 0, sizeof(remote_node_t));
+    sender.addr = remote->addr;
+    sender.port = remote->port;
+    update_t* new_update = (update_t *) malloc(sizeof(update_t));
+    memset(new_update, 0, sizeof(update_t));
+    new_update->remote = map_search(comm->remote_node_map, &sender, sizeof(remote_node_t));
+    if(new_update->remote == NULL) {
+        printlog(LOG_DEBUG, "SWIM: PANIC: PING_ACK received from an unknown node %s\n",inet_ntoa(*(struct in_addr *)&sender.addr));
+    }
+    new_update->count = 0;
+
+    if(msg->incarnation > new_update->remote->incarnation) {
+        new_update->remote->incarnation = msg->incarnation;
+        new_update->remote->reachability = REACHABLE;
+        new_update->remote->awol = 0;
+        new_update->remote->count_rounds = 0;
+        new_update->remote->count_awol = 0;
+        new_update->remote->count_alive = 0;
+        if( find_and_update(swim_comm->updates, new_update) == 0 ) {
+            swim_comm->updates = add_to_list(swim_comm->updates, new_update);
+            swim_comm->count_updates++;
+        }
+    } else if(msg->incarnation == new_update->remote->incarnation) {
+        // if the node previously thought that sender was down then it prevails
+        // else if the node thought it was up then there is no change
+    }
+
+    if(confirm != 1) printlog(LOG_DEBUG,"SWIM: PING_ACK did not match entries in list\n");
+    printlog(LOG_DEBUG,"SWIM: Processed PING_ACK packet\n");
+    return;
+}
+
+/** Receiving CHECK_ACK packet*/
+static void swim_receive_checkack(comm_t *comm, int sock, remote_limiter_t *remote, message_t *msg) {
+    printlog(LOG_DEBUG, "SWIM: receiving the CHECK_ACK message from %s\n", inet_ntoa(*(struct in_addr *)&remote->addr));
+    int i;
+    for( i = 0; i < comm->remote_node_count; i++) {
+        if(comm->remote_limiters[i].addr == msg->check_target && comm->remote_limiters[i].port == msg->check_port) {
+            if(msg->checkack_value == ALIVE)
+                comm->remote_limiters[i].count_alive++;
+            else if (msg->checkack_value == AWOL)
+                comm->remote_limiters[i].count_awol++;
+        }
+    }
+    printlog(LOG_DEBUG,"SWIM: Processed CHECK_ACK packet\n");
+    return;
+}
+
+static int swim_send(comm_t *comm, int id, int sock) {
+    int i, result;
+    swim_comm_t *swim_comm = (swim_comm_t *) comm->membership_state;
+
+    /**SOURCE: Send messages to friends to check on 
+     * nodes which are suspected to be down*/
+    for(i = 0; i < comm->remote_node_count; i++) {
+        printlog(LOG_DEBUG, "SWIM: AWOL count of %s is %d\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[i].addr), comm->remote_limiters[i].awol);
+        if(comm->remote_limiters[i].awol == GOSSIP_REMOTE_AWOL_THRESHOLD) {
+            // HACK to make sure this part of code is entered only once
+            comm->remote_limiters[i].reachability = SUSPECT;
+            comm->remote_limiters[i].awol++;
+            help_from_friends(comm, i, id, sock);
+        }
+    }
+
+    /**SOURCE: Count number of rounds since the node has been suspected
+     * If in this process the count reaches threshold then take action
+     */
+    for (i = 0; i < comm->remote_node_count; i++) {
+        if(comm->remote_limiters[i].reachability == SUSPECT) {
+            comm->remote_limiters[i].count_rounds++;
+            printlog(LOG_DEBUG, "SWIM: ROUNDS count on %s index %d is %d\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[i].addr), i, comm->remote_limiters[i].count_rounds);
+            if(comm->remote_limiters[i].count_rounds > SOURCE_THRESHOLD) {
+                if(comm->remote_limiters[i].count_alive > 0) {
+                    printlog(LOG_DEBUG,"SWIM: the node %s was up, wrongly suspected\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[i].addr));
+                    comm->remote_limiters[i].reachability = REACHABLE;
+                    comm->remote_limiters[i].count_rounds = 0;
+                    comm->remote_limiters[i].count_awol = 0;
+                    comm->remote_limiters[i].count_alive = 0;
+                    // FIX
+                    comm->remote_limiters[i].awol = 0;
+                }
+                else if (comm->remote_limiters[i].count_awol > 0) {
+                    comm->remote_limiters[i].reachability = UNREACHABLE;
+                    update_t* new_update = (update_t *) malloc(sizeof(update_t));
+                    memset(new_update, 0, sizeof(update_t));
+                    new_update->remote = &comm->remote_limiters[i];
+                    new_update->count = 0;
+                    // comm->remote_limiters[i].incoming.seen_seqno = 0;
+                    printlog(LOG_DEBUG, "SWIM: INFECT: Update down information %s\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[i].addr));
+                    if(find_and_update(swim_comm->updates, new_update) == 0) {
+                        swim_comm->updates = add_to_list(swim_comm->updates, new_update);
+                        swim_comm->count_updates++;
+                    }
+                    printlog(LOG_DEBUG,"SWIM: The node %s is down. reachability: %d\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[i].addr), comm->remote_limiters[i].reachability);// The node is check_list.target
+                }
+                else {
+                    /**Even friends have not responded, request for help from more friends?*/
+                    printlog(LOG_DEBUG,"SWIM: Last ditch effort, even friends did not respond\n");
+                    comm->remote_limiters[i].reachability = SUSPECT;
+                    comm->remote_limiters[i].count_rounds = 0; // CHECK
+                    help_from_friends(comm, i, id, sock);
+                }
+            }
+        }
+    }
+
+    /**Actions performed by "FRIEND"*/
+    // DELETE THIS LOOP
+    ping_target_t* ping_list = swim_comm->ping_targets;
+    while(ping_list != NULL) {
+        printlog(LOG_DEBUG, "SWIM: in PING list %s\n", inet_ntoa(*(struct in_addr *)&ping_list->target.addr));
+        ping_list = ping_list->next;
+    }
+    ping_list = swim_comm->ping_targets;
+    ping_target_t* ping_list_prev = swim_comm->ping_targets;
+    int delete_head = 0;
+    while(ping_list != NULL) {
+        ping_list->count++;
+        printlog(LOG_DEBUG,"SWIM: friend keeping track of gossip rounds since PING.\n");
+        // if in this process some node hits threshold then
+        // send AWOL and delete it from this list
+        if(ping_list->count >= FRIEND_THRESHOLD) {
+            printlog(LOG_DEBUG,"SWIM: friend declaring AWOL.\n");
+            message_t checkack_msg;
+            memset(&checkack_msg, 0, sizeof(message_t));
+            checkack_msg.magic = MAGIC_MSG;
+            checkack_msg.ident_id = id;
+            checkack_msg.value = 0;
+            checkack_msg.weight = 0;
+            checkack_msg.seqno = 0;
+            checkack_msg.min_seqno = 0;
+            checkack_msg.type = CHECK_ACK;
+            checkack_msg.checkack_value = AWOL;
+            // inform the source of the addr and port of suspected node
+            checkack_msg.check_target = ping_list->target.addr;
+            checkack_msg.check_port = ping_list->target.port;
+            struct sockaddr_in toaddr;
+            memset(&toaddr, 0, sizeof(struct sockaddr_in));
+            toaddr.sin_family = AF_INET;
+            toaddr.sin_addr.s_addr = ping_list->source.addr;
+            toaddr.sin_port = ping_list->source.port;
+            message_to_nbo(&checkack_msg);
+
+            if (sendto(sock, &checkack_msg, sizeof(checkack_msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
+                printlog(LOG_WARN, "WARN: limiter_send_mesh: sento failed.\n");
+                result = errno;
+                printlog(LOG_WARN, "  - The error was |%d|\n", strerror(result));
+            }
+            // now the deletion: after deletion we want to continue from next node
+            printlog(LOG_DEBUG, "SWIM: deleting from PING list %s\n", inet_ntoa(*(struct in_addr *)&ping_list->target.addr));
+            if(ping_list_prev == ping_list) {
+                swim_comm->ping_targets = ping_list->next;
+                ping_list->next = NULL;
+                free(ping_list);
+                ping_list = swim_comm->ping_targets;
+                delete_head = 1;
+            } else {
+                ping_list_prev->next = ping_list->next;
+                ping_list->next = NULL;
+                free(ping_list);
+                ping_list = ping_list_prev;
+            }
+        }
+        ping_list_prev = ping_list;
+        if (ping_list != NULL && delete_head != 1) ping_list = ping_list->next;
+        delete_head = 0;
+    }
+
+    return 0;
+}
+
+
+/** Handle SWIM packets received*/
+int swim_receive(comm_t *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg) {
+    swim_comm_t *swim_comm = (swim_comm_t *) comm->membership_state;
+
+    if (msg->type == ACK) {
+        /* If ACK was received then reset the awol count */
+        if (msg->seqno == remote->outgoing.next_seqno - 1) {
+            /* Ack for most recent message.  Clear saved state. */
+            remote->outgoing.first_seqno = remote->outgoing.next_seqno;
+            remote->outgoing.saved_value = 0;
+            remote->outgoing.saved_weight = 0;
+
+            remote->awol = 0;
+            remote->count_awol = 0;
+        }
+        /* Ignore ack if it isn't for most recent message. */
+    } else if (msg->type == MSG) {
+        if (msg->min_seqno > remote->incoming.seen_seqno) {
+            /* Entirely new information */
+            remote->incoming.seen_seqno = msg->seqno;
+            remote->incoming.saved_value = msg->value;
+            remote->incoming.saved_weight = msg->weight;
+            comm->gossip.value += msg->value;
+            comm->gossip.weight += msg->weight;
+            send_ack(id, remote, msg->seqno, ACK, comm->gossip.view);
+            remote->awol = 0;
+            remote->count_rounds = 0;
+            remote->count_awol = 0;
+            remote->count_alive = 0;
+
+            // check if there is an update piggy backed on this message
+            // if yes then add it to the update list
+            if(msg->update_present > 0) {
+                update_t *new_update = (update_t *) malloc(sizeof(update_t));
+                memset(new_update, 0, sizeof(update_t));
+                // look for the remote limiter about whom update is being
+                // sent the update node is sent in the message, msg->node!
+                remote_limiter_t *temp_remote = map_search(comm->remote_node_map, &msg->node, sizeof(remote_node_t));
+                // an update about the node itself is possible in which case map_search would fail
+                if(temp_remote != NULL) {
+                    printlog(LOG_DEBUG, "SWIM: INFECT: Receiving update about %s. Update says %d\n", inet_ntoa(*(struct in_addr *)&temp_remote->addr), msg->reachability);
+                    new_update->remote = temp_remote;
+                    new_update->count = 0;
+                    if(msg->incarnation > new_update->remote->incarnation) {
+                        printlog(LOG_DEBUG, "SWIM: INFECT: Receiving update about %s. Update is about new incarnation\n", inet_ntoa(*(struct in_addr *)&temp_remote->addr));
+                        new_update->remote->reachability = msg->reachability;
+                        new_update->remote->incarnation = msg->incarnation;
+                        if(find_and_update(swim_comm->updates, new_update) == 0) {
+                            swim_comm->updates = add_to_list(swim_comm->updates, new_update);
+                            swim_comm->count_updates++;
+                        }
+                    } else if(msg->incarnation == new_update->remote->incarnation && new_update->remote->reachability == REACHABLE && msg->reachability == UNREACHABLE) {
+                        printlog(LOG_DEBUG, "SWIM: INFECT: Receiving update about %s. Update about same incarnation, says node unreachable\n", inet_ntoa(*(struct in_addr *)&temp_remote->addr));
+                        new_update->remote->reachability = msg->reachability;
+                        if(find_and_update(swim_comm->updates, new_update) == 0) {
+                            swim_comm->updates = add_to_list(swim_comm->updates, new_update);
+                            swim_comm->count_updates++;
+                        } else {
+                            // Ignore the update
+                            printlog(LOG_DEBUG, "SWIM: INFECT: update about %s ignored\n", inet_ntoa(*(struct in_addr *)&temp_remote->addr));
+                        }
+                    }
+                }
+            }
+        } 
+        else if (msg->seqno > remote->incoming.seen_seqno) {
+            /* Only some of the message is old news. */
+            double diff_value = msg->value - remote->incoming.saved_value;
+            double diff_weight = msg->weight - remote->incoming.saved_weight;
+
+            remote->incoming.seen_seqno = msg->seqno;
+            remote->incoming.saved_value = msg->value;
+            remote->incoming.saved_weight = msg->weight;
+
+            comm->gossip.value += diff_value;
+            comm->gossip.weight += diff_weight;
+            send_ack(id, remote, msg->seqno, ACK, comm->gossip.view);
+            remote->awol = 0;
+            remote->count_awol = 0;
+        } 
+        else {
+            /* The entire message is old news. (Duplicate). */
+            /* Do nothing. */
+        }
+        // Hearing from a node previously declared unreachable
+        if(remote->reachability == UNREACHABLE) {
+            printlog(LOG_DEBUG, "SWIM: INFECT: seems like %s is back up\n", inet_ntoa(*(struct in_addr *)&remote->addr));
+            remote->reachability = SUSPECT;
+            remote->awol = GOSSIP_REMOTE_AWOL_THRESHOLD;
+            remote->count_rounds = 0;
+            remote->count_awol = 0;
+            remote->count_alive = 0;
+        }
+    }
+    else if(msg->type == CHECK) {
+        swim_receive_check(comm, sock, remote, msg);
+    }
+    else if(msg->type == PING ) {
+        swim_receive_ping(comm, sock, remote, msg);
+    }
+    else if(msg->type == PING_ACK) {
+        swim_receive_pingack(comm, sock, remote, msg);
+    }
+    else if(msg->type == CHECK_ACK) {
+        swim_receive_checkack(comm, sock, remote, msg);
+    }
+    return 0;
+}
+
+int send_gossip_swim(comm_t *comm, uint32_t id, int sock) {
+    int i, j;
+    int retry_index = 0;
+    int result = 0;
+    remote_limiter_t *remote;
+    struct sockaddr_in toaddr;
+    double msg_value, msg_weight;
+
+    /* This is the factor for the portion of value/weight to keep locally.
+     * Normally this is 1, meaning that we retain the same amount of value/weight
+     * that was sent to the peers.  In the case of not being able to send to a
+     * peer though, we increment this to reclaim the value/weight locally. */
+    int message_portion = 1;
+
+    memset(&toaddr, 0, sizeof(struct sockaddr_in));
+    toaddr.sin_family = AF_INET;
+
+    msg_value = comm->gossip.value / (comm->gossip.gossip_branch + 1);
+    msg_weight = comm->gossip.weight / (comm->gossip.gossip_branch + 1);
+
+    /*Nodes to which message was sent will have a non-zero here*/
+    /*    for (i = 0; i < comm->remote_node_count; i++) {
+          if(comm->remote_limiters[i].awol > 0)
+          comm->remote_limiters[i].awol++;
+          }*/
+
+    for (i = 0; i < comm->remote_node_count; i++) {
+        printlog(LOG_DEBUG, "SWIM: STATUS: Node: %s reachability: %d\n", inet_ntoa(*(struct in_addr *)&comm->remote_limiters[i].addr), comm->remote_limiters[i].reachability);
+    }
+
+    for (i = 0; i < comm->gossip.gossip_branch; ++i) {
+        int targetid = NULL_PEER;
+        int rand_count = 0;
+        message_t msg;
+
+        printlog(LOG_DEBUG, "Gossip loop iteration, i=%d, branch=%d\n", i, comm->gossip.gossip_branch);
+
+        /* If there are any peers with unacked messages, select them first. */
+        while (retry_index < comm->remote_node_count) {
+            if (comm->remote_limiters[retry_index].awol > 0 && comm->remote_limiters[retry_index].reachability == REACHABLE) {
+                targetid = retry_index;
+                printlog(LOG_DEBUG, "GOSSIP: Selected peerindex %d because it had unacked messages.\n", targetid);
+                retry_index += 1;
+                break;
+            }
+            retry_index += 1;
+        }
+
+        while (targetid == NULL_PEER && rand_count < 50) {
+            /* *Gossip node would be chosen from
+            * the array which would be randomly shuffled
+            * once all the nodes have been sent messages*
+            */
+            targetid = find_gossip_target(comm);
+            /* If we didn't find any peers the needed retransmissions, select one randomly here. */
+/*                targetid = myrand() % comm->remote_node_count;
+                rand_count += 1;
+*/
+            /* Don't select an already-used index. */
+            for (j = 0; j < i; ++j) {
+                if (targetid == comm->selected[j]) {
+                    printlog(LOG_DEBUG, "GOSSIP: disqualified targetid %d.  selected[j=%d] is %d\n", targetid, j, comm->selected[j]); 
+                    targetid = NULL_PEER;
+                    break;
+                }
+            }
+            if (targetid != NULL_PEER) {
+                if(comm->remote_limiters[targetid].reachability != REACHABLE) {
+                    printlog(LOG_DEBUG, "GOSSIP: disqualified targetid %d.  reachability is %d, and remote awol count is %d\n",
+                            targetid, comm->remote_limiters[targetid].reachability, comm->remote_limiters[targetid].awol);
+                    targetid = NULL_PEER;
+                }
+            }
+            
+            rand_count++;
+        }
+
+        if (targetid < 0) {
+            /* Couldn't find a suitable peer to send to... */
+            message_portion += 1;
+            printlog(LOG_DEBUG, "GOSSIP: exhausted random peer search.\n");
+            continue;
+        } else {
+            printlog(LOG_DEBUG, "GOSSIP: settled on peer id %d.\n", targetid);
+        }
+
+        remote = &comm->remote_limiters[targetid];
+        comm->selected[i] = targetid;
+
+        toaddr.sin_addr.s_addr = remote->addr; /* Already in network byte order. */
+        toaddr.sin_port = remote->port;
+
+        memset(&msg, 0, sizeof(message_t));
+        msg.magic = MAGIC_MSG;
+        msg.ident_id = id;
+        msg.value = msg_value + remote->outgoing.saved_value;
+        msg.weight = msg_weight + remote->outgoing.saved_weight;
+        msg.seqno = remote->outgoing.next_seqno;
+        msg.min_seqno = remote->outgoing.first_seqno;
+        msg.type = MSG;
+        msg.view = comm->gossip.view;
+        if(comm->gossip.membership == SWIM) {
+            swim_comm_t *swim_comm = (swim_comm_t *)comm->membership_state;
+            // piggy back an update
+            if(swim_comm->count_updates > 0) {
+                int index = myrand() % swim_comm->count_updates;
+                update_t *pointer = swim_comm->updates;
+                update_t *prev_pointer = swim_comm->updates;
+                while(index != 0) {
+                    prev_pointer = pointer;
+                    pointer = pointer->next;
+                    index--;
+                }
+                msg.update_present = TRUE;
+                msg.reachability = pointer->remote->reachability;
+                msg.incarnation = pointer->remote->incarnation;
+                msg.node.addr = pointer->remote->addr;
+                msg.node.port = pointer->remote->port;
+                printlog(LOG_DEBUG, "SWIM: Sending update about %s\n", inet_ntoa(*(struct in_addr *)&pointer->remote->addr));
+                pointer->count++;
+                if(swim_comm->updates != pointer && pointer->count == UPDATE_THRESHOLD) {
+                    // pointer is not head
+                    prev_pointer->next = pointer->next;
+                    pointer->next = NULL;
+                    free(pointer);
+                    swim_comm->count_updates--;
+                } else if(pointer->count == UPDATE_THRESHOLD) {
+                    // pointer is head of the list
+                    swim_comm->updates = pointer->next;
+                    pointer->next = NULL;
+                    free(pointer);
+                    swim_comm->count_updates--;
+                    if(swim_comm->count_updates == 0) {
+                        swim_comm->updates = NULL;
+                    }
+                }
+            }
+        }
+        remote->outgoing.next_seqno++;
+        remote->outgoing.saved_value += msg_value;
+        remote->outgoing.saved_weight += msg_weight;
+        /* Represents the number of messages we have sent to this peer without hearing a message or ACK from it. */
+        remote->awol += 1;
+
+
+#ifdef ALLOW_PARTITION
+
+        if (do_partition && ((partition_set & (1 << targetid)) == 0)) {
+            printlog(LOG_DEBUG, "Partition: GOSSIP: ignoring targetid %d\n", targetid);
+            continue;
+        }
+
+#endif
+
+        message_to_nbo(&msg);
+
+        if (sendto(sock, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
+            printlog(LOG_WARN, "WARN: limiter_send_gossip: sento failed.\n");
+            result = errno;
+            break;
+        }
+        printlog(LOG_DEBUG,"SWIM: sent the gossip to %s\n", inet_ntoa(*(struct in_addr *)&toaddr.sin_addr.s_addr));
+    }
+    swim_send(comm, id, sock);
+    comm->gossip.value = msg_value * message_portion;
+    comm->gossip.weight = msg_weight * message_portion;
+
+    return result;
+}
+
+void swim_restart(comm_t *comm, int32_t view_number) {
+    /* Not sure about this yet... */
+}
+
+int swim_init(comm_t *comm, uint32_t id) {
+    comm->membership_state = malloc(sizeof(swim_comm_t));
+    if (comm->membership_state == NULL) {
+        return ENOMEM;
+    }
+    comm->connected = 1;
+    comm->recv_function = swim_receive;
+    comm->send_function = send_gossip_swim;
+    comm->restart_function = swim_restart;
+
+    swim_comm_t *swim_comm = (swim_comm_t *) comm->membership_state;
+
+    FILE *fp = fopen("/root/incarnation", "w+");
+    fscanf(fp, "%d", &swim_comm->incarnation);
+    fprintf(fp, "%d", swim_comm->incarnation + 1); // next time a greater incarnation would be read
+    fflush(fp);
+    fclose(fp);
+
+    return 0;
+}
+
+void swim_teardown(comm_t *comm) {
+    if (comm->membership_state)
+        free(comm->membership_state);
+}
+
+/**Bhanu: new functions being introduced*/
+/** Given a friend_id and the address of the suspected node
+ * this function sends a CHECK message to friend */
+/*
+   int help_from_friend(comm_t* comm, int friend_id, in_addr_t addr, in_port_t port, uint32_t id, int sock) {
+// send a CHECK message to friend
+int result;
+message_t msg;
+memset(&msg, 0, sizeof(message_t));
+msg.magic = MAGIC_MSG;
+msg.ident_id = id;
+msg.value = 0;
+msg.weight = 0;
+msg.seqno = 0;
+msg.min_seqno = 0;
+msg.type = CHECK;
+msg.check_target = addr;
+msg.check_port = port;
+
+// send the message
+struct sockaddr_in toaddr;
+memset(&toaddr, 0, sizeof(sockaddr_in));
+toaddr.sin_family = AF_INET;
+toaddr.sin_addr.s_addr = comm->remote_limiters[friend_id].addr;
+toaddr.sin_port = comm->remote_limiters[friend_id].port;
+message_to_nbo(&msg);
+
+if (sendto(sock, &msg, sizeof(msg), 0, (struct sockaddr *) &toaddr, sizeof(struct sockaddr_in)) < 0) {
+printlog(LOG_WARN, "WARN: limiter_send_mesh: sento failed.\n");
+result = errno;
+printlog(LOG_WARN, "  - The error was |%d|\n", strerror(result));
+}
+}
+*/
diff --git a/drl/swim.h b/drl/swim.h
new file mode 100644 (file)
index 0000000..79b5559
--- /dev/null
@@ -0,0 +1,67 @@
+/* See the DRL-LICENSE file for this file's software license. */
+
+#ifndef _SWIM_H_
+#define _SWIM_H_
+
+#define AWOL (0)
+#define ALIVE (1)
+#define MAX_FRIENDS (5)
+#define UPDATE_THRESHOLD (3)
+#define FRIEND_THRESHOLD (4)
+#define SOURCE_THRESHOLD (8)
+
+static const uint16_t CHECK = 3;
+static const uint16_t CHECK_ACK = 4;
+static const uint16_t PING = 5;
+static const uint16_t PING_ACK = 6;
+
+/** Structs needed to maintain the list of nodes to which
+* ping messages have been sent. This list is maintained
+* by one of the "friend" nodes
+*/
+typedef struct ping_target {
+    // node which has been targetted
+    remote_node_t target;
+    // node which requested the ping target
+    // so that CHECK_ACK could be sent with ALIVE / AWOL
+    remote_node_t source;
+    // count of gossip rounds to keep timeout
+    uint32_t count;
+    // this has to be a list as well because we have to
+    // remember all the suspects a particular node has
+    // pinged and is waiting for a response
+    struct ping_target *next;
+} ping_target_t;
+
+typedef struct update {
+    /*Remote limiter whose update this is*/
+    remote_limiter_t *remote;
+    /*Number of times, update has been piggy 
+     *backed on a gossip message*/
+    int count;
+    struct update *next;
+} update_t;
+
+typedef struct swim_comm {
+    /*Incarnation number of the node*/
+    uint32_t incarnation;
+
+    /*List of updates currently held by the node*/
+    update_t *updates;
+    int count_updates;
+
+    /** Keep track of the ping messages being sent and wait for timeout */
+    ping_target_t *ping_targets;
+} swim_comm_t;
+
+int swim_init(comm_t *comm, uint32_t id);
+
+void swim_teardown(comm_t *comm);
+
+int swim_receive(comm_t *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg);
+
+int send_gossip_swim(comm_t *comm, uint32_t id, int sock);
+
+void swim_restart(comm_t *comm, int32_t view_number);
+
+#endif  /* _SWIM_H_ */
index f9d6d56..e24b41c 100644 (file)
@@ -140,8 +140,16 @@ static config_entry_t partition = {
     .u = { .value = 0xfffffff },
 };
 
-static config_entry_t netem_slice = {
+static config_entry_t sfq_slice = {
     .next = &partition,
+    .key = "sfq_slice",
+    .type = CONFIG_TYPE_STRING,
+    .options = CONFIG_OPT_NONE,
+    .u = { .string = "NONE" },
+};
+
+static config_entry_t netem_slice = {
+    .next = &sfq_slice,
     .key = "netem_slice",
     .type = CONFIG_TYPE_STRING,
     .options = CONFIG_OPT_NONE,
@@ -271,9 +279,9 @@ extern FILE *logfile;
 extern uint8_t system_loglevel;
 extern uint8_t do_enforcement;
 
-/* From peer_comm.c - used to simulate partition. */
-extern int do_partition;
-extern int partition_set;
+/* Used to simulate partitions. */
+int do_partition = 0;
+int partition_set = 0xfffffff;
 
 /* functions */
 
@@ -1141,6 +1149,23 @@ static inline int add_htb_netem(const char *iface, const uint32_t parent_major,
     return execute_cmd(cmd);
 }
 
+static inline int add_htb_sfq(const char *iface, const uint32_t parent_major,
+                                const uint32_t parent_minor, const uint32_t handle,
+                                const int perturb) {
+    char cmd[300];
+
+    sprintf(cmd, "/sbin/tc qdisc del dev %s parent %x:%x handle %x pfifo", iface, parent_major,
+            parent_minor, handle);
+    printlog(LOG_WARN, "HTB_cmd: %s\n", cmd);
+    if (execute_cmd(cmd))
+        printlog(LOG_WARN, "HTB_cmd: Previous deletion did not succeed.\n");
+
+    sprintf(cmd, "/sbin/tc qdisc replace dev %s parent %x:%x handle %x sfq perturb %d",
+            iface, parent_major, parent_minor, handle, perturb);
+    printlog(LOG_WARN, "HTB_cmd: %s\n", cmd);
+    return execute_cmd(cmd);
+}
+
 static int create_htb_hierarchy(drl_instance_t *instance) {
     char cmd[300];
     int i, j, k;
@@ -1273,6 +1298,30 @@ static int create_htb_hierarchy(drl_instance_t *instance) {
         }
     }
 
+    /* Turn on SFQ for experimentation. */
+    if (strcmp(sfq_slice.u.string, "NONE")) {
+        if (!strcmp(sfq_slice.u.string, "ALL")) {
+            if (add_htb_sfq("eth0", 1, 0x1000, 0x1000, 30))
+                return 1;
+            if (add_htb_sfq("eth0", 1, 0x1fff, 0x1fff, 30))
+                return 1;
+
+            for (k = 0; k < instance->leaf_count; ++k) {
+                if (add_htb_sfq("eth0", 1, (0x1000 | instance->leaves[k].xid),
+                            (0x1000 | instance->leaves[k].xid), 30)) {
+                    return 1;
+                }
+            }
+        } else {
+            uint32_t slice_xid;
+
+            sscanf(sfq_slice.u.string, "%x", &slice_xid);
+
+            if (add_htb_sfq("eth0", 1, slice_xid, slice_xid, 30))
+                return 1;
+        }
+    }
+
     return 0;
 }
 
diff --git a/drl/zk_drl.c b/drl/zk_drl.c
new file mode 100644 (file)
index 0000000..a9fd92d
--- /dev/null
@@ -0,0 +1,346 @@
+/* See the DRL-LICENSE file for this file's software license. */
+
+#ifdef BUILD_ZOOKEEPER
+
+#include <arpa/inet.h>
+#include <assert.h>
+#include <inttypes.h>
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+
+#include "raterouter.h"
+#include "ratetypes.h"
+#include "drl_state.h"
+#include "peer_comm.h"
+#include "zk_drl.h"
+#include "logging.h"
+
+#define NULL_LEN (-1)
+#define PATH_LEN (64)
+#define PATH_BUFFER_LEN (64)
+
+static int32_t read_path_cversion(zhandle_t *zkhandle, const char *path) {
+    struct Stat stat;
+    int zoo_result;
+
+    memset(&stat, 0, sizeof(struct Stat));
+
+    zoo_result = zoo_exists(zkhandle, path, 0, &stat);
+
+    if (zoo_result != ZOK) {
+        return -1;
+    }
+
+    return stat.cversion;
+}
+
+static int process_membership_change(zhandle_t *zkhandle, zkdrlcontext_t *context, const char *path) {
+    struct String_vector children;
+    int32_t view_before = 0;
+    int32_t view_after = view_before + 1; //Needs to be != to view_before
+    int zoo_result = 0;
+    int i;
+
+    while (view_before != view_after) {
+        view_before = read_path_cversion(zkhandle, path);
+
+        zoo_result = zoo_get_children(zkhandle, path, 1, &children);
+        if (zoo_result != ZOK) {
+            return zoo_result;
+        }
+
+        view_after = read_path_cversion(zkhandle, path);
+    }
+
+    if (view_after > context->comm->gossip.view) {
+        printlog(LOG_DEBUG, "ZK:zookeeper watch says we need to restart with a new view.\n");
+        context->comm->restart_function(context->comm, view_after);
+    }
+
+    /* Clear the remote limiter list.  This will be overwritten below for
+     * limiters that are found to be in the new view. */
+    for (i = 0; i < context->comm->remote_node_count; ++i) {
+        context->comm->remote_limiters[i].reachability = UNREACHABLE;
+        context->comm->remote_limiters[i].view = view_after;
+        context->comm->remote_limiters[i].view_confidence = NOTIN;
+    }
+
+    for (i = 0; i < children.count; ++i) {
+        remote_limiter_t *remote_limiter = NULL;
+        remote_node_t remote_node;
+
+        memset(&remote_node, 0, sizeof(remote_node_t));
+
+        printlog(LOG_DEBUG, "ZK:children.data[%d] is %s\n", i, children.data[i]);
+
+        sscanf(children.data[i], "%u", &remote_node.addr);
+        remote_node.port = htons(LIMITER_LISTEN_PORT);
+
+        if (remote_node.addr != context->local_addr) {
+            printlog(LOG_DEBUG, "ZK:searching map for %u:%u\n", remote_node.addr, remote_node.port);
+            remote_limiter = map_search(context->comm->remote_node_map, &remote_node, sizeof(remote_node_t));
+            assert(remote_limiter != NULL);
+            remote_limiter->reachability = REACHABLE;
+            remote_limiter->view_confidence = IN;
+        } else {
+            printlog(LOG_DEBUG, "ZK: %u is my own addr.\n", remote_node.addr);
+        }
+    }
+    
+    assert(view_after >= 0);
+
+    context->comm->connected = 1;
+
+    return ZOK;
+}
+
+static void zk_connected(zhandle_t *zkhandle, zkdrlcontext_t *context) {
+    char path[PATH_LEN];
+    char path_buffer[PATH_BUFFER_LEN];
+    int zoo_result = 0;
+
+    printlog(LOG_DEBUG, "ZK:(Re)Connected to zookeeper.\n");
+
+    sprintf(path, "/%u", context->id);
+
+    zoo_result = zoo_create(zkhandle, path, NULL, NULL_LEN, &ZOO_OPEN_ACL_UNSAFE, 0, path_buffer, PATH_BUFFER_LEN);
+
+    if (zoo_result == ZOK) {
+        printlog(LOG_DEBUG, "ZK: created path %s\n", path);
+    } else {
+        //An error occurred.  It was probably already there.
+        printlog(LOG_DEBUG, "ZK: error creating path %s: %d\n", path, zoo_result);
+    }
+
+    sprintf(path, "/%u/%u", context->id, context->local_addr);
+    
+    zoo_result = zoo_create(zkhandle, path, NULL, NULL_LEN, &ZOO_READ_ACL_UNSAFE, ZOO_EPHEMERAL, path_buffer, PATH_BUFFER_LEN);
+
+    if (zoo_result == ZOK) {
+        printlog(LOG_DEBUG, "ZK: created path %s\n", path);
+    } else {
+        printlog(LOG_DEBUG, "ZK: error creating path %s: %d\n", path, zoo_result);
+    }
+
+    sprintf(path, "/%u", context->id);
+
+    zoo_result = process_membership_change(zkhandle, context, path);
+
+    if (zoo_result != ZOK) {
+        printlog(LOG_WARN, "ZK: process_membership_change failed?\n");
+    }
+}
+
+static void zk_disconnected(zhandle_t *zkhandle, zkdrlcontext_t *context) {
+    printlog(LOG_DEBUG, "ZK:Disconnected from zookeeper.\n");
+
+    context->comm->connected = 0;
+}
+
+static void zk_membership_change(zhandle_t *zkhandle, const char *path, zkdrlcontext_t *context) {
+    int zoo_result = 0;
+
+    printlog(LOG_DEBUG, "ZK:zookeeper child list changed.\n");
+
+    zoo_result = process_membership_change(zkhandle, context, path);
+}
+
+void zk_drl_restart(comm_t *comm, int32_t view_number) {
+    int i;
+
+    comm->gossip.value = comm->local_rate;
+    comm->gossip.weight = 1.0;
+    comm->gossip.view = view_number;
+
+    for (i = 0; i < comm->remote_node_count; ++i) {
+        if (comm->remote_limiters[i].view < view_number) {
+            comm->remote_limiters[i].rate = 0;
+            memset(&comm->remote_limiters[i].incoming, 0, sizeof(in_neighbor_t));
+            memset(&comm->remote_limiters[i].outgoing, 0, sizeof(out_neighbor_t));
+            comm->remote_limiters[i].view = view_number;
+            comm->remote_limiters[i].view_confidence = UNSURE;
+        }
+    }
+
+    printlog(LOG_DEBUG, "ZK: Changing view to %d\n", view_number);
+}
+
+static void zk_drl_watcher(zhandle_t *zkhandle, int type, int state, const char *path, void *context_ptr) {
+    zkdrlcontext_t *context = (zkdrlcontext_t *) context_ptr;
+
+    pthread_rwlock_rdlock(context->limiter_lock);
+    pthread_mutex_lock(&context->comm->lock);
+
+    if (type == ZOO_SESSION_EVENT) {
+        if (state == ZOO_CONNECTED_STATE) {
+            /* We're newly connected - set that watch! */
+            zk_connected(zkhandle, context);
+        } else if (state == ZOO_CONNECTING_STATE) {
+            /* We're no longer connected.  Do something safe. */
+            zk_disconnected(zkhandle, context);
+        } else if (state == ZOO_EXPIRED_SESSION_STATE) {
+            printlog(LOG_DEBUG, "ZK:zookeeper session expired - reconnecting.\n");
+            context->zkhandle = zookeeper_init(context->zk_host, zk_drl_watcher, 10000, NULL, context, 0);
+        } else {
+            printlog(LOG_DEBUG, "ZK:Unhandled event zk_drl_watcher: type is %d, state is %d, path is %s\n", type, state, path);
+        }
+    } else if (type == ZOO_CHILD_EVENT) {
+        /* The list of child nodes in the group has changed.  Re-read the
+         * group membership list and re-set the watch. */
+        zk_membership_change(zkhandle, path, context);
+    } else {
+        printlog(LOG_DEBUG, "ZK:Unhandled event zk_drl_watcher: type is %d, state is %d, path is %s\n", type, state, path);
+    }
+
+    pthread_mutex_unlock(&context->comm->lock);
+    pthread_rwlock_unlock(context->limiter_lock);
+}
+
+int zk_drl_recv(comm_t *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg) {
+    if (msg->type == ACK) {
+        /* If ACK was received then reset the awol count */
+        if (msg->view == comm->gossip.view && msg->seqno == remote->outgoing.next_seqno - 1) {
+            /* Ack for most recent message.  Clear saved state. */
+            remote->outgoing.first_seqno = remote->outgoing.next_seqno;
+            remote->outgoing.saved_value = 0;
+            remote->outgoing.saved_weight = 0;
+            remote->awol = 0;
+
+        } else if (msg->view > comm->gossip.view) {
+            printlog(LOG_DEBUG, "ZK:Received ack for newer view, restarting.\n");
+            comm->restart_function(comm, msg->view);
+            remote->view_confidence = IN;
+            remote->awol = 0;
+        }
+        /* Ignore ack if it isn't for most recent message or its from an old view. */
+    } else if (msg->type == MSG) {
+        if (msg->view == comm->gossip.view) {
+            if (msg->min_seqno > remote->incoming.seen_seqno) {
+                /* Entirely new information */
+                remote->incoming.seen_seqno = msg->seqno;
+                remote->incoming.saved_value = msg->value;
+                remote->incoming.saved_weight = msg->weight;
+                comm->gossip.value += msg->value;
+                comm->gossip.weight += msg->weight;
+                send_ack(id, remote, msg->seqno, ACK, comm->gossip.view);
+                remote->awol = 0;
+            } 
+            else if (msg->seqno > remote->incoming.seen_seqno) {
+                /* Only some of the message is old news. */
+                double diff_value = msg->value - remote->incoming.saved_value;
+                double diff_weight = msg->weight - remote->incoming.saved_weight;
+
+                remote->incoming.seen_seqno = msg->seqno;
+                remote->incoming.saved_value = msg->value;
+                remote->incoming.saved_weight = msg->weight;
+
+                comm->gossip.value += diff_value;
+                comm->gossip.weight += diff_weight;
+                send_ack(id, remote, msg->seqno, ACK, comm->gossip.view);
+                remote->awol = 0;
+            } 
+            else {
+                /* The entire message is old news. (Duplicate). */
+                /* Do nothing. */
+            }
+        } else if (msg->view > comm->gossip.view) {
+            printlog(LOG_DEBUG, "ZK:received message with a newer viewstamp, restarting.\n");
+            comm->restart_function(comm, msg->view);
+            remote->view_confidence = IN;
+
+            remote->incoming.seen_seqno = msg->seqno;
+            remote->incoming.saved_value = msg->value;
+            remote->incoming.saved_weight = msg->weight;
+            comm->gossip.value += msg->value;
+            comm->gossip.weight += msg->weight;
+            send_ack(id, remote, msg->seqno, ACK, comm->gossip.view);
+            remote->awol = 0;
+        } else if (msg->view < comm->gossip.view) {
+            printlog(LOG_DEBUG, "ZK:received a message with an older viewstamp.\n");
+            if (remote->view_confidence == IN) {
+                /* The sender is in the new view and doesn't know it yet. */
+                send_ack(id, remote, msg->seqno, ACK, comm->gossip.view);
+            } else if (remote->view_confidence == UNSURE) {
+                /* We don't know if he's in or not. */
+                send_ack(id, remote, msg->seqno, UNSUREACK, comm->gossip.view);
+            } else if (remote->view_confidence == NOTIN) {
+                /* He's out of luck... */
+                send_ack(id, remote, msg->seqno, NACK, comm->gossip.view);
+            }
+            remote->awol = 0;
+        }
+    } else if (msg->type == UNSUREACK) {
+        /* We received an ack, but the ack sender was unsure whether or not
+         * we'll be a part of its new view.  Can't do much here... */
+        if (msg->view > comm->gossip.view) {
+            remote->view = msg->view;
+            remote->view_confidence = IN;
+            remote->awol = 0;
+            printlog(LOG_DEBUG, "ZK:received an UNSUREACK for view %d\n", msg->view);
+        }
+    } else if (msg->type == NACK) {
+        if (msg->view > comm->gossip.view) {
+            remote->view = msg->view;
+            remote->view_confidence = IN;
+            remote->awol = 0;
+
+            comm->connected = 0;
+            printlog(LOG_DEBUG, "ZK:received a NACK for view %d\n", msg->view);
+        }
+    }
+
+    return 0;
+}
+
+int zk_drl_init(comm_t *comm, uint32_t id, limiter_t *limiter, ident_config *config) {
+    zkdrlcontext_t *context = NULL;
+    comm->connected = 0;
+
+    if ((context = malloc(sizeof(zkdrlcontext_t))) == NULL) {
+        return ENOMEM;
+    }
+
+    context->zk_host = config->zk_host;
+    context->limiter_lock = &limiter->limiter_lock;
+    context->comm = comm;
+    context->id = id;
+    context->local_addr = limiter->localaddr;
+    comm->membership_state = context;
+
+    printlog(LOG_DEBUG, "ZK: Calling zk init\n");
+
+    context->zkhandle = zookeeper_init(context->zk_host, zk_drl_watcher, 10000, NULL, context, 0);
+
+    if (context->zkhandle == NULL) {
+        printlog(LOG_CRITICAL, "ZK: docs say that this can fail, but they don't say why. :(  Errno is %d\n", errno);
+        return EINVAL;
+    }
+
+    comm->recv_function = zk_drl_recv;
+    comm->send_function = send_udp_gossip;
+    comm->restart_function = zk_drl_restart;
+
+    return 0;
+}
+
+int zk_drl_close(comm_t *comm) {
+    zkdrlcontext_t *context = (zkdrlcontext_t *) comm->membership_state;
+
+    zookeeper_close(context->zkhandle);
+    
+    if (context && context->zk_host) {
+        free(context->zk_host);
+        context->zk_host = NULL;
+    }
+
+    if (context) {
+        free(context);
+        comm->membership_state = NULL;
+    }
+
+    return 0;
+}
+
+#endif  /* BUILD_ZOOKEEPER */
diff --git a/drl/zk_drl.h b/drl/zk_drl.h
new file mode 100644 (file)
index 0000000..12ac936
--- /dev/null
@@ -0,0 +1,39 @@
+/* See the DRL-LICENSE file for this file's software license. */
+
+#ifndef _ZK_DRL_
+#define _ZK_DRL_
+
+#define _XOPEN_SOURCE 600
+
+#include <inttypes.h>
+#include <pthread.h>
+#include <sys/types.h>
+
+#include <zookeeper.h>
+
+static const uint16_t UNSUREACK = 3;
+static const uint16_t NACK = 4;
+
+typedef struct zkdrlcontext {
+    /** The host string that should be passed to zookeeper_init when using
+     * zookeeper.  This consists of comma-separated ipaddr:port pairs. Example:
+     * "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" */
+    char *zk_host;
+
+    pthread_rwlock_t *limiter_lock;
+    comm_t *comm;
+    uint32_t id;
+    in_addr_t local_addr;
+    
+    zhandle_t *zkhandle;
+} zkdrlcontext_t;
+
+int zk_drl_init(comm_t *comm, uint32_t id, limiter_t *limiter, ident_config *config);
+
+int zk_drl_close(comm_t *comm);
+
+int zk_drl_recv(comm_t *comm, uint32_t id, int sock, remote_limiter_t *remote, message_t *msg);
+
+void zk_drl_restart(comm_t *comm, int32_t view_number);
+
+#endif  /* _ZK_DRL_ */
diff --git a/include/zookeeper/recordio.h b/include/zookeeper/recordio.h
new file mode 100644 (file)
index 0000000..33b8c70
--- /dev/null
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __RECORDIO_H__
+#define __RECORDIO_H__
+
+#include <sys/types.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct buffer {
+    int32_t len;
+    char *buff;
+};
+
+void deallocate_String(char **s);
+void deallocate_Buffer(struct buffer *b);
+void deallocate_vector(void *d);
+struct iarchive {
+    int (*start_record)(struct iarchive *ia, const char *tag);
+    int (*end_record)(struct iarchive *ia, const char *tag);
+    int (*start_vector)(struct iarchive *ia, const char *tag, int32_t *count);
+    int (*end_vector)(struct iarchive *ia, const char *tag);
+    int (*deserialize_Bool)(struct iarchive *ia, const char *name, int32_t *);
+    int (*deserialize_Int)(struct iarchive *ia, const char *name, int32_t *);
+    int (*deserialize_Long)(struct iarchive *ia, const char *name, int64_t *);
+    int (*deserialize_Buffer)(struct iarchive *ia, const char *name,
+            struct buffer *);
+    int (*deserialize_String)(struct iarchive *ia, const char *name, char **);
+    void *priv;
+};
+struct oarchive {
+    int (*start_record)(struct oarchive *oa, const char *tag);
+    int (*end_record)(struct oarchive *oa, const char *tag);
+    int (*start_vector)(struct oarchive *oa, const char *tag, const int32_t *count);
+    int (*end_vector)(struct oarchive *oa, const char *tag);
+    int (*serialize_Bool)(struct oarchive *oa, const char *name, const int32_t *);
+    int (*serialize_Int)(struct oarchive *oa, const char *name, const int32_t *);
+    int (*serialize_Long)(struct oarchive *oa, const char *name,
+            const int64_t *);
+    int (*serialize_Buffer)(struct oarchive *oa, const char *name,
+            const struct buffer *);
+    int (*serialize_String)(struct oarchive *oa, const char *name, char **);
+    void *priv;
+};
+
+struct oarchive *create_buffer_oarchive(void);
+void close_buffer_oarchive(struct oarchive **oa, int free_buffer);
+struct iarchive *create_buffer_iarchive(char *buffer, int len);
+void close_buffer_iarchive(struct iarchive **ia);
+char *get_buffer(struct oarchive *);
+int get_buffer_len(struct oarchive *);
+
+int64_t htonll(int64_t v);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/include/zookeeper/zookeeper.h b/include/zookeeper/zookeeper.h
new file mode 100644 (file)
index 0000000..e69f7ce
--- /dev/null
@@ -0,0 +1,1249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ZOOKEEPER_H_
+#define ZOOKEEPER_H_
+
+#include <sys/time.h>
+#include <stdio.h>
+
+#include "zookeeper_version.h"
+#include "recordio.h"
+#include "zookeeper.jute.h"
+
+/**
+ * \file zookeeper.h 
+ * \brief ZooKeeper functions and definitions.
+ * 
+ * ZooKeeper is a network service that may be backed by a cluster of
+ * synchronized servers. The data in the service is represented as a tree
+ * of data nodes. Each node has data, children, an ACL, and status information.
+ * The data for a node is read and write in its entirety.
+ * 
+ * ZooKeeper clients can leave watches when they queries the data or children
+ * of a node. If a watch is left, that client will be notified of the change.
+ * The notification is a one time trigger. Subsequent chances to the node will
+ * not trigger a notification unless the client issues a querity with the watch
+ * flag set. If the client is ever disconnected from the service, even if the
+ * disconnection is temporary, the watches of the client will be removed from
+ * the service, so a client must treat a disconnect notification as an implicit
+ * trigger of all outstanding watches.
+ * 
+ * When a node is created, it may be flagged as an ephemeral node. Ephemeral
+ * nodes are automatically removed when a client session is closed or when
+ * a session times out due to inactivity (the ZooKeeper runtime fills in
+ * periods of inactivity with pings). Ephemeral nodes cannot have children.
+ * 
+ * ZooKeeper clients are identified by a server assigned session id. For
+ * security reasons The server
+ * also generates a corresponding password for a session. A client may save its
+ * id and corresponding password to persistent storage in order to use the
+ * session across program invocation boundaries.
+ */
+
+/* Support for building on various platforms */
+
+// on cygwin we should take care of exporting/importing symbols properly 
+#ifdef DLL_EXPORT
+#    define ZOOAPI __declspec(dllexport)
+#else
+#  if defined(__CYGWIN__) && !defined(USE_STATIC_LIB)
+#    define ZOOAPI __declspec(dllimport)
+#  else
+#    define ZOOAPI
+#  endif
+#endif
+
+/** zookeeper return constants **/
+
+enum ZOO_ERRORS {
+  ZOK = 0, /*!< Everything is OK */
+
+  /** System and server-side errors.
+   * This is never thrown by the server, it shouldn't be used other than
+   * to indicate a range. Specifically error codes greater than this
+   * value, but lesser than {@link #ZAPIERROR}, are system errors. */
+  ZSYSTEMERROR = -1,
+  ZRUNTIMEINCONSISTENCY = -2, /*!< A runtime inconsistency was found */
+  ZDATAINCONSISTENCY = -3, /*!< A data inconsistency was found */
+  ZCONNECTIONLOSS = -4, /*!< Connection to the server has been lost */
+  ZMARSHALLINGERROR = -5, /*!< Error while marshalling or unmarshalling data */
+  ZUNIMPLEMENTED = -6, /*!< Operation is unimplemented */
+  ZOPERATIONTIMEOUT = -7, /*!< Operation timeout */
+  ZBADARGUMENTS = -8, /*!< Invalid arguments */
+  ZINVALIDSTATE = -9, /*!< Invliad zhandle state */
+
+  /** API errors.
+   * This is never thrown by the server, it shouldn't be used other than
+   * to indicate a range. Specifically error codes greater than this
+   * value are API errors (while values less than this indicate a 
+   * {@link #ZSYSTEMERROR}).
+   */
+  ZAPIERROR = -100,
+  ZNONODE = -101, /*!< Node does not exist */
+  ZNOAUTH = -102, /*!< Not authenticated */
+  ZBADVERSION = -103, /*!< Version conflict */
+  ZNOCHILDRENFOREPHEMERALS = -108, /*!< Ephemeral nodes may not have children */
+  ZNODEEXISTS = -110, /*!< The node already exists */
+  ZNOTEMPTY = -111, /*!< The node has children */
+  ZSESSIONEXPIRED = -112, /*!< The session has been expired by the server */
+  ZINVALIDCALLBACK = -113, /*!< Invalid callback specified */
+  ZINVALIDACL = -114, /*!< Invalid ACL specified */
+  ZAUTHFAILED = -115, /*!< Client authentication failed */
+  ZCLOSING = -116, /*!< ZooKeeper is closing */
+  ZNOTHING = -117, /*!< (not error) no server responses to process */
+  ZSESSIONMOVED = -118 /*!<session moved to another server, so operation is ignored */ 
+};
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+*  @name Debug levels
+*/
+typedef enum {ZOO_LOG_LEVEL_ERROR=1,ZOO_LOG_LEVEL_WARN=2,ZOO_LOG_LEVEL_INFO=3,ZOO_LOG_LEVEL_DEBUG=4} ZooLogLevel;
+
+/**
+ * @name ACL Consts
+ */
+extern ZOOAPI const int ZOO_PERM_READ;
+extern ZOOAPI const int ZOO_PERM_WRITE;
+extern ZOOAPI const int ZOO_PERM_CREATE;
+extern ZOOAPI const int ZOO_PERM_DELETE;
+extern ZOOAPI const int ZOO_PERM_ADMIN;
+extern ZOOAPI const int ZOO_PERM_ALL;
+
+/** This Id represents anyone. */
+extern ZOOAPI struct Id ZOO_ANYONE_ID_UNSAFE;
+/** This Id is only usable to set ACLs. It will get substituted with the
+ * Id's the client authenticated with.
+ */
+extern ZOOAPI struct Id ZOO_AUTH_IDS;
+
+/** This is a completely open ACL*/
+extern ZOOAPI struct ACL_vector ZOO_OPEN_ACL_UNSAFE;
+/** This ACL gives the world the ability to read. */
+extern ZOOAPI struct ACL_vector ZOO_READ_ACL_UNSAFE;
+/** This ACL gives the creators authentication id's all permissions. */
+extern ZOOAPI struct ACL_vector ZOO_CREATOR_ALL_ACL;
+
+/**
+ * @name Interest Consts
+ * These constants are used to express interest in an event and to
+ * indicate to zookeeper which events have occurred. They can
+ * be ORed together to express multiple interests. These flags are
+ * used in the interest and event parameters of 
+ * \ref zookeeper_interest and \ref zookeeper_process.
+ */
+// @{
+extern ZOOAPI const int ZOOKEEPER_WRITE;
+extern ZOOAPI const int ZOOKEEPER_READ;
+// @}
+
+/**
+ * @name Create Flags
+ * 
+ * These flags are used by zoo_create to affect node create. They may
+ * be ORed together to combine effects.
+ */
+// @{
+extern ZOOAPI const int ZOO_EPHEMERAL;
+extern ZOOAPI const int ZOO_SEQUENCE;
+// @}
+
+/**
+ * @name State Consts
+ * These constants represent the states of a zookeeper connection. They are
+ * possible parameters of the watcher callback.
+ */
+// @{
+extern ZOOAPI const int ZOO_EXPIRED_SESSION_STATE;
+extern ZOOAPI const int ZOO_AUTH_FAILED_STATE;
+extern ZOOAPI const int ZOO_CONNECTING_STATE;
+extern ZOOAPI const int ZOO_ASSOCIATING_STATE;
+extern ZOOAPI const int ZOO_CONNECTED_STATE;
+// @}
+
+/**
+ * @name Watch Types
+ * These constants indicate the event that caused the watch event. They are
+ * possible values of the first parameter of the watcher callback.
+ */
+// @{
+/**
+ * \brief a node has been created.
+ * 
+ * This is only generated by watches on non-existent nodes. These watches
+ * are set using \ref zoo_exists.
+ */
+extern ZOOAPI const int ZOO_CREATED_EVENT;
+/**
+ * \brief a node has been deleted.
+ * 
+ * This is only generated by watches on nodes. These watches
+ * are set using \ref zoo_exists and \ref zoo_get.
+ */
+extern ZOOAPI const int ZOO_DELETED_EVENT;
+/**
+ * \brief a node has changed.
+ * 
+ * This is only generated by watches on nodes. These watches
+ * are set using \ref zoo_exists and \ref zoo_get.
+ */
+extern ZOOAPI const int ZOO_CHANGED_EVENT;
+/**
+ * \brief a change as occurred in the list of children.
+ * 
+ * This is only generated by watches on the child list of a node. These watches
+ * are set using \ref zoo_get_children.
+ */
+extern ZOOAPI const int ZOO_CHILD_EVENT;
+/**
+ * \brief a session has been lost.
+ * 
+ * This is generated when a client loses contact or reconnects with a server.
+ */
+extern ZOOAPI const int ZOO_SESSION_EVENT;
+
+/**
+ * \brief a watch has been removed.
+ * 
+ * This is generated when the server for some reason, probably a resource
+ * constraint, will no longer watch a node for a client.
+ */
+extern ZOOAPI const int ZOO_NOTWATCHING_EVENT;
+// @}
+
+/**
+ * \brief ZooKeeper handle.
+ * 
+ * This is the handle that represents a connection to the ZooKeeper service.
+ * It is needed to invoke any ZooKeeper function. A handle is obtained using
+ * \ref zookeeper_init.
+ */
+typedef struct _zhandle zhandle_t;
+
+/**
+ * \brief client id structure.
+ * 
+ * This structure holds the id and password for the session. This structure
+ * should be treated as opaque. It is received from the server when a session
+ * is established and needs to be sent back as-is when reconnecting a session.
+ */
+typedef struct {
+    int64_t client_id;
+    char passwd[16];
+} clientid_t;
+
+/**
+ * \brief signature of a watch function.
+ * 
+ * There are two ways to receive watch notifications: legacy and watcher object.
+ * <p>
+ * The legacy style, an application wishing to receive events from ZooKeeper must 
+ * first implement a function with this signature and pass a pointer to the function 
+ * to \ref zookeeper_init. Next, the application sets a watch by calling one of 
+ * the getter API that accept the watch integer flag (for example, \ref zoo_aexists, 
+ * \ref zoo_get, etc).
+ * <p>
+ * The watcher object style uses an instance of a "watcher object" which in 
+ * the C world is represented by a pair: a pointer to a function implementing this
+ * signature and a pointer to watcher context -- handback user-specific data. 
+ * When a watch is triggered this function will be called along with 
+ * the watcher context. An application wishing to use this style must use
+ * the getter API functions with the "w" prefix in their names (for example, \ref
+ * zoo_awexists, \ref zoo_wget, etc).
+ * 
+ * \param zh zookeeper handle
+ * \param type event type. This is one of the *_EVENT constants. 
+ * \param state connection state. The state value will be one of the *_STATE constants.
+ * \param path znode path for which the watcher is triggered. NULL if the event 
+ * type is ZOO_SESSION_EVENT
+ * \param watcherCtx watcher context.
+ */
+typedef void (*watcher_fn)(zhandle_t *zh, int type, 
+        int state, const char *path,void *watcherCtx);
+
+/**
+ * \brief create a handle to used communicate with zookeeper.
+ * 
+ * This method creates a new handle and a zookeeper session that corresponds
+ * to that handle. Session establishment is asynchronous, meaning that the
+ * session should not be considered established until (and unless) an
+ * event of state ZOO_CONNECTED_STATE is received.
+ * \param host comma separated host:port pairs, each corresponding to a zk
+ *   server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+ * \param fn the global watcher callback function. When notifications are
+ *   triggered this function will be invoked.
+ * \param clientid the id of a previously established session that this
+ *   client will be reconnecting to. Pass 0 if not reconnecting to a previous
+ *   session. Clients can access the session id of an established, valid,
+ *   connection by calling \ref zoo_client_id. If the session corresponding to
+ *   the specified clientid has expired, or if the clientid is invalid for 
+ *   any reason, the returned zhandle_t will be invalid -- the zhandle_t 
+ *   state will indicate the reason for failure (typically
+ *   ZOO_EXPIRED_SESSION_STATE).
+ * \param context the handback object that will be associated with this instance 
+ *   of zhandle_t. Application can access it (for example, in the watcher 
+ *   callback) using \ref zoo_get_context. The object is not used by zookeeper 
+ *   internally and can be null.
+ * \param flags reserved for future use. Should be set to zero.
+ * \return a pointer to the opaque zhandle structure. If it fails to create 
+ * a new zhandle the function returns NULL and the errno variable 
+ * indicates the reason.
+ */
+ZOOAPI zhandle_t *zookeeper_init(const char *host, watcher_fn fn,
+  int recv_timeout, const clientid_t *clientid, void *context, int flags);
+
+/**
+ * \brief close the zookeeper handle and free up any resources.
+ * 
+ * After this call, the client session will no longer be valid. The function
+ * will flush any outstanding send requests before return. As a result it may 
+ * block.
+ *
+ * This method should only be called only once on a zookeeper handle. Calling
+ * twice will cause undefined (and probably undesirable behavior). Calling any other
+ * zookeeper method after calling close is undefined behaviour and should be avoided.
+ *
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \return a result code. Regardless of the error code returned, the zhandle 
+ * will be destroyed and all resources freed. 
+ *
+ * ZOK - success
+ * ZBADARGUMENTS - invalid input parameters
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ * ZOPERATIONTIMEOUT - failed to flush the buffers within the specified timeout.
+ * ZCONNECTIONLOSS - a network error occured while attempting to send request to server
+ * ZSYSTEMERROR -- a system (OS) error occured; it's worth checking errno to get details
+ */
+ZOOAPI int zookeeper_close(zhandle_t *zh);
+
+/**
+ * \brief return the client session id, only valid if the connections
+ * is currently connected (ie. last watcher state is ZOO_CONNECTED_STATE)
+ */
+ZOOAPI const clientid_t *zoo_client_id(zhandle_t *zh);
+
+ZOOAPI int zoo_recv_timeout(zhandle_t *zh);
+
+ZOOAPI const void *zoo_get_context(zhandle_t *zh);
+
+ZOOAPI void zoo_set_context(zhandle_t *zh, void *context);
+
+/**
+ * \brief set a watcher function
+ * \return previous watcher function
+ */
+ZOOAPI watcher_fn zoo_set_watcher(zhandle_t *zh,watcher_fn newFn);
+
+#ifndef THREADED
+/**
+ * \brief Returns the events that zookeeper is interested in.
+ * 
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param fd is the file descriptor of interest
+ * \param interest is an or of the ZOOKEEPER_WRITE and ZOOKEEPER_READ flags to
+ *    indicate the I/O of interest on fd.
+ * \param tv a timeout value to be used with select/poll system call
+ * \return a result code.
+ * ZOK - success
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZCONNECTIONLOSS - a network error occured while attempting to establish 
+ * a connection to the server
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ * ZOPERATIONTIMEOUT - hasn't received anything from the server for 2/3 of the
+ * timeout value specified in zookeeper_init()
+ * ZSYSTEMERROR -- a system (OS) error occured; it's worth checking errno to get details
+ */
+ZOOAPI int zookeeper_interest(zhandle_t *zh, int *fd, int *interest, 
+       struct timeval *tv);
+
+/**
+ * \brief Notifies zookeeper that an event of interest has happened.
+ * 
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param events will be an OR of the ZOOKEEPER_WRITE and ZOOKEEPER_READ flags.
+ * \return a result code. 
+ * ZOK - success
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZCONNECTIONLOSS - a network error occured while attempting to send request to server
+ * ZSESSIONEXPIRED - connection attempt failed -- the session's expired
+ * ZAUTHFAILED - authentication request failed, e.i. invalid credentials
+ * ZRUNTIMEINCONSISTENCY - a server response came out of order
+ * ZSYSTEMERROR -- a system (OS) error occured; it's worth checking errno to get details
+ * ZNOTHING -- not an error; simply indicates that there no more data from the server 
+ *              to be processed (when called with ZOOKEEPER_READ flag).
+ */
+ZOOAPI int zookeeper_process(zhandle_t *zh, int events);
+#endif
+
+/**
+ * \brief signature of a completion function for a call that returns void.
+ * 
+ * This method will be invoked at the end of a asynchronous call and also as 
+ * a result of connection loss or timeout.
+ * \param rc the error code of the call. Connection loss/timeout triggers 
+ * the completion with one of the following error codes:
+ * ZCONNECTIONLOSS -- lost connection to the server
+ * ZOPERATIONTIMEOUT -- connection timed out
+ * Data related events trigger the completion with error codes listed the 
+ * Exceptions section of the documentation of the function that initiated the
+ * call. (Zero indicates call was successful.)
+ * \param data the pointer that was passed by the caller when the function
+ *   that this completion corresponds to was invoked. The programmer
+ *   is responsible for any memory freeing associated with the data
+ *   pointer.
+ */
+typedef void (*void_completion_t)(int rc, const void *data);
+
+/**
+ * \brief signature of a completion function that returns a Stat structure.
+ * 
+ * This method will be invoked at the end of a asynchronous call and also as 
+ * a result of connection loss or timeout.
+ * \param rc the error code of the call. Connection loss/timeout triggers 
+ * the completion with one of the following error codes:
+ * ZCONNECTIONLOSS -- lost connection to the server
+ * ZOPERATIONTIMEOUT -- connection timed out
+ * Data related events trigger the completion with error codes listed the 
+ * Exceptions section of the documentation of the function that initiated the
+ * call. (Zero indicates call was successful.)
+ * \param stat a pointer to the stat information for the node involved in
+ *   this function. If a non zero error code is returned, the content of
+ *   stat is undefined. The programmer is NOT responsible for freeing stat.
+ * \param data the pointer that was passed by the caller when the function
+ *   that this completion corresponds to was invoked. The programmer
+ *   is responsible for any memory freeing associated with the data
+ *   pointer.
+ */
+typedef void (*stat_completion_t)(int rc, const struct Stat *stat,
+        const void *data);
+
+/**
+ * \brief signature of a completion function that returns data.
+ * 
+ * This method will be invoked at the end of a asynchronous call and also as 
+ * a result of connection loss or timeout.
+ * \param rc the error code of the call. Connection loss/timeout triggers 
+ * the completion with one of the following error codes:
+ * ZCONNECTIONLOSS -- lost connection to the server
+ * ZOPERATIONTIMEOUT -- connection timed out
+ * Data related events trigger the completion with error codes listed the 
+ * Exceptions section of the documentation of the function that initiated the
+ * call. (Zero indicates call was successful.)
+ * \param value the value of the information returned by the asynchronous call.
+ *   If a non zero error code is returned, the content of value is undefined.
+ *   The programmer is NOT responsible for freeing value.
+ * \param value_len the number of bytes in value.
+ * \param stat a pointer to the stat information for the node involved in
+ *   this function. If a non zero error code is returned, the content of
+ *   stat is undefined. The programmer is NOT responsible for freeing stat.
+ * \param data the pointer that was passed by the caller when the function
+ *   that this completion corresponds to was invoked. The programmer
+ *   is responsible for any memory freeing associated with the data
+ *   pointer.
+ */
+typedef void (*data_completion_t)(int rc, const char *value, int value_len,
+        const struct Stat *stat, const void *data);
+
+/**
+ * \brief signature of a completion function that returns a list of strings.
+ * 
+ * This method will be invoked at the end of a asynchronous call and also as 
+ * a result of connection loss or timeout.
+ * \param rc the error code of the call. Connection loss/timeout triggers 
+ * the completion with one of the following error codes:
+ * ZCONNECTIONLOSS -- lost connection to the server
+ * ZOPERATIONTIMEOUT -- connection timed out
+ * Data related events trigger the completion with error codes listed the 
+ * Exceptions section of the documentation of the function that initiated the
+ * call. (Zero indicates call was successful.)
+ * \param strings a pointer to the structure containng the list of strings of the
+ *   names of the children of a node. If a non zero error code is returned,
+ *   the content of strings is undefined. The programmer is NOT responsible
+ *   for freeing strings.
+ * \param data the pointer that was passed by the caller when the function
+ *   that this completion corresponds to was invoked. The programmer
+ *   is responsible for any memory freeing associated with the data
+ *   pointer.
+ */
+typedef void (*strings_completion_t)(int rc,
+        const struct String_vector *strings, const void *data);
+
+/**
+ * \brief signature of a completion function that returns a list of strings.
+ * 
+ * This method will be invoked at the end of a asynchronous call and also as 
+ * a result of connection loss or timeout.
+ * \param rc the error code of the call. Connection loss/timeout triggers 
+ * the completion with one of the following error codes:
+ * ZCONNECTIONLOSS -- lost connection to the server
+ * ZOPERATIONTIMEOUT -- connection timed out
+ * Data related events trigger the completion with error codes listed the 
+ * Exceptions section of the documentation of the function that initiated the
+ * call. (Zero indicates call was successful.)
+ * \param value the value of the string returned.
+ * \param data the pointer that was passed by the caller when the function
+ *   that this completion corresponds to was invoked. The programmer
+ *   is responsible for any memory freeing associated with the data
+ *   pointer.
+ */
+typedef void
+        (*string_completion_t)(int rc, const char *value, const void *data);
+
+/**
+ * \brief signature of a completion function that returns an ACL.
+ * 
+ * This method will be invoked at the end of a asynchronous call and also as 
+ * a result of connection loss or timeout.
+ * \param rc the error code of the call. Connection loss/timeout triggers 
+ * the completion with one of the following error codes:
+ * ZCONNECTIONLOSS -- lost connection to the server
+ * ZOPERATIONTIMEOUT -- connection timed out
+ * Data related events trigger the completion with error codes listed the 
+ * Exceptions section of the documentation of the function that initiated the
+ * call. (Zero indicates call was successful.)
+ * \param acl a pointer to the structure containng the ACL of a node. If a non 
+ *   zero error code is returned, the content of strings is undefined. The
+ *   programmer is NOT responsible for freeing acl.
+ * \param stat a pointer to the stat information for the node involved in
+ *   this function. If a non zero error code is returned, the content of
+ *   stat is undefined. The programmer is NOT responsible for freeing stat.
+ * \param data the pointer that was passed by the caller when the function
+ *   that this completion corresponds to was invoked. The programmer
+ *   is responsible for any memory freeing associated with the data
+ *   pointer.
+ */
+typedef void (*acl_completion_t)(int rc, struct ACL_vector *acl,
+        struct Stat *stat, const void *data);
+
+/**
+ * \brief get the state of the zookeeper connection.
+ * 
+ * The return value will be one of the \ref State Consts.
+ */
+ZOOAPI int zoo_state(zhandle_t *zh);
+
+/**
+ * \brief create a node.
+ * 
+ * This method will create a node in ZooKeeper. A node can only be created if
+ * it does not already exists. The Create Flags affect the creation of nodes.
+ * If ZOO_EPHEMERAL flag is set, the node will automatically get removed if the
+ * client session goes away. If the ZOO_SEQUENCE flag is set, a unique
+ * monotonically increasing sequence number is appended to the path name.
+ * 
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path The name of the node. Expressed as a file name with slashes 
+ * separating ancestors of the node.
+ * \param value The data to be stored in the node.
+ * \param valuelen The number of bytes in data.
+ * \param acl The initial ACL of the node. If null, the ACL of the parent will be
+ *    used.
+ * \param flags this parameter can be set to 0 for normal create or an OR
+ *    of the Create Flags
+ * \param completion the routine to invoke when the request completes. The completion
+ * will be triggered with one of the following codes passed in as the rc argument:
+ * ZOK operation completed succesfully
+ * ZNONODE the parent node does not exist.
+ * ZNODEEXISTS the node already exists
+ * ZNOAUTH the client does not have permission.
+ * ZNOCHILDRENFOREPHEMERALS cannot create children of ephemeral nodes.
+ * \param data The data that will be passed to the completion routine when the 
+ * function completes.
+ * \return ZOK on success or one of the following errcodes on failure:
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ZOOAPI int zoo_acreate(zhandle_t *zh, const char *path, const char *value, 
+        int valuelen, const struct ACL_vector *acl, int flags,
+        string_completion_t completion, const void *data);
+
+/**
+ * \brief delete a node in zookeeper.
+ * 
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path the name of the node. Expressed as a file name with slashes 
+ * separating ancestors of the node.
+ * \param version the expected version of the node. The function will fail if the
+ *    actual version of the node does not match the expected version.
+ *  If -1 is used the version check will not take place. 
+ * \param completion the routine to invoke when the request completes. The completion
+ * will be triggered with one of the following codes passed in as the rc argument:
+ * ZOK operation completed succesfully
+ * ZNONODE the node does not exist.
+ * ZNOAUTH the client does not have permission.
+ * ZBADVERSION expected version does not match actual version.
+ * ZNOTEMPTY children are present; node cannot be deleted.
+ * \param data the data that will be passed to the completion routine when 
+ * the function completes.
+ * \return ZOK on success or one of the following errcodes on failure:
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ZOOAPI int zoo_adelete(zhandle_t *zh, const char *path, int version, 
+        void_completion_t completion, const void *data);
+
+/**
+ * \brief checks the existence of a node in zookeeper.
+ * 
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path the name of the node. Expressed as a file name with slashes 
+ * separating ancestors of the node.
+ * \param watch if nonzero, a watch will be set at the server to notify the 
+ * client if the node changes. The watch will be set even if the node does not 
+ * exist. This allows clients to watch for nodes to appear.
+ * \param completion the routine to invoke when the request completes. The completion
+ * will be triggered with one of the following codes passed in as the rc argument:
+ * ZOK operation completed succesfully
+ * ZNONODE the node does not exist.
+ * ZNOAUTH the client does not have permission.
+ * \param data the data that will be passed to the completion routine when the 
+ * function completes.
+ * \return ZOK on success or one of the following errcodes on failure:
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ZOOAPI int zoo_aexists(zhandle_t *zh, const char *path, int watch, 
+        stat_completion_t completion, const void *data);
+
+/**
+ * \brief checks the existence of a node in zookeeper.
+ * 
+ * This function is similar to \ref zoo_axists except it allows one specify 
+ * a watcher object - a function pointer and associated context. The function
+ * will be called once the watch has fired. The associated context data will be 
+ * passed to the function as the watcher context parameter. 
+ * 
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path the name of the node. Expressed as a file name with slashes 
+ * separating ancestors of the node.
+ * \param watcher if non-null a watch will set on the specified znode on the server.
+ * The watch will be set even if the node does not exist. This allows clients 
+ * to watch for nodes to appear.
+ * \param watcherCtx user specific data, will be passed to the watcher callback.
+ * Unlike the global context set by \ref zookeeper_init, this watcher context
+ * is associated with the given instance of the watcher only.
+ * \param completion the routine to invoke when the request completes. The completion
+ * will be triggered with one of the following codes passed in as the rc argument:
+ * ZOK operation completed succesfully
+ * ZNONODE the node does not exist.
+ * ZNOAUTH the client does not have permission.
+ * \param data the data that will be passed to the completion routine when the 
+ * function completes.
+ * \return ZOK on success or one of the following errcodes on failure:
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ZOOAPI int zoo_awexists(zhandle_t *zh, const char *path, 
+        watcher_fn watcher, void* watcherCtx, 
+        stat_completion_t completion, const void *data);
+
+/**
+ * \brief gets the data associated with a node.
+ * 
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path the name of the node. Expressed as a file name with slashes 
+ * separating ancestors of the node.
+ * \param watch if nonzero, a watch will be set at the server to notify 
+ * the client if the node changes.
+ * \param completion the routine to invoke when the request completes. The completion
+ * will be triggered with one of the following codes passed in as the rc argument:
+ * ZOK operation completed succesfully
+ * ZNONODE the node does not exist.
+ * ZNOAUTH the client does not have permission.
+ * \param data the data that will be passed to the completion routine when 
+ * the function completes.
+ * \return ZOK on success or one of the following errcodes on failure:
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either in ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ZOOAPI int zoo_aget(zhandle_t *zh, const char *path, int watch, 
+        data_completion_t completion, const void *data);
+
+/**
+ * \brief gets the data associated with a node.
+ * 
+ * This function is similar to \ref zoo_aget except it allows one specify 
+ * a watcher object rather than a boolean watch flag. 
+ *
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path the name of the node. Expressed as a file name with slashes 
+ * separating ancestors of the node.
+ * \param watcher if non-null, a watch will be set at the server to notify 
+ * the client if the node changes.
+ * \param watcherCtx user specific data, will be passed to the watcher callback.
+ * Unlike the global context set by \ref zookeeper_init, this watcher context
+ * is associated with the given instance of the watcher only.
+ * \param completion the routine to invoke when the request completes. The completion
+ * will be triggered with one of the following codes passed in as the rc argument:
+ * ZOK operation completed succesfully
+ * ZNONODE the node does not exist.
+ * ZNOAUTH the client does not have permission.
+ * \param data the data that will be passed to the completion routine when 
+ * the function completes.
+ * \return ZOK on success or one of the following errcodes on failure:
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either in ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ZOOAPI int zoo_awget(zhandle_t *zh, const char *path, 
+        watcher_fn watcher, void* watcherCtx, 
+        data_completion_t completion, const void *data);
+
+/**
+ * \brief sets the data associated with a node.
+ * 
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path the name of the node. Expressed as a file name with slashes 
+ * separating ancestors of the node.
+ * \param buffer the buffer holding data to be written to the node.
+ * \param buflen the number of bytes from buffer to write.
+ * \param version the expected version of the node. The function will fail if 
+ * the actual version of the node does not match the expected version. If -1 is 
+ * used the version check will not take place. * completion: If null, 
+ * the function will execute synchronously. Otherwise, the function will return 
+ * immediately and invoke the completion routine when the request completes.
+ * \param completion the routine to invoke when the request completes. The completion
+ * will be triggered with one of the following codes passed in as the rc argument:
+ * ZOK operation completed succesfully
+ * ZNONODE the node does not exist.
+ * ZNOAUTH the client does not have permission.
+ * ZBADVERSION expected version does not match actual version.
+ * \param data the data that will be passed to the completion routine when 
+ * the function completes.
+ * \return ZOK on success or one of the following errcodes on failure:
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ZOOAPI int zoo_aset(zhandle_t *zh, const char *path, const char *buffer, int buflen, 
+        int version, stat_completion_t completion, const void *data);
+
+/**
+ * \brief lists the children of a node.
+ * 
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path the name of the node. Expressed as a file name with slashes 
+ * separating ancestors of the node.
+ * \param watch if nonzero, a watch will be set at the server to notify 
+ * the client if the node changes.
+ * \param completion the routine to invoke when the request completes. The completion
+ * will be triggered with one of the following codes passed in as the rc argument:
+ * ZOK operation completed succesfully
+ * ZNONODE the node does not exist.
+ * ZNOAUTH the client does not have permission.
+ * \param data the data that will be passed to the completion routine when 
+ * the function completes.
+ * \return ZOK on success or one of the following errcodes on failure:
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ZOOAPI int zoo_aget_children(zhandle_t *zh, const char *path, int watch, 
+        strings_completion_t completion, const void *data);
+
+/**
+ * \brief lists the children of a node.
+ * 
+ * This function is similar to \ref zoo_aget_children except it allows one specify 
+ * a watcher object rather than a boolean watch flag.
+ *  
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path the name of the node. Expressed as a file name with slashes 
+ * separating ancestors of the node.
+ * \param watcher if non-null, a watch will be set at the server to notify 
+ * the client if the node changes.
+ * \param watcherCtx user specific data, will be passed to the watcher callback.
+ * Unlike the global context set by \ref zookeeper_init, this watcher context
+ * is associated with the given instance of the watcher only.
+ * \param completion the routine to invoke when the request completes. The completion
+ * will be triggered with one of the following codes passed in as the rc argument:
+ * ZOK operation completed succesfully
+ * ZNONODE the node does not exist.
+ * ZNOAUTH the client does not have permission.
+ * \param data the data that will be passed to the completion routine when 
+ * the function completes.
+ * \return ZOK on success or one of the following errcodes on failure:
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ZOOAPI int zoo_awget_children(zhandle_t *zh, const char *path,
+        watcher_fn watcher, void* watcherCtx, 
+        strings_completion_t completion, const void *data);
+
+/**
+ * \brief Flush leader channel.
+ *
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path the name of the node. Expressed as a file name with slashes
+ * separating ancestors of the node.
+ * \param completion the routine to invoke when the request completes. The completion
+ * will be triggered with one of the following codes passed in as the rc argument:
+ * ZOK operation completed succesfully
+ * ZNONODE the node does not exist.
+ * ZNOAUTH the client does not have permission.
+ * \param data the data that will be passed to the completion routine when
+ * the function completes.
+ * \return ZOK on success or one of the following errcodes on failure:
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+
+ZOOAPI int zoo_async(zhandle_t *zh, const char *path, 
+        string_completion_t completion, const void *data);
+
+
+/**
+ * \brief gets the acl associated with a node.
+ * 
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path the name of the node. Expressed as a file name with slashes 
+ * separating ancestors of the node.
+ * \param completion the routine to invoke when the request completes. The completion
+ * will be triggered with one of the following codes passed in as the rc argument:
+ * ZOK operation completed succesfully
+ * ZNONODE the node does not exist.
+ * ZNOAUTH the client does not have permission.
+ * \param data the data that will be passed to the completion routine when 
+ * the function completes.
+ * \return ZOK on success or one of the following errcodes on failure:
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ZOOAPI int zoo_aget_acl(zhandle_t *zh, const char *path, acl_completion_t completion, 
+        const void *data);
+
+/**
+ * \brief sets the acl associated with a node.
+ * 
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path the name of the node. Expressed as a file name with slashes 
+ * separating ancestors of the node.
+ * \param buffer the buffer holding the acls to be written to the node.
+ * \param buflen the number of bytes from buffer to write.
+ * \param completion the routine to invoke when the request completes. The completion
+ * will be triggered with one of the following codes passed in as the rc argument:
+ * ZOK operation completed succesfully
+ * ZNONODE the node does not exist.
+ * ZNOAUTH the client does not have permission.
+ * ZINVALIDACL invalid ACL specified
+ * ZBADVERSION expected version does not match actual version.
+ * \param data the data that will be passed to the completion routine when 
+ * the function completes.
+ * \return ZOK on success or one of the following errcodes on failure:
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ZOOAPI int zoo_aset_acl(zhandle_t *zh, const char *path, int version, 
+        struct ACL_vector *acl, void_completion_t, const void *data);
+
+/**
+ * \brief return an error string.
+ * 
+ * \param return code
+ * \return string corresponding to the return code
+ */
+ZOOAPI const char* zerror(int c);
+
+/**
+ * \brief specify application credentials.
+ * 
+ * The application calls this function to specify its credentials for purposes
+ * of authentication. The server will use the security provider specified by 
+ * the scheme parameter to authenticate the client connection. If the 
+ * authentication request has failed:
+ * - the server connection is dropped
+ * - the watcher is called with the ZOO_AUTH_FAILED_STATE value as the state 
+ * parameter.
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param scheme the id of authentication scheme. Natively supported:
+ * "digest" password-based authentication
+ * \param cert application credentials. The actual value depends on the scheme.
+ * \param certLen the length of the data parameter
+ * \param completion the routine to invoke when the request completes. One of 
+ * the following result codes may be passed into the completion callback:
+ * ZOK operation completed successfully
+ * ZAUTHFAILED authentication failed 
+ * \param data the data that will be passed to the completion routine when the 
+ * function completes.
+ * \return ZOK on success or one of the following errcodes on failure:
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ * ZSYSTEMERROR - a system error occured
+ */
+ZOOAPI int zoo_add_auth(zhandle_t *zh,const char* scheme,const char* cert, 
+       int certLen, void_completion_t completion, const void *data);
+
+/**
+ * \brief checks if the current zookeeper connection state can't be recovered.
+ * 
+ *  The application must close the zhandle and try to reconnect.
+ * 
+ * \param zh the zookeeper handle (see \ref zookeeper_init)
+ * \return ZINVALIDSTATE if connection is unrecoverable
+ */
+ZOOAPI int is_unrecoverable(zhandle_t *zh);
+
+/**
+ * \brief sets the debugging level for the library 
+ */
+ZOOAPI void zoo_set_debug_level(ZooLogLevel logLevel);
+
+/**
+ * \brief sets the stream to be used by the library for logging 
+ * 
+ * The zookeeper library uses stderr as its default log stream. Application
+ * must make sure the stream is writable. Passing in NULL resets the stream 
+ * to its default value (stderr).
+ */
+ZOOAPI void zoo_set_log_stream(FILE* logStream);
+
+/**
+ * \brief enable/disable quorum endpoint order randomization
+ * 
+ * If passed a non-zero value, will make the client connect to quorum peers
+ * in the order as specified in the zookeeper_init() call.
+ * A zero value causes zookeeper_init() to permute the peer endpoints
+ * which is good for more even client connection distribution among the 
+ * quorum peers.
+ */
+ZOOAPI void zoo_deterministic_conn_order(int yesOrNo);
+
+/**
+ * \brief create a node synchronously.
+ * 
+ * This method will create a node in ZooKeeper. A node can only be created if
+ * it does not already exists. The Create Flags affect the creation of nodes.
+ * If ZOO_EPHEMERAL flag is set, the node will automatically get removed if the
+ * client session goes away. If the ZOO_SEQUENCE flag is set, a unique
+ * monotonically increasing sequence number is appended to the path name.
+ * 
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path The name of the node. Expressed as a file name with slashes 
+ * separating ancestors of the node.
+ * \param value The data to be stored in the node.
+ * \param valuelen The number of bytes in data. To set the data to be NULL use
+  * value as NULL and valuelen as -1.
+ * \param acl The initial ACL of the node. If null, the ACL of the parent will be
+ *    used.
+ * \param flags this parameter can be set to 0 for normal create or an OR
+ *    of the Create Flags
+ * \param path_buffer Buffer which will be filled with the path of the
+ *    new node (this might be different than the supplied path
+ *    because of the ZOO_SEQUENCE flag).  The path string will always be
+ *    null-terminated.
+ * \param path_buffer_len Size of path buffer; if the path of the new
+ *    node (including space for the null terminator) exceeds the buffer size,
+ *    the path string will be truncated to fit.  The actual path of the
+ *    new node in the server will not be affected by the truncation.
+ *    The path string will always be null-terminated.
+ * \return  one of the following codes are returned:
+ * ZOK operation completed succesfully
+ * ZNONODE the parent node does not exist.
+ * ZNODEEXISTS the node already exists
+ * ZNOAUTH the client does not have permission.
+ * ZNOCHILDRENFOREPHEMERALS cannot create children of ephemeral nodes.
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ZOOAPI int zoo_create(zhandle_t *zh, const char *path, const char *value,
+        int valuelen, const struct ACL_vector *acl, int flags,
+        char *path_buffer, int path_buffer_len);
+
+/**
+ * \brief delete a node in zookeeper synchronously.
+ * 
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path the name of the node. Expressed as a file name with slashes 
+ * separating ancestors of the node.
+ * \param version the expected version of the node. The function will fail if the
+ *    actual version of the node does not match the expected version.
+ *  If -1 is used the version check will not take place. 
+ * \return one of the following values is returned.
+ * ZOK operation completed succesfully
+ * ZNONODE the node does not exist.
+ * ZNOAUTH the client does not have permission.
+ * ZBADVERSION expected version does not match actual version.
+ * ZNOTEMPTY children are present; node cannot be deleted.
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ZOOAPI int zoo_delete(zhandle_t *zh, const char *path, int version);
+
+
+/**
+ * \brief checks the existence of a node in zookeeper synchronously.
+ * 
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path the name of the node. Expressed as a file name with slashes 
+ * separating ancestors of the node.
+ * \param watch if nonzero, a watch will be set at the server to notify the 
+ * client if the node changes. The watch will be set even if the node does not 
+ * exist. This allows clients to watch for nodes to appear.
+ * \param the return stat value of the node.
+ * \return  return code of the function call.
+ * ZOK operation completed succesfully
+ * ZNONODE the node does not exist.
+ * ZNOAUTH the client does not have permission.
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ZOOAPI int zoo_exists(zhandle_t *zh, const char *path, int watch, struct Stat *stat);
+
+/**
+ * \brief checks the existence of a node in zookeeper synchronously.
+ * 
+ * This function is similar to \ref zoo_exists except it allows one specify 
+ * a watcher object rather than a boolean watch flag.
+ * 
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path the name of the node. Expressed as a file name with slashes 
+ * separating ancestors of the node.
+ * \param watcher if non-null a watch will set on the specified znode on the server.
+ * The watch will be set even if the node does not exist. This allows clients 
+ * to watch for nodes to appear.
+ * \param watcherCtx user specific data, will be passed to the watcher callback.
+ * Unlike the global context set by \ref zookeeper_init, this watcher context
+ * is associated with the given instance of the watcher only.
+ * \param the return stat value of the node.
+ * \return  return code of the function call.
+ * ZOK operation completed succesfully
+ * ZNONODE the node does not exist.
+ * ZNOAUTH the client does not have permission.
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ZOOAPI int zoo_wexists(zhandle_t *zh, const char *path,
+        watcher_fn watcher, void* watcherCtx, struct Stat *stat);
+
+/**
+ * \brief gets the data associated with a node synchronously.
+ * 
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path the name of the node. Expressed as a file name with slashes 
+ * separating ancestors of the node.
+ * \param watch if nonzero, a watch will be set at the server to notify 
+ * the client if the node changes.
+ * \param buffer the buffer holding the node data returned by the server
+ * \param buffer_len is the size of the buffer pointed to by the buffer parameter.
+ * It'll be set to the actual data length upon return. If the data is NULL, length is -1.
+ * \param stat if not NULL, will hold the value of stat for the path on return.
+ * \return return value of the function call.
+ * ZOK operation completed succesfully
+ * ZNONODE the node does not exist.
+ * ZNOAUTH the client does not have permission.
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either in ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ZOOAPI int zoo_get(zhandle_t *zh, const char *path, int watch, char *buffer,   
+                   int* buffer_len, struct Stat *stat);
+
+/**
+ * \brief gets the data associated with a node synchronously.
+ * 
+ * This function is similar to \ref zoo_get except it allows one specify 
+ * a watcher object rather than a boolean watch flag.
+ * 
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path the name of the node. Expressed as a file name with slashes 
+ * separating ancestors of the node.
+ * \param watcher if non-null, a watch will be set at the server to notify 
+ * the client if the node changes.
+ * \param watcherCtx user specific data, will be passed to the watcher callback.
+ * Unlike the global context set by \ref zookeeper_init, this watcher context
+ * is associated with the given instance of the watcher only.
+ * \param buffer the buffer holding the node data returned by the server
+ * \param buffer_len is the size of the buffer pointed to by the buffer parameter.
+ * It'll be set to the actual data length upon return. If the data is NULL, length is -1.
+ * \param stat if not NULL, will hold the value of stat for the path on return.
+ * \return return value of the function call.
+ * ZOK operation completed succesfully
+ * ZNONODE the node does not exist.
+ * ZNOAUTH the client does not have permission.
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either in ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ZOOAPI int zoo_wget(zhandle_t *zh, const char *path, 
+        watcher_fn watcher, void* watcherCtx, 
+        char *buffer, int* buffer_len, struct Stat *stat);
+
+/**
+ * \brief sets the data associated with a node. See zoo_set2 function if
+ * you require access to the stat information associated with the znode.
+ * 
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path the name of the node. Expressed as a file name with slashes 
+ * separating ancestors of the node.
+ * \param buffer the buffer holding data to be written to the node.
+ * \param buflen the number of bytes from buffer to write. To set NULL as data 
+ * use buffer as NULL and buflen as -1.
+ * \param version the expected version of the node. The function will fail if 
+ * the actual version of the node does not match the expected version. If -1 is 
+ * used the version check will not take place. 
+ * \return the return code for the function call.
+ * ZOK operation completed succesfully
+ * ZNONODE the node does not exist.
+ * ZNOAUTH the client does not have permission.
+ * ZBADVERSION expected version does not match actual version.
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ZOOAPI int zoo_set(zhandle_t *zh, const char *path, const char *buffer,
+                   int buflen, int version);
+
+/**
+ * \brief sets the data associated with a node. This function is the same
+ * as zoo_set except that it also provides access to stat information
+ * associated with the znode.
+ * 
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path the name of the node. Expressed as a file name with slashes 
+ * separating ancestors of the node.
+ * \param buffer the buffer holding data to be written to the node.
+ * \param buflen the number of bytes from buffer to write. To set NULL as data
+ * use buffer as NULL and buflen as -1.
+ * \param version the expected version of the node. The function will fail if 
+ * the actual version of the node does not match the expected version. If -1 is 
+ * used the version check will not take place. 
+ * \param stat if not NULL, will hold the value of stat for the path on return.
+ * \return the return code for the function call.
+ * ZOK operation completed succesfully
+ * ZNONODE the node does not exist.
+ * ZNOAUTH the client does not have permission.
+ * ZBADVERSION expected version does not match actual version.
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ZOOAPI int zoo_set2(zhandle_t *zh, const char *path, const char *buffer,
+                   int buflen, int version, struct Stat *stat);
+
+/**
+ * \brief lists the children of a node synchronously.
+ * 
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path the name of the node. Expressed as a file name with slashes 
+ * separating ancestors of the node.
+ * \param watch if nonzero, a watch will be set at the server to notify 
+ * the client if the node changes.
+ * \param strings return value of children paths.
+ * \return the return code of the function.
+ * ZOK operation completed succesfully
+ * ZNONODE the node does not exist.
+ * ZNOAUTH the client does not have permission.
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ZOOAPI int zoo_get_children(zhandle_t *zh, const char *path, int watch,
+                            struct String_vector *strings);
+
+/**
+ * \brief lists the children of a node synchronously.
+ * 
+ * This function is similar to \ref zoo_get_children except it allows one specify 
+ * a watcher object rather than a boolean watch flag.
+ * 
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path the name of the node. Expressed as a file name with slashes 
+ * separating ancestors of the node.
+ * \param watcher if non-null, a watch will be set at the server to notify 
+ * the client if the node changes.
+ * \param watcherCtx user specific data, will be passed to the watcher callback.
+ * Unlike the global context set by \ref zookeeper_init, this watcher context
+ * is associated with the given instance of the watcher only.
+ * \param strings return value of children paths.
+ * \return the return code of the function.
+ * ZOK operation completed succesfully
+ * ZNONODE the node does not exist.
+ * ZNOAUTH the client does not have permission.
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ZOOAPI int zoo_wget_children(zhandle_t *zh, const char *path, 
+        watcher_fn watcher, void* watcherCtx,
+        struct String_vector *strings);
+
+/**
+ * \brief gets the acl associated with a node synchronously.
+ * 
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path the name of the node. Expressed as a file name with slashes 
+ * separating ancestors of the node.
+ * \param acl the return value of acls on the path.
+ * \param stat returns the stat of the path specified.
+ * \return the return code for the function call.
+ * ZOK operation completed succesfully
+ * ZNONODE the node does not exist.
+ * ZNOAUTH the client does not have permission.
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ZOOAPI int zoo_get_acl(zhandle_t *zh, const char *path, struct ACL_vector *acl,
+                       struct Stat *stat);
+
+/**
+ * \brief sets the acl associated with a node synchronously.
+ * 
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path the name of the node. Expressed as a file name with slashes 
+ * separating ancestors of the node.
+ * \param version the expected version of the path.
+ * \param acl the acl to be set on the path. 
+ * \return the return code for the function call.
+ * ZOK operation completed succesfully
+ * ZNONODE the node does not exist.
+ * ZNOAUTH the client does not have permission.
+ * ZINVALIDACL invalid ACL specified
+ * ZBADVERSION expected version does not match actual version.
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ZOOAPI int zoo_set_acl(zhandle_t *zh, const char *path, int version,
+                           const struct ACL_vector *acl);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /*ZOOKEEPER_H_*/
diff --git a/include/zookeeper/zookeeper.jute.h b/include/zookeeper/zookeeper.jute.h
new file mode 100644 (file)
index 0000000..2a32934
--- /dev/null
@@ -0,0 +1,376 @@
+#ifndef __ZOOKEEPER_JUTE__
+#define __ZOOKEEPER_JUTE__
+#include "recordio.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct Id {
+    char * scheme;
+    char * id;
+};
+int serialize_Id(struct oarchive *out, const char *tag, struct Id *v);
+int deserialize_Id(struct iarchive *in, const char *tag, struct Id*v);
+void deallocate_Id(struct Id*);
+struct ACL {
+    int32_t perms;
+    struct Id id;
+};
+int serialize_ACL(struct oarchive *out, const char *tag, struct ACL *v);
+int deserialize_ACL(struct iarchive *in, const char *tag, struct ACL*v);
+void deallocate_ACL(struct ACL*);
+struct Stat {
+    int64_t czxid;
+    int64_t mzxid;
+    int64_t ctime;
+    int64_t mtime;
+    int32_t version;
+    int32_t cversion;
+    int32_t aversion;
+    int64_t ephemeralOwner;
+    int32_t dataLength;
+    int32_t numChildren;
+    int64_t pzxid;
+};
+int serialize_Stat(struct oarchive *out, const char *tag, struct Stat *v);
+int deserialize_Stat(struct iarchive *in, const char *tag, struct Stat*v);
+void deallocate_Stat(struct Stat*);
+struct StatPersisted {
+    int64_t czxid;
+    int64_t mzxid;
+    int64_t ctime;
+    int64_t mtime;
+    int32_t version;
+    int32_t cversion;
+    int32_t aversion;
+    int64_t ephemeralOwner;
+    int64_t pzxid;
+};
+int serialize_StatPersisted(struct oarchive *out, const char *tag, struct StatPersisted *v);
+int deserialize_StatPersisted(struct iarchive *in, const char *tag, struct StatPersisted*v);
+void deallocate_StatPersisted(struct StatPersisted*);
+struct StatPersistedV1 {
+    int64_t czxid;
+    int64_t mzxid;
+    int64_t ctime;
+    int64_t mtime;
+    int32_t version;
+    int32_t cversion;
+    int32_t aversion;
+    int64_t ephemeralOwner;
+};
+int serialize_StatPersistedV1(struct oarchive *out, const char *tag, struct StatPersistedV1 *v);
+int deserialize_StatPersistedV1(struct iarchive *in, const char *tag, struct StatPersistedV1*v);
+void deallocate_StatPersistedV1(struct StatPersistedV1*);
+struct op_result_t {
+    int32_t rc;
+    int32_t op;
+    struct buffer response;
+};
+int serialize_op_result_t(struct oarchive *out, const char *tag, struct op_result_t *v);
+int deserialize_op_result_t(struct iarchive *in, const char *tag, struct op_result_t*v);
+void deallocate_op_result_t(struct op_result_t*);
+struct ConnectRequest {
+    int32_t protocolVersion;
+    int64_t lastZxidSeen;
+    int32_t timeOut;
+    int64_t sessionId;
+    struct buffer passwd;
+};
+int serialize_ConnectRequest(struct oarchive *out, const char *tag, struct ConnectRequest *v);
+int deserialize_ConnectRequest(struct iarchive *in, const char *tag, struct ConnectRequest*v);
+void deallocate_ConnectRequest(struct ConnectRequest*);
+struct ConnectResponse {
+    int32_t protocolVersion;
+    int32_t timeOut;
+    int64_t sessionId;
+    struct buffer passwd;
+};
+int serialize_ConnectResponse(struct oarchive *out, const char *tag, struct ConnectResponse *v);
+int deserialize_ConnectResponse(struct iarchive *in, const char *tag, struct ConnectResponse*v);
+void deallocate_ConnectResponse(struct ConnectResponse*);
+struct String_vector {
+    int32_t count;
+    char * *data;
+;
+};
+int serialize_String_vector(struct oarchive *out, const char *tag, struct String_vector *v);
+int deserialize_String_vector(struct iarchive *in, const char *tag, struct String_vector *v);
+int allocate_String_vector(struct String_vector *v, int32_t len);
+int deallocate_String_vector(struct String_vector *v);
+struct SetWatches {
+    int64_t relativeZxid;
+    struct String_vector dataWatches;
+    struct String_vector existWatches;
+    struct String_vector childWatches;
+};
+int serialize_SetWatches(struct oarchive *out, const char *tag, struct SetWatches *v);
+int deserialize_SetWatches(struct iarchive *in, const char *tag, struct SetWatches*v);
+void deallocate_SetWatches(struct SetWatches*);
+struct RequestHeader {
+    int32_t xid;
+    int32_t type;
+};
+int serialize_RequestHeader(struct oarchive *out, const char *tag, struct RequestHeader *v);
+int deserialize_RequestHeader(struct iarchive *in, const char *tag, struct RequestHeader*v);
+void deallocate_RequestHeader(struct RequestHeader*);
+struct AuthPacket {
+    int32_t type;
+    char * scheme;
+    struct buffer auth;
+};
+int serialize_AuthPacket(struct oarchive *out, const char *tag, struct AuthPacket *v);
+int deserialize_AuthPacket(struct iarchive *in, const char *tag, struct AuthPacket*v);
+void deallocate_AuthPacket(struct AuthPacket*);
+struct ReplyHeader {
+    int32_t xid;
+    int64_t zxid;
+    int32_t err;
+};
+int serialize_ReplyHeader(struct oarchive *out, const char *tag, struct ReplyHeader *v);
+int deserialize_ReplyHeader(struct iarchive *in, const char *tag, struct ReplyHeader*v);
+void deallocate_ReplyHeader(struct ReplyHeader*);
+struct GetDataRequest {
+    char * path;
+    int32_t watch;
+};
+int serialize_GetDataRequest(struct oarchive *out, const char *tag, struct GetDataRequest *v);
+int deserialize_GetDataRequest(struct iarchive *in, const char *tag, struct GetDataRequest*v);
+void deallocate_GetDataRequest(struct GetDataRequest*);
+struct SetDataRequest {
+    char * path;
+    struct buffer data;
+    int32_t version;
+};
+int serialize_SetDataRequest(struct oarchive *out, const char *tag, struct SetDataRequest *v);
+int deserialize_SetDataRequest(struct iarchive *in, const char *tag, struct SetDataRequest*v);
+void deallocate_SetDataRequest(struct SetDataRequest*);
+struct SetDataResponse {
+    struct Stat stat;
+};
+int serialize_SetDataResponse(struct oarchive *out, const char *tag, struct SetDataResponse *v);
+int deserialize_SetDataResponse(struct iarchive *in, const char *tag, struct SetDataResponse*v);
+void deallocate_SetDataResponse(struct SetDataResponse*);
+struct ACL_vector {
+    int32_t count;
+    struct ACL *data;
+;
+};
+int serialize_ACL_vector(struct oarchive *out, const char *tag, struct ACL_vector *v);
+int deserialize_ACL_vector(struct iarchive *in, const char *tag, struct ACL_vector *v);
+int allocate_ACL_vector(struct ACL_vector *v, int32_t len);
+int deallocate_ACL_vector(struct ACL_vector *v);
+struct CreateRequest {
+    char * path;
+    struct buffer data;
+    struct ACL_vector acl;
+    int32_t flags;
+};
+int serialize_CreateRequest(struct oarchive *out, const char *tag, struct CreateRequest *v);
+int deserialize_CreateRequest(struct iarchive *in, const char *tag, struct CreateRequest*v);
+void deallocate_CreateRequest(struct CreateRequest*);
+struct DeleteRequest {
+    char * path;
+    int32_t version;
+};
+int serialize_DeleteRequest(struct oarchive *out, const char *tag, struct DeleteRequest *v);
+int deserialize_DeleteRequest(struct iarchive *in, const char *tag, struct DeleteRequest*v);
+void deallocate_DeleteRequest(struct DeleteRequest*);
+struct GetChildrenRequest {
+    char * path;
+    int32_t watch;
+};
+int serialize_GetChildrenRequest(struct oarchive *out, const char *tag, struct GetChildrenRequest *v);
+int deserialize_GetChildrenRequest(struct iarchive *in, const char *tag, struct GetChildrenRequest*v);
+void deallocate_GetChildrenRequest(struct GetChildrenRequest*);
+struct GetMaxChildrenRequest {
+    char * path;
+};
+int serialize_GetMaxChildrenRequest(struct oarchive *out, const char *tag, struct GetMaxChildrenRequest *v);
+int deserialize_GetMaxChildrenRequest(struct iarchive *in, const char *tag, struct GetMaxChildrenRequest*v);
+void deallocate_GetMaxChildrenRequest(struct GetMaxChildrenRequest*);
+struct GetMaxChildrenResponse {
+    int32_t max;
+};
+int serialize_GetMaxChildrenResponse(struct oarchive *out, const char *tag, struct GetMaxChildrenResponse *v);
+int deserialize_GetMaxChildrenResponse(struct iarchive *in, const char *tag, struct GetMaxChildrenResponse*v);
+void deallocate_GetMaxChildrenResponse(struct GetMaxChildrenResponse*);
+struct SetMaxChildrenRequest {
+    char * path;
+    int32_t max;
+};
+int serialize_SetMaxChildrenRequest(struct oarchive *out, const char *tag, struct SetMaxChildrenRequest *v);
+int deserialize_SetMaxChildrenRequest(struct iarchive *in, const char *tag, struct SetMaxChildrenRequest*v);
+void deallocate_SetMaxChildrenRequest(struct SetMaxChildrenRequest*);
+struct SyncRequest {
+    char * path;
+};
+int serialize_SyncRequest(struct oarchive *out, const char *tag, struct SyncRequest *v);
+int deserialize_SyncRequest(struct iarchive *in, const char *tag, struct SyncRequest*v);
+void deallocate_SyncRequest(struct SyncRequest*);
+struct SyncResponse {
+    char * path;
+};
+int serialize_SyncResponse(struct oarchive *out, const char *tag, struct SyncResponse *v);
+int deserialize_SyncResponse(struct iarchive *in, const char *tag, struct SyncResponse*v);
+void deallocate_SyncResponse(struct SyncResponse*);
+struct GetACLRequest {
+    char * path;
+};
+int serialize_GetACLRequest(struct oarchive *out, const char *tag, struct GetACLRequest *v);
+int deserialize_GetACLRequest(struct iarchive *in, const char *tag, struct GetACLRequest*v);
+void deallocate_GetACLRequest(struct GetACLRequest*);
+struct SetACLRequest {
+    char * path;
+    struct ACL_vector acl;
+    int32_t version;
+};
+int serialize_SetACLRequest(struct oarchive *out, const char *tag, struct SetACLRequest *v);
+int deserialize_SetACLRequest(struct iarchive *in, const char *tag, struct SetACLRequest*v);
+void deallocate_SetACLRequest(struct SetACLRequest*);
+struct SetACLResponse {
+    struct Stat stat;
+};
+int serialize_SetACLResponse(struct oarchive *out, const char *tag, struct SetACLResponse *v);
+int deserialize_SetACLResponse(struct iarchive *in, const char *tag, struct SetACLResponse*v);
+void deallocate_SetACLResponse(struct SetACLResponse*);
+struct WatcherEvent {
+    int32_t type;
+    int32_t state;
+    char * path;
+};
+int serialize_WatcherEvent(struct oarchive *out, const char *tag, struct WatcherEvent *v);
+int deserialize_WatcherEvent(struct iarchive *in, const char *tag, struct WatcherEvent*v);
+void deallocate_WatcherEvent(struct WatcherEvent*);
+struct CreateResponse {
+    char * path;
+};
+int serialize_CreateResponse(struct oarchive *out, const char *tag, struct CreateResponse *v);
+int deserialize_CreateResponse(struct iarchive *in, const char *tag, struct CreateResponse*v);
+void deallocate_CreateResponse(struct CreateResponse*);
+struct ExistsRequest {
+    char * path;
+    int32_t watch;
+};
+int serialize_ExistsRequest(struct oarchive *out, const char *tag, struct ExistsRequest *v);
+int deserialize_ExistsRequest(struct iarchive *in, const char *tag, struct ExistsRequest*v);
+void deallocate_ExistsRequest(struct ExistsRequest*);
+struct ExistsResponse {
+    struct Stat stat;
+};
+int serialize_ExistsResponse(struct oarchive *out, const char *tag, struct ExistsResponse *v);
+int deserialize_ExistsResponse(struct iarchive *in, const char *tag, struct ExistsResponse*v);
+void deallocate_ExistsResponse(struct ExistsResponse*);
+struct GetDataResponse {
+    struct buffer data;
+    struct Stat stat;
+};
+int serialize_GetDataResponse(struct oarchive *out, const char *tag, struct GetDataResponse *v);
+int deserialize_GetDataResponse(struct iarchive *in, const char *tag, struct GetDataResponse*v);
+void deallocate_GetDataResponse(struct GetDataResponse*);
+struct GetChildrenResponse {
+    struct String_vector children;
+};
+int serialize_GetChildrenResponse(struct oarchive *out, const char *tag, struct GetChildrenResponse *v);
+int deserialize_GetChildrenResponse(struct iarchive *in, const char *tag, struct GetChildrenResponse*v);
+void deallocate_GetChildrenResponse(struct GetChildrenResponse*);
+struct GetACLResponse {
+    struct ACL_vector acl;
+    struct Stat stat;
+};
+int serialize_GetACLResponse(struct oarchive *out, const char *tag, struct GetACLResponse *v);
+int deserialize_GetACLResponse(struct iarchive *in, const char *tag, struct GetACLResponse*v);
+void deallocate_GetACLResponse(struct GetACLResponse*);
+struct Id_vector {
+    int32_t count;
+    struct Id *data;
+;
+};
+int serialize_Id_vector(struct oarchive *out, const char *tag, struct Id_vector *v);
+int deserialize_Id_vector(struct iarchive *in, const char *tag, struct Id_vector *v);
+int allocate_Id_vector(struct Id_vector *v, int32_t len);
+int deallocate_Id_vector(struct Id_vector *v);
+struct QuorumPacket {
+    int32_t type;
+    int64_t zxid;
+    struct buffer data;
+    struct Id_vector authinfo;
+};
+int serialize_QuorumPacket(struct oarchive *out, const char *tag, struct QuorumPacket *v);
+int deserialize_QuorumPacket(struct iarchive *in, const char *tag, struct QuorumPacket*v);
+void deallocate_QuorumPacket(struct QuorumPacket*);
+struct FileHeader {
+    int32_t magic;
+    int32_t version;
+    int64_t dbid;
+};
+int serialize_FileHeader(struct oarchive *out, const char *tag, struct FileHeader *v);
+int deserialize_FileHeader(struct iarchive *in, const char *tag, struct FileHeader*v);
+void deallocate_FileHeader(struct FileHeader*);
+struct TxnHeader {
+    int64_t clientId;
+    int32_t cxid;
+    int64_t zxid;
+    int64_t time;
+    int32_t type;
+};
+int serialize_TxnHeader(struct oarchive *out, const char *tag, struct TxnHeader *v);
+int deserialize_TxnHeader(struct iarchive *in, const char *tag, struct TxnHeader*v);
+void deallocate_TxnHeader(struct TxnHeader*);
+struct CreateTxn {
+    char * path;
+    struct buffer data;
+    struct ACL_vector acl;
+    int32_t ephemeral;
+};
+int serialize_CreateTxn(struct oarchive *out, const char *tag, struct CreateTxn *v);
+int deserialize_CreateTxn(struct iarchive *in, const char *tag, struct CreateTxn*v);
+void deallocate_CreateTxn(struct CreateTxn*);
+struct DeleteTxn {
+    char * path;
+};
+int serialize_DeleteTxn(struct oarchive *out, const char *tag, struct DeleteTxn *v);
+int deserialize_DeleteTxn(struct iarchive *in, const char *tag, struct DeleteTxn*v);
+void deallocate_DeleteTxn(struct DeleteTxn*);
+struct SetDataTxn {
+    char * path;
+    struct buffer data;
+    int32_t version;
+};
+int serialize_SetDataTxn(struct oarchive *out, const char *tag, struct SetDataTxn *v);
+int deserialize_SetDataTxn(struct iarchive *in, const char *tag, struct SetDataTxn*v);
+void deallocate_SetDataTxn(struct SetDataTxn*);
+struct SetACLTxn {
+    char * path;
+    struct ACL_vector acl;
+    int32_t version;
+};
+int serialize_SetACLTxn(struct oarchive *out, const char *tag, struct SetACLTxn *v);
+int deserialize_SetACLTxn(struct iarchive *in, const char *tag, struct SetACLTxn*v);
+void deallocate_SetACLTxn(struct SetACLTxn*);
+struct SetMaxChildrenTxn {
+    char * path;
+    int32_t max;
+};
+int serialize_SetMaxChildrenTxn(struct oarchive *out, const char *tag, struct SetMaxChildrenTxn *v);
+int deserialize_SetMaxChildrenTxn(struct iarchive *in, const char *tag, struct SetMaxChildrenTxn*v);
+void deallocate_SetMaxChildrenTxn(struct SetMaxChildrenTxn*);
+struct CreateSessionTxn {
+    int32_t timeOut;
+};
+int serialize_CreateSessionTxn(struct oarchive *out, const char *tag, struct CreateSessionTxn *v);
+int deserialize_CreateSessionTxn(struct iarchive *in, const char *tag, struct CreateSessionTxn*v);
+void deallocate_CreateSessionTxn(struct CreateSessionTxn*);
+struct ErrorTxn {
+    int32_t err;
+};
+int serialize_ErrorTxn(struct oarchive *out, const char *tag, struct ErrorTxn *v);
+int deserialize_ErrorTxn(struct iarchive *in, const char *tag, struct ErrorTxn*v);
+void deallocate_ErrorTxn(struct ErrorTxn*);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif //ZOOKEEPER_JUTE__
diff --git a/include/zookeeper/zookeeper_log.h b/include/zookeeper/zookeeper_log.h
new file mode 100644 (file)
index 0000000..e5917cb
--- /dev/null
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ZK_LOG_H_
+#define ZK_LOG_H_
+
+#include <zookeeper.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+extern ZOOAPI ZooLogLevel logLevel;
+#define LOGSTREAM getLogStream()
+
+#define LOG_ERROR(x) if(logLevel>=ZOO_LOG_LEVEL_ERROR) \
+    log_message(ZOO_LOG_LEVEL_ERROR,__LINE__,__func__,format_log_message x)
+#define LOG_WARN(x) if(logLevel>=ZOO_LOG_LEVEL_WARN) \
+    log_message(ZOO_LOG_LEVEL_WARN,__LINE__,__func__,format_log_message x)
+#define LOG_INFO(x) if(logLevel>=ZOO_LOG_LEVEL_INFO) \
+    log_message(ZOO_LOG_LEVEL_INFO,__LINE__,__func__,format_log_message x)
+#define LOG_DEBUG(x) if(logLevel==ZOO_LOG_LEVEL_DEBUG) \
+    log_message(ZOO_LOG_LEVEL_DEBUG,__LINE__,__func__,format_log_message x)
+
+ZOOAPI void log_message(ZooLogLevel curLevel, int line,const char* funcName,
+    const char* message);
+
+ZOOAPI const char* format_log_message(const char* format,...);
+
+FILE* getLogStream();
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /*ZK_LOG_H_*/
diff --git a/include/zookeeper/zookeeper_version.h b/include/zookeeper/zookeeper_version.h
new file mode 100644 (file)
index 0000000..b0f88bc
--- /dev/null
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ZOOKEEPER_VERSION_H_
+#define ZOOKEEPER_VERSION_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#define ZOO_MAJOR_VERSION 3
+#define ZOO_MINOR_VERSION 2
+#define ZOO_PATCH_VERSION 1
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ZOOKEEPER_VERSION_H_ */
index f7bce51..4e44de0 100644 (file)
@@ -57,23 +57,20 @@ plugin="@libdir@/ulogd_DRL.so"
 # It will never go above this rate, even if capacity is
 # available.  A value of 0 means unlimited.
 nodelimit=0
-
-# Leave this at FPS right now...
+# DRL allocation policy: FPS for flow proportional share
+# or GRD for global random drop.
 policy=FPS
-
-# Smallest interval at which identities are processed.
+# estintms is the system tick time (estimate interval),
+# in milliseconds.
 estintms=500
-
-# Log file location.
-drl_logfile="/var/log/drl.log"
-
+# The location of the DRL logfile.
+drl_logfile="/var/log/ulogd-drl.log"
 # The verbosity of the DRL logfile. 1 = most verbose,
 # 3 = errors only.
-drl_loglevel 2
-
-# Location of the DRL xml configuration file.
-drl_configfile="/etc/drl.xml"
-
+drl_loglevel=3
+# The location of the DRL configuration file, which
+# specifies which identities should be installed.
+drl_configfile="@etcdir@/drl.xml"
 
 [NETFLOW]
 # PlanetLab NetFlow logging
index 7ab2204..c4df9fc 100644 (file)
@@ -99,6 +99,7 @@ rm -rf %{buildroot}
 %attr(0755,root,root) %{_sbindir}/ulogd
 #%attr(0755,root,root) %{_bindir}/netflow-import
 %{_sysconfdir}/ulogd.conf
+%{_sysconfdir}/drl.xml
 %{_sysconfdir}/logrotate.d/ulogd
 %attr(0755,root,root) %{_sysconfdir}/rc.d/init.d/ulogd
 %{_mandir}/man8/*