JUC常用类笔记

JUC常用类

指的是import java.util.concurrent包下的类,通常用于解决多线程协调问题

  • lock和衍生的ReentrantLock
  • 各种容器的安全类:CopyOnWriteArrayList,ConcurrentHashMap…
  • 不安全集合转安全集合:Collections.synchronizedLis()

生产者和消费者的问题(面试)

  • 如果生产者不为空,通知消费者消费
  • 当商品被消费完了,通知消费者消费
  • 生产者和消费者的拉锯

使用wait与notify

//生产者
class Producer {
    final Produce produce;

    public Producer(Produce produce) {
        this.produce = produce;
    }

    //生产
    public void produce() {
        for (int i = 0; i < 100; i++) {
            produce.add();
        }
    }
}

//消费者
class Consumer {
    final Produce produce;

    public Consumer(Produce produce) {
        this.produce = produce;
    }

    //消费
    public void consume() {
        for (int i = 0; i < 1000; i++) {
            produce.dec();
        }
    }
}

//产品
class Produce {
    Queue<Integer> data = new LinkedList<>();
    int count = 0;

    public synchronized void add(){
        while (data.size()>=10){
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        data.offer(count);
        System.out.println("生产了产品:" + count++);
        notifyAll();
    }

    public synchronized void dec(){
        while (data.size()==0){
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("消费了产品:" + data.poll());
        notifyAll();
    }
}

Condition

  • 通常是通过lock.newCondition创建
//产品
class Produce {
    Lock lock = new ReentrantLock();
    Condition condition = lock.newCondition();
    Queue<Integer> data = new LinkedList<>();
    int count = 0;

    public void add() {
        lock.lock();
        try {
            while (data.size() >= 10) {
                try {
                    condition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            data.offer(count);
            System.out.println("生产了产品:" + count++);
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public void dec() {
        lock.lock();
        try {
            while (data.size() == 0) {
                try {
                    condition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("消费了产品:" + data.poll());
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }
}
  • 多个Condition可以实现精准唤醒
package com.hzy.juc.pc;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class C {
    public static void main(String[] args) {
        Data3 data3=new Data3();
        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                data3.printA();
            }
        },"A").start();
        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                data3.printB();
            }
        },"B").start();
        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                data3.printC();
            }
        },"C").start();
    }
}
class Data3{

    private Lock lock=new ReentrantLock();
    private Condition condition1 = lock.newCondition();
    private Condition condition2 = lock.newCondition();
    private Condition condition3 = lock.newCondition();
    private int number=1; //1a 2b 3c
    public void printA(){
        lock.lock();
        try {
            while (number!=1)
                condition1.await();
            {
        }
            System.out.println(Thread.currentThread().getName()+"=>AAAAAA");
            number=2;
            condition2.signal();//唤醒指定的人干活
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
    public void printB(){
        lock.lock();
        try {
            while (number!=2)
                condition2.await();
            {
            }
            System.out.println(Thread.currentThread().getName()+"=>BBBBBB");
            number=3;
            condition3.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
    public void printC(){
        lock.lock();
        try {
            while (number!=3)
                condition3.await();
            {
            }
            System.out.println(Thread.currentThread().getName()+"=>CCCCCC");
            number=1;
            condition1.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

计数器

CountDownLatch(减法计数器)

  • 线程安全
  • 多个线程countDown,为0await向下执行
public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        //总数是6,必须要执行任务的时候,再使用!
        CountDownLatch countDownLatch=new CountDownLatch(6);
        for (int i = 1; i <= 6; i++) {
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+"go out");
                countDownLatch.countDown();
            },String.valueOf(i)).start();
        }
        countDownLatch.await();//等待计数器归0然后再向下执行
        System.out.println("Close Door");
    }
}

CyclicBarrier(加法计数器)

public class CyclicBarrierDemo {
    public static void main(String[] args) {
        /**
         * 集齐7颗龙珠召唤神龙
         * 召唤龙珠的线程
         * */
        CyclicBarrier cyclicBarrier=new CyclicBarrier(7,()->{
            System.out.println("召唤神龙成功!");
        });
        for (int i = 1; i <= 7 ; i++) {
            final int temp=i;
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+"收集了"+temp+"颗龙珠");
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

Semaphore(信号量)

多个信号量可以让多个线程交替使用

多个共享资源互斥的使用!并发限流,控制最大的线程数!

acquire();获取,假设已经满了,等待被释放为止!
release();释放,会将当前的信号量释放+1,然后唤醒等待的线程

//最多同时三个访问
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 10; i++) {
    new Thread(()->{
        try {
            //拿到许可证,若没有则等待
            semaphore.acquire();
            //一秒后释放许可证
            System.out.println(Thread.currentThread().getName()+"执行中");
            TimeUnit.SECONDS.sleep(1);
            semaphore.release();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();
}

读写锁

  • 读是共享锁,写是排它锁
  • 分为writeLock().lock()readLock().lock();
private ReadWriteLock readWriteLock=new ReentrantReadWriteLock();

队列

阻塞队列

  • FIFO(先入先出)
  • 满了阻塞,空了阻塞
  • 场景:并发处理,线程池
  • add&remove
  • offer&poll
ArrayBlockingQueue blockingQueue=new ArrayBlockingQueue(int num);

同步队列

  • 只能放一个,存进去必须等待取出来(交替打印)
public class SynchronousQueueDemo {
    public static void main(String[] args) {
        BlockingQueue<String> synchronousQueue= new SynchronousQueue();
        new Thread(()->{
            try {
                System.out.println(Thread.currentThread().getName()+"=>"+"put 1");
                synchronousQueue.put("1");
                System.out.println(Thread.currentThread().getName()+"=>"+"put 2");
                synchronousQueue.put("2");
                System.out.println(Thread.currentThread().getName()+"=>"+"put 3");
                synchronousQueue.put("3");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"T1").start();
        new Thread(()->{
            try {
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName()+"=>"+synchronousQueue.take());
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName()+"=>"+synchronousQueue.take());
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName()+"=>"+synchronousQueue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"T2").start();
    }
}

线程池(重点)

池化技术:事先准备好一些资源,用完之后还给我(不回收)

  • 节省开辟线程的消耗
  • 提高想要速度
  • 并与管理

看阿里巴巴开发手册并发编程这块有一条:线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,通过源码分析禁用的原因。

阅读Executors源码,发现只不过是写好了参数。

  1. newFixedThreadPool

    作用:该方法返回一个固定线程数量的线程池,线程数量自定义。该方法创建的线程池最大线程数量等于核心线程数量。如果新提交的任务没有空闲的线程去处理,就会被放入阻塞队列中。

    缺点:该线程池使用的阻塞队列是LinkedBlockingQueue:链表阻塞队列,默认容量为Integer.MAX_VALUE,容量过大,可能会堆积大量的任务,从而造成OOM(内存溢出)

  2. newSingleThreadExecutor

    作用:该方法创建了只有一个线程的线程池,如果提交的任务没有空闲的线程去处理,就会被放入阻塞队列中

    缺点:该线程池使用的阻塞队列是LinkedBlockingQueue:链表阻塞队列,默认容量为Integer.MAX_VALUE,容量过大,可能会堆积大量的任务,从而造成OOM(内存溢出)

  3. newCachedThreadPool

    作用:该方法返回一个可根据实际需求调整线程数量的线程池。如果提交的任务没有空闲的线程处理,就会创建新的线程去处理该任务。如果有线程空闲时间超过60秒,就会被销毁

    缺点:该线程池允许创建的最大线程数量为Integer.MAX_VALUE,可能会创建出大量线程,导致OOM(内存溢出)

  4. newScheduleThreadPool

    作用:该方法可以创建自定义核心线程容量的线程池,而且该线程池支持定时以及周期性的任务执行。

    缺点:该线程池允许创建的最大线程数量为Integer.MAX_VALUE,可能会创建出大量线程,导致OOM(内存溢出)

FixedThreadPool

FixedThreadPool是复用固定数量的线程处理一个共享的无边界队列

  • 它是一种固定大小的线程池;
  • corePoolSize和maximunPoolSize都为用户设定的线程数量nThreads;
  • keepAliveTime为0,意味着一旦有多余的空闲线程,就会被立即停止掉;但这里keepAliveTime无效;
  • 阻塞队列采用了LinkedBlockingQueue,它是一个*队列;
  • 由于阻塞队列是一个*队列,因此永远不可能拒绝任务;
  • 由于采用了*队列,实际线程数量将永远维持在nThreads,因此maximumPoolSize和keepAliveTime将无效。

SingleThreadExecutor

  • 它只会创建一条工作线程处理任务;
  • 采用的阻塞队列为LinkedBlockingQueue;

CachedThreadPool

  • 它是一个可以无限扩大的线程池;
  • 它比较适合处理执行时间比较小的任务
  • corePoolSize为0,maximumPoolSize为无限大,意味着线程数量可以无限大;
  • keepAliveTime为60S,意味着线程空闲时间超过60S就会被杀死;
  • 采用SynchronousQueue装等待的任务,这个阻塞队列没有存储空间,这意味着只要有请求到来,就必须要找到一条工作线程处理他,如果当前没有空闲的线程,那么就会再创建一条新的线程。

ScheduledThreadPool(延时任务)

scheduledThreadPool.schedule(() -> System.out.println("hello"), int time, TimeUnit.MINUTES);:time分钟后打印hello

  • 它接收SchduledFutureTask类型的任务,有两种提交任务的方式:
  1. scheduledAtFixedRate
  2. scheduledWithFixedDelay
  • SchduledFutureTask接收的参数:
  1. time:任务开始的时间
  2. sequenceNumber:任务的序号
  3. period:任务执行的时间间隔
  • 它采用DelayQueue存储等待的任务
  • DelayQueue内部封装了一个PriorityQueue,它会根据time的先后时间排序,若time相同则根据sequenceNumber排序;
  • DelayQueue也是一个*队列;
  • 工作线程的执行过程:
  • 工作线程会从DelayQueue取已经到期的任务去执行;
  • 执行结束后重新设置任务的到期时间,再次放回DelayQueue

ForkJoin

ForkJoin在JDK1.7,并行执行任务!提高效率,大数据!(把大任务拆分成小任务)

public class Main {
    public static void main(String[] args) throws Exception {
        // 创建2000个随机数组成的数组:
        long[] array = new long[2000];
        long expectedSum = 0;
        for (int i = 0; i < array.length; i++) {
            array[i] = random();
            expectedSum += array[i];
        }
        System.out.println("Expected sum: " + expectedSum);
        // fork/join:
        ForkJoinTask<Long> task = new SumTask(array, 0, array.length);
        long startTime = System.currentTimeMillis();
        Long result = ForkJoinPool.commonPool().invoke(task);
        long endTime = System.currentTimeMillis();
        System.out.println("Fork/join sum: " + result + " in " + (endTime - startTime) + " ms.");
    }

    static Random random = new Random(0);

    static long random() {
        return random.nextInt(10000);
    }
}

class SumTask extends RecursiveTask<Long> {
    static final int THRESHOLD = 500;
    long[] array;
    int start;
    int end;

    SumTask(long[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        if (end - start <= THRESHOLD) {
            // 如果任务足够小,直接计算:
            long sum = 0;
            for (int i = start; i < end; i++) {
                sum += this.array[i];
                // 故意放慢计算速度:
                try {
                    Thread.sleep(1);
                } catch (InterruptedException e) {
                }
            }
            return sum;
        }
        // 任务太大,一分为二:
        int middle = (end + start) / 2;
        System.out.println(String.format("split %d~%d ==> %d~%d, %d~%d", start, end, start, middle, middle, end));
        SumTask subtask1 = new SumTask(this.array, start, middle);
        SumTask subtask2 = new SumTask(this.array, middle, end);
        invokeAll(subtask1, subtask2);
        Long subresult1 = subtask1.join();
        Long subresult2 = subtask2.join();
        Long result = subresult1 + subresult2;
        System.out.println("result = " + subresult1 + " + " + subresult2 + " ==> " + result);
        return result;
    }
}

Future

  • 类似Ajax
  • 对将来的某个事件的结果进行建模
 CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(()->{
     System.out.println(Thread.currentThread().getName()+"supplyAsync"+":Integer");
     return 1024;
 });
System.out.println(completableFuture.whenComplete((t, u) -> {
    //t:正常的返回结果
    //u:错误信息
    System.out.println(t + "\t" + u);
}).exceptionally((e) -> {
    e.getMessage();//打印错误信息
    return 233;// 可以获取到错误的返回结果
}).get());

CompletableFuture

Java8引入的,针对Future做了优化,可以传入回调对象,当任务结束时,会自动回调某个对象的方法,并且可以串行执行,使用静态方法即可创建一个异步任务:

CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("hello");
    return "hello";
});
CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> {
    System.out.println("world");
    return "world";
});
CompletableFuture<Void> completableFuture = CompletableFuture.allOf(completableFuture1, completableFuture2);
completableFuture.thenAccept(c -> {
    System.out.println("任务都完成了");
});
上一篇:Codeforces Round 536 (Div. 2) (E)


下一篇:JUC学习(二)- 线程间的通信