My Java Concurrent Note

以下大部分是java Concurrency in Practice的笔记,以JCIP简称之

对象的状态

  • (非正式意义上),对象的状态是指存储在状态变量中的数据,可能包括其他依赖对象的域(例如HashMap的Map.Entry对象)
  • 对象的状态中包含了任何可能影响其外部可见行为的数据

线程安全性的定义

  • 正确性的含义

    • 某个类的行为与其规范完全一致。在良好的规范中通常会定义各种不变性条件(Invariant)来约束对象的状态,以及定义各种后验条件来描述对象操作的结果

    • a postcondition is a condition or predicate that must always be true just after the execution of some section of code or after an operation in a formal specification.

  • 当多个线程访问某个类时,这个类始终能够表现出正确的行为,那么就称这个类是线程安全的

  • 当多个线程访问一个对象时, 如果不用考虑这些线程在运行时环境下的调度和交替执行, 也不需要进行额外的同步, 或者在调用方进行任何其他的协调操作, 调用这个对象的行为都可以获得正确的结果, 那这个对象是线程安全的

JCIP中“同步”的含义

  • synchronized关键字
  • volatile变量
  • 显式锁
  • 原子变量

竞态条件

  • 在并发编程中,由于不恰当的执行时序而出现不正确的结果(这不是正式的定义)
  • 竞态条件类型举例
    • 先检查后执行(比如if(i==1) {i=10;})。大多数竞态条件的本质:基于一种可能失效的观察结果来做出判断或执行计算
  • 并非所有的竞态条件都是数据竞争,同样并非所有的数据竞争都是竞态条件

数据竞争

  • 访问共享的非final类型的域时没有采用同步来协同,则会出现数据竞争

    A data race occurs when:

    • two or more threads in a single process access the same memory location concurrently, and
    • at least one of the accesses is for writing, and
    • the threads are not using any exclusive locks to control their accesses to that memory.

    When these three conditions hold, the order of accesses is non-deterministic, and the computation may give different results from run to run depending on that order. Some data-races may be benign (for example, when the memory access is used for a busy-wait), but many data-races are bugs in the program.

    ref from

  • 在java的内存模型中,如果在代码中存在数据竞争,那么这段代码就没有确定的语义

原子性

  • 假定有两个操作A、B,从执行A的线程来看,当另一个线程执行B时,要么将B完全执行完,要么完全不执行B,那么A和B对彼此来说是原子的
  • 原子操作是指,对于访问同一个状态的所有操作(包括这个操作本身)来说,这个操作是以一种原子方式执行
  • 与事务应用程序中的和原子性有相同的含义:一组语句作为一个不可分割的单元被执行

volatile变量

  • 只有成员变量才能使用它

  • 用来确保变量的更新操作通知到其他线程

    以下代码可以复现,无论是否有server模式

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    class VolatileTest {
    // 去掉volatile,则无限循环
    static volatile boolean sleep = false;

    public static void main(String[] args) throws InterruptedException {
    new Thread(() -> {
    while (!sleep) {
    }
    System.out.println("out of sleep");
    }).start();
    Thread.sleep(100);
    sleep = true;
    }
    }
  • anything that was visible to thread A when it writes to volatile field f becomes visible to thread B when it reads f.

  • synchronized在内存可见性上的作用比volatile变量更强

  • 依赖volatile变量来控制状态的可见性,通常比使用锁的代码更加脆弱,更加难以理解

  • 无法确保原子性(锁可以确保原子性和可见性)

  • 正确的使用方式

    • 确保他们状态的可见性
    • 确保它们所引用的状态的可见性

      This declares a volatile reference to an array, which might not be what you want. With a volatile reference to an array, reads and writes of the reference to the array are treated as volatile, but the array elements are non-volatile. To get volatile array elements, you will need to use one of the atomic array classes in java.util.concurrent (provided in Java 5.0). (ref from FindBugs)

    • 标志一些重要的程序生命周期事件的发生
  • 满足以下所有条件时,才应该使用

    • 对变量的写入不依赖于当前值,或者只有单个线程该变量
    • 该变量不会与其他状态变量一起纳入不变性条件中
    • 在访问变量时不需要加锁
  • 关于重排序(来自于)

    • 旧内存模型

      • Under the old memory model, accesses to volatile variables could not be reordered with each other, but they could be reordered with nonvolatile variable accesses.
    • 新内存模型(新内存模型是JSR133)

      • Under the new memory model, it is still true that volatile variables cannot be reordered with each other. The difference is that it is now no longer so easy to reorder normal field accesses around them.

      • Writing to a volatile field has the same memory effect as a monitor release, and reading from a volatile field has the same memory effect as a monitor acquire. In effect, because the new memory model places stricter constraints on reordering of volatile field accesses with other field accesses, volatile or not, anything that was visible to thread A when it writes to volatile field f becomes visible to thread B when it reads f.

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        class VolatileExample {
        int x = 0;
        volatile boolean v = false;
        public void writer() {
        x = 42;
        v = true;
        }

        public void reader() {
        if (v == true) {
        // uses x - guaranteed to see 42.
        // This would not have been true under the old memory model.
        }
        }
        }
      • Effectively, the semantics of volatile have been strengthened substantially, almost to the level of synchronization. Each read or write of a volatile field acts like “half” a synchronization, for purposes of visibility.

      • Note that it is important for both threads to access the same volatile variable in order to properly set up the happens-before relationship. It is not the case that everything visible to thread A when it writes volatile field f becomes visible to thread B after it reads volatile field g. The release and acquire have to “match”(i.e., be performed on the same volatile field) to have the right semantics

文档

  • 应该详细记载的内容
    • 同步策略(什么状态由什么锁保护、可见性如何维护)
    • 加锁顺序
    • 没有加锁情况下的同步策略(比如使用一个flag阻止其他线程在flag为true之前使用某资源)
    • 资源池的配置(避免资源依赖死锁)
    • 如果不能通过open call来避免死锁,那么应该详细记载如何避免在调用时出现死锁
  • 应该将条件队列相关联的条件谓词以及在这些谓词上等待的操作都写入文档

对象的初始化

  • 即使某个对象的引用对于其他线程是可见的,也并不意味着对象状态对于使用该对象的线程一定是可见的
  • 为了确保对象状态能够呈现出一致的视图,必须使用同步

this逸出

  • 只有当对象的构造函数返回时,对象才处于可预测的和一致的状态。因此,从构造函数发布对象时,发布了一个尚未构造完成的对象。
  • 例子:
    • 在构造函数启动线程
    • 在构造函数调用一个可以override的方法(非private、非final的方法,可以被子类override)
  • 可以通过使用start函数来启动线程、使用工厂方法和私有构造函数(即不要再构造函数启动,而是在工厂方法中,构造函数返回后才启动)
  • 通过避免在构造函数启动新线程,可以使得使用该对象的对象可以把该对象作为finalfield(因为这需要在构造函数等 new 该对象,从而其要求被构造的对象不触发新线程启动)

Final域

  • 还有一种情况可以安全地访问一个共享域, 即这个域声明为 final 时

  • 在java内存模型中,final域能确保初始化过程的安全性,从而可以不受限制的访问不可变对象,并在共享这些对象时无需同步。为了维持这种初始化安全性的保证,需要满足不可变性的所有需求

    • 状态不可修改
    • 所有域都是final类型
    • 正确的构造

    这种保证还延伸到正确创建的对象中所有final类型的域,在没有额外同步的情况下,也可以安全的访问final类型的域。不过,如果这些final域指向的是可变对象,那么访问这些域所指向的对象的状态时还是需要同步

  • 在发布不可变对象时,没有使用同步,也可以安全地访问该对象

原子变量

  • 如果有大量线程要访问相同的原子值, 性能会大幅下降, 因为乐观更新需要太多次重试。
  • Java SE 8 提供了 LongAdder 和 LongAccumulator 类来解决这个问题。LongAdder 包括多个变量(加数), 其总和为当前值。可以有多个线程更新不同的加数,线程个数增加时会自动提供新的加数。通常情况下, 只有当所有工作都完成之后才需要总和的值, 对于这种情况,这种方法会很高效。性能会有显著的提升。

