以往学习多线程总是很零碎,不写就忘,然后一段时间又要翻各种资料,就打算最近写一写多线程内容,当然自己可能了解的都是基础,记录的话呢还是准备以实战为主,理论为辅
1、Java中创建线程池 一、 ThreadPoolExecutor类介绍 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 package java.util.concurrent;public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {}
二、ThreadPoolExecutor的执行流程如下:
1.主线程提交新任务到线程池 2.线程池判断当前线程池的线程数和核心线程数的大小,小于就新建线程处理请求;否则继续判断当前工作队列是否已满. 3.如果当前工作队列未满就将任务放到工作队列中;否则继续判断当前线程池的线程数和最大线程数的大小. 4.如果当前线程池的线程数小于最大线程数就新建线程处理请求;否则就调用RejectedExecutionHandler来做拒绝处理。
三、jdk提供四种拒绝策略 1. AbortPolicy 直接抛出RejectedExecutionException异常
2. CallerRunsPolicy 交由主线程执行
3. DiscardOldestPolicy 抛弃工作队列中旧的任务,将新任务添加进队列;会导致被丢弃的任务无法再次被执行
4. DiscardPolicy 抛弃当前任务;会导致被抛弃的任务无法再次被执行
当然你也可以自定义拒绝策略,只需要实现RejectedExecutionHandler接口即可
2、Spring中创建线程池 一、ThreadPoolTaskExecutor类介绍 1 2 3 4 5 6 7 8 9 10 11 12 package org.springframework.scheduling.concurrent;public class ThreadPoolTaskExecutor { private final Object poolSizeMonitor = new Object(); private int corePoolSize = 1 ; private int maxPoolSize = 2147483647 ; private int keepAliveSeconds = 60 ; private int queueCapacity = 2147483647 ; private boolean allowCoreThreadTimeOut = false ; private TaskDecorator taskDecorator; private ThreadPoolExecutor threadPoolExecutor; }
从源码中可以看出ThreadPoolTaskExecutor就是在java中ThreadPoolExecutor的基础上封装的
3、线程池使用示例 一、使用ThreadPoolTaskExecutor
定义配置类:我们需要通过SpringBoot的配置类来配置线程池的Bean和对应的参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 import java.util.concurrent.Executor;import java.util.concurrent.ThreadPoolExecutor;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.scheduling.annotation.EnableAsync;import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;@Configuration @EnableAsync public class ThreadPoolConfig { @Bean public Executor threadPoolTaskExecutor () { ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); threadPoolTaskExecutor.setCorePoolSize(5 ); threadPoolTaskExecutor.setMaxPoolSize(5 ); threadPoolTaskExecutor.setQueueCapacity(2000 ); threadPoolTaskExecutor.setThreadNamePrefix("threadPoolTaskExecutor-->" ); threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); threadPoolTaskExecutor.initialize(); return threadPoolTaskExecutor; } }
调用方法 sevice
1 2 3 4 5 6 7 8 9 10 11 @Override @Async() public void testThread () { log.info("start test thread" ); System.out.println(Thread.currentThread().getName()); log.info("end test thread" ); }
测试类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 package com.coder.lion.test;import com.coder.lion.CoderLionApplication;import com.coder.lion.demo.service.ImportService;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;import javax.annotation.Resource;@RunWith(SpringRunner.class) @SpringBootTest(classes = CoderLionApplication.class) public class ThreadTest { @Resource ImportService importService; @Test public void test1 () { importService.testThread(); } }
输出结果:
从测试的结果可以清晰的看到sayHello方法是由我们定义的线程池中的线程执行的
注意 因为显示名称长度限制的原因我们看到的是askExecutor–>1, 但是通过在方法中打印当前线程的名字得知确实是我们设置的线程threadPoolTaskExecutor–>1
二、使用ThreadPoolExecutor 在配置类中增加如下配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 @Bean public Executor myThreadPool () { log.info("创建线程池 -- myThreadPool" ); int corePoolSize = 5 ; int maxPoolSize = 5 ; int queueCapacity = 2000 ; long keepAliveTime = 30 ; String threadNamePrefix = "myThreadPool-->" ; RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() { @Override public void rejectedExecution (Runnable r, ThreadPoolExecutor executor) { throw new RejectedExecutionException("自定义的RejectedExecutionHandler" ); } }; ThreadFactory threadFactory = new ThreadFactory() { private int i = 1 ; @Override public Thread newThread (Runnable r) { Thread thread = new Thread(r); thread.setName(threadNamePrefix + i); i++; return thread; } }; ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<>(queueCapacity), threadFactory, rejectedExecutionHandler); return threadPoolExecutor; }
可以看到我们在配置类中配置了两个线程池,如果我们想要指定使用其中一个线程池的需使用如下方式
当未指明使用哪个线程池的时候会优先使用ThreadPoo lTaskExecutor,当定义了多个或未定义ThreadPoolTaskExecutor时,默认使用的是SimpleAsyncTaskExecutor
SimpleAsyncTaskExecutor:不是真的线程池,这个类不重用线程,每次调用都会创建一个新的线程。并发大的时候会产生严重的性能问题。
1 2 3 4 5 6 7 @Override @Async("myThreadPool") public void testThread () { log.info("start test thread" ); System.out.println(Thread.currentThread().getName()); log.info("end test thread" ); }
二、 自定义ThreadPoolTaskExecutor
创建 MyThreadPoolTaskExecutor
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 package com.coder.lion.demo.config;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import org.springframework.util.concurrent.ListenableFuture;import java.util.concurrent.Callable;import java.util.concurrent.Future;public class MyThreadPoolTaskExecutor extends ThreadPoolTaskExecutor { Logger logger = LoggerFactory.getLogger(MyThreadPoolTaskExecutor.class); @Override public void execute (Runnable task) { logThreadPoolStatus(); super .execute(task); } @Override public void execute (Runnable task, long startTimeout) { logThreadPoolStatus(); super .execute(task, startTimeout); } @Override public Future<?> submit(Runnable task) { logThreadPoolStatus(); return super .submit(task); } @Override public <T> Future<T> submit (Callable<T> task) { logThreadPoolStatus(); return super .submit(task); } @Override public ListenableFuture<?> submitListenable(Runnable task) { logThreadPoolStatus(); return super .submitListenable(task); } @Override public <T> ListenableFuture<T> submitListenable (Callable<T> task) { logThreadPoolStatus(); return super .submitListenable(task); } private void logThreadPoolStatus () { logger.info("核心线程数:{}, 最大线程数:{}, 当前线程数: {}, 活跃的线程数: {}" , getCorePoolSize(), getMaxPoolSize(), getPoolSize(), getActiveCount()); } }
我们可以在自定义的ThreadPoolTaskExecutor中,输出一些线程池的当前状态,包括所有上面介绍的参数。
在配置类增加 使用 MyThreadPoolTaskExecutor 的 bean
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Bean public Executor myThreadPoolTaskExecutor () { log.info("创建线程池 -- myThreadPoolTaskExecutor" ); ThreadPoolTaskExecutor threadPoolTaskExecutor = new MyThreadPoolTaskExecutor(); threadPoolTaskExecutor.setCorePoolSize(5 ); threadPoolTaskExecutor.setMaxPoolSize(5 ); threadPoolTaskExecutor.setQueueCapacity(2000 ); threadPoolTaskExecutor.setThreadNamePrefix("myThreadPoolTaskExecutor-->" ); threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); threadPoolTaskExecutor.initialize(); return threadPoolTaskExecutor; }
Java 并发编程:线程池的使用
参考二