My Java Concurrent Note(2)

串行线程的封闭

  • 线程封闭对象的要求:
    • 只能由单个线程拥有
    • 通过安全的发布来转移所有权(安全的发布确保对象状态对于新的所有者来说是可见的(不止这个作用?))
    • 所有权转以后,只有接受所有权的线程才可以访问(独占的所有权)
  • 可以使用阻塞队列、ConcurrentHashMap的remove方法、AtomicReference的compareAndSet来完成

Interrupt

  • 每个线程都有一个boolean的中断标志
  • Thread.interrupted方法将返回当前线程是否被中断,并清除中断标志
  • Thread.currentThread().interrupt()将中断当前线程
  • 阻塞库方法,例如Thread.sleepObject.wait会检查线程何时中断,并在发现时提前返回

InterruptedException

  • 当一个方法会抛出这个异常时,表示这是一个阻塞方法,如果这个方法被中断,那么它将努力提前结束阻塞状态
  • 当在代码中调用将抛出该异常的方法时,我们的方法也就成了一个阻塞方法,必须要对中断进行响应。有两种选择
    • 传递InterruptedException:不捕获该异常,或者捕获后重新抛出
    • 恢复中断:调用当前线程的interrupt方法恢复中断状态
  • 只有在对Thread进行扩展并且可以控制调用栈上的所有更高层代码时才可以屏蔽该异常

不可中断的操作

  • 请求内置锁(synchronized)

Thread

MISC

  • 一个Thread对象即使没有引用了,但是已经start,这个线程还是在运行

安全策略

  • 使用Executors.privilegedThreadFactory()可以创建出线程工厂,用这种方式创建出来的线程,将于创建privilegedThreadFactory的线程拥有相同的访问权限、AccessControlContextcontextClassLoader
  • 如果不使用该方法,线程池创建的线程将从在需要新线程时调用execute或submit的客户程序中继承访问权限

Daemon

  • Thread.setDaemon(): This method must be invoked before the thread is started
  • 当只剩下守护线程时,虚拟机就退出了,由于如果只剩下守护线程,就没必要继续运行程序了。
  • 守护线程应该永远不去访问固有资源, 如文件、 数据库,因为它会在任何时候甚至在一个操作的中间发生中断。

Thread.suspend

  • 与 stop 不同,suspend 不会破坏对象。但是,如果用 suspend 挂起一个持有一个锁的线程, 那么,该锁在恢复之前是不可用的。 如果调用suspend 方法的线程试图获得同一个锁, 那么程序死锁

Thread状态

  • 状态转换

    • 当一个线程被阻塞或等待时(或终止时) ,另一个线程被调度为运行状态
    • 当一个线程被重新激活(例如, 因为超时期满或成功地获得了一个锁), 调度器检查它是否具有比当前运行线程更高的优先级。 如果是这样,调度器从当前运行线程中挑选一个, 剥夺其运行权,选择一个新的线程运行。
  • new:当用 new 操作符创建一个新线程时, 如 new Thread(r), 该线程还没有开始运行。这意味着它的状态是 new。当一个线程处于新创建状态时, 程序还没有开始运行线程中的代码。在线程运行之前还有一些基础工作要做

  • Runnable:一旦调用 start 方法,线程处于 runnable 状态。一个可运行的线桿可能正在运行也可能没有运行, 这取决于操作系统给线程提供运行的时间。(A thread in the runnable state is executing in the Java virtual machine but it may be waiting for other resources from the operating system such as processor)(Java 的规范说明没有将它作为一个单独状态。一个正在运行中的线程仍然处于可运行状态)
  • Blocked:当线程处于被阻塞或等待状态时,它暂时不活动。它不运行任何代码且消耗最少的资源。直到线程调度器重新激活它。( 细节取决于它是怎样达到非活动状态的) 。当所有其他线程释放该锁,并且线程调度器允许本线程持有它的时候,该线程将变成非阻塞状态

    • 当一个线程试图获取一个内部的对象锁(而不是 java.util.concurrent 库中的锁,) 而该锁被其他线程持有, 则该线程进人阻塞状态。(Thread state for a thread blocked waiting for a monitor lock. A thread in the blocked state is waiting for a monitor lock to enter a synchronized block/method or reenter a synchronized block/method after calling Object.wait.)。当所有其他线程释放该锁,并且线程调度器允许本线程持有它的时候,该线程将变成非阻塞状态
  • WAITING

    • 当线程等待另一个线程通知调度器一个条件时, 它自己进入等待状态。(使用Object.waitThread.join方法,或是等待java.util.concurrent库中的Lock或Condition时,就会出现这种情况)

    • Thread state for a waiting thread. A thread is in the waiting state due to calling one of the following methods:

      • Object.wait with no timeout
      • Thread.join with no timeout
      • LockSupport.park

      A thread in the waiting state is waiting for another thread to perform a particular action.

  • TIMED_WAITING

    • Thread state for a waiting thread with a specified waiting time. A thread is in the timed waiting state due to calling one of the following methods with a specified positive waiting time:
      • Thread.sleep
      • Object.wait with timeout
      • Thread.join with timeout
      • LockSupport.parkNanos
      • LockSupport.parkUntil
  • TERMINATED

    • Thread state for a terminated thread. The thread has completed execution
    • 线程因如下两个原因之一而被终止
      • 因为 run 方法正常退出而自然死亡。
      • 因为一个没有捕获的异常终止了 nm 方法而意外死亡
      • 可以调用线程的 stop 方法杀死一个线程。 该方法抛出 ThreadDeath 错误对象,由此杀死线程。但是,stop 方法已过时