Double Checked Locking(DCL)

  • 来自于

  • 示例代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    // double-checked-locking - don't do this!

    private static Something instance = null;

    public Something getInstance() {
    if (instance == null) {
    synchronized (this) {
    if (instance == null)
    instance = new Something();
    }
    }
    return instance;
    }
  • There’s only one problem with it – it doesn’t work. Why not? The most obvious reason is that the writes which initialize instance and the write to the instance field can be reordered by the compiler or the cache, which would have the effect of returning what appears to be a partially constructed Something. The result would be that we read an uninitialized object.

  • There is no way to fix it using the old Java memory model.

  • Under the new memory model, making the instance field volatile will “fix” the problems with double-checked locking, because then there will be a happens-before relationship between the initialization of the Something by the constructing thread and the return of its value by the thread that reads it.(我猜,这个happen-before是不是就是程序顺序规则(each action in a thread haappens-before every action in that thread that come later in the program order)和传递性volatile 变量规则的结合——就是instance=new Something()在单线程中必须保证构造操作在赋值给instance操作之前,然后volatile又保证了写入在读取之前,所以一旦另一个线程读取了instance,则根据传递性,构造操作已经完成了)

  • 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    public class Something {
    private Something() {}

    private static class LazyHolder {
    static final Something INSTANCE = new Something();
    }

    public static Something getInstance() {
    return LazyHolder.INSTANCE;
    }
    }

    This code is guaranteed to be correct because of the initialization guarantees for static fields; if a field is set in a static initializer, it is guaranteed to be made visible, correctly, to any thread that accesses that class.

  • 作用
    • 互斥
    • 原子性(复合操作的执行过程中持有一个锁,则这组操作成为原子操作)
    • 内存可见性(所有线程都能看到共享变量的最新值——只需要所有线程都在同一个锁上同步)

synchronized

  • 用于实现原子性
  • 可重入
  • 确定临界区
  • Constructors in Java can not use the synchronized keyword,但是可以synchronized(this){}
  • 内存可见性——某个线程修改了对象状态后,其他线程可以看到发生的状态的变化
  • 每个对象都有一个内部锁,该锁有一个内部条件(条件对应Object对象的final方法waitnotifynotifyAll方法)(所以一旦某个对象的一个synchronized方法被调用后,该对象的另一个synchronized方法会被阻塞)
  • 调用synchronized静态方法获得相关的类对象(Class对象)的内部锁。要注意,使用不同的classLoader加载出来的Class对象是不同的
  • 监视器
    • 用Java的术语来讲,监视器具有如下特性:
      • 监视器是只包含私有域的类。
      • 每个监视器类的对象有一个相关的锁。
      • 使用该锁对所有的方法进行加锁。换句话说,如果客户端调用obj.meth0d(),那么obj对象的锁是在方法调用开始时自动获得(这是获得锁的唯一途径),并且当方法返回时自动释放该锁(无论是正常退出还是异常退出)。因为所有的域是私有的,这样的安排可以确保一个线程在对对象操作时,没有其他线程能访问该域
      • 该锁可以有任意多个相关条件
    • Java设计者以不是很精确的方式采用了监视器概念,Java中的每一个对象有一个内部的锁和内部的条件。如果一个方法用synchronized关键字声明,那么,它表现的就像是一个监视器方法。通过调用wait/notifyAll/notify来访问条件变量。然而,在下述的3个方面Java对象不同于监视器,从而使得线程的安全性下降:
      • 域不要求必须是private。
      • 方法不要求必须是synchronized。
      • 内部锁对客户是可用的。
  • 局限
    • 不能中断一个正在试图获得锁的线程
    • 没有超时
    • 只有单一的条件
    • 不提供公平的锁

内存可见性

  • 最低安全性
    • 某个线程没有同步情况下读取变量,可能会得到一个失效值,但这个值至少是之前某个线程设置的,而不是随机的
    • Java内存模型要求,变量的读取和写入必须是原子操作,但是非volatilelongdouble例外
    • JVM允许将64bit的读或写操作分解为两个32bit的操作,因此可能无法满足最低安全性(除非用volatile声明或是用锁保护)
  • Volatile Arrays in Java

    • 对于volatile int[] arr = new int[len]arr[i] = x这个操作没有volatile地写入arr这个变量,只是写入了arr[i]这个非volatile的变量,从而,即使x=arr[i]有volatile地读取arr这个volatile变量,也无法构成acquire-release关系
    • arr = arr, int x = arr[0], arr[0] = 1这些都有volatile地readarr本身。其中,第三个需要先读arr,然后再写入0th位置
    • 如下代码提供对arr元素的volatile读写

      1
      2
      3
      4
      5
       // I wouldn't do this if I were you.
      volatile int [] arr = new int[SIZE];

      arr[0] = 1;
      arr = arr;
    • 以上代码的问题在于

      The virtue of a volatile write is that a corresponding read can detect that it happened, and do something based on the new value. In this case, you can’t actually detect that another thread performed the write, because it is writing the same value to the variable.
      不过我认为,如果我们只需要保证arr里的元素的更新被及时传播,那么完全可以使用这个方案,因为我们并不需要等待一个flag的变更再动作,而只是期望读到最新的值

不可变对象

需要满足的条件

  • 对象创建后其状态不能修改
  • 对象的所有域是final类型的(可以不用,比如StringhashCode这个field)
  • 对象是正确创建的(创建期间,this没有逸出)

misc

  • 对象内部可以使用可变对象来管理状态

  • 例子

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public final class ThreeStooges {
    private final Set<String> stooges = new HashSet<String>();

    public ThreeStooges() {
    stooges.add("Moe");
    stooges.add("Larry");
    stooges.add("Curly");
    }

    public boolean isStooge(String name) {
    return stooges.contains(name);
    }
    }
  • 使用volatile+不可变对象来实现一组状态的原子操作

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    class OneValueCache {
    private final BigInteger lastNumber;
    private final BigInteger[] lastFactors;

    public OneValueCache(BigInteger i,
    BigInteger[] factors) {
    lastNumber = i;
    // 如果没有这个copyOf,就不是不可变的
    lastFactors = Arrays.copyOf(factors, factors.length);
    }

    public BigInteger[] getFactors(BigInteger i) {
    if (lastNumber == null || !lastNumber.equals(i))
    return null;
    else
    return Arrays.copyOf(lastFactors, lastFactors.length);
    }
    }

    public class VolatileCachedFactorizer implements Servlet {
    private volatile OneValueCache cache =
    new OneValueCache(null, null);

    public void service(ServletRequest req, ServletResponse resp) {
    BigInteger i = extractFromRequest(req);
    // 如果这里多个请求到来,某个请求调用getFactors调用到一半
    // 然后另一个请求则把cache赋值成另外一个对象,那么有没有问题?
    // 似乎c++就不可以这样做,因为这个需要自动垃圾收集
    BigInteger[] factors = cache.getFactors(i);
    if (factors == null) {
    factors = factor(i);
    cache = new OneValueCache(i, factors);
    }
    encodeIntoResponse(resp, factors);
    }
    }

正确的发布

  • 发布(publish):使对象能够在当前作用域之外的代码中使用

  • 逸出(escape):某个不该发布的对象被发布

  • 不可变对象可以通过任意的机制发布

    • 3.4节,不可变对象那里,代码3-13使用了volatile来保证其他线程看到最新的引用。另外,3.5.2节说,即使发布不可变对象时,没有使用同步,仍然可以安全的访问该对象(上面那一段说,“即使某个对象的引用对于其他线程可见,也不意味着对象的状态对于使用该对象的线程一定是可见的”)。这里的“不需要同步”的含义是指,不需要同步即可以保证对象状态的可见性,而不是说,每次更新引用,都会让其他线程看到新的引用值。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      class SynTest {
      static FC fc = null;

      public static void main(String[] args) throws InterruptedException {
      Thread t1 = new Thread(() -> {
      fc = new FC();
      });
      Thread t2 = new Thread(() -> {
      while (fc == null) {
      }
      int n = fc.n * 10;
      assert n * 10 == 50 * 20;
      });
      t2.start();
      Thread.sleep(100);
      t1.start();
      }

      static class FC {
      final int n = 10;
      }
      }
    • 第二个线程没有退出。所以,其实不可变对象的发布时不需要同步的含义应该是“另一个线程可以看到完整的对象状态”。(很奇怪,如果把两个start的位置颠倒,或者是去掉sleep,则会导致成功结束)

    • Java内存模型为不可变对象的共享提供了一种特殊的初始化安全性保证
    • 即使在发布不可变对象的引用时没有使用同步,也可以安全的访问该对象。为了实现这种安全,对象必须满足上面提到的不可变对象的所有要求
    • 这种保证还包括被正确创建对象中的所有final类型的域。但是如果这些域是可变对象,那么在访问时还是需要同步
  • 事实不可变对象(比如某个Date对象被构造后,就只执行读操作,类似于java要求lambda可以读取的lambda外变量是“事实不可变的”)必须通过安全的方式发布
  • 可变对象必须通过安全的方式发布,并且必须是线程安全的或者某个锁保护起来

可变对象的正确的发布

  • 即使某个对象的引用对于其他线程是可见的,也不意味着对象状态对于使用该对象的线程是一定可见的。为了确保对象状态呈现出一致性视图(对象的引用与对象的状态必须同时对其他线程可见),必须要使用同步(包括发布时和使用时)

  • 常用的发布模式

    • 在静态初始化函数中初始化一个对象引用(不知道“静态初始化函数”是否等价于“静态初始化器”)(因为静态初始化器由JVM在类的初始化阶段执行,JVM内部存在同步机制)

      1
      public static Holder holder = new Holder(42);

      虚拟机会保证一个类的<clinit>()方法在多线程环境中被正确地加锁、同步,如果多个线程同时去初始化一个类,那么只会有一个线程去执行这个类的<clinit>()方法,其他线程都需要阻塞等待,直到活动线程执行<clinit>()方法完毕

      <clinit>()方法是由编译器自动收集类中的所有类变量的赋值动作和静态语句块(static{}块)中的语句合并产生的

      来自深入理解JVM

    • 将对象的引用保存在volatile类型的域或者AtomicReference对象中

    • 将对象的引用保存在某个正确构造对象的final类型域中

    • 将对象的引用保存在一个由锁保护的域中

      • 线程安全容器(比如HashtablesynchronizedMapConcurrentMapVectorCopyOnWriteArrayListCopyOnWriteArraySetsynchronizedListsynchronizedSetBlockingQueueConcurrentLinkedQueue)内部的同步意味着将对象放入到某个容器中,将满足这一条要求)
      • FutureExchanger也可以

lambda使用外部的对象(或value type的变量)

以下均为我个人根据final域的特性认为的——不知道final域与final变量的差别是什么

  • lambda可以使用外部的final或事实final的变量
  • 如果是这个final对象引用的变量,没有问题,可见(就是引用对于其他线程可见时,其内部的状态也是,可以保证一致性)
  • 如果引用的是可变对象,则要同步

内部类与final变量

  • TODO:内部类引用外部的变量时,不要求其为final的,那么怎么办,可见性如何保证

volatile vs 锁

  • 锁可以确保可见性和原子性
  • volatile只确保可见性

MISC

  • 开发服务器应用时,要始终加上-server参数,client模式的JVM优化与server的不一样

内存屏障

  • MFENCE, LFENCE, SFENCE (Ref from intel manual 325462-sdm-vol-1-2abcd-3abcd)
    • MFENCE: Performs a serializing operation on all load-from-memory and store-to-memory instructions that were issued prior the MFENCE instruction. This serializing operation guarantees that every load and store instruction that precedes the MFENCE instruction in program order becomes globally visible before any load or store instruction that follows the MFENCE instruction. The MFENCE instruction is ordered with respect to all load and store instructions, other MFENCE instructions, any LFENCE and SFENCE instructions, and any serializing instructions (such as the CPUID instruction). MFENCE does not serialize the instruction stream.
    • SFENCE: Orders processor execution relative to all memory stores prior to the SFENCE instruction. The processor ensures that every store prior to SFENCE is globally visible before any store after SFENCE becomes globally visible. The SFENCE instruction is ordered with respect to memory stores, other SFENCE instructions, MFENCE instructions, and any serializing instructions (such as the CPUID instruction). It is not ordered with respect to memory loads or the LFENCE instruction.
    • LFENCE: Performs a serializing operation on all load-from-memory instructions that were issued prior the LFENCE instruction. Specifically, LFENCE does not execute until all prior instructions have completed locally, and no later instruction begins execution until LFENCE completes. In particular, an instruction that loads from memory and that precedes an LFENCE receives data from memory prior to completion of the LFENCE. (An LFENCE that follows an instruction that stores to memory might complete before the data being stored have become globally visible.) Instructions following an LFENCE may be fetched from memory before the LFENCE, but they will not execute until the LFENCE completes.
  • read barrier, write barrier, (Ref)
    • Read barrier: A read barrier is a data dependency barrier plus a guarantee that all the LOAD operations specified before the barrier will appear to happen before all the LOAD operations specified after the barrier with respect to the other components of the system. A read barrier is a partial ordering on loads only; it is not required to have any effect on stores. Read memory barriers imply data dependency barriers, and so can substitute for them. [!] Note that read barriers should normally be paired with write barriers; see the “SMP barrier pairing” subsection.
    • Data dependency barrier: A data dependency barrier is a weaker form of read barrier. In the case where two loads are performed such that the second depends on the result of the first (eg: the first load retrieves the address to which the second load will be directed), a data dependency barrier would be required to make sure that the target of the second load is updated after the address obtained by the first load is accessed.
    • General memory barriers. A general memory barrier gives a guarantee that all the LOAD and STORE operations specified before the barrier will appear to happen before all the LOAD and STORE operations specified after the barrier with respect to the other components of the system.
    • Write memory barrier: A write memory barrier gives a guarantee that all the STORE operations specified before the barrier will appear to happen before all the STORE operations specified after the barrier with respect to the other components of the system.

设计线程安全类

  • 三个要素
    • 找出构成对象状态的所有变量
      • 如果在对象域中引用了其他对象,那么该对象的状态就包含被引用对象的域
      • 在定义哪些变量将构成对象的状态时,只考虑对象拥有的数据
    • 找出约束状态变量的不变性条件
    • 建立对象状态的并发访问管理策略
      • 同步策略定义了如何在不违背对象不变条件或后验条件(在操作中会包含一些后验条件来判断状态的迁移是否有效——比如要求某变量如果当前是17,那么下一次更新是18)的情况下对其状态的访问操作进行协同
      • 同步策略规定了如何将不可变性、线程封闭、加锁机制等结合起来维护线程的安全性。还规定了哪些变量由哪些锁来保护
      • 必须要将同步策略写为正式文档
  • 尽可能使用final域以缩小状态空间
  • 如果变量的某些状态是无效的,那么就要对该变量进行封装以避免用户的修改使得该变量处于无效状态。如果某个操作中存在无效的状态转换那么该操作就必须是原子的
  • 如果一个类是由多个独立且线程安全的状态变量组成,并且所有的操作都不包含无效的状态转换,那么可以将线程安全性委托给底层的状态变量
  • 如果一个状态变量是线程安全的,并且没有任何不变性条件来约束它的值,在变量的操作上也不存在任何不允许的状态转换(然而按照线程安全的定义,似乎如果满足以上要求,那么线程安全也就必然满足),那么就可以安全地发布这个变量
  • 当把线程安全委托给底层的线程安全对象时,如果这些变量不是final,那么会导致一个问题——引用可能会引用了另一个对象。

基于现有的线程安全类添加特性

  • 最好的方法是直接修改原始类,这需要理解代码中的同步策略。好处在于所有实现同步策略的代码在同一个源文件中
  • 另一种方法是extend这个类,但是可能有的类的一些状态是private,子类访问不到,所以行不通。不好的地方在于同步策略的实现被分布到多个单独维护的源文件中,如果底层的类改变了同步策略那么子类就不安全了。
  • 客户端加锁:对于使用对象X的用户代码,在用户代码中,使用X本身用于保护其状态的锁来保护这段用户代码。这里的问题在于,如果文档没有明确指出X使用的锁,那么这个锁在以后版本中可能会修改,从而导致用户代码相对于X的代码不是原子的(在用户代码执行过程中,X内的代码可能执行)。这种会破坏同步策略的封装性(类似于extend会破坏实现的封装性)。与前一种一样,都是将派生类的行为与基类的行为耦合在一起
  • 组合:类似于Collections.synchronizedXXX,用户传递一个对象给该方法(转移所有权,以后都通过这个工厂方法返回的对象来操作),新对象内使用委托和加锁来实现。这种不依赖于被委托对象的线程安全性和使用的锁策略

依赖状态的操作

  • 类的不变性条件和后验条件约束了在对象上有哪些状态和状态转换是有效的。在某些对象的方法中还包含一些基于状态的先验条件(比如,从队列中pop时,队列必须非空)。如果某个操作中包含有基于状态的先验条件,则这个操作就叫做依赖状态的操作

使用私有的锁 vs 使用对象的内置锁

  • 优点
    • 私有锁可以将锁封闭起来,避免用户代码得到锁,以避免用户代码参与到同步策略中
    • 如果使用内置锁,要想验证锁是否被正确使用,需要检查整个程序

实例分析

一个线程安全的车辆追踪器的代码解析

  • 不可变的Point类,因为不可变,所以复制map时只需要复制map的结构,无需复制Point的内容。返回的unmodifableMap也类似如此。(代码来源,JCIP代码清单4.7和4.6)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    class DelegatingVehicleTracker {
    private final ConcurrentMap<String, Point> locations;
    private final Map<String, Point> unmodifiableMap;

    // 构造函数的安全依赖于用户——如果用户在构造函数运行时,
    // 修改参数points的内容,那么locations的视图就可能是不一致的。
    // 并且,用户需要自己保证这个DelegatingVehicleTracker类是安全的发布的,
    // 从而,保证在构造函数运行完之前,类内的其他非静态函数不会被调用
    public DelegatingVehicleTracker(Map<String, Point> points) {
    locations = new ConcurrentHashMap<>(points);
    unmodifiableMap = Collections.unmodifiableMap(locations);
    }

    // 返回的unmodifiableMap内部是没有同步的,
    // 那么,locations的修改是如何反映到unmodifiableMap的?
    // 通过ConcurrentMap来保证
    public Map<String, Point> getLocations() {
    return unmodifiableMap;
    }

    public Point getLocation(String id) {
    return locations.get(id);
    }

    public void setLocation(String id, int x, int y) {
    if (locations.replace(id, new Point(x, y)) == null)
    throw new IllegalArgumentException(
    "invalid vehicle name: " + id);
    }

    class Point {
    final int x, y;

    public Point(int x, int y) {
    this.x = x;
    this.y = y;
    }
    }
    }
  • 可变的线程安全的Point,如果在point的状态上施加不变性任何约束,那么以下就不是线程安全的,因为用户可以修改返回的point对象。(代码来源,JCIP代码清单4.11,4.12)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    class PublishingVehicleTracker {
    private final Map<String, SafePoint> locations;
    private final Map<String, SafePoint> unmodifiableMap;

    public PublishingVehicleTracker(
    Map<String, SafePoint> locations) {
    this.locations
    = new ConcurrentHashMap<>(locations);
    this.unmodifiableMap
    = Collections.unmodifiableMap(this.locations);
    }

    public Map<String, SafePoint> getLocations() {
    return unmodifiableMap;
    }

    public SafePoint getLocation(String id) {
    return locations.get(id);
    }

    public void setLocation(String id, int x, int y) {
    if (!locations.containsKey(id))
    throw new IllegalArgumentException(
    "invalid vehicle name: " + id);
    locations.get(id).set(x, y);
    }

    public class SafePoint {
    private int x, y;

    private SafePoint(int[] a) {
    this(a[0], a[1]);
    }

    // 此处,如果实现为`this(p.x, p.y)`就会有竞态条件,
    public SafePoint(SafePoint p) {
    this(p.get());
    }

    public SafePoint(int x, int y) {
    this.x = x;
    this.y = y;
    }

    public synchronized int[] get() {
    return new int[]{x, y};
    }

    public synchronized void set(int x, int y) {
    this.x = x;
    this.y = y;
    }
    }
    }

ConcurrentHashMap

来自JCIP的5.2.1,不清楚JDK5之后有没有修改

  • 使用一种粒度更细的加锁机制——分段锁
  • 一定数量的写入线程可以并发的修改Map
  • 迭代器具有弱一致性,而不是fail-fast(HashMap等的是fail-fast,从而如果迭代时发现数量变化了,就会抛出ConcurrentModificationException),不会抛出ConcurrentModificationException。可以容忍并发的修改
  • size返回的只是估计值
  • 内部没有实现对map加锁以独占访问(对比之下,同步容器HashtablesynchronizedMap中可以通过获取map的锁来实现独占),所以不能在用户代码通过获取map的锁来独占(所以如果确实需要独占,应该放弃ConcurrentHashMap)

condition

Condition.signal要在持有锁时才能调用

  • lost-wake-up问题,比如说代码如下

    1
    2
    cnt+=1;
    notify();
    1
    2
    3
    while(cnt<=0) {
    wait();
    }
  • 然后,在检查了cnt<=0这个条件后、调用wait之前,有个线程完成了cnt++;notify()的操作,那么消费者就丢失了wake-up,陷入了无限等待(如果其在等待时没有被唤醒的话)

串行线程的封闭

  • 线程封闭对象的要求:
    • 只能由单个线程拥有
    • 通过安全的发布来转移所有权(安全的发布确保对象状态对于新的所有者来说是可见的(不止这个作用?))
    • 所有权转以后,只有接受所有权的线程才可以访问(独占的所有权)
  • 可以使用阻塞队列、ConcurrentHashMap的remove方法、AtomicReference的compareAndSet来完成

Interrupt

  • 每个线程都有一个boolean的中断标志
  • Thread.interrupted方法将返回当前线程是否被中断,并清除中断标志
  • Thread.currentThread().interrupt()将中断当前线程
  • 阻塞库方法,例如Thread.sleepObject.wait会检查线程何时中断,并在发现时提前返回

InterruptedException

  • 当一个方法会抛出这个异常时,表示这是一个阻塞方法,如果这个方法被中断,那么它将努力提前结束阻塞状态
  • 当在代码中调用将抛出该异常的方法时,我们的方法也就成了一个阻塞方法,必须要对中断进行响应。有两种选择
    • 传递InterruptedException:不捕获该异常,或者捕获后重新抛出
    • 恢复中断:调用当前线程的interrupt方法恢复中断状态
  • 只有在对Thread进行扩展并且可以控制调用栈上的所有更高层代码时才可以屏蔽该异常

不可中断的操作

  • 请求内置锁(synchronized)

Thread

MISC

  • 一个Thread对象即使没有引用了,但是已经start,这个线程还是在运行

安全策略

  • 使用Executors.privilegedThreadFactory()可以创建出线程工厂,用这种方式创建出来的线程,将于创建privilegedThreadFactory的线程拥有相同的访问权限、AccessControlContextcontextClassLoader
  • 如果不使用该方法,线程池创建的线程将从在需要新线程时调用execute或submit的客户程序中继承访问权限

Daemon

  • Thread.setDaemon(): This method must be invoked before the thread is started
  • 当只剩下守护线程时,虚拟机就退出了,由于如果只剩下守护线程,就没必要继续运行程序了。
  • 守护线程应该永远不去访问固有资源, 如文件、 数据库,因为它会在任何时候甚至在一个操作的中间发生中断。

Thread.suspend

  • 与 stop 不同,suspend 不会破坏对象。但是,如果用 suspend 挂起一个持有一个锁的线程, 那么,该锁在恢复之前是不可用的。 如果调用suspend 方法的线程试图获得同一个锁, 那么程序死锁

Thread状态

  • 状态转换

    • 当一个线程被阻塞或等待时(或终止时) ,另一个线程被调度为运行状态
    • 当一个线程被重新激活(例如, 因为超时期满或成功地获得了一个锁), 调度器检查它是否具有比当前运行线程更高的优先级。 如果是这样,调度器从当前运行线程中挑选一个, 剥夺其运行权,选择一个新的线程运行。
  • new:当用 new 操作符创建一个新线程时, 如 new Thread(r), 该线程还没有开始运行。这意味着它的状态是 new。当一个线程处于新创建状态时, 程序还没有开始运行线程中的代码。在线程运行之前还有一些基础工作要做

  • Runnable:一旦调用 start 方法,线程处于 runnable 状态。一个可运行的线桿可能正在运行也可能没有运行, 这取决于操作系统给线程提供运行的时间。(A thread in the runnable state is executing in the Java virtual machine but it may be waiting for other resources from the operating system such as processor)(Java 的规范说明没有将它作为一个单独状态。一个正在运行中的线程仍然处于可运行状态)
  • Blocked:当线程处于被阻塞或等待状态时,它暂时不活动。它不运行任何代码且消耗最少的资源。直到线程调度器重新激活它。( 细节取决于它是怎样达到非活动状态的) 。当所有其他线程释放该锁,并且线程调度器允许本线程持有它的时候,该线程将变成非阻塞状态

    • 当一个线程试图获取一个内部的对象锁(而不是 java.util.concurrent 库中的锁,) 而该锁被其他线程持有, 则该线程进人阻塞状态。(Thread state for a thread blocked waiting for a monitor lock. A thread in the blocked state is waiting for a monitor lock to enter a synchronized block/method or reenter a synchronized block/method after calling Object.wait.)。当所有其他线程释放该锁,并且线程调度器允许本线程持有它的时候,该线程将变成非阻塞状态
  • WAITING

    • 当线程等待另一个线程通知调度器一个条件时, 它自己进入等待状态。(使用Object.waitThread.join方法,或是等待java.util.concurrent库中的Lock或Condition时,就会出现这种情况)

    • Thread state for a waiting thread. A thread is in the waiting state due to calling one of the following methods:

      • Object.wait with no timeout
      • Thread.join with no timeout
      • LockSupport.park

      A thread in the waiting state is waiting for another thread to perform a particular action.

  • TIMED_WAITING

    • Thread state for a waiting thread with a specified waiting time. A thread is in the timed waiting state due to calling one of the following methods with a specified positive waiting time:
      • Thread.sleep
      • Object.wait with timeout
      • Thread.join with timeout
      • LockSupport.parkNanos
      • LockSupport.parkUntil
  • TERMINATED

    • Thread state for a terminated thread. The thread has completed execution
    • 线程因如下两个原因之一而被终止
      • 因为 run 方法正常退出而自然死亡。
      • 因为一个没有捕获的异常终止了 nm 方法而意外死亡
      • 可以调用线程的 stop 方法杀死一个线程。 该方法抛出 ThreadDeath 错误对象,由此杀死线程。但是,stop 方法已过时

ThreadPoolExecutor

MISC

  • JVM只有在所有(非守护)线程全部终止后才会退出,因为如果无法正确的关闭Executor,JVM无法关闭

SingleThreadExecutor

  • 确保不会并发执行任务。为了避免用户代码修改返回的ThreadPoolExecutor对象,从而改变不并发执行的语义,所以对其进行包装使得无法类型转换为ThreadPoolExecutor来修改那些特性
  • Executors.unconfigurableExecutorService()包装的Executor就具有冻结配置的特性

afterExecute和beforeExecute方法

  • 以下是runWorker方法内的片段

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    try {
    beforeExecute(wt, task);
    Throwable thrown = null;
    try {
    task.run();
    } catch (RuntimeException x) {
    thrown = x; throw x;
    } catch (Error x) {
    thrown = x; throw x;
    } catch (Throwable x) {
    thrown = x; throw new Error(x);
    } finally {
    afterExecute(task, thrown);
    }
    } finally {
    task = null;
    w.completedTasks++;
    w.unlock();
    }

    可以看出

    • 如果beforeExecute抛出异常,则不会执行task和afterExecute
    • 无论task抛出什么异常,都会执行afterExecute

submit的策略

  • 源码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
    return;
    c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
    reject(command);
    else if (workerCountOf(recheck) == 0)
    addWorker(null, false);
    }
    else if (!addWorker(command, false))
    reject(command);
  • 如果运行的corePoolSize线程少于corePoolSize,则会创建一个新线程来处理请求,即使其他工作线程处于空闲状态也是如此。否则,如果正在运行少于maximumPoolSize的线程,则只有在队列已满时才会创建一个新线程来处理请求

  • 如果core线程还有剩(就是当前启动的线程数少于核心线程数),那么新加
    worker(等价于新加线程,加入线程后,会run firstTask——prestartAllCoreThreads方法加入的核心worker的firstTask都是null,从而run firstTask不会去运行task)

    1
    2
    3
    4
    5
    /** Delegates main run loop to outer runWorker  */
    public void run() {
    runWorker(this);
    }
    // 这是内部的Worker(实现了Runnable接口)的run方法,在addWorker后会调用该worker的thread的start方法,从而调用该run方法
  • 否则就尝试push到工作队列里(即使核心线程不忙,也是直接push到队列中,然后核心线程从队列中获得task)(worker运行的线程会阻塞的等待工作队列有东西返回)

    1
    2
    3
    4
    while (task != null || (task = getTask()) != null) {
    w.lock();
    // If pool is stopping, ensure thread is interrupted;
    // 这来自ThreadPoolExecutor的runWorker方法,worker将run方法委托给这个方法
  • 如果工作队列已经满了就会尝试去添加非核心线程

  • 因为在添加到工作队列时,使用的是offer,所以即使是SynchronousQueue,也不会阻塞,而是没有线程正在等待接受就立刻返回

    1
    2
    3
    4
    SynchronousQueue<E> public boolean offer
    Inserts the specified element into this queue, if another thread is waiting to receive it.
    Returns: true if the element was added to this queue, else false
    // 截取自SynchronousQueue的doc

    因为,如果核心线程已满,但是这个queue无法被offer,那么说明核心线程都忙(会不会出现executor.prestartAllCoreThreads跑完但是有的线程还没到等待点的情况,从而虽然有的线程不忙但是还是无法offer?),所以尝试启动新的非核心线程

  • 这意味着,queue不能太小:否则,如果无法添加新线程,而有的线程虽然不忙,但是可能没能及时得到cpu时间去把task从队列中拿出来,就会导致执行RejectedExecutionHandler

  • 要特别注意execute的这一个片段

    1
    2
    3
    4
    5
    6
    if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
    reject(command);
    else if (workerCountOf(recheck) == 0)
    addWorker(null, false);

    即使coreSize==0(maxSize为0是非法的), 并且这是无界队列,线程池还是会保证至少有一个线程在跑

  • 如果使用了无界队列,其实maxSize也没什么用,因为总是会添加队列成功——除非超过了Integer.MAX_VALUE

  • 队列的作用在于

    • 防止core线程不忙但是没能及时从队列中pop出task导致添加多余的线程
    • 使得系统平缓应对小波峰——避免一有风吹草动系统就跟着波动(怎么感觉有点像惊群效应)。小波峰的含义是,对用户的响应的延迟在接受范围内
    • 削峰填谷

死锁

锁顺序死锁

  • 在持有锁时调用某个外部方法,那么将可能出现死锁,或阻塞时间过长

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    // Warning: deadlock-prone!
    class Taxi {
    @GuardedBy("this") private Point location, destination;
    private final Dispatcher dispatcher;
    public Taxi(Dispatcher dispatcher) {
    this.dispatcher = dispatcher;
    }
    public synchronized Point getLocation() {
    return location;
    }
    public synchronized void setLocation(Point location) {
    this.location = location;
    if (location.equals(destination))
    dispatcher.notifyAvailable(this);
    }
    }
    class Dispatcher {
    @GuardedBy("this") private final Set<Taxi> taxis;
    @GuardedBy("this") private final Set<Taxi> availableTaxis;
    public Dispatcher() {
    taxis = new HashSet<Taxi>();
    availableTaxis = new HashSet<Taxi>();
    }
    public synchronized void notifyAvailable(Taxi taxi) {
    availableTaxis.add(taxi);
    }
    public synchronized Image getImage() {
    Image image = new Image();
    for (Taxi t : taxis)
    image.drawMarker(t.getLocation());
    return image;
    }
    }

    某个线程调用setLocation,然后在持有该Taxis的锁时调用Dispatcher的notifyAvaiable。同时,另一个线程调用getImage,持有dispatcher的锁然后请求taxis的锁。从而死锁

  • 开放调用:调用某个方法时不需要持有锁。即可能使用开放调用,可以更容易找到需要获得多个锁的代码路径。如果因为开放调用而导致某些操作不是原子的,那么可以通过一些协议来避免该路径被并发执行——比如搞一个标志,然后其他线程看到该标志为xx时就不执行

    The need to rely on open calls and careful lock ordering reflects the fundamental messiness of composing synchronized objects rather than synchronizing composed objects.

  • 避免锁顺序死锁
    • 以固定的顺序加锁
    • 如果加锁顺序取决于调用的参数顺序呢(比如获得parm的obj1obj2内部锁,那么就取决于obj1obj2的顺序),那么利用System.identityHashCode(obj)来决定是那个obj先加锁,如果两个的hashCode相同,那么加一个“加时赛锁”(就是先加锁这个“加时赛锁”,然后再加锁两个对象),确保同一时间只有一个线程在对obj1obj2加锁

资源死锁

  • 比如某个线程持有某个链接池的链接等待另一个链接池的链接,另一个线程顺序颠倒地持有和请求
  • 线程依赖死锁
    • 当线程池不够大,而线程池里的任务依赖于同一线程池的任务,那么会导致,某个任务正在跑,然后其依赖于处于队列中的任务,从而死锁
    • 这种就要求线程池要经过一定的配置——比如不能太小等。所以要将配置策略文档化
    • 只有当任务相互独立时,为线程池或工作队列设置界限才是合理的。任务之间存在依赖性,有界的线程池或队列就可能导致线程饥饿死锁
    • 解决方法
      • 使用无界的线程池,比如newCachedThreadPool
      • 使用有界的线程池,并使用SynchronousQueue作为工作队列(这样当线程池满的时候调用者就会被阻塞从而知道子任务无法运行),以及Caller-Runs饱和策略
      • 为什么不使用0长度的其他blockingQueue而使用SynchronousQueue(以下均为个人观点)
        • 一方面,其他blockingQueue不支持0长度。所以有可能有一个被等待的子任务就死锁了
        • 另一方面,如果无法增加核心worker(新建线程),那么task都跑到队列中,这时候,如果没有线程在等待从队列中获得task,队列就会立刻返回(因为使用的是offer而不是会阻塞的put),从而在可能死锁时(就是子任务没线程能运行时),可以执行拒绝策略。

消除锁的方法

  • 比如要求在遍历一个集合时,集合保持一致性,那么可以通过加锁复制该集合,然后遍历副本

避免死锁的其他方法

  • 使用有定时功能的锁,并在失败时记录日志、适当rollback
  • threadDump可以提供synchronizedReentrantLock等的死锁信息。通过定时触发threadDump可以有效的获知加锁信息

饥饿和优先级

  • 在 Java 程序设计语言中,每一个线程有一个优先级。
  • 默认情况下,一个线程继承它的父线程的优先级。
  • 可以用setPriority 方法提高或降低任何一个线程的优先级
  • 可以将优先级设置为在 MIN_PRIORITY (在 Thread 类中定义为 1 ) 与 MAX_PRIORITY (定义为 10 ) 之间的任何值。NORM_PRIORITY 被定义为 5
  • 每当线程调度器有机会选择新线程时, 它首先选择具有较高优先级的线程。但是,线程优先级是高度依赖于系统的
  • 如果有几个高优先级的线程没有进入非活动状态, 低优先级的线程可能永远也不能执行

  • 尽量不要改变线程优先级,如果改变了,那么程序的行为就与平台相关了。并且可能导致饥饿

  • 在大多数java应用程序中,所有线程都具有相同的Thread.NORM_PRIORITY,我测试了Daemon线程和非Daemon线程,都是5(linux 4.18.0-13)
  • Thread.yieldThread.sleep(0)的语义都是未定义的,JVM可以将他们实现为空操作,也可以把他们视为线程调度的参考。在unix系统中并不要求他们拥有sleep(0)的语义——将当前线程放在该优先级对应的运行队列末尾,并将执行全交给相同优先级的其他线程,尽管有些JVM是按照这种方法来实现yield的

活锁

  • 不会阻塞线程但是也无法继续执行,因为线程总是不断重复执行相同的操作,而且总是失败
    • 比如:某个消息被处理时一定会失败,然而开发者错误的将这种错误当做可以修复的错误,从而把该消息又加入了消息队列头部,从而消息处理器会反复调用,并返回相同的结果,从而程序无法继续执行下去
  • Livelock can also occur when multiple cooperating threads change their state in response to the others in such a way that no thread can ever make progress
    • This is similar to what happens when two overly polite people are walking in opposite directions in a hallway: each steps out of the other’s way, and now they are again in each other’s way.so they both step aside again, and again, and again
    • 解决方法:重试时引入随机性,比如信道上冲突时,随机退避一段时间

可伸缩性

加速比的计算

  • $\text{speedup}\leq \frac{1}{F+\frac{(1-F)}{N}}$
  • 其中$F$是串行化的比例,$N$是CPU数目,比如$F=10%$,$N=10$,则在加速比为5.3

锁竞争

  • 随着线程数目的增加,性能下降,原因是上下文切换开销和调度开销(比如CFS,两个nice值相同的非实时进程,100ms可以各自分得50ms,如果是100个进程,则只有1ms)
  • 如果某个线程需要获取多个锁,那么可能获取了一个锁后又需要block,从而导致无效的调度

上下文切换

  • vmstat可以看到上下文切换次数、内核执行的时间占比
  • 如果内核占用率比较高(超过10%),那么说明调度发生得很频繁,可能是由于IO或锁竞争导致的阻塞引起的

非竞争同步和竞争同步

  • 非竞争同步可以完全在JVM中处理,竞争同步可能需要操作系统的介入
  • When locking is contended, the losing thread(s) must block. The JVM can implement blocking either via spin-waiting (repeatedly trying to acquire the lock until it succeeds) or by suspending the blocked thread through the operating system.

  • volatile通常是非竞争同步
  • synchronized针对无竞争同步进行了优化
  • 在java5时(作者编写JCIP时)一个Fast-Path的非竞争同步将消耗20~250个时钟周期
  • 其对程序整体性能的影响很低

同步的开销

  • 会增加总线的通信量,从而对其他线程造成影响

    This aspect is sometimes used to argue against the use of nonblocking algorithms without some sort of backoff, because under heavy contention,nonblocking algorithms generate more synchronization traffic than lock-based ones. See Chapter 15.

排队理论

  • Little定律:在一个稳定的系统中,顾客的平均数量等于他们的平均达到率乘以他们在系统中的平均停留时间
  • 所以考察锁的性能时要考考察锁的请求频率乘以每次持有锁的时间

降低锁的竞争程度

  • 减少持有锁的时间
    • 将线程安全性委托给线程安全对象而不是直接对该对象加锁也可以缩小需要同步的块大小。比如synchronized(a HashMap)就比Hashtable的同步块大,因为hashtable内部只有到必要时才加锁,而不是整个操作都加锁。
  • 降低锁的请求频率
    • 锁分解:一个锁保护一个变量,不要多个变量共用一个锁
    • 锁分段:比如ConcurrentHashMap使用16个锁,保护1/16散列桶。劣势是有时候需要持有所有的锁来执行操作(比如resize)
  • 使用有允许更高的并发性的协调的机制代替独占锁

    Replace exclusive locks with coordination mechanisms that permit greater concurrency.

  • 合并过小的锁(锁粗化,因为加锁需要代价,尤其是基于OS资源的锁)
  • 避免热点域,比如多线程下的Map如果每次写入操作都需要更新size这个变量,那么size就成了热点域。可以通过使用多个size变量记录局部size,还可以维护一个全局的volatile size,每次写入操作导致size被更新时,将这个全局size设置为-1,每次调用size()时,把结果缓存在全局size

对象池

  • 缺点
    • 如果太大,则可用内存会变小
    • 有可能有旧对象到新对象的引用,从而对GC不利
    • 需要同步,这个代价可能比new新对象更大,即使是非竞争同步,开销也比new新对象大

直接在业务线程写日志 vs 分离日志到后台线程

  • 前者的缺点
    • IO会阻塞,阻塞需要上下文切换,而可能阻塞的时间其实小于上下文切换的时间
    • 阻塞结束后换入前,可能需要等待其他线程,从而延迟服务时间
    • 在输出流上发生竞争(因为有多个业务线程直接往流上写日志,并且像c的printf其实是有锁的)
    • 锁竞争会导致上下文切换、调度开销,从而进一步增加服务时间和内核工作的时间(即减少了跑业务的时间)
  • 后者的优点
    • 写入日志其实是写内存——写到日志队列,这是很快的
    • 因为会发生竞争的动作(写日志)很快,从而减少了锁竞争,也就减少了发生上下文切换的次数
    • 在日志队列未满之前,几乎不阻塞,因此也几乎不需要被调度出去
    • 把一条包含锁竞争、IO的复杂路径变成一条简单的路径
    • 只有一个日志线程,所以在流上没有锁和锁竞争,也就没有加锁的用户态内核态切换代价和锁竞争导致的上下文切换
    • 削峰填谷

性能指标

指标

  • 资源利用率
  • 响应时间
  • 服务时间

测量方法

  • 测量等待外部服务的时间
  • 测量IO时间
  • 测量网络流量
  • 使用ThreadDump获得锁竞争的信息,竞争激烈的锁,会频繁出现在ThreadDump中

测试

  • 有安全性测试和活跃性测试

    • 安全性测试:不发生任何错误的行为
    • 活跃性测试:某个良好的行为终究发生
  • 测试时可以使用以下随机数发生器

    1
    2
    3
    4
    5
    6
    static int xorShift(int y) {
    y ^= (y << 6);
    y ^= (y >>> 21);
    y ^= (y << 7);
    return y;
    }
  • 可以使用/home/hzx/MyStudy/notes/java/JSR166TestCase.java作为测试基类(来自于http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/test/tck/JSR166TestCase.java)

  • 使用那些容易检查、容易出错的属性来测试
  • 测试代码应该避免引入过多的同步、避免限制并发性,理想的情况是,测试属性中不需要任何同步(比如,如果代码中内存可见性有一些问题,但是测试代码中有一个printf(C的printf似乎可以充当内存屏障,毕竟printf需要lock),那么就会掩盖内存可见性问题)
  • Random是线程安全的

    Instances of java.util.Random are threadsafe.(ref from Random’s javadoc)
    应该使用纯函数来计算随机数

    1
    2
    3
    4
    5
    6
    7
    8
    // 计算出本线程私有的seed,然后使用下面这个xorShift不断迭代计算下一个seed
    int seed = (this.hashCode() ^ (int) System.nanoTime());
    private int xorShift(int y) {
    y ^= (y << 6);
    y ^= (y >>> 21);
    y ^= (y << 7);
    return y;
    }
  • 如果创建线程开销比较大,而每个线程运行的时间比较短,那么就可能导致循环里创建的线程其实变成了串行执行。可以通过引入CyclicBarrier来解决

  • 为了避免在判断测试何时结束时需要与其他线程协调,可以使用确定性的结束条件
  • 测量噪声
    • 可能来源(以map为例)
      • hashCode的交错
      • 线程的调度
      • rehash
      • GC、malloc的开销
      • OS的辅助任务
    • 不应该尝试在测试时去掉噪声,因为实际使用过程中也是有各种噪声的
  • 提高错误发生的概率

    • 在多处理器系统上,开大于处理器数目的线程数目,可以使得线程的交错更加不可预测
    • 在不同的处理器数量、操作系统、处理器架构的系统上测试
    • 访问共享状态的操作中,使用Thread.yield将产生更多的上下文切换(与平台相关,JVM可能将其实现为空操作),短时间的sleep
  • 实例分析:测试阻塞队列

    • 如果使用一个类库提供的阻塞队列作为对照,那么就会引入过多的同步
    • 更好的方法是,使用一个对顺序敏感(只有一个生产者、消费者的情况下)(比如不支持交换率的运算)的校验和计算函数来计算所有入列元素和出列元素的校验和,并进行比较,这样还可以测试出取出的元素的顺序
    • 因为在每个消费者那里,不可能说每取出一个元素就要对应的操作全局的checksum,而是说边取出边操作一个局部的checksum,等全部取出后才去用局部checksum操作全局checksum,这意味着,只有一个生产者、消费者是才可以使用顺序敏感的函数,否则不应该使用这种函数
    • 代码
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      public static class PutTakeTest {
      private final ExecutorService pool = Executors.newCachedThreadPool();
      private final AtomicInteger putSum = new AtomicInteger(0);
      private final AtomicInteger takeSum = new AtomicInteger(0);
      private final CyclicBarrier barrier;
      private final MyBlockingQueue<Integer> bb;
      private final int nTrials, nPairs;


      public PutTakeTest(int capacity, int nPairs, int nTrials) {
      this.bb = new MyBlockingQueue<>(capacity);
      this.nTrials = nTrials;
      this.nPairs = nPairs;
      this.barrier = new CyclicBarrier(nPairs * 2 + 1);
      }

      public void test() {
      try {
      for (int i = 0; i < nPairs; i++) {
      pool.execute(new Producer());
      pool.execute(new Consumer());
      }
      barrier.await();
      barrier.await();
      assertEquals(putSum.get(), takeSum.get());
      } catch (Exception e) {
      throw new RuntimeException(e);
      } finally {
      pool.shutdown();
      }
      }

      private class Producer implements Runnable {
      public void run() {
      try {
      int seed = (this.hashCode() ^ (int) System.nanoTime());
      int sum = 0;
      barrier.await();
      for (int i = nTrials; i > 0; --i) {
      bb.put(seed);
      sum += seed;
      seed = xorShift(seed);
      }
      putSum.getAndAdd(sum);
      barrier.await();
      } catch (Exception e) {
      throw new RuntimeException(e);
      }
      }
      }

      private class Consumer implements Runnable {
      public void run() {
      try {
      barrier.await();
      int sum = 0;
      for (int i = nTrials; i > 0; --i) {
      sum += bb.get();
      }
      takeSum.getAndAdd(sum);
      barrier.await();
      } catch (Exception e) {
      throw new RuntimeException(e);
      }
      }
      }

      private int xorShift(int y) {
      y ^= (y << 6);
      y ^= (y >>> 21);
      y ^= (y << 7);
      return y;
      }
      }

Lock类

  • Lock实现中必须提供与内部锁相同的内存可见性语义

    All Lock implementations must enforce the same memory synchronization semantics as provided by the built-in monitor lock, as described in The Java Language Specification (17.4 Memory Model) :

    • A successful lock operation has the same memory synchronization effects as a successful Lock action.
    • A successful unlock operation has the same memory synchronization effects as a successful Unlock action.
      (ref from Lock’s javadoc)
  • 加锁语义、调度算法、顺序保证、性能特性方面可以与内置所有所不同

ReentrantLock

  • 获取ReentrantLock时,有着与进入同步代码块相同的内存语义
  • 释放时,有着与退出同步代码块相同的内存语义
  • 非公平的版本,在某个线程请求时,如果该锁变为可用,那么其可以直接获得锁,无需考虑正在排队的线程。其并不提倡插队,但是无法避免插队

    • 非公平版本的代码

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      static final class NonfairSync extends Sync {
      private static final long serialVersionUID = 7316153563782823691L;

      /**
      * Performs lock. Try immediate barge, backing up to normal
      * acquire on failure.
      */
      final void lock() {
      if (compareAndSetState(0, 1))
      setExclusiveOwnerThread(Thread.currentThread());
      else
      acquire(1);
      }

      protected final boolean tryAcquire(int acquires) {
      return nonfairTryAcquire(acquires);
      }
      }
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      final boolean nonfairTryAcquire(int acquires) {
      final Thread current = Thread.currentThread();
      int c = getState();
      if (c == 0) {
      if (compareAndSetState(0, acquires)) {
      setExclusiveOwnerThread(current);
      return true;
      }
      }
      else if (current == getExclusiveOwnerThread()) {
      int nextc = c + acquires;
      if (nextc < 0) // overflow
      throw new Error("Maximum lock count exceeded");
      setState(nextc);
      return true;
      }
      return false;
      }
    • 公平版本

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      final void lock() {
      acquire(1); // 其会调用tryAcquire
      }
      protected final boolean tryAcquire(int acquires) {
      final Thread current = Thread.currentThread();
      int c = getState();
      if (c == 0) {
      if (!hasQueuedPredecessors() &&
      compareAndSetState(0, acquires)) {
      setExclusiveOwnerThread(current);
      return true;
      }
      }
      else if (current == getExclusiveOwnerThread()) {
      int nextc = c + acquires;
      if (nextc < 0)
      throw new Error("Maximum lock count exceeded");
      setState(nextc);
      return true;
      }
      return false;
      }
    • 即使是公平版本的锁,tryLock也是不公平的

      Even when this lock has been set to use a fair ordering policy, a call to tryLock() will immediately acquire the lock if it is available, whether or not other threads are currently waiting for the lock. This “barging” behavior can be useful in certain circumstances, even though it breaks fairness. If you want to honor the fairness setting for this lock, then use tryLock(0, TimeUnit.SECONDS) which is almost equivalent (it also detects interruption).

    • 公平的版本,在JCIP的性能测试中性能比非公平的低两个数量级
    • 如果请求锁的时间时间间隔比较长,或者持有锁的时间间隔比较长,那么使用公锁是ok的
    • 内置锁是非公平的

      读写锁

  • 如果读并不是非常多,那么其实会降低性能,因为该锁算法复杂。所以使用前要对程序的瓶颈有明确的分析、测试

Fork-Join

  • fork-join 框架使用了一种有效的智能方法来平衡可用线程的工作负载,这种方法称为工作密取(work stealing)。每个工作线程都有一个双端队列 ( deque ) 来完成任务。一个工作线程将子任务压人其双端队列的队头。(只有一个线程可以访问队头,所以不需要加锁。)一个工作线程空闲时,它会从另一个双端队列的队尾“ 密取” 一个任务

构建自定义的同步工具

前置条件不满足的处理方案

  • 可以Thread.sleep()
  • 可以park
  • 可以自旋
  • 可以yield,对于很快(但是使用自旋又太浪费的地方)就可以满足前置条件的情况,这一个比较好的选择(JCIP说比较park好(14.1.1最后),但是我觉得似乎不尽然,毕竟让出了时间片不就等于park

条件变量

  • 构成前提条件的变量必须由对象的锁来保护,从而使得它们在测试前提条件的同时保持不变。如果前提条件尚未满足,就必须释放锁
  • 条件队列:每个对象都可以作为一个条件队列。其使得一组线程(等待线程集合)能够通过某种方式来等待特定的条件变为真
  • 如果某个功能无法通过“轮询+休眠”实现,那么其也无法通过条件队列实现(公平的条件队列除外,条件队列可以实现公平,但是“轮询+休眠”没法实现,除非搞个队列,但是这样子不就成了条件队列)
  • 条件谓词:使得某个操作成为状态依赖操作的前提条件(比如阻塞队列的take操作的条件谓词就是“队列不为空”),其是类中各个状态变量构成的表达式
  • 应该将条件队列相关联的条件谓词以及在这些谓词上等待的操作都写入文档

  • Monitor Condition

    • 如果调用同步方法的线程必须被阻塞或是其他原因不能立刻进行,它能够在一个监视条件上等待,这将导致该客户线程暂时释放监视锁,并挂起(WAITING / TIMED_WAITING)在监视条件上
    • 同步方法线程恢复。一旦早先挂起在某监视条件上的同步方法线程获取通知,它将继续在最初的等待监视条件的点上执行,执行的条件是唤醒后抢占到监视锁。当线程从wait方法中被唤醒时,他在重新请求锁时不具有任何特殊的优先级,而要与任何其他尝试进入同步代码块的线程一起正常的在锁上竞争
    • 当调用wait时,需要持有锁,否则会抛出IllegalMonitorStateException
    • 不提供公平的队列
    • 只有一个条件变量
  • 调用wait之前要测试条件谓词,并且从wait中返回时再次进行测试。要在循环中调用wait

    • 信号丢失:如果线程A通知了一个条件队列,但是线程B随后调用wait,那么wait并不会立刻醒来,而是要等待另一个通知来唤醒它。所以在wait之前要检查条件谓词
    • Spurious wakeup:(来自wiki。内容可能有争议)。比如,当只有一个资源变得可用时,本来我们调用notify,那么可以相信醒来的那个一定可以拿到资源。但是,并不是如此,实际操作系统中,有时会出现即使是上面那种情况,醒来后还是发现资源不可用
    • 有个问题:wait唤醒后需要抢占锁,那么,是否有可能某线程虽然被notify唤醒,也就是只有一个等待该条件的线程醒来获得资源,但是同时外界来了一个线程,并且竞争锁先成功,那么这个被唤醒的不就没资源了吗,从而类似假醒
  • 确保使用与条件队列相关联的锁来保护条件谓词的各个状态变量

  • 调用wait、notify、notifyAll时,要持有与条件队列相关的锁,否则有lost-wake-up问题,比如说代码如下

    1
    2
    cnt+=1;
    notify();
    1
    2
    3
    while(cnt<=0) {
    wait();
    }

    然后,在检查了cnt<=0这个条件后、调用wait之前,有个线程完成了cnt++;notify()的操作,那么消费者就丢失了wake-up,陷入了无限等待(如果其在等待时没有被唤醒的话)

  • 检查条件谓词之后以及开始执行相关操作之前,不要释放锁

  • 调用notify时,JVM会从条件队列上等待的多个线程中选择一个来唤醒,而调用notifyAll时则会唤醒所有在这个条件队列上等待的线程。调用notify或notifyAll时必须持有条件队列对象的锁,而如果这些等待的线程此时不能重新获得锁,那么无法从wait返回,因此发出通知的线程要尽快释放锁

  • 在一个条件队列上等待同一个条件谓词

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    abstract class BaseBoundedBuffer<V> {
    private final V[] buf;
    private int tail;
    private int head;
    private int count;

    protected BaseBoundedBuffer(int capacity) {
    this.buf = (V[]) new Object[capacity];
    }

    protected synchronized final void doPut(V v) {
    buf[tail] = v;
    if (++tail == buf.length)
    tail = 0;
    ++count;
    }

    protected synchronized final V doTake() {
    V v = buf[head];
    buf[head] = null;
    if (++head == buf.length)
    head = 0;
    --count;
    return v;
    }

    public synchronized final boolean isFull() {
    return count == buf.length;
    }

    public synchronized final boolean isEmpty() {
    return count == 0;
    }
    }

    class BoundedBuffer<V> extends BaseBoundedBuffer<V> {
    // CONDITION PREDICATE: not-full (!isFull())
    // CONDITION PREDICATE: not-empty (!isEmpty())
    public BoundedBuffer(int size) {
    super(size);
    }

    // BLOCKS-UNTIL: not-full
    public synchronized void put(V v) throws InterruptedException {
    while (isFull())
    wait();
    doPut(v);
    notifyAll();
    }

    // BLOCKS-UNTIL: not-empty
    public synchronized V take() throws InterruptedException {
    while (isEmpty())
    wait();
    V v = doTake();
    notifyAll();
    return v;
    }
    }

    这种情况下有个问题:调用notify后,take被叫醒,然而还是empty——此时isFull不满足,所以本应该put方法被得到通知被执行,所以这个信号就好像丢失了

  • 只有在同时满足以下两个条件时,才能用单一的notify而不是notifyAll

    • 所有等待的线程类型相同。只有一个条件谓词与条件队列相关,并且每个线程从wait返回后将执行相同的操作
    • 在条件变量上每次通知最多只能唤醒一个线程来执行
  • 使用notifyAll比notify更容易正确,所以除非notifyAll对性能影响过大,否则应该优先使用notifyAll

原子变量与非阻塞同步机制

非阻塞算法

  • 基于底层原子机器指令来代替锁
  • 多个线程竞争时不会发生阻塞
  • 极大地减少调度开销(因为使用自旋而不是挂起?)
  • 不存在死锁和其他活跃性问题(饥饿呢?活锁呢?)
    • “在不常见的情况下,存在活锁风险”
    • 如果其他线程每次在CAS竞争中都成功,那么本线程就会饥饿,但是实际中很少发生
  • 不会受到单个线程失败的影响

锁的劣势

  • 调度开销
  • 线程恢复执行时,需要等待其他线程执行完他们的时间片,才能被调度执行
  • 如果一个线程在持有锁时发生了page fault、调度延迟之类的情况,那么其他线程都会被延迟
  • 优先级反转:被阻塞的高优先级线程因为需要等待低优先级线程持有的锁,所以导致它的优先级降低到低优先级线程的那个级别(不知道是OS真的降低了优先级还是说虽然有高优先级但是表现上跟低优先级差不多)

原子变量

  • 内存可见性(Ref from atomic javadoc)
    • The memory effects for accesses and updates of atomics generally follow the rules for volatiles, as stated in The Java Language Specification (17.4 Memory Model) :
    • get has the memory effects of reading a volatile variable.
    • set has the memory effects of writing (assigning) a volatile variable.
    • lazySet has the memory effects of writing (assigning) a volatile variable except that it permits reorderings with subsequent (but not previous) memory actions that do not themselves impose reordering constraints with ordinary non-volatile writes. Among other usage contexts, lazySet may apply when nulling out, for the sake of garbage collection, a reference that is never accessed again.
    • weakCompareAndSet atomically reads and conditionally writes a variable but does not create any happens-before orderings, so provides no guarantees with respect to previous or subsequent reads and writes of any variables other than the target of the weakCompareAndSet.
    • compareAndSet and all other read-and-update operations such as getAndIncrement have the memory effects of both reading and writing volatile variables.

内存模型

  • JMM为程序中所有的操作(包括变量读写、monitor的加锁/释放、线程的启动和join)定义了一个偏序关系,称之为Happen-before,如果要保证执行操作B的线程看到操作A的结果,那么A和B之间就必须满足Happen-Before的关系。如果两个操作之间没有Happen-Before的关系,那么JVM就可以对它们任意地重排序
  • 以下引用自Java Concurrency in Practice
    • A data race occurs when a variable is read by more than one thread, and written by at least one thread, but the reads and writes are not ordered by happens-before. A correctly synchronized program is one with no data races; correctly synchronized programs exhibit sequential consistency, meaning that all actions within the program appear to happen in a fixed, global order.
    • The rules for happens-before are:
      • Program order rule. Each action in a thread happens-before every action in that thread that comes later in the program order.
      • Monitor lock rule. An unlock on a monitor lock happens-before every subsequent lock on that same monitor lock. [3]
      • Volatile variable rule.A write to a volatile field happens-before every subsequent read of that same field. [4]
      • Thread start rule. A call to Thread start on a thread happens-before every action in the started thread.
      • Thread termination rule. Any action in a thread happens-before any other thread detects that thread has terminated, either by successfully return from Thread.join or by Thread.isAlive returning false.
      • Interruption rule. A thread calling interrupt on another thread happens-before the interrupted thread detects the interrupt (either by having InterruptedException thrown, or invoking isInterrupted or interrupted).
      • Finalizer rule. The end of a constructor for an object happens-before the start of the finalizer for that object.
      • Transitivity. If A happens-before B, and B happens-before C, then A happens-before C.

具体问题

  • 哲学家进餐(5人)
    • 原子的获取两个(Semaphore.tryAcquire(2)),获取失败则回避随机长度的时间,避免活锁
    • 获取一个后,获取第二个失败,则释放第一个,随机退避,避免活锁
    • 奇数编号的哲学家先拿起左边的筷子,接着拿起右边的,偶数编号的颠倒过来
    • 只允许四个人同时就餐

MISC

  • 伪共享(false sharing)(Ref
    • RFO:Request for owner,如果某cache line是shared的状态,然后需要写该cache line,那么就会导致发送RFO,使得其他的该cache line的副本失效
    • 如果两个无关的变量在同一cache line,那么就会导致写其中一个变量,另一个变量的本地缓存也失效了,从而要读取时得去L3、内存取新数据
  • 如果保证资源只有一个线程去访问?通过限制只有一个线程(比如单线程的日志服务,然后所有日志工作都交给该服务)、使用锁(粒度可以是call或是thread,后者可重入,前者不可以)
  • UncaughtExceptionHandler:可以用 setUncaughtExceptionHandler 方法为任何线程安装一个处理器。也可以用 Thread类的静态方法 setDefaultUncaughtExceptionHandler 为所有线程安装一个默认的处理器

问题

  • 对象的正确发布的责任是否是在使用者而不是在线程安全类,即线程安全类也要求被正确发布

  • 代码3-13那里,有个cache.getFactors(i),这个操作应该不是原子的吧,那么如果.操作符完成之前,cache值被更新(即引用被更新),含义是什么