Construct a Thread-Safe ThreadPool

代码

线程安全的两个要点

  • 原子性:一组操作要么不执行,要么完全执行

  • 内存可见性:一个线程对于某个对象、变量的修改对于另一个线程是否可见,什么时候可见,该数据可见时其他数据的的可见情况

使用pthread时保证内存可见性

  • 由于笔者对pthread的内存模型不熟悉、不清楚c++中的volatile语义是否编译器相关的,所以,在构造这个ThreadPool时不敢使用volatile等来实现内存可见性。而是通过加锁以充当内存屏障来实现——虽然性能上会受到影响,但是至少可以保证正确性

  • pthread创建线程时,传递的start_routine参数必须是static方法,这意味着,需要把ThreadPool的this指针传递过去。但是,一个严重的问题,this指针是在原线程初始化的,ThreadPool对象也是在原线程构造的,那么,如何保证新线程看到一致的、正确构造的ThreadPool对象?在java内存模型中,这是无法保证的,必须使用其他办法来保证(volatilefinalAtomicReference等)。笔者不清楚pthread是否可以保证传递过去的参数的内存可见性,虽然实验中,clang++7开-O3,测试了5k多次,在另一个线程都可以看到正确的对象,但是因为不确定这是否是编译器相关的。所以,笔者使用了一个全局mutex,充当内存屏障——每次写该变量时持有该mutex,每次读时也要持有,以保证新线程可以看到正确构造的、一致的对象

  • 有另一个问题,mutex对象的内存可见性如何保证?按照JCIP(java concurrency in practice)的说法,即使是一个线程安全的对象,也需要被安全的发布(发布(publish):使对象能够在当前作用域之外的代码中使用)。保护this指针的mutex是全局变量,那么我认为其可见性应该是可以保证的,因为在编译期就已经初始化完成该mutex(欢迎指正!)

    1
    static pthread_mutex_t poolPtrAndSigToBlockMutex = PTHREAD_MUTEX_INITIALIZER;

    而对于那些是类内变量(非static)的mutex,因为每次获得threadPool对象时,都是在持有保护this指针的mutex时解引用this指针,所以我认为可以保证另一个线程可以看到完整的、一致的对象(欢迎指正!)

  • (2019.4.16更新)

    • PROGRAMMING WITH POSIX THREADS的3.4,有这么一段

      Pthreads provides a few basic rules about memory visibility. You can count on all implementations of the standard to follow these rules:

      1. Whatever memory values a thread can see when it calls pthread_create can also be seen by the new thread when it starts. Any data written to memory after the call to pthread_create may not necessarily be seen by the new thread, even if the write occurs before the thread starts.
      2. Whatever memory values a thread can see when it unlocks a mutex, either directly or by waiting on a condition variable, can also be seen by any thread that later locks the same mutex. Again, data written after the mutex is unlocked may not necessarily be seen by the thread that locks the mutex, even if the write occurs before the lock.
      3. Whatever memory values a thread can see when it terminates, either by cancellation, returning from its start function, or by calling pthread_exit, can also be seen by the thread that joins with the terminated thread by calling pthread_join. And, of course, data written after the thread terminates may not necessarily be seen by the thread that joins, even if the write occurs before the join.
      4. Whatever memory values a thread can see when it signals or broadcasts a condition variable can also be seen by any thread that is awakened by that signal or broadcast. And, one more time, data written after the signal or broadcast may not necessarily be seen by the thread that wakes up, even if the write occurs before it awakens.
    • The Open Group Base Specifications Issue 7, 2018 edition有这么一段

      4.12 Memory Synchronization

      Applications shall ensure that access to any memory location by more than one thread of control (threads or processes) is restricted such that no thread of control can read or modify a memory location while another thread of control may be modifying it. Such access is restricted using functions that synchronize thread execution and also synchronize memory with respect to other threads. The following functions synchronize memory(是不是就是内存屏障?) with respect to other threads:

      fork()
      pthread_barrier_wait()
      pthread_cond_broadcast()
      pthread_cond_signal()
      pthread_cond_timedwait()
      pthread_cond_wait()
      pthread_create()
      pthread_join()
      pthread_mutex_lock()
      pthread_mutex_timedlock()
      pthread_mutex_trylock()
      pthread_mutex_unlock()
      pthread_spin_lock()
      pthread_spin_trylock()
      pthread_spin_unlock()
      pthread_rwlock_rdlock()
      pthread_rwlock_timedrdlock()
      pthread_rwlock_timedwrlock()
      pthread_rwlock_tryrdlock()
      pthread_rwlock_trywrlock()
      pthread_rwlock_unlock()
      pthread_rwlock_wrlock()
      sem_post()
      sem_timedwait()
      sem_trywait()
      sem_wait()
      semctl()
      semop()
      wait()
      waitpid()

this指针逸出问题

  • 根据JCIP,逸出(escape)的含义是在对象构造完成前就发布该对象,从而另一个线程看到一个没有完全构造的对象
  • 如果在构造函数里启动线程,那么就是典型的this指针逸出——在构造函数还没运行完前,另一个线程就已经看到了this指针,从而看到了一个没有完全构造的对象
  • 解决方法是使用一个start函数,用户获得ThreadPool实例后,手动调用start方法,以启动工作线程

状态转换的证明

  • ThreadPool需要维护自己当前处于哪一个状态

    1
    NEW, RUNNING, GRACEFUL_SHUTDOWN, IMMEDIATE_SHUTDOWN

    并且保证程序中只有合法的状态转移。这需要证明。

  • 在我的实现的线程池中,状态的转换是

    1
    2
    3
    NEW->RUNNING->GRACEFUL_SHUTDOWN or IMMEDIATE_SHUTDOWN
    or
    NEW->GRACEFUL_SHUTDOWN or IMMEDIATE_SHUTDOWN

    证明是

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    In the constructor, the state is set to NEW.
    In the start method, the state is set to RUNNING.
    In the shutdown method, the state set to GRACEFUL_SHUTDOWN or IMMEDIATE_SHUTDOWN.

    proof of the state transformation safety.
    For any instance of this ThreadPool,
    the constructor run only one time,
    so for any instance, if it is not NEW state,
    it will never be NEW state.
    After the shutdown method is called,
    no other public method except destructor and getInstance can be called,
    so if it is shutdown state, it will always be shutdown state.
    Every time the state is modify, it is protected by taskQueAndStateMutex,
    so there is no thread safety problem.

单例模式

  • 因为使用了全局变量mutex,所以这个类只能是单例模式

  • 在java中,有一个DCL(double check locking),其代码如下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    private static Object instance = null;

    public Object getInstance() {
    if (instance == null) {
    synchronized (this) {
    if (instance == null)
    instance = new Object();
    }
    }
    return instance;
    }

    这个写法有个问题,另一个线程可能看到一个instance引用已经被初始化完成,但是对象构造函数还未完成的对象——因为内存重排序,编译器可以先初始化引用再完成构造函数的运行

  • 因为不清楚pthread是否也会有这种问题,所以对于获得ThreadPool实例的getInstance方法,每次都要加锁,然后才检查instance指针

使用条件变量的注意事项

  • wait Condition变量前要检查条件是否满足,否则可能出现:条件以满足,所以生产者不再notify,然后消费者就一直hang在cond wait那里

  • 检查条件时要使用while而不是if

    1
    2
    3
    4
    5
    6
    7
    8
    9
    // 正面例子
    while (not satify condition) {
    cond.wait()
    }

    // 反面例子
    if(not satify condition) {
    cond.wait
    }

    一个很重要的原因:某个线程正在等待,然后被唤醒,这时候,突然一个新线程(该线程之前并没有在等待该cond)进来,拿走了资源,结果醒来的线程拿不到资源,所以应该继续等。另外wiki(Spurious wakeup)上还讲了另一个原因,就是说在现代处理器上,要保证没有假醒需要较高的代价。不过,JCIP(java concurrency in practice)上虽然有打比方说“因为线路故障导致烤面包机提前响起”,但是没有明确说存在这种硬件上导致的假醒