MENU

Java并发编程之J.U.C中Atomic原子包总结

类型

基本类型

  • AtomicInteger:整型原子类
  • AtomicLong:长整型原子类
  • AtomicBoolean:布尔型原子类
# 示例
import java.util.concurrent.atomic.AtomicInteger;

class AtomicTest {

    public static void main(String[] args) {
        int temp;
        AtomicInteger atomicInteger = new AtomicInteger(0);
        temp = atomicInteger.getAndIncrement(); // 相当于i++
        System.out.println("Old Value:" + temp + " New Value:" + atomicInteger); // Old Value:0 New Value:1
        temp = atomicInteger.incrementAndGet(); // 相当于++i
        System.out.println("Old Value:" + temp + " New Value:" + atomicInteger); // Old Value:2 New Value:2
    }
}

数组类型

  • AtomicIntegerArray:整型数组原子类
  • AtomicLongArray:长整型数组原子类
  • AtomicBoolean:布尔型原子类
# 示例
import java.util.concurrent.atomic.AtomicIntegerArray;

class AtomicTest {

    public static void main(String[] args) {
        int[] num = {1, 2, 3, 4, 5};
        AtomicIntegerArray array = new AtomicIntegerArray(num);
        array.getAndSet(1,5);
        System.out.println(array); // [1, 5, 3, 4, 5]
        array.compareAndSet(1,5,7);
        System.out.println(array); // [1, 7, 3, 4, 5]
    }
}

引用类型

  • AtomicReference:引用类型原子类
  • AtomicStampedRerence:原子更新带有版本号的引用类型(可用于解决CAS中ABA问题)
  • AtomicMarkableReference:原子更新带有标记位的引用类型
# 示例
import lombok.AllArgsConstructor;
import lombok.Data;

import java.util.concurrent.atomic.AtomicReference;

class AtomicTest {

    public static void main(String[] args) {
        AtomicReference<User> atomicReference = new AtomicReference<>();
        User user = new User("icharle", 18);
        atomicReference.set(user);
        System.out.println("name:" + atomicReference.get().getName() + " age:" + atomicReference.get().getAge()); // name:icharle age:18
        User newUser = new User("mlui", 19);
        atomicReference.compareAndSet(user, newUser);
        System.out.println("name:" + atomicReference.get().getName() + " age:" + atomicReference.get().getAge()); // name:mlui age:19
    }
}

对象属性修改类型

  • AtomicIntegerFieldUpdater:原子更新整型字段的更新
  • AtomicLongFieldUpdater:原子更新长整型字段的更新
# 示例
import lombok.AllArgsConstructor;
import lombok.Data;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

class AtomicTest {

    public static void main(String[] args) {
        AtomicIntegerFieldUpdater fieldUpdater = AtomicIntegerFieldUpdater.newUpdater(User.class, "age");

        User user = new User("icharle", 18);
        System.out.println(fieldUpdater.incrementAndGet(user));  // 19
    }
}

@AllArgsConstructor
@Data
class User {
    private String name;
    public volatile int age;
}

AtomicXXX

这里就用AtomicInteger做例子,当然还有原子类均java.util.concurrent.atomic包下,这里用几个经典的例子分析说明。
201930

AtomicInteger count = new AtomicInteger(0);
count.getAndIncrement();

# 上面一段相当于count++
# 但是在多线程并发情况下count++是不安全的,因为++操作并非是原子操作。

AtomicInteger原理分析

# AtomicInteger类
/**
 * Atomically increments by one the current value.
 *
 * @return the previous value
 */
 public final int getAndIncrement() {
    return unsafe.getAndAddInt(this, valueOffset, 1);
 }
 
 # unsafe类
 public final int getAndAddInt(Object var1, long var2, int var4) {
        int var5;
        do {
            var5 = this.getIntVolatile(var1, var2);
        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

        return var5;
    }

AtomicXXX在多线程并发情况下,是通过CAS方式来保证。其中unsafe类就是CAS的核心。

  • getAndAddInt(Object var1, long var2, int var4),var1为当前对象、var2为当前对象内存值、var4为需要更新的值。
  • var5 = this.getIntVolatile(var1, var2);,取到当前对象的内存值。
  • while循环直到var2 = var5 时候才更新值。

CAS缺点: 存在一个do···while语句,如果一直更新不成功,则会出现自旋操作。再则容易出现ABA问题。

AtomicStampReference解决CAS中ABA问题(原子引用、版本号机制)

class VolatileDemo {

    public static void main(String[] args) {
        AtomicStampedReference<Integer> reference = new AtomicStampedReference<>(10, 1);

        new Thread(() -> {
            int stamp = reference.getStamp();
            System.out.println("T1拿到的第一次的版本号:" + stamp);
            // 先暂停1秒,等T2线程拿到相同的初始版本号
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            reference.compareAndSet(10, 101, reference.getStamp(), reference.getStamp() + 1);
            System.out.println("T1线程第一次操作后的版本号为:"  + reference.getStamp());
            reference.compareAndSet(101, 10, reference.getStamp(), reference.getStamp() + 1);
            System.out.println("T1线程第二次操作后的版本号为:"  + reference.getStamp());
        }, "T1").start();

        new Thread(() -> {
            int stamp = reference.getStamp();
            System.out.println("T2拿到的第一次的版本号:" + stamp);
            // 先暂停3秒,等T1线程有充分的时候做一次ABA操作
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            boolean b = reference.compareAndSet(10, 2019, stamp, stamp + 1);
            System.out.println("当前内存中的最新值为:" + reference.getReference());
            System.out.println("T2线程在T1线程执行完ABA问题后在执行的结果为:" + b);
        }, "T2").start();
    }
}

参考文章死磕Java——CAS

AtomicLong与LongAdder区别

在AtomicLong中add()方法是通过CAS不断自旋方式更新值,当在多线程同时竞争激烈,更新值得过程不断自旋尝试CAS会造成CPU很大开销。而LongAdder则以“空间换时间”的思想,LongAdder存在一个Cell数组,因此它会针对Cell数组中的值进行CAS操作。(有点类似于jdk1.7中ConcurrentHashMap分段锁原理)

通过源码分析

先从add()方法入手,add方法中存在一个Cell数组,是Striped64的一个内部类,官方的解释为AtomicLong的填充变体仅支持原始访问和CAS

public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    // ①判断cells是否被还没初始化 ②尝试对值直接进行cas操作
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        boolean uncontended = true;
        //①cell[]数组是否初始化
        //②cell[]数组虽然已经初始化但是数组长度是否为0
        //③该线程对应的cell是否为null
        //④尝试对该线程对应的cell单元进行cas更新失败
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[getProbe() & m]) == null ||
            !(uncontended = a.cas(v = a.value, v + x)))
            longAccumulate(x, null, uncontended);
    }
}

if条件中有一个条件符合则进入到longAccumulate方法进行更新值。

final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        int h;
        // getProbe()作用是根据当前线程hash出一个int值
        // 如果getProbe()为0表示还未初始化,则进行强制初始化
        if ((h = getProbe()) == 0) {
            ThreadLocalRandom.current(); // force initialization
            h = getProbe();
            wasUncontended = true;
        }
        boolean collide = false;                // True if last slot nonempty
        for (; ; ) {
            Cell[] as;
            Cell a;
            int n;
            long v;
            // cell[]数组已初始化但是数组并且长度是否为大于0
            if ((as = cells) != null && (n = as.length) > 0) {
                // 该线程对应的cell是否为null
                if ((a = as[(n - 1) & h]) == null) {
                    // 如果busy锁没有被占用,则进行新建一个cell
                    if (cellsBusy == 0) {       // Try to attach new Cell
                        Cell r = new Cell(x);   // Optimistically create
                        // 检测busy是否为0,并且尝试锁busy
                        if (cellsBusy == 0 && casCellsBusy()) {
                            boolean created = false;
                            try {               // Recheck under lock
                                Cell[] rs;
                                int m, j;
                                // 再次确认当前线程hash对应的cell是否为null,并且将新建cell赋值
                                if ((rs = cells) != null &&
                                        (m = rs.length) > 0 &&
                                        rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                // 释放锁
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                                
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                //  针对“尝试对该线程对应的cell单元进行cas更新失败”置为true后交给循环重试
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                        fn.applyAsLong(v, x))))
                    break;
                else if (n >= NCPU || cells != as)
                    collide = false;            // At max size or stale
                else if (!collide)
                    collide = true;
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        if (cells == as) {      // Expand table unless stale
                        // 尝试扩大cell 并将前n个copy进新cell数组
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                // 线程竞争过于激烈 重新hash当前线程中HashCode值分配槽  
                h = advanceProbe(h);
            }
            // cells还未初始化情况并且能够获得锁情况 
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                boolean init = false;
                try {                           // Initialize table
                    if (cells == as) {
                        // 初始化Cell为2的数组
                        Cell[] rs = new Cell[2];
                        // 计算hashcode值
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    // 释放锁
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            // 重试一次casBase对值直接累加
            else if (casBase(v = base, ((fn == null) ? v + x :
                    fn.applyAsLong(v, x))))
                break;                          // Fall back on using base
        }
    }

sum方法中,将base值以及遍历Cell数组累加和。

public long sum() {
        Cell[] as = cells; Cell a;
        long sum = base;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }

LongAdder对比AtomicLong,使用Cell数组去承接并发cas以提升性能,但LongAdder在统计的时候如果有并发更新,可能会导致统计的数据有误差。

参考文章从 LongAdder 中窥见并发组件的设计思路

返回文章列表 文章二维码 打赏
本页链接的二维码
打赏二维码