ThreadPoolExecutor

MISC

  • JVM只有在所有(非守护)线程全部终止后才会退出,因为如果无法正确的关闭Executor,JVM无法关闭

SingleThreadExecutor

  • 确保不会并发执行任务。为了避免用户代码修改返回的ThreadPoolExecutor对象,从而改变不并发执行的语义,所以对其进行包装使得无法类型转换为ThreadPoolExecutor来修改那些特性
  • Executors.unconfigurableExecutorService()包装的Executor就具有冻结配置的特性

afterExecute和beforeExecute方法

  • 以下是runWorker方法内的片段

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    try {
    beforeExecute(wt, task);
    Throwable thrown = null;
    try {
    task.run();
    } catch (RuntimeException x) {
    thrown = x; throw x;
    } catch (Error x) {
    thrown = x; throw x;
    } catch (Throwable x) {
    thrown = x; throw new Error(x);
    } finally {
    afterExecute(task, thrown);
    }
    } finally {
    task = null;
    w.completedTasks++;
    w.unlock();
    }

    可以看出

    • 如果beforeExecute抛出异常,则不会执行task和afterExecute
    • 无论task抛出什么异常,都会执行afterExecute

submit的策略

  • 源码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
    return;
    c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
    reject(command);
    else if (workerCountOf(recheck) == 0)
    addWorker(null, false);
    }
    else if (!addWorker(command, false))
    reject(command);
  • 如果运行的corePoolSize线程少于corePoolSize,则会创建一个新线程来处理请求,即使其他工作线程处于空闲状态也是如此。否则,如果正在运行少于maximumPoolSize的线程,则只有在队列已满时才会创建一个新线程来处理请求

  • 如果core线程还有剩(就是当前启动的线程数少于核心线程数),那么新加
    worker(等价于新加线程,加入线程后,会run firstTask——prestartAllCoreThreads方法加入的核心worker的firstTask都是null,从而run firstTask不会去运行task)

    1
    2
    3
    4
    5
    /** Delegates main run loop to outer runWorker  */
    public void run() {
    runWorker(this);
    }
    // 这是内部的Worker(实现了Runnable接口)的run方法,在addWorker后会调用该worker的thread的start方法,从而调用该run方法
  • 否则就尝试push到工作队列里(即使核心线程不忙,也是直接push到队列中,然后核心线程从队列中获得task)(worker运行的线程会阻塞的等待工作队列有东西返回)

    1
    2
    3
    4
    while (task != null || (task = getTask()) != null) {
    w.lock();
    // If pool is stopping, ensure thread is interrupted;
    // 这来自ThreadPoolExecutor的runWorker方法,worker将run方法委托给这个方法
  • 如果工作队列已经满了就会尝试去添加非核心线程

  • 因为在添加到工作队列时,使用的是offer,所以即使是SynchronousQueue,也不会阻塞,而是没有线程正在等待接受就立刻返回

    1
    2
    3
    4
    SynchronousQueue<E> public boolean offer
    Inserts the specified element into this queue, if another thread is waiting to receive it.
    Returns: true if the element was added to this queue, else false
    // 截取自SynchronousQueue的doc

    因为,如果核心线程已满,但是这个queue无法被offer,那么说明核心线程都忙(会不会出现executor.prestartAllCoreThreads跑完但是有的线程还没到等待点的情况,从而虽然有的线程不忙但是还是无法offer?),所以尝试启动新的非核心线程

  • 这意味着,queue不能太小:否则,如果无法添加新线程,而有的线程虽然不忙,但是可能没能及时得到cpu时间去把task从队列中拿出来,就会导致执行RejectedExecutionHandler

  • 要特别注意execute的这一个片段

    1
    2
    3
    4
    5
    6
    if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
    reject(command);
    else if (workerCountOf(recheck) == 0)
    addWorker(null, false);

    即使coreSize==0(maxSize为0是非法的), 并且这是无界队列,线程池还是会保证至少有一个线程在跑

  • 如果使用了无界队列,其实maxSize也没什么用,因为总是会添加队列成功——除非超过了Integer.MAX_VALUE

  • 队列的作用在于

    • 防止core线程不忙但是没能及时从队列中pop出task导致添加多余的线程
    • 使得系统平缓应对小波峰——避免一有风吹草动系统就跟着波动(怎么感觉有点像惊群效应)。小波峰的含义是,对用户的响应的延迟在接受范围内
    • 削峰填谷

死锁

锁顺序死锁

  • 在持有锁时调用某个外部方法,那么将可能出现死锁,或阻塞时间过长

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    // Warning: deadlock-prone!
    class Taxi {
    @GuardedBy("this") private Point location, destination;
    private final Dispatcher dispatcher;
    public Taxi(Dispatcher dispatcher) {
    this.dispatcher = dispatcher;
    }
    public synchronized Point getLocation() {
    return location;
    }
    public synchronized void setLocation(Point location) {
    this.location = location;
    if (location.equals(destination))
    dispatcher.notifyAvailable(this);
    }
    }
    class Dispatcher {
    @GuardedBy("this") private final Set<Taxi> taxis;
    @GuardedBy("this") private final Set<Taxi> availableTaxis;
    public Dispatcher() {
    taxis = new HashSet<Taxi>();
    availableTaxis = new HashSet<Taxi>();
    }
    public synchronized void notifyAvailable(Taxi taxi) {
    availableTaxis.add(taxi);
    }
    public synchronized Image getImage() {
    Image image = new Image();
    for (Taxi t : taxis)
    image.drawMarker(t.getLocation());
    return image;
    }
    }

    某个线程调用setLocation,然后在持有该Taxis的锁时调用Dispatcher的notifyAvaiable。同时,另一个线程调用getImage,持有dispatcher的锁然后请求taxis的锁。从而死锁

  • 开放调用:调用某个方法时不需要持有锁。即可能使用开放调用,可以更容易找到需要获得多个锁的代码路径。如果因为开放调用而导致某些操作不是原子的,那么可以通过一些协议来避免该路径被并发执行——比如搞一个标志,然后其他线程看到该标志为xx时就不执行

    The need to rely on open calls and careful lock ordering reflects the fundamental messiness of composing synchronized objects rather than synchronizing composed objects.

  • 避免锁顺序死锁
    • 以固定的顺序加锁
    • 如果加锁顺序取决于调用的参数顺序呢(比如获得parm的obj1obj2内部锁,那么就取决于obj1obj2的顺序),那么利用System.identityHashCode(obj)来决定是那个obj先加锁,如果两个的hashCode相同,那么加一个“加时赛锁”(就是先加锁这个“加时赛锁”,然后再加锁两个对象),确保同一时间只有一个线程在对obj1obj2加锁

资源死锁

  • 比如某个线程持有某个链接池的链接等待另一个链接池的链接,另一个线程顺序颠倒地持有和请求
  • 线程依赖死锁
    • 当线程池不够大,而线程池里的任务依赖于同一线程池的任务,那么会导致,某个任务正在跑,然后其依赖于处于队列中的任务,从而死锁
    • 这种就要求线程池要经过一定的配置——比如不能太小等。所以要将配置策略文档化
    • 只有当任务相互独立时,为线程池或工作队列设置界限才是合理的。任务之间存在依赖性,有界的线程池或队列就可能导致线程饥饿死锁
    • 解决方法
      • 使用无界的线程池,比如newCachedThreadPool
      • 使用有界的线程池,并使用SynchronousQueue作为工作队列(这样当线程池满的时候调用者就会被阻塞从而知道子任务无法运行),以及Caller-Runs饱和策略
      • 为什么不使用0长度的其他blockingQueue而使用SynchronousQueue(以下均为个人观点)
        • 一方面,其他blockingQueue不支持0长度。所以有可能有一个被等待的子任务就死锁了
        • 另一方面,如果无法增加核心worker(新建线程),那么task都跑到队列中,这时候,如果没有线程在等待从队列中获得task,队列就会立刻返回(因为使用的是offer而不是会阻塞的put),从而在可能死锁时(就是子任务没线程能运行时),可以执行拒绝策略。

消除锁的方法

  • 比如要求在遍历一个集合时,集合保持一致性,那么可以通过加锁复制该集合,然后遍历副本

避免死锁的其他方法

  • 使用有定时功能的锁,并在失败时记录日志、适当rollback
  • threadDump可以提供synchronizedReentrantLock等的死锁信息。通过定时触发threadDump可以有效的获知加锁信息

饥饿和优先级

  • 在 Java 程序设计语言中,每一个线程有一个优先级。
  • 默认情况下,一个线程继承它的父线程的优先级。
  • 可以用setPriority 方法提高或降低任何一个线程的优先级
  • 可以将优先级设置为在 MIN_PRIORITY (在 Thread 类中定义为 1 ) 与 MAX_PRIORITY (定义为 10 ) 之间的任何值。NORM_PRIORITY 被定义为 5
  • 每当线程调度器有机会选择新线程时, 它首先选择具有较高优先级的线程。但是,线程优先级是高度依赖于系统的
  • 如果有几个高优先级的线程没有进入非活动状态, 低优先级的线程可能永远也不能执行

  • 尽量不要改变线程优先级,如果改变了,那么程序的行为就与平台相关了。并且可能导致饥饿

  • 在大多数java应用程序中,所有线程都具有相同的Thread.NORM_PRIORITY,我测试了Daemon线程和非Daemon线程,都是5(linux 4.18.0-13)
  • Thread.yieldThread.sleep(0)的语义都是未定义的,JVM可以将他们实现为空操作,也可以把他们视为线程调度的参考。在unix系统中并不要求他们拥有sleep(0)的语义——将当前线程放在该优先级对应的运行队列末尾,并将执行全交给相同优先级的其他线程,尽管有些JVM是按照这种方法来实现yield的

活锁

  • 不会阻塞线程但是也无法继续执行,因为线程总是不断重复执行相同的操作,而且总是失败
    • 比如:某个消息被处理时一定会失败,然而开发者错误的将这种错误当做可以修复的错误,从而把该消息又加入了消息队列头部,从而消息处理器会反复调用,并返回相同的结果,从而程序无法继续执行下去
  • Livelock can also occur when multiple cooperating threads change their state in response to the others in such a way that no thread can ever make progress
    • This is similar to what happens when two overly polite people are walking in opposite directions in a hallway: each steps out of the other’s way, and now they are again in each other’s way.so they both step aside again, and again, and again
    • 解决方法:重试时引入随机性,比如信道上冲突时,随机退避一段时间

可伸缩性

加速比的计算

  • $\text{speedup}\leq \frac{1}{F+\frac{(1-F)}{N}}$
  • 其中$F$是串行化的比例,$N$是CPU数目,比如$F=10%$,$N=10$,则在加速比为5.3

锁竞争

  • 随着线程数目的增加,性能下降,原因是上下文切换开销和调度开销(比如CFS,两个nice值相同的非实时进程,100ms可以各自分得50ms,如果是100个进程,则只有1ms)
  • 如果某个线程需要获取多个锁,那么可能获取了一个锁后又需要block,从而导致无效的调度

上下文切换

  • vmstat可以看到上下文切换次数、内核执行的时间占比
  • 如果内核占用率比较高(超过10%),那么说明调度发生得很频繁,可能是由于IO或锁竞争导致的阻塞引起的

非竞争同步和竞争同步

  • 非竞争同步可以完全在JVM中处理,竞争同步可能需要操作系统的介入
  • When locking is contended, the losing thread(s) must block. The JVM can implement blocking either via spin-waiting (repeatedly trying to acquire the lock until it succeeds) or by suspending the blocked thread through the operating system.

  • volatile通常是非竞争同步
  • synchronized针对无竞争同步进行了优化
  • 在java5时(作者编写JCIP时)一个Fast-Path的非竞争同步将消耗20~250个时钟周期
  • 其对程序整体性能的影响很低

同步的开销

  • 会增加总线的通信量,从而对其他线程造成影响

    This aspect is sometimes used to argue against the use of nonblocking algorithms without some sort of backoff, because under heavy contention,nonblocking algorithms generate more synchronization traffic than lock-based ones. See Chapter 15.

排队理论

  • Little定律:在一个稳定的系统中,顾客的平均数量等于他们的平均达到率乘以他们在系统中的平均停留时间
  • 所以考察锁的性能时要考考察锁的请求频率乘以每次持有锁的时间

降低锁的竞争程度

  • 减少持有锁的时间
    • 将线程安全性委托给线程安全对象而不是直接对该对象加锁也可以缩小需要同步的块大小。比如synchronized(a HashMap)就比Hashtable的同步块大,因为hashtable内部只有到必要时才加锁,而不是整个操作都加锁。
  • 降低锁的请求频率
    • 锁分解:一个锁保护一个变量,不要多个变量共用一个锁
    • 锁分段:比如ConcurrentHashMap使用16个锁,保护1/16散列桶。劣势是有时候需要持有所有的锁来执行操作(比如resize)
  • 使用有允许更高的并发性的协调的机制代替独占锁

    Replace exclusive locks with coordination mechanisms that permit greater concurrency.

  • 合并过小的锁(锁粗化,因为加锁需要代价,尤其是基于OS资源的锁)
  • 避免热点域,比如多线程下的Map如果每次写入操作都需要更新size这个变量,那么size就成了热点域。可以通过使用多个size变量记录局部size,还可以维护一个全局的volatile size,每次写入操作导致size被更新时,将这个全局size设置为-1,每次调用size()时,把结果缓存在全局size

对象池

  • 缺点
    • 如果太大,则可用内存会变小
    • 有可能有旧对象到新对象的引用,从而对GC不利
    • 需要同步,这个代价可能比new新对象更大,即使是非竞争同步,开销也比new新对象大

直接在业务线程写日志 vs 分离日志到后台线程

  • 前者的缺点
    • IO会阻塞,阻塞需要上下文切换,而可能阻塞的时间其实小于上下文切换的时间
    • 阻塞结束后换入前,可能需要等待其他线程,从而延迟服务时间
    • 在输出流上发生竞争(因为有多个业务线程直接往流上写日志,并且像c的printf其实是有锁的)
    • 锁竞争会导致上下文切换、调度开销,从而进一步增加服务时间和内核工作的时间(即减少了跑业务的时间)
  • 后者的优点
    • 写入日志其实是写内存——写到日志队列,这是很快的
    • 因为会发生竞争的动作(写日志)很快,从而减少了锁竞争,也就减少了发生上下文切换的次数
    • 在日志队列未满之前,几乎不阻塞,因此也几乎不需要被调度出去
    • 把一条包含锁竞争、IO的复杂路径变成一条简单的路径
    • 只有一个日志线程,所以在流上没有锁和锁竞争,也就没有加锁的用户态内核态切换代价和锁竞争导致的上下文切换
    • 削峰填谷

性能指标

指标

  • 资源利用率
  • 响应时间
  • 服务时间

测量方法

  • 测量等待外部服务的时间
  • 测量IO时间
  • 测量网络流量
  • 使用ThreadDump获得锁竞争的信息,竞争激烈的锁,会频繁出现在ThreadDump中

测试

  • 有安全性测试和活跃性测试

    • 安全性测试:不发生任何错误的行为
    • 活跃性测试:某个良好的行为终究发生
  • 测试时可以使用以下随机数发生器

    1
    2
    3
    4
    5
    6
    static int xorShift(int y) {
    y ^= (y << 6);
    y ^= (y >>> 21);
    y ^= (y << 7);
    return y;
    }
  • 可以使用/home/hzx/MyStudy/notes/java/JSR166TestCase.java作为测试基类(来自于http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/test/tck/JSR166TestCase.java)

  • 使用那些容易检查、容易出错的属性来测试
  • 测试代码应该避免引入过多的同步、避免限制并发性,理想的情况是,测试属性中不需要任何同步(比如,如果代码中内存可见性有一些问题,但是测试代码中有一个printf(C的printf似乎可以充当内存屏障,毕竟printf需要lock),那么就会掩盖内存可见性问题)
  • Random是线程安全的

    Instances of java.util.Random are threadsafe.(ref from Random’s javadoc)
    应该使用纯函数来计算随机数

    1
    2
    3
    4
    5
    6
    7
    8
    // 计算出本线程私有的seed,然后使用下面这个xorShift不断迭代计算下一个seed
    int seed = (this.hashCode() ^ (int) System.nanoTime());
    private int xorShift(int y) {
    y ^= (y << 6);
    y ^= (y >>> 21);
    y ^= (y << 7);
    return y;
    }
  • 如果创建线程开销比较大,而每个线程运行的时间比较短,那么就可能导致循环里创建的线程其实变成了串行执行。可以通过引入CyclicBarrier来解决

  • 为了避免在判断测试何时结束时需要与其他线程协调,可以使用确定性的结束条件
  • 测量噪声
    • 可能来源(以map为例)
      • hashCode的交错
      • 线程的调度
      • rehash
      • GC、malloc的开销
      • OS的辅助任务
    • 不应该尝试在测试时去掉噪声,因为实际使用过程中也是有各种噪声的
  • 提高错误发生的概率

    • 在多处理器系统上,开大于处理器数目的线程数目,可以使得线程的交错更加不可预测
    • 在不同的处理器数量、操作系统、处理器架构的系统上测试
    • 访问共享状态的操作中,使用Thread.yield将产生更多的上下文切换(与平台相关,JVM可能将其实现为空操作),短时间的sleep
  • 实例分析:测试阻塞队列

    • 如果使用一个类库提供的阻塞队列作为对照,那么就会引入过多的同步
    • 更好的方法是,使用一个对顺序敏感(只有一个生产者、消费者的情况下)(比如不支持交换率的运算)的校验和计算函数来计算所有入列元素和出列元素的校验和,并进行比较,这样还可以测试出取出的元素的顺序
    • 因为在每个消费者那里,不可能说每取出一个元素就要对应的操作全局的checksum,而是说边取出边操作一个局部的checksum,等全部取出后才去用局部checksum操作全局checksum,这意味着,只有一个生产者、消费者是才可以使用顺序敏感的函数,否则不应该使用这种函数
    • 代码
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      public static class PutTakeTest {
      private final ExecutorService pool = Executors.newCachedThreadPool();
      private final AtomicInteger putSum = new AtomicInteger(0);
      private final AtomicInteger takeSum = new AtomicInteger(0);
      private final CyclicBarrier barrier;
      private final MyBlockingQueue<Integer> bb;
      private final int nTrials, nPairs;


      public PutTakeTest(int capacity, int nPairs, int nTrials) {
      this.bb = new MyBlockingQueue<>(capacity);
      this.nTrials = nTrials;
      this.nPairs = nPairs;
      this.barrier = new CyclicBarrier(nPairs * 2 + 1);
      }

      public void test() {
      try {
      for (int i = 0; i < nPairs; i++) {
      pool.execute(new Producer());
      pool.execute(new Consumer());
      }
      barrier.await();
      barrier.await();
      assertEquals(putSum.get(), takeSum.get());
      } catch (Exception e) {
      throw new RuntimeException(e);
      } finally {
      pool.shutdown();
      }
      }

      private class Producer implements Runnable {
      public void run() {
      try {
      int seed = (this.hashCode() ^ (int) System.nanoTime());
      int sum = 0;
      barrier.await();
      for (int i = nTrials; i > 0; --i) {
      bb.put(seed);
      sum += seed;
      seed = xorShift(seed);
      }
      putSum.getAndAdd(sum);
      barrier.await();
      } catch (Exception e) {
      throw new RuntimeException(e);
      }
      }
      }

      private class Consumer implements Runnable {
      public void run() {
      try {
      barrier.await();
      int sum = 0;
      for (int i = nTrials; i > 0; --i) {
      sum += bb.get();
      }
      takeSum.getAndAdd(sum);
      barrier.await();
      } catch (Exception e) {
      throw new RuntimeException(e);
      }
      }
      }

      private int xorShift(int y) {
      y ^= (y << 6);
      y ^= (y >>> 21);
      y ^= (y << 7);
      return y;
      }
      }

Lock类

  • Lock实现中必须提供与内部锁相同的内存可见性语义

    All Lock implementations must enforce the same memory synchronization semantics as provided by the built-in monitor lock, as described in The Java Language Specification (17.4 Memory Model) :

    • A successful lock operation has the same memory synchronization effects as a successful Lock action.
    • A successful unlock operation has the same memory synchronization effects as a successful Unlock action.
      (ref from Lock’s javadoc)
  • 加锁语义、调度算法、顺序保证、性能特性方面可以与内置所有所不同

ReentrantLock

  • 获取ReentrantLock时,有着与进入同步代码块相同的内存语义
  • 释放时,有着与退出同步代码块相同的内存语义
  • 非公平的版本,在某个线程请求时,如果该锁变为可用,那么其可以直接获得锁,无需考虑正在排队的线程。其并不提倡插队,但是无法避免插队

    • 非公平版本的代码

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      static final class NonfairSync extends Sync {
      private static final long serialVersionUID = 7316153563782823691L;

      /**
      * Performs lock. Try immediate barge, backing up to normal
      * acquire on failure.
      */
      final void lock() {
      if (compareAndSetState(0, 1))
      setExclusiveOwnerThread(Thread.currentThread());
      else
      acquire(1);
      }

      protected final boolean tryAcquire(int acquires) {
      return nonfairTryAcquire(acquires);
      }
      }
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      final boolean nonfairTryAcquire(int acquires) {
      final Thread current = Thread.currentThread();
      int c = getState();
      if (c == 0) {
      if (compareAndSetState(0, acquires)) {
      setExclusiveOwnerThread(current);
      return true;
      }
      }
      else if (current == getExclusiveOwnerThread()) {
      int nextc = c + acquires;
      if (nextc < 0) // overflow
      throw new Error("Maximum lock count exceeded");
      setState(nextc);
      return true;
      }
      return false;
      }
    • 公平版本

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      final void lock() {
      acquire(1); // 其会调用tryAcquire
      }
      protected final boolean tryAcquire(int acquires) {
      final Thread current = Thread.currentThread();
      int c = getState();
      if (c == 0) {
      if (!hasQueuedPredecessors() &&
      compareAndSetState(0, acquires)) {
      setExclusiveOwnerThread(current);
      return true;
      }
      }
      else if (current == getExclusiveOwnerThread()) {
      int nextc = c + acquires;
      if (nextc < 0)
      throw new Error("Maximum lock count exceeded");
      setState(nextc);
      return true;
      }
      return false;
      }
    • 即使是公平版本的锁,tryLock也是不公平的

      Even when this lock has been set to use a fair ordering policy, a call to tryLock() will immediately acquire the lock if it is available, whether or not other threads are currently waiting for the lock. This “barging” behavior can be useful in certain circumstances, even though it breaks fairness. If you want to honor the fairness setting for this lock, then use tryLock(0, TimeUnit.SECONDS) which is almost equivalent (it also detects interruption).

    • 公平的版本,在JCIP的性能测试中性能比非公平的低两个数量级
    • 如果请求锁的时间时间间隔比较长,或者持有锁的时间间隔比较长,那么使用公锁是ok的
    • 内置锁是非公平的

      读写锁

  • 如果读并不是非常多,那么其实会降低性能,因为该锁算法复杂。所以使用前要对程序的瓶颈有明确的分析、测试

Fork-Join

  • fork-join 框架使用了一种有效的智能方法来平衡可用线程的工作负载,这种方法称为工作密取(work stealing)。每个工作线程都有一个双端队列 ( deque ) 来完成任务。一个工作线程将子任务压人其双端队列的队头。(只有一个线程可以访问队头,所以不需要加锁。)一个工作线程空闲时,它会从另一个双端队列的队尾“ 密取” 一个任务

构建自定义的同步工具

前置条件不满足的处理方案

  • 可以Thread.sleep()
  • 可以park
  • 可以自旋
  • 可以yield,对于很快(但是使用自旋又太浪费的地方)就可以满足前置条件的情况,这一个比较好的选择(JCIP说比较park好(14.1.1最后),但是我觉得似乎不尽然,毕竟让出了时间片不就等于park

条件变量

  • 构成前提条件的变量必须由对象的锁来保护,从而使得它们在测试前提条件的同时保持不变。如果前提条件尚未满足,就必须释放锁
  • 条件队列:每个对象都可以作为一个条件队列。其使得一组线程(等待线程集合)能够通过某种方式来等待特定的条件变为真
  • 如果某个功能无法通过“轮询+休眠”实现,那么其也无法通过条件队列实现(公平的条件队列除外,条件队列可以实现公平,但是“轮询+休眠”没法实现,除非搞个队列,但是这样子不就成了条件队列)
  • 条件谓词:使得某个操作成为状态依赖操作的前提条件(比如阻塞队列的take操作的条件谓词就是“队列不为空”),其是类中各个状态变量构成的表达式
  • 应该将条件队列相关联的条件谓词以及在这些谓词上等待的操作都写入文档

  • Monitor Condition

    • 如果调用同步方法的线程必须被阻塞或是其他原因不能立刻进行,它能够在一个监视条件上等待,这将导致该客户线程暂时释放监视锁,并挂起(WAITING / TIMED_WAITING)在监视条件上
    • 同步方法线程恢复。一旦早先挂起在某监视条件上的同步方法线程获取通知,它将继续在最初的等待监视条件的点上执行,执行的条件是唤醒后抢占到监视锁。当线程从wait方法中被唤醒时,他在重新请求锁时不具有任何特殊的优先级,而要与任何其他尝试进入同步代码块的线程一起正常的在锁上竞争
    • 当调用wait时,需要持有锁,否则会抛出IllegalMonitorStateException
    • 不提供公平的队列
    • 只有一个条件变量
  • 调用wait之前要测试条件谓词,并且从wait中返回时再次进行测试。要在循环中调用wait

    • 信号丢失:如果线程A通知了一个条件队列,但是线程B随后调用wait,那么wait并不会立刻醒来,而是要等待另一个通知来唤醒它。所以在wait之前要检查条件谓词
    • Spurious wakeup:(来自wiki。内容可能有争议)。比如,当只有一个资源变得可用时,本来我们调用notify,那么可以相信醒来的那个一定可以拿到资源。但是,并不是如此,实际操作系统中,有时会出现即使是上面那种情况,醒来后还是发现资源不可用
    • 有个问题:wait唤醒后需要抢占锁,那么,是否有可能某线程虽然被notify唤醒,也就是只有一个等待该条件的线程醒来获得资源,但是同时外界来了一个线程,并且竞争锁先成功,那么这个被唤醒的不就没资源了吗,从而类似假醒
  • 确保使用与条件队列相关联的锁来保护条件谓词的各个状态变量

  • 调用wait、notify、notifyAll时,要持有与条件队列相关的锁,否则有lost-wake-up问题,比如说代码如下

    1
    2
    cnt+=1;
    notify();
    1
    2
    3
    while(cnt<=0) {
    wait();
    }

    然后,在检查了cnt<=0这个条件后、调用wait之前,有个线程完成了cnt++;notify()的操作,那么消费者就丢失了wake-up,陷入了无限等待(如果其在等待时没有被唤醒的话)

  • 检查条件谓词之后以及开始执行相关操作之前,不要释放锁

  • 调用notify时,JVM会从条件队列上等待的多个线程中选择一个来唤醒,而调用notifyAll时则会唤醒所有在这个条件队列上等待的线程。调用notify或notifyAll时必须持有条件队列对象的锁,而如果这些等待的线程此时不能重新获得锁,那么无法从wait返回,因此发出通知的线程要尽快释放锁

  • 在一个条件队列上等待同一个条件谓词

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    abstract class BaseBoundedBuffer<V> {
    private final V[] buf;
    private int tail;
    private int head;
    private int count;

    protected BaseBoundedBuffer(int capacity) {
    this.buf = (V[]) new Object[capacity];
    }

    protected synchronized final void doPut(V v) {
    buf[tail] = v;
    if (++tail == buf.length)
    tail = 0;
    ++count;
    }

    protected synchronized final V doTake() {
    V v = buf[head];
    buf[head] = null;
    if (++head == buf.length)
    head = 0;
    --count;
    return v;
    }

    public synchronized final boolean isFull() {
    return count == buf.length;
    }

    public synchronized final boolean isEmpty() {
    return count == 0;
    }
    }

    class BoundedBuffer<V> extends BaseBoundedBuffer<V> {
    // CONDITION PREDICATE: not-full (!isFull())
    // CONDITION PREDICATE: not-empty (!isEmpty())
    public BoundedBuffer(int size) {
    super(size);
    }

    // BLOCKS-UNTIL: not-full
    public synchronized void put(V v) throws InterruptedException {
    while (isFull())
    wait();
    doPut(v);
    notifyAll();
    }

    // BLOCKS-UNTIL: not-empty
    public synchronized V take() throws InterruptedException {
    while (isEmpty())
    wait();
    V v = doTake();
    notifyAll();
    return v;
    }
    }

    这种情况下有个问题:调用notify后,take被叫醒,然而还是empty——此时isFull不满足,所以本应该put方法被得到通知被执行,所以这个信号就好像丢失了

  • 只有在同时满足以下两个条件时,才能用单一的notify而不是notifyAll

    • 所有等待的线程类型相同。只有一个条件谓词与条件队列相关,并且每个线程从wait返回后将执行相同的操作
    • 在条件变量上每次通知最多只能唤醒一个线程来执行
  • 使用notifyAll比notify更容易正确,所以除非notifyAll对性能影响过大,否则应该优先使用notifyAll

原子变量与非阻塞同步机制

非阻塞算法

  • 基于底层原子机器指令来代替锁
  • 多个线程竞争时不会发生阻塞
  • 极大地减少调度开销(因为使用自旋而不是挂起?)
  • 不存在死锁和其他活跃性问题(饥饿呢?活锁呢?)
    • “在不常见的情况下,存在活锁风险”
    • 如果其他线程每次在CAS竞争中都成功,那么本线程就会饥饿,但是实际中很少发生
  • 不会受到单个线程失败的影响

锁的劣势

  • 调度开销
  • 线程恢复执行时,需要等待其他线程执行完他们的时间片,才能被调度执行
  • 如果一个线程在持有锁时发生了page fault、调度延迟之类的情况,那么其他线程都会被延迟
  • 优先级反转:被阻塞的高优先级线程因为需要等待低优先级线程持有的锁,所以导致它的优先级降低到低优先级线程的那个级别(不知道是OS真的降低了优先级还是说虽然有高优先级但是表现上跟低优先级差不多)

原子变量

  • 内存可见性(Ref from atomic javadoc)
    • The memory effects for accesses and updates of atomics generally follow the rules for volatiles, as stated in The Java Language Specification (17.4 Memory Model) :
    • get has the memory effects of reading a volatile variable.
    • set has the memory effects of writing (assigning) a volatile variable.
    • lazySet has the memory effects of writing (assigning) a volatile variable except that it permits reorderings with subsequent (but not previous) memory actions that do not themselves impose reordering constraints with ordinary non-volatile writes. Among other usage contexts, lazySet may apply when nulling out, for the sake of garbage collection, a reference that is never accessed again.
    • weakCompareAndSet atomically reads and conditionally writes a variable but does not create any happens-before orderings, so provides no guarantees with respect to previous or subsequent reads and writes of any variables other than the target of the weakCompareAndSet.
    • compareAndSet and all other read-and-update operations such as getAndIncrement have the memory effects of both reading and writing volatile variables.

内存模型

  • JMM为程序中所有的操作(包括变量读写、monitor的加锁/释放、线程的启动和join)定义了一个偏序关系,称之为Happen-before,如果要保证执行操作B的线程看到操作A的结果,那么A和B之间就必须满足Happen-Before的关系。如果两个操作之间没有Happen-Before的关系,那么JVM就可以对它们任意地重排序
  • 以下引用自Java Concurrency in Practice
    • A data race occurs when a variable is read by more than one thread, and written by at least one thread, but the reads and writes are not ordered by happens-before. A correctly synchronized program is one with no data races; correctly synchronized programs exhibit sequential consistency, meaning that all actions within the program appear to happen in a fixed, global order.
    • The rules for happens-before are:
      • Program order rule. Each action in a thread happens-before every action in that thread that comes later in the program order.
      • Monitor lock rule. An unlock on a monitor lock happens-before every subsequent lock on that same monitor lock. [3]
      • Volatile variable rule.A write to a volatile field happens-before every subsequent read of that same field. [4]
      • Thread start rule. A call to Thread start on a thread happens-before every action in the started thread.
      • Thread termination rule. Any action in a thread happens-before any other thread detects that thread has terminated, either by successfully return from Thread.join or by Thread.isAlive returning false.
      • Interruption rule. A thread calling interrupt on another thread happens-before the interrupted thread detects the interrupt (either by having InterruptedException thrown, or invoking isInterrupted or interrupted).
      • Finalizer rule. The end of a constructor for an object happens-before the start of the finalizer for that object.
      • Transitivity. If A happens-before B, and B happens-before C, then A happens-before C.

具体问题

  • 哲学家进餐(5人)
    • 原子的获取两个(Semaphore.tryAcquire(2)),获取失败则回避随机长度的时间,避免活锁
    • 获取一个后,获取第二个失败,则释放第一个,随机退避,避免活锁
    • 奇数编号的哲学家先拿起左边的筷子,接着拿起右边的,偶数编号的颠倒过来
    • 只允许四个人同时就餐

MISC

  • 伪共享(false sharing)(Ref
    • RFO:Request for owner,如果某cache line是shared的状态,然后需要写该cache line,那么就会导致发送RFO,使得其他的该cache line的副本失效
    • 如果两个无关的变量在同一cache line,那么就会导致写其中一个变量,另一个变量的本地缓存也失效了,从而要读取时得去L3、内存取新数据
  • 如果保证资源只有一个线程去访问?通过限制只有一个线程(比如单线程的日志服务,然后所有日志工作都交给该服务)、使用锁(粒度可以是call或是thread,后者可重入,前者不可以)
  • UncaughtExceptionHandler:可以用 setUncaughtExceptionHandler 方法为任何线程安装一个处理器。也可以用 Thread类的静态方法 setDefaultUncaughtExceptionHandler 为所有线程安装一个默认的处理器

问题

  • 对象的正确发布的责任是否是在使用者而不是在线程安全类,即线程安全类也要求被正确发布

  • 代码3-13那里,有个cache.getFactors(i),这个操作应该不是原子的吧,那么如果.操作符完成之前,cache值被更新(即引用被更新),含义是什么