Created
May 30, 2014 09:09
-
-
Save arekinath/4d0b3e014fbd260fee1a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/c_src/nif.c b/c_src/nif.c | |
index 7ec1fca..93f111e 100644 | |
--- a/c_src/nif.c | |
+++ b/c_src/nif.c | |
@@ -96,10 +96,12 @@ struct cache_queue { | |
ErlNifUInt64 size; /* sum of node->size for all nodes in the queue */ | |
}; | |
-#define FL_DYING 1 | |
+#define FL_DYING 1 | |
struct atom_node; | |
+#define N_INCR_BKT 8 | |
+ | |
/* lock ordering: cache_lock then lookup_lock then ctrl_lock */ | |
struct cache { | |
ErlNifUInt64 max_size; /* these are only set at construction */ | |
@@ -110,7 +112,9 @@ struct cache { | |
ErlNifUInt64 miss; | |
int flags; | |
- TAILQ_HEAD(cache_incr_q, cache_incr_node) incr_head; | |
+ TAILQ_HEAD(cache_incr_q, cache_incr_node) incr_head[N_INCR_BKT]; | |
+ ErlNifMutex *incr_lock[N_INCR_BKT]; | |
+ | |
int incr_count; | |
ErlNifMutex *ctrl_lock; | |
ErlNifCond *check_cond; | |
@@ -199,6 +203,7 @@ static void | |
destroy_cache_node(struct cache_node *n) | |
{ | |
struct cache_incr_node *in, *nextin; | |
+ int i; | |
TAILQ_REMOVE(&(n->q->head), n, entry); | |
n->q->size -= n->size; | |
@@ -207,15 +212,19 @@ destroy_cache_node(struct cache_node *n) | |
if (n->expiry.tv_sec != 0) | |
RB_REMOVE(expiry_tree, &(n->c->expiry_head), n); | |
- nextin = TAILQ_FIRST(&(n->c->incr_head)); | |
- while ((in = nextin)) { | |
- nextin = TAILQ_NEXT(in, entry); | |
- if (in->node == n) { | |
- TAILQ_REMOVE(&(n->c->incr_head), in, entry); | |
- --(n->c->incr_count); | |
- in->node = 0; | |
- enif_free(in); | |
+ for (i = 0; i < N_INCR_BKT; ++i) { | |
+ enif_mutex_lock(n->c->incr_lock[i]); | |
+ nextin = TAILQ_FIRST(&(n->c->incr_head[i])); | |
+ while ((in = nextin)) { | |
+ nextin = TAILQ_NEXT(in, entry); | |
+ if (in->node == n) { | |
+ TAILQ_REMOVE(&(n->c->incr_head[i]), in, entry); | |
+ __sync_sub_and_fetch(&(n->c->incr_count), 1); | |
+ in->node = 0; | |
+ enif_free(in); | |
+ } | |
} | |
+ enif_mutex_unlock(n->c->incr_lock[i]); | |
} | |
n->c = NULL; | |
@@ -230,6 +239,7 @@ static void * | |
cache_bg_thread(void *arg) | |
{ | |
struct cache *c = (struct cache *)arg; | |
+ int i; | |
while (1) { | |
enif_mutex_lock(c->ctrl_lock); | |
@@ -251,33 +261,39 @@ cache_bg_thread(void *arg) | |
enif_mutex_lock(c->ctrl_lock); | |
/* first process the promotion queue before we do any evicting */ | |
- while (!TAILQ_EMPTY(&(c->incr_head))) { | |
- struct cache_incr_node *n; | |
- n = TAILQ_FIRST(&(c->incr_head)); | |
- TAILQ_REMOVE(&(c->incr_head), n, entry); | |
- --(c->incr_count); | |
- | |
- /* let go of the ctrl_lock here, we don't need it when we aren't looking | |
- at the incr_queue, and this way other threads can use it while we shuffle | |
- queue nodes around */ | |
- enif_mutex_unlock(c->ctrl_lock); | |
+ for (i = 0; i < N_INCR_BKT; ++i) { | |
+ enif_mutex_lock(c->incr_lock[i]); | |
+ while (!TAILQ_EMPTY(&(c->incr_head[i]))) { | |
+ struct cache_incr_node *n; | |
+ n = TAILQ_FIRST(&(c->incr_head[i])); | |
+ TAILQ_REMOVE(&(c->incr_head[i]), n, entry); | |
+ __sync_sub_and_fetch(&(c->incr_count), 1); | |
+ | |
+ /* let go of the ctrl_lock here, we don't need it when we aren't looking | |
+ at the incr_queue, and this way other threads can use it while we shuffle | |
+ queue nodes around */ | |
+ enif_mutex_unlock(c->incr_lock[i]); | |
+ enif_mutex_unlock(c->ctrl_lock); | |
+ | |
+ if (n->node->q == &(c->q1)) { | |
+ TAILQ_REMOVE(&(c->q1.head), n->node, entry); | |
+ c->q1.size -= n->node->size; | |
+ TAILQ_INSERT_HEAD(&(c->q2.head), n->node, entry); | |
+ n->node->q = &(c->q2); | |
+ c->q2.size += n->node->size; | |
+ | |
+ } else if (n->node->q == &(c->q2)) { | |
+ TAILQ_REMOVE(&(c->q2.head), n->node, entry); | |
+ TAILQ_INSERT_HEAD(&(c->q2.head), n->node, entry); | |
+ } | |
- if (n->node->q == &(c->q1)) { | |
- TAILQ_REMOVE(&(c->q1.head), n->node, entry); | |
- c->q1.size -= n->node->size; | |
- TAILQ_INSERT_HEAD(&(c->q2.head), n->node, entry); | |
- n->node->q = &(c->q2); | |
- c->q2.size += n->node->size; | |
+ enif_free(n); | |
- } else if (n->node->q == &(c->q2)) { | |
- TAILQ_REMOVE(&(c->q2.head), n->node, entry); | |
- TAILQ_INSERT_HEAD(&(c->q2.head), n->node, entry); | |
+ /* take the ctrl_lock back again for the next loop around */ | |
+ enif_mutex_lock(c->ctrl_lock); | |
+ enif_mutex_lock(c->incr_lock[i]); | |
} | |
- | |
- enif_free(n); | |
- | |
- /* take the ctrl_lock back again for the next loop around */ | |
- enif_mutex_lock(c->ctrl_lock); | |
+ enif_mutex_unlock(c->incr_lock[i]); | |
} | |
/* let go of the ctrl_lock here for two reasons: | |
@@ -343,18 +359,6 @@ cache_bg_thread(void *arg) | |
c->atom_node = NULL; | |
- /* free the incr_queue */ | |
- { | |
- struct cache_incr_node *in, *nextin; | |
- nextin = TAILQ_FIRST(&(c->incr_head)); | |
- while ((in = nextin)) { | |
- nextin = TAILQ_NEXT(in, entry); | |
- TAILQ_REMOVE(&(c->incr_head), in, entry); | |
- in->node = 0; | |
- enif_free(in); | |
- } | |
- } | |
- | |
/* free the actual cache queues */ | |
{ | |
struct cache_node *n, *nextn; | |
@@ -370,6 +374,23 @@ cache_bg_thread(void *arg) | |
} | |
} | |
+ for (i = 0; i < N_INCR_BKT; ++i) | |
+ enif_mutex_lock(c->incr_lock[i]); | |
+ | |
+ /* free the incr_queue */ | |
+ for (i = 0; i < N_INCR_BKT; ++i) { | |
+ struct cache_incr_node *in, *nextin; | |
+ nextin = TAILQ_FIRST(&(c->incr_head[i])); | |
+ while ((in = nextin)) { | |
+ nextin = TAILQ_NEXT(in, entry); | |
+ TAILQ_REMOVE(&(c->incr_head[i]), in, entry); | |
+ in->node = 0; | |
+ enif_free(in); | |
+ } | |
+ enif_mutex_unlock(c->incr_lock[i]); | |
+ enif_mutex_destroy(c->incr_lock[i]); | |
+ } | |
+ | |
/* unlock and destroy! */ | |
enif_cond_destroy(c->check_cond); | |
@@ -410,6 +431,7 @@ new_cache(ERL_NIF_TERM atom, int max_size, int min_q1_size) | |
{ | |
struct cache *c; | |
struct atom_node *an; | |
+ int i; | |
c = enif_alloc(sizeof(*c)); | |
memset(c, 0, sizeof(*c)); | |
@@ -421,7 +443,10 @@ new_cache(ERL_NIF_TERM atom, int max_size, int min_q1_size) | |
c->check_cond = enif_cond_create("cache->check_cond"); | |
TAILQ_INIT(&(c->q1.head)); | |
TAILQ_INIT(&(c->q2.head)); | |
- TAILQ_INIT(&(c->incr_head)); | |
+ for (i = 0; i < N_INCR_BKT; ++i) { | |
+ TAILQ_INIT(&(c->incr_head[i])); | |
+ c->incr_lock[i] = enif_mutex_create("cache->incr_lock"); | |
+ } | |
RB_INIT(&(c->expiry_head)); | |
an = enif_alloc(sizeof(*an)); | |
@@ -561,7 +586,7 @@ stats(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) | |
enif_rwlock_rlock(c->cache_lock); | |
q1s = enif_make_uint64(env, c->q1.size); | |
q2s = enif_make_uint64(env, c->q2.size); | |
- incrs = enif_make_uint64(env, c->incr_count); | |
+ incrs = enif_make_uint64(env, __sync_fetch_and_add(&(c->incr_count), 0)); | |
enif_rwlock_runlock(c->cache_lock); | |
ret = enif_make_tuple5(env, | |
enif_make_uint64(env, c->hit), | |
@@ -666,8 +691,9 @@ get(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) | |
struct cache_node *n; | |
struct cache_incr_node *in; | |
struct timespec now; | |
- int incrqs; | |
+ int incrqs, hashv, bkt; | |
ERL_NIF_TERM ret; | |
+ ErlNifTid tid; | |
if (!enif_is_atom(env, argv[0])) | |
return enif_make_badarg(env); | |
@@ -701,10 +727,12 @@ get(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) | |
in->node = n; | |
__sync_add_and_fetch(&c->hit, 1); | |
- enif_mutex_lock(c->ctrl_lock); | |
- TAILQ_INSERT_TAIL(&(c->incr_head), in, entry); | |
- incrqs = ++(c->incr_count); | |
- enif_mutex_unlock(c->ctrl_lock); | |
+ tid = enif_thread_self(); | |
+ HASH_SFH(&tid, sizeof(ErlNifTid), N_INCR_BKT, hashv, bkt); | |
+ enif_mutex_lock(c->incr_lock[bkt]); | |
+ TAILQ_INSERT_TAIL(&(c->incr_head[bkt]), in, entry); | |
+ enif_mutex_unlock(c->incr_lock[bkt]); | |
+ incrqs = __sync_add_and_fetch(&(c->incr_count), 1); | |
ret = enif_make_resource_binary(env, n->val, n->val, n->vsize); | |
enif_rwlock_runlock(c->lookup_lock); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment