侧边栏壁纸
  • 累计撰写 101 篇文章
  • 累计创建 89 个标签
  • 累计收到 9 条评论

JUC原子操作之原子操作增强类

bearjun
2021-11-19 / 0 评论 / 0 点赞 / 1,379 阅读 / 11,656 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2021-11-19,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

原子操作类相信大家或多或少都用过?像AtomicInteger这样,保证一个变量在多线程的操作中正确的运算出我们希望的数值?这又是为什么呢?底层又是怎样实现的呢?我们一起来慢慢深入。。。
64725-eilt7c8vzku.png

CAS

在没有CAS之前,我们操作多线程数据运算的时候,一般都要加上锁。但是用了CAS原子操作之后,只需要调用对用的API就可以准确无误的运算数值。

CAS是什么

Compare And Swap的缩写,中文翻译成比较并交换,实现并发算法时常用到的一种技术。它包含三个操作数——内存位置、预期原值及更新值。

执行CAS操作的时候,将内存位置的值与预期原值比较:如果相匹配,那么处理器会自动将该位置值更新为新值;如果不匹配,处理器不做任何操作。多个线程同时执行CAS操作只有一个会成功。

底层原理

CAS有3个操作数,位置内存值V,旧的预期值A,要修改的更新值B。当且仅当旧的预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做或重来。

// AtomicInteger.calss源码部分内容:
public class AtomicInteger extends Number implements java.io.Serializable {
   static {
        try {
            valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }

    private volatile int value;

    public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }
}
    

// Unsafe.class源码部分内容:
public final class Unsafe {

public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);

public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
}
/**
*var1:表示要操作的对象
*var2:表示要操作对象中属性地址的偏移量
*var4:表示需要修改数据的期望的值
*var5/var6:表示需要修改为的新值
*/

CAS是JDK提供的非阻塞原子性操作,它通过硬件保证了比较-更新的原子性。它是非阻塞的且自身原子性,也就是说CAS效率更高且通过硬件保证,说明CAS更可靠。
 
CAS是一条CPU的原子指令(cmpxchg指令),不会造成所谓的数据不一致问题,Unsafe提供的CAS方法(如compareAndSwapXXX)底层实现即为CPU指令cmpxchg。执行cmpxchg指令的时候,会判断当前系统是否为多核系统,如果是就给总线加锁,只有一个线程会对总线加锁成功,加锁成功之后会执行CAS操作,也就是说CAS的原子性实际上是CPU实现的, 其实在这一点上还是有排他锁的,只是比起用synchronized,这里的排他时间要短的多,所以在多线程情况下性能会比较好。

CAS缺点

我们都知道,CAS通过Unsafe类的底层CPU指定来保证原子性。其缺点也很明显:

  • 循环时间长开销很大。
  • 引出来ABA问题。

循环时间长开销很大。
63223-l1l3krfj0fo.png
如果CAS失败,会一直进行尝试。如果CAS长时间一直不成功,可能会给CPU带来很大的开销。

引出来ABA问题。

public static void main(String[] args) {
        AtomicInteger ai = new AtomicInteger();

        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 第三次操作 0 -> 1
            System.out.println(Thread.currentThread().getName() + "==>" + ai.compareAndSet(0, 1));
        }, "one").start();


        new Thread(() -> {
            // 第一次操作 0 -> 100
            System.out.println(Thread.currentThread().getName() + "==>" + ai.compareAndSet(0, 100));
            // 第二次操作 100 -> 0
            System.out.println(Thread.currentThread().getName() + "==>" + ai.compareAndSet(100, 0));
        }, "two").start();

    }

CAS算法实现一个重要前提需要取出内存中某时刻的数据并在当下时刻比较并替换,那么在这个时间差类会导致数据的变化。
比如说一个线程one从内存位置V中取出A,这时候另一个线程two也从内存中取出A,并且线程two进行了一些操作将值变成了B,然后线程two又将V位置的数据变成A,这时候线程one进行CAS操作发现内存中仍然是A,然后线程one操作成功。 
尽管线程one的CAS操作成功,但是不代表这个过程就是没有问题的。

那需要怎么解决呢?
我们可以使用AtomicStampedReference或者AtomicMarkableReference来解决。

原子操作类

原子操作类是指java.util.concurrent.atomic下面的类。它支持对单个变量进行无锁线程安全编程。正如开篇的那张图片所示,总共16个原子类,我们分为5大类。现在让我们慢慢来揭秘。

  • 基本类型
  • 数组类型
  • 引用类型
  • 对象的属性修改
  • 原子操作增强类原理深度解析

基本类型原子类

这个应该是我们经常使用或者是使用最多的原子类了。分别为:

  • AtomicInteger
  • AtomicLong
  • AtomicBoolean

常用的API为:

  • public final int get() //获取当前的值
  • public final int getAndSet(int newValue)//获取当前的值,并设置新的值
  • public final int getAndIncrement()//获取当前的值,并自增
  • public final int getAndDecrement() //获取当前的值,并自减
  • public final int getAndAdd(int delta) //获取当前的值,并加上预期的值
  • boolean compareAndSet(int expect, int update) //如果输入的数值等于预期值,则以原子方式将该值设置为输入值(update)
public static void main(String[] args) throws InterruptedException {
        AtomicInteger ai = new AtomicInteger();

        new Thread(() -> {
            ai.getAndIncrement();// +1
            System.out.println(Thread.currentThread().getName() + "=>" + ai.get());
            ai.getAndSet(10);// = 10
            System.out.println(Thread.currentThread().getName() + "=>" + ai.get());
        }, "one").start();

        TimeUnit.SECONDS.sleep(2);

        new Thread(() -> {
            ai.getAndDecrement();// -1
            System.out.println(Thread.currentThread().getName() + "=>" + ai.get());
            ai.getAndAdd(10);// +10
            System.out.println(Thread.currentThread().getName() + "=>" + ai.get());
        }, "two").start();
    }

数组类型原子类

刚才操作的都是基本的类型的单个,现在来看看原子类型的数组类。等价于数组的操作。

  • AtomicIntegerArray
  • AtomicLongArray
  • AtomicReferenceArray
public static void main(String[] args) throws InterruptedException {
        /**
         * 1、初始的构造方法有三种,除了第三种,其他数组的每一项默认值都为0
         */
        //AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(6);
        //AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(new int[6]);
        AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(new int[]{1, 2, 3, 4, 5, 6});

        /**
         * 赋值
         */
        // 给数组第0项 + 10
        atomicIntegerArray.getAndAdd(0, 10);

        // 给数组第1项 + 1
        atomicIntegerArray.getAndIncrement(1);

        // 给数组第2项 - 1
        atomicIntegerArray.getAndDecrement(2);

        // 给数组第3项赋值
        atomicIntegerArray.set(3, 111);

        for (int i = 0; i < atomicIntegerArray.length(); i++) {
            System.out.println(atomicIntegerArray.get(i));
        }
    }

引用类型原子类

刚刚都是一些基本数据类型,那复杂类型呢?这不:

  • AtomicReference 原子更新的引用类型对象,泛型可以设置任意类型
  • AtomicStampedReference 原子更新带有版本号的引用类型对象
  • AtomicMarkableReference 原子更新带有标记位的引用类型对象

AtomicReference<E>是一个可以自定义类型的原子更新的类。

public static void main(String[] args) throws InterruptedException {
        /**
         * 如果AtomicReference的泛型为Integer基本类型,等价于 AtomicInteger
         */
        AtomicReference<Integer> ref = new AtomicReference<>();

        ref.set(10); // get() 结果为10
        ref.getAndUpdate(t -> {
            return t + 11; // get() 结果为21
        });
        /**
         * 参数t1:ref.get()的值
         * 参数t2:就是传入进来的5
         */
        ref.getAndAccumulate(5, (t1, t2) -> {
            return t1 + t2; // get() 结果为21+5
        });

        /**
         * 当然,泛型也可以是一个对象
         */
        AtomicReference<User> userRef = new AtomicReference<>();
        User user1 = new User(1, "bearjun", 25);
        User user2 = new User(100, "bear", 25);
        userRef.set(user1);

        userRef.getAndUpdate(u -> {
            u.setId(10);
            return u; //重新赋值
        });

        System.out.println(userRef.compareAndSet(user2, user2));
        // 如果当前值==预期值user1,则原子地将值设置为给定的更新值user2。返回值:是否更新成功
        System.out.println(userRef.compareAndSet(user1, user2));
    }

@Data
@AllArgsConstructor
class User{
    private Integer id;
    private String name;
    private Integer age;
}

再来看看AtomicStampedReferenceAtomicMarkableReference。前面说过ABA问题,这两个都可以解决ABA问题,AtomicStampedReference是用的一个版本号,而AtomicMarkableReference是true或者false。

public static void main(String[] args) throws InterruptedException {
        /**
         * 参数1:初始值,任意类型(基本类型或者引用类型都可以)
         * 参数2:版本号初始值,只能是int
         */
        AtomicStampedReference sRef = new AtomicStampedReference(10000,0);

        int stamp = sRef.getStamp();
        Integer reference = (Integer) sRef.getReference();
        /**
         * 参数1 是否预期的值  参数2 是预期的值则替换为该参数的值
         * 参数3 是否为预期的版本 参数4 是预期的版本则替换为当前的版本
         * 比较参数1是否为预期的值并且同时比较参数3是否为预期的版本,如果都是则把参数2赋值为最新的值,参数4为最新的版本,返回值是否替换成功
         */
        System.out.println(sRef.compareAndSet(reference, 10010, stamp, stamp + 1));

        System.out.println(sRef.getReference()); // 10010
        System.out.println(sRef.getStamp()); //1

        /**
         * 参数1:初始值,任意类型(基本类型或者引用类型都可以)
         * 参数2:默认的标记位的值
         */
        AtomicMarkableReference mRef = new AtomicMarkableReference(10,false);

        boolean marked = mRef.isMarked();
        /**
         * 如果当前引用是==期望引用,则自动将标记的值设置为给定的更新值
         */
        mRef.attemptMark(10, marked);
        System.out.println(mRef.getReference());

        /**
         * 如果当前引用是期望引用的==且当前标记等于期望标记,则将引用和标记的值都自动设置为给定的更新值。
         */
        System.out.println(mRef.compareAndSet(10, 10010, marked, !marked));

    }

对象的属性修改原子类

以一种线程安全的方式操作非线程安全对象内的某些字段。这就6批了。兄弟们。

  • AtomicIntegerFieldUpdater 原子更新对象中int类型字段的值
  • AtomicLongFieldUpdater 原子更新对象中Long类型字段的值
  • AtomicReferenceFieldUpdater 原子更新引用类型字段的值

但是6归6,但是一些使用要求也是需要注意的:
更新的对象属性必须使用public volatile修饰符。
因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须使用静态方法newUpdater()创建一个更新器,并且需要设置想要更新的类和属性。

先来看看AtomicIntegerFieldUpdaterAtomicLongFieldUpdater的使用:

class test {
    public static void main(String[] args) throws InterruptedException {

        AtomicIntegerFieldUpdater<Student> sUpdater = AtomicIntegerFieldUpdater.newUpdater(Student.class,"score");
        Student bearjun = new Student(1, "bearjun", 0);
        for (int i = 0; i<100; i++){
            new Thread(()->{
                sUpdater.incrementAndGet(bearjun);
            }).start();
        }
        // 此处等待所有的线程都执行完成。你也可以使用countDownLatch来使用。
        TimeUnit.SECONDS.sleep(5);
        System.out.println(bearjun);// Student(number=1, name=bearjun, score=100)
        System.out.println(sUpdater.get(bearjun));// 100
    }
}

@Data
@AllArgsConstructor
class Student{
    private Integer number;
    private String name;
    /**
     * 如果用AtomicIntegerFieldUpdater,必须是int
     * 如果用AtomicLongFieldUpdater,必须是long
     * 我也不知道为啥,谁知道能告诉我?
     */
    public volatile int score;
}

AtomicReferenceFieldUpdater是怎么来操作上面一样的问题呢?

class test {
    public static void main(String[] args) throws InterruptedException {
        /**
         * 泛型中分别为对象和需要修改的属性的类型
         * newUpdater:分别为对象的字节码,需要修改类型的字节码,字段名称
         */
        AtomicReferenceFieldUpdater<Student, Integer> refUpdater = AtomicReferenceFieldUpdater.newUpdater(Student.class, Integer.class, "score");
        Student bearjun = new Student(1, "bearjun", 0);
        for (int i = 0; i < 100; i++) {
            new Thread(() -> {
                /**
                 * 参数1:对象
                 * 参数2:传入需要操作的数值
                 * 参数3:x: 当前return的值(此处为:return x+y)  y:就是上面的参数2
                 */
                refUpdater.getAndAccumulate(bearjun, 1, (x, y) -> {
                    System.out.println(x + "======>" + y);
                    return x + y;
                });
            }).start();
        }
        TimeUnit.SECONDS.sleep(5);
        System.out.println(bearjun);// Student(number=1, name=bearjun, score=100)
    }
}

@Data
@AllArgsConstructor
class Student {
    private Integer number;
    private String name;
    /**
     * 如果用AtomicIntegerFieldUpdater,必须是int
     * 如果用AtomicLongFieldUpdater,必须是long
     */
    public volatile Integer score;
}

原子操作增强类原理深度解析

前面我们说了AtomicXXX的一些原子操作类,但是为什么Java还定义了一些XXXAdder或者XXXAccumulator的呢?
首先我们看看这个几个:

  • LongAdder 长整型原子+1/-1/+number计算
  • LongAccumulator 长整型原子可做复杂计算
  • DoubleAdder 浮点数原子+1/-1/+number计算
  • DoubleAccumulator 浮点数原子可做复杂计算

通过官方API可以知道,Adder只能做一些简单的操作(i++,i--),而Accumulator用于做一些复杂的操作。

public static void main(String[] args) {
        // 初始化为0
        LongAdder add = new LongAdder();

        add.increment();// +1
        add.decrement();// -1

        add.add(10); // +10

        System.out.println(add.longValue());// 结果为:10

        /**
         * 可以进行复杂的操作
         * 参数1:LongBinaryOperator接口两个参数: x:当前LongAccumulator这次计算结果的值  y:每次需要加的值 返回值:想怎么操作就怎么操作
         * 参数2:初始化值
         */
        LongAccumulator acc = new LongAccumulator((x, y) -> {
            return x + y;
        }, 0);

        /**
         * 解释当前操作:
         * 第一步计算:参数为1,则当前的的计算为 0(x) + 1(y)
         * 第二步计算:参数为2,则当前的的计算为1(x) + 2(y)
         */
        acc.accumulate(1);
        acc.accumulate(2);

        System.out.println(acc.longValue()); // 结果为:3
    }

好了,学到这。那大家一定有疑惑?为什么学了Atomic,还会有Adder呢?先来看一张图:
12794-u9ssak5su39.png
java开发手册-嵩山版 里面关于并发处理有明确的的规定。如果是JDK8,推荐使用LongAdder对象,比AtomicLong性能更好(减少乐观锁的重试次数)。

那么问题来了?为什么LongAdder比AtomicLong性能更好?好在哪里?底层又是怎样的原理呢?

LongAdder性能的探讨

首先我们来上一段代码?看看各种情况的运行时间。

class test {
    public static void main(String[] args) throws InterruptedException {
        // 书 50w
        final Integer book = 50 * 10000;
        // 线程数 10
        final Integer threadNumber = 100;

        SellBook sellBook = new SellBook();

        CountDownLatch c1 = new CountDownLatch(threadNumber);
        CountDownLatch c2 = new CountDownLatch(threadNumber);
        CountDownLatch c3 = new CountDownLatch(threadNumber);

        long l1 = System.currentTimeMillis();

        for (Integer integer = 0; integer < threadNumber; integer++) {
            new Thread(() -> {
                try {
                    for (Integer i = 0; i < book; i++) {
                        sellBook.sellBookBySynchronized();
                    }
                } finally {
                    c1.countDown();
                }
            }, "synchronized").start();
        }
        c1.await();
        System.out.println("运行时间:" + ((System.currentTimeMillis()) - l1) + ",卖出去数量:" + sellBook.bookNumber);

        long l2 = System.currentTimeMillis();
        for (Integer integer = 0; integer < threadNumber; integer++) {
            new Thread(() -> {
                try {
                    for (Integer i = 0; i < book; i++) {
                        sellBook.sellBookByAtomicInteger();
                    }
                } finally {
                    c2.countDown();
                }
            }, "AtomicInteger").start();
        }
        c2.await();
        System.out.println("运行时间:" + ((System.currentTimeMillis()) - l2) + ",卖出去数量:" + sellBook.bookAtomic.get());

        long l3 = System.currentTimeMillis();
        for (Integer integer = 0; integer < threadNumber; integer++) {
            new Thread(() -> {
                try {
                    for (Integer i = 0; i < book; i++) {
                        sellBook.sellBookByLongAdder();
                    }
                } finally {
                    c3.countDown();
                }
            }, "LongAdder").start();
        }
        c3.await();
        System.out.println("运行时间:" + ((System.currentTimeMillis()) - l3) + ",卖出去数量:" + sellBook.bookAdder.longValue());
    }
}

class SellBook {

    Integer bookNumber = 0;
    AtomicInteger bookAtomic = new AtomicInteger(0);
    LongAdder bookAdder = new LongAdder();

    public synchronized void sellBookBySynchronized() {
        bookNumber++;
    }

    public void sellBookByAtomicInteger() {
        bookAtomic.incrementAndGet();
    }

    public void sellBookByLongAdder() {
        bookAdder.increment();
    }
}

那现在直接上结果吧:

运行时间:2374,卖出去数量:50000000
运行时间:1461,卖出去数量:50000000
运行时间:194,卖出去数量:50000000

简单的说,Atomic的父类是Number,而LongAdder的父类Striped64,Striped64父类是Number。所以LongAdder有比Atomic更多的操作方法。而其中,Number中有一个base的变量来存放Atomic原子计算的值,而LongAdder不仅仅有base,还有一个类似于excel的cell数组来存放原子变量的值。Atomic的最终结果就是base。而LongAdder在无竞争的情况,跟AtomicLong一样,对同一个base进行操作,当出现竞争关系时则是采用化整为零的做法,从空间换时间,用一个数组cells,将一个value拆分进这个数组cells。多个线程需要同时对value进行操作时候,可以对线程id进行hash得到hash值,再根据hash值映射到这个数组cells的某个下标,再对该下标所对应的值进行自增操作。当所有线程操作完毕,将数组cells的所有值和无竞争值base都加起来作为最终结果。
85434-kearzd6z7tf.png

使用小结:
Atomic:线程安全,可允许一些性能损耗,要求高精度时可使用。AtomicLong是多个线程针对单个热点值value进行原子操作。
LongAdder:当需要在高并发下有较好的性能表现,且对值的精确度要求不高时,可以使用。LongAdder是每个线程拥有自己的槽,各个线程一般只对自己槽中的那个值进行CAS操作。

0

评论区