multicore-locks/litl

malthusian 的spinlock版本实现错误

Opened this issue · 0 comments

malthusian的spinlock版本后背队列不会进入到睡眠状态,与文章不符。
这里提供一个简单的malthusian 的实现
/*

  • The MIT License (MIT)
  • Copyright (c) 2016 Hugo Guiroux <hugo.guiroux at gmail dot com>
  • Permission is hereby granted, free of charge, to any person obtaining a copy
  • of his software and associated documentation files (the "Software"), to deal
  • in the Software without restriction, including without limitation the rights
  • to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  • copies of the Software, and to permit persons to whom the Software is
  • furnished to do so, subject to the following conditions:
  • The above copyright notice and this permission notice shall be included in
  • all copies or substantial portions of the Software.
  • THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  • IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  • FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  • AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  • LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  • OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  • SOFTWARE.
  • Dave Dice. 2015.
  • Malthusian Locks.
  • In CoRR (arXiv).
  • Idea: this is a classical MCS lock, but to avoid contention, we allow
  • ourselves to modify the waiting queue to
  • put asides some threads for some times.
  • The fairness is ensured by randomly putting back the "asides" threads into
  • the active waiting queue of the lock.
  • Note: this version has been validated by the author.
    */
    #include <stdlib.h>
    #include <stdint.h>
    #include <string.h>
    #include <stdio.h>
    #include <errno.h>
    #include <sys/mman.h>
    #include <pthread.h>
    #include <assert.h>
    #include <unistd.h>
    #include <papi.h>
    #include <malthusian.h>
    // #include <linux/futex.h>
    #include "waiting_policy.h"
    #include "interpose.h"
    #include "utils.h"
    #define COND_VAR 1
    extern __thread unsigned int cur_thread_id;
    int len = 0;
    // From D.Dice https://blogs.oracle.com/dave/entry/a_simple_prng_idiom

static inline uint32_t xor_random() {
static __thread uint32_t rv = 0;

if (rv == 0)
    rv = cur_thread_id + 1;

uint32_t v = rv;
v ^= v << 6;
v ^= (uint32_t)(v) >> 21;
v ^= v << 7;
rv = v;
// printf("random %d\n", v & (UNLOCK_COUNT_THRESHOLD - 1));
return v & (UNLOCK_COUNT_THRESHOLD - 1);

}

malthusian_mutex_t *malthusian_mutex_create(const pthread_mutexattr_t *attr) {
malthusian_mutex_t *impl =
(malthusian_mutex_t *)alloc_cache_align(sizeof(malthusian_mutex_t));
impl->tail = 0;
impl->passive_set_head = 0;
impl->passive_set_tail = 0;
#ifdef COND_VAR
REAL(pthread_mutex_init)(&impl->posix_lock, attr);
#endif

return impl;

}

static int __malthusian_mutex_lock(malthusian_mutex_t *impl,
malthusian_node_t *me) {
malthusian_node_t *tail;
assert(me != NULL);

me->next = 0;
me->spin = LOCKED;
me->tid = cur_thread_id;
tail = __atomic_exchange_n(&impl->tail, me, __ATOMIC_RELEASE);
/* No one there? */
if (!tail) {
    goto succ;
}

/* Someone there, need to link in */
// tail->next = me;
// COMPILER_BARRIER();
// printf("tid %d in queue %d\n", me->tid, me->spin);
 __atomic_store_n(&tail->next, me, __ATOMIC_RELEASE);
/* Spin on my spin variable */
// waiting_policy_sleep(&me->spin);
me->status = S_SPINING;
while (me->spin == 0) {
	CPU_PAUSE();
	if (me->status == S_PARKING) {
        // printf("tid %d sleep\n", me->tid);
        waiting_policy_sleep(&me->status);
        // printf("tid %d waked\n", me->tid);
    }
			
}

succ:
// printf("tid %d lock succ %d\n", me->tid, me->spin);
/* Should preseve acquire semantic here */
MEMORY_BARRIER();
return 0;
}

int malthusian_mutex_lock(malthusian_mutex_t *impl, malthusian_node_t *me) {
int ret = __malthusian_mutex_lock(impl, me);
assert(ret == 0);
#ifdef COND_VAR
ret = REAL(pthread_mutex_lock)(&impl->posix_lock);
assert(ret == 0);
#endif

return 0;

}

int malthusian_mutex_trylock(malthusian_mutex_t *impl, malthusian_node_t *me) {
malthusian_node_t *tail;

me->next = 0;
me->spin = LOCKED;

/* Try to lock */
tail = __sync_val_compare_and_swap(&impl->tail, 0, me);

/* No one was there - can quickly return */
if (!tail) {

#ifdef COND_VAR
int ret = 0;
while ((ret = REAL(pthread_mutex_trylock)(&impl->posix_lock)) == EBUSY)
CPU_PAUSE();

    assert(ret == 0);

#endif
return 0;
}

return EBUSY;

}

// Helper functions to manage the passive set
static inline malthusian_node_t *
passive_set_pop_back(malthusian_mutex_t *impl) {
malthusian_node_t *elem = impl->passive_set_tail;
if (elem == 0)
return NULL;

impl->passive_set_tail = elem->prev;
if (impl->passive_set_tail == 0)
    impl->passive_set_head = 0;
else
    impl->passive_set_tail->next = 0;

elem->prev = 0;
elem->next = 0;
// printf("pop %d back\n", elem->tid);
return elem;

}

static inline malthusian_node_t *
passive_set_pop_front(malthusian_mutex_t *impl) {
malthusian_node_t *elem = impl->passive_set_head;
if (elem == 0)
return NULL;

impl->passive_set_head = elem->next;
if (impl->passive_set_head == 0)
    impl->passive_set_tail = 0;
else
    impl->passive_set_head->prev = 0;

elem->prev = 0;
elem->next = 0;
// printf("pop %d\n", elem->tid);
return elem;

}

static inline void passive_set_push_front(malthusian_mutex_t *impl,
malthusian_node_t *elem) {
malthusian_node_t *prev_head = impl->passive_set_head;
elem->next = prev_head;
elem->prev = 0;
// printf("push %d\n", elem->tid);
impl->passive_set_head = elem;

if (prev_head != 0) {
    prev_head->prev = elem;
}

if (impl->passive_set_tail == 0)
    impl->passive_set_tail = elem;

}

static void __malthusian_insert_at_head(malthusian_mutex_t *impl,
malthusian_node_t cur_head,
malthusian_node_t new_elem) {
/

* Cur tail is either the current lock holder or a new thread enqueued in
* the meantime (note that several new threads may be enqueued).
* We insert new_elem just behind cur_head and (if any) in front of the
* queue of new threads.
*/
malthusian_node_t cur_tail =
__sync_val_compare_and_swap(&impl->tail, cur_head, new_elem);
if (cur_tail == cur_head) {
cur_head->next = new_elem;
} else {
/

* One or several other threads managed to get inserted in the queue
* before new_elem.
* In this case, we must wait for the first thread to finish
* its insertion and then insert new_elem in front of it.
**/
while (!cur_head->next)
CPU_PAUSE();
new_elem->next = cur_head->next;
cur_head->next = new_elem;
COMPILER_BARRIER();
}
}

static void __malthusian_mutex_unlock(malthusian_mutex_t impl,
malthusian_node_t me) {
/

* "To ensure long-term fairness, the unlock operator periodically
* selects the tail of the excess list T as the successor and then
* grafts T into the main MCS chain immediately after the
* lock-holder's element, passing ownership of the lock to T"
**/
if (xor_random() == 0) {
DEBUG("[%d] Insert T as successor of me\n", cur_thread_id);
malthusian_node_t *elem = passive_set_pop_back(impl);
if (elem != 0) {
// len--;
__malthusian_insert_at_head(impl, me, elem);
// printf("me->tid %d insert head wake %d\n", me->tid, me->next->tid);
waiting_policy_wake(&elem->status);
// sys_futex(&elem->status, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0);
me->next->spin = 1;
return;
}
}

/* No successor yet? */
if (!me->next) {
    /**
     * "Conversely, at unlock-time if the main queue is empty
     * except for the owner's node, we then extract a node
     * from the head of the passive list, insert it into the
     * queue at the tail and pass ownership to that thread."
     **/
    //  printf("here no next %d ?\n", me->tid);
    DEBUG("[%d - %p] Trying to extract from PS because no waiter\n",
          cur_thread_id, me);
    malthusian_node_t *extract_next = passive_set_pop_front(impl);
    if (extract_next == 0) {
        DEBUG("[%d - %p] No passive thread, old code\n", cur_thread_id, me);
        /* Try to atomically unlock */
        if (__sync_val_compare_and_swap(&impl->tail, me, 0) == me)
            return;

        /* Wait for successor to appear */
        DEBUG("[%d - %p] Wait for successor to appear\n", cur_thread_id,
              me);
        while (!me->next)
            CPU_PAUSE();
        // printf("1 wake tid %d  \n", me->next->tid);
        waiting_policy_wake(&me->next->status);
        me->next->spin = 1;
        return;
    } else {
        DEBUG("[%d - %p] Fetching thread from PS %p\n", cur_thread_id, me,
              extract_next);
        // len--;
        __malthusian_insert_at_head(impl, me, extract_next);
        // printf("2 wake tid %d insert %d \n", me->next->tid, me->tid);
        waiting_policy_wake(&me->next->status);
        me->next->spin = 1;
        return;
    }
}

/**
 * "At unlock-time, if there exists anypa intermediate nodes in the
 * queue between the owner's node and the current tail, then we
 * have a surplus threads in the ACS and we can unlink and excise
 * one of those nodes and transfer it to the head of the passive
 * list where excess "cold" threads reside."
 * Note that we systematically choose the to unlink the thread
 * that is enqueued just behind the current lock holder. *
 **/
if (me->next != impl->tail) {
    DEBUG("[%d - %p] Moving %p to PS\n", cur_thread_id, me, me->next);

    /**
     * It is possible that the successor of the node that we want to unlink
     *it not fully linked yet
     * (i.e., me->next->next is not set).
     * So we wait until the successor has finished its insertion.
     **/
    while (!me->next->next)
        CPU_PAUSE();

    malthusian_node_t *new_next = me->next->next;
    me->next->status = S_PARKING;
    // printf("push %d in passive\n", me->next->tid);
    passive_set_push_front(impl, me->next);
    // len++;
    me->next = new_next;
    COMPILER_BARRIER();
}
// printf("tid %d unlock\n", me->tid);
/* Unlock next one */
// waiting_policy_wake(&me->next->spin);
me->next->spin = 1;

}

void malthusian_mutex_unlock(malthusian_mutex_t *impl, malthusian_node_t *me) {
#ifdef COND_VAR
int ret = REAL(pthread_mutex_unlock)(&impl->posix_lock);
assert(ret == 0);
#endif
__malthusian_mutex_unlock(impl, me);
}

int malthusian_mutex_destroy(malthusian_mutex_t *lock) {
#ifdef COND_VAR
REAL(pthread_mutex_destroy)(&lock->posix_lock);
#endif
free(lock);
lock = NULL;

return 0;

}

int malthusian_cond_init(malthusian_cond_t *cond,
const pthread_condattr_t *attr) {
#ifdef COND_VAR
return REAL(pthread_cond_init)(cond, attr);
#else
fprintf(stderr, "Error cond_var not supported.");
assert(0);
#endif
}

int malthusian_cond_timedwait(malthusian_cond_t *cond, malthusian_mutex_t *lock,
malthusian_node_t *me,
const struct timespec *ts) {
#ifdef COND_VAR
int res;

__malthusian_mutex_unlock(lock, me);
DEBUG("[%d] Sleep cond=%p lock=%p posix_lock=%p\n", cur_thread_id, cond,
      lock, &(lock->posix_lock));
DEBUG_PTHREAD("[%d] Cond posix = %p lock = %p\n", cur_thread_id, cond,
              &lock->posix_lock);

if (ts)
    res = REAL(pthread_cond_timedwait)(cond, &lock->posix_lock, ts);
else
    res = REAL(pthread_cond_wait)(cond, &lock->posix_lock);

if (res != 0 && res != ETIMEDOUT) {
    fprintf(stderr, "Error on cond_{timed,}wait %d\n", res);
    assert(0);
}

int ret = 0;
if ((ret = REAL(pthread_mutex_unlock)(&lock->posix_lock)) != 0) {
    fprintf(stderr, "Error on mutex_unlock %d\n", ret == EPERM);
    assert(0);
}

malthusian_mutex_lock(lock, me);

return res;

#else
fprintf(stderr, "Error cond_var not supported.");
assert(0);
#endif
}

int malthusian_cond_wait(malthusian_cond_t *cond, malthusian_mutex_t *lock,
malthusian_node_t *me) {
return malthusian_cond_timedwait(cond, lock, me, 0);
}

int malthusian_cond_signal(malthusian_cond_t *cond) {
#ifdef COND_VAR
return REAL(pthread_cond_signal)(cond);
#else
fprintf(stderr, "Error cond_var not supported.");
assert(0);
#endif
}

int malthusian_cond_broadcast(malthusian_cond_t *cond) {
#ifdef COND_VAR
DEBUG("[%d] Broadcast cond=%p\n", cur_thread_id, cond);
return REAL(pthread_cond_broadcast)(cond);
#else
fprintf(stderr, "Error cond_var not supported.");
assert(0);
#endif
}

int malthusian_cond_destroy(malthusian_cond_t *cond) {
#ifdef COND_VAR
return REAL(pthread_cond_destroy)(cond);
#else
fprintf(stderr, "Error cond_var not supported.");
assert(0);
#endif
}

void malthusian_thread_start(void) {
}

void malthusian_thread_exit(void) {
}

void malthusian_application_init(void) {
}

void malthusian_application_exit(void) {
}

void malthusian_init_context(lock_mutex_t *UNUSED(impl),
lock_context_t *UNUSED(context),
int UNUSED(number)) {
}