国产秋霞理论久久久电影-婷婷色九月综合激情丁香-欧美在线观看乱妇视频-精品国avA久久久久久久-国产乱码精品一区二区三区亚洲人-欧美熟妇一区二区三区蜜桃视频

@Async異步任務與線程池

共 48000字,需瀏覽 96分鐘

 ·

2021-06-28 02:39

寫在前面:本篇文章是關于使用@Async進行異步任務,并且關于線程池做了一個初步的梳理和總結,包括遇到過的一些坑

在工作中用到的一些線程池

以下代碼已做脫敏處理

1.newCachedThreadPool

    private void startTask(List<String> usersList){
ExecutorService executor = Executors.newCachedThreadPool();
executor.submit(()->{
//do someting
});
}

復制代碼

2.newScheduledThreadPool


@Configuration
public class ScheduleConfig implements SchedulingConfigurer {

@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
//當然了,這里設置的線程池是corePoolSize也是很關鍵了,自己根據業(yè)務需求設定
taskRegistrar.setScheduler(Executors.newScheduledThreadPool(10));
}

}

復制代碼

如果在idea中安裝了阿里規(guī)范插件,就會發(fā)現(xiàn)上面兩種創(chuàng)建線程池的方式都會報紅。原因是:

線程池不允許使用Executors去創(chuàng)建,而是通過ThreadPoolExecutor的方式,這樣的處理方式讓寫的同學更加明確線程池的運行規(guī)則,規(guī)避資源耗盡的風險。說明:Executors返回的線程池對象的弊端如下:

  1. FixedThreadPool和SingleThreadPool:

    允許的請求隊列長度為Integer.MAX_VALUE,可能會堆積大量的請求,從而導致OOM。

  2. CachedThreadPool:

    允許的創(chuàng)建線程數(shù)量為Integer.MAX_VALUE,可能會創(chuàng)建大量的線程,從而導致OOM。

其實這里CachedThreadPool和newScheduledThreadPool是一樣的,都是因為最大線程數(shù)被設置成了Integer.MAX_VALUE。


public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
復制代碼
    public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
復制代碼

在源碼中可以看的出newCachedThreadPool使用的是synchronousqueue隊列,也可以看作是一個長度為1的BlockingQueue所以,再加上最大允許線程數(shù)為Integer.MAX_VALUE,就導致可能會創(chuàng)建大量線程導致OOM。

同理ScheduledThreadPoolExecutor使用的是DelayedWorkQueue,初始化大小為16。當隊列滿后就會創(chuàng)建新線程,就導致可能會創(chuàng)建大量線程導致OOM。

我們不妨實際來測試一下,以newCachedThreadPool為例,jvm參數(shù)-Xms64m -Xmx192m -Xss1024K -XX:MetaspaceSize=64m -XX:MaxMetaspaceSize=128m。


@PostMapping("/newCachedThreadPoolExample")
@ResponseBody
public void newCachedThreadPoolExample(){
ExecutorService executorService = Executors.newCachedThreadPool();
while (true){
executorService.submit(()->{
log.info("submit:"+LocalDateTime.now());
try {
Thread.sleep(1000);
}catch (InterruptedException e){
e.printStackTrace();
}
});
}

}

復制代碼

剛啟動時的情況:

請求接口后就開始爆炸

然后就開始卡著不動了

比較尷尬的是一直沒有出現(xiàn)報錯OOM的情況,就直接卡死了。

總結

以上的線程池雖然可以在外部限制的情況下避免OOM等情況,但是還是建議盡量根據自己的業(yè)務情況自定義線程池。

使用@Async快速創(chuàng)建一個異步任務

1. application.yml

這里是線程池相關配置,先不詳細說,同理可以在代碼里面配置config。

線程池緩沖隊列的選擇

以上發(fā)生的問題大多數(shù)都和線程池的緩沖隊列有關,選擇一個符合自己業(yè)務特點的緩沖隊列也十分重要。

spring:
task:
execution:
pool:
# 最大線程數(shù)
max-size: 16
# 核心線程數(shù)
core-size: 16
# 存活時間
keep-alive: 10s
# 隊列大小
queue-capacity: 100
# 是否允許核心線程超時
allow-core-thread-timeout: true
# 線程名稱前綴
thread-name-prefix: async-task-

復制代碼

2.ThreadpoolApplication

這里需要在 Application上添加 @EnableAsync注解,開啟異步任務。如果是選擇在代碼里面寫config,則需要在config文件上添加@EnableAsync注解。

@EnableAsync
@SpringBootApplication
public class ThreadpoolApplication {

public static void main(String[] args) {
SpringApplication.run(ThreadpoolApplication.class, args);
}

}
復制代碼

3.AsyncTask

編寫一個異步任務處理類,在需要開啟異步的方法上面添加@Async

@Component
@Slf4j
public class AsyncTask {
@Async
public void asyncRun() throws InterruptedException {
Thread.sleep(10);
log.info(Thread.currentThread().getName()+":處理完成");
}
}

復制代碼

4.AsyncService

編寫一個調用異步方法的service

@Service
@Slf4j
public class AsyncService {
@Autowired
private AsyncTask asyncTask;

public void asyncSimpleExample() {
try {
log.info("service start");
asyncTask.asyncRun();
log.info("service end");
}catch (InterruptedException e){
e.printStackTrace();
}
}


}
復制代碼

5.AsyncController

編寫一個Controller去調用AsyncService


/**
* @author kurtl
*/

@Controller
@RequestMapping("/")
public class AsyncController {
@Autowired
private AsyncService asyncService;
@PostMapping("/asyncSimpleExample")
@ResponseBody
public void asyncSimpleExample(){
asyncService.asyncSimpleExample();
}
}

復制代碼

最后請求這個接口

可以看到,先輸出了asyncSimpleExample里面打印的service start與service end,表示service方法先執(zhí)行完畢了,而異步方法則在調用后進行了一個sleep,service沒有同步等待sleep完成,而是直接返回,表示這個是異步任務。至此我們已經通過@Async成功創(chuàng)建的異步任務。

關于@Async和@EnableAsync的原理

個人覺得源碼中很重要的一部分就是源碼中的注釋,閱讀注釋也可以幫你快速了解源碼的作用等,所有我會把重要的注釋稍微翻譯一下

1.@Async源碼




@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Async {

/**
* A qualifier value for the specified asynchronous operation(s).
* <p>May be used to determine the target executor to be used when executing
* the asynchronous operation(s), matching the qualifier value (or the bean
* name) of a specific {@link java.util.concurrent.Executor Executor} or
* {@link org.springframework.core.task.TaskExecutor TaskExecutor}
* bean definition.
* <p>When specified on a class-level {@code @Async} annotation, indicates that the
* given executor should be used for all methods within the class. Method-level use
* of {@code Async#value} always overrides any value set at the class level.
* @since 3.1.2
*/


/**
* 在這些注釋中有三個非常重要的部分
* 1.使用@Async的方法只能返回Void 或者 Future類型
* 2.表明了@Async是通過org.springframework.core.task.TaskExecutor
* 或者java.util.concurrent.Executor來創(chuàng)建線程池
* 3.寫了@Async的作用范圍 在類上使用@Async會覆蓋方法上的@Async
*/


String value() default "";

}

復制代碼

2.@EnableAsync源碼




/**
* Enables Spring's asynchronous method execution capability, similar to functionality
* found in Spring's {@code <task:*>} XML namespace.
*
* <p>To be used together with @{@link Configuration Configuration} classes as follows,
* enabling annotation-driven async processing for an entire Spring application context:
*
* <pre class="code">
* &#064;Configuration
* &#064;EnableAsync
* public class AppConfig {
*
* }</pre>
* 這里表示需要聯(lián)合@Configuration注解一起使用,所以@EnableAsync應該
* 添加在線程池Config或者SpringBootApplication 上
* {@code MyAsyncBean} is a user-defined type with one or more methods annotated with
* either Spring's {@code @Async} annotation, the EJB 3.1 {@code @javax.ejb.Asynchronous}
* annotation, or any custom annotation specified via the {@link #annotation} attribute.
* The aspect is added transparently for any registered bean, for instance via this
* configuration:
*
* <pre class="code">
* &#064;Configuration
* public class AnotherAppConfig {
*
* &#064;Bean
* public MyAsyncBean asyncBean() {
* return new MyAsyncBean();
* }
* }</pre>
*
* <p>By default, Spring will be searching for an associated thread pool definition:
* either a unique {@link org.springframework.core.task.TaskExecutor} bean in the context,
* or an {@link java.util.concurrent.Executor} bean named "taskExecutor" otherwise. If
* neither of the two is resolvable, a {@link org.springframework.core.task.SimpleAsyncTaskExecutor}
* 默認情況下spring會先搜索TaskExecutor類型的bean或者名字為
* taskExecutor的Executor類型的bean,都不存在使用
* SimpleAsyncTaskExecutor執(zhí)行器但是這個SimpleAsyncTaskExecutor實際
* 上是有很大的坑的,建議是自定義一個線程池,這個后面會說
* will be used to process async method invocations. Besides, annotated methods having
*
* @author Chris Beams
* @author Juergen Hoeller
* @author Stephane Nicoll
* @author Sam Brannen
* @since 3.1
* @see Async
* @see AsyncConfigurer
* @see AsyncConfigurationSelector
*/

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {

/**
* Indicate the 'async' annotation type to be detected at either class
* or method level.
* <p>By default, both Spring's @{@link Async} annotation and the EJB 3.1
* {@code @javax.ejb.Asynchronous} annotation will be detected.
* <p>This attribute exists so that developers can provide their own
* custom annotation type to indicate that a method (or all methods of
* a given class) should be invoked asynchronously.
*/

Class<? extends Annotation> annotation() default Annotation.class;

/**
* Indicate whether subclass-based (CGLIB) proxies are to be created as opposed
* to standard Java interface-based proxies.
* <p><strong>Applicable only if the {@link #mode} is set to {@link AdviceMode#PROXY}</strong>.
* <p>The default is {@code false}.
* <p>Note that setting this attribute to {@code true} will affect <em>all</em>
* Spring-managed beans requiring proxying, not just those marked with {@code @Async}.
* For example, other beans marked with Spring's {@code @Transactional} annotation
* will be upgraded to subclass proxying at the same time. This approach has no
* negative impact in practice unless one is explicitly expecting one type of proxy
* vs. another &mdash; for example, in tests.
*
* 這個字段用來表示,是否要創(chuàng)建基于CGLIB的代理,實際上在高版本
* 的spring 上(大概3.x)是自動選擇使用jdk動態(tài)代理還是CGLIB.
* 設置為true時,其它spring管理的bean也會升級到CGLIB代理
*/

boolean proxyTargetClass() default false;

/**
* Indicate how async advice should be applied.
* <p><b>The default is {@link AdviceMode#PROXY}.</b>
* Please note that proxy mode allows for interception of calls through the proxy
* only. Local calls within the same class cannot get intercepted that way; an
* {@link Async} annotation on such a method within a local call will be ignored
* since Spring's interceptor does not even kick in for such a runtime scenario.
* For a more advanced mode of interception, consider switching this to
* {@link AdviceMode#ASPECTJ}.
* 這個字段用來標識異步通知的模式,默認PROXY,當這個字段為
* PROXY的時候,在同一個類中,非異步方法調用異步方法,會導致異
* 步不生效,相反如果,想實現(xiàn)同一個類非異步方法調用異步方法就應
* 該設置為ASPECTJ
*/

AdviceMode mode() default AdviceMode.PROXY;

/**
* Indicate the order in which the {@link AsyncAnnotationBeanPostProcessor}
* should be applied.
* <p>The default is {@link Ordered#LOWEST_PRECEDENCE} in order to run
* after all other post-processors, so that it can add an advisor to
* existing proxies rather than double-proxy.
* 標明異步注解bean處理器應該遵循的執(zhí)行順序,默認最低的優(yōu)先級
*(Integer.MAX_VALUE,值越小優(yōu)先級越高)
*/

int order() default Ordered.LOWEST_PRECEDENCE;

}


復制代碼

在上面的源碼中,其實最核心的代碼只有一句,@Import(AsyncConfigurationSelector.class),引入了相關的配置。




/**
* Selects which implementation of {@link AbstractAsyncConfiguration} should
* be used based on the value of {@link EnableAsync#mode} on the importing
* {@code @Configuration} class.
*
* @author Chris Beams
* @author Juergen Hoeller
* @since 3.1
* @see EnableAsync
* @see ProxyAsyncConfiguration
*/

public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {

private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";


/**
* Returns {@link ProxyAsyncConfiguration} or {@code AspectJAsyncConfiguration}
* for {@code PROXY} and {@code ASPECTJ} values of {@link EnableAsync#mode()},
* respectively.
*/

/**
* 這整個方法其實就是一個選擇器和ImportSelector接口的selectImports()方法很像,基于不同的代理模式,加載不同的配置類
*/

@Override
@Nullable
public String[] selectImports(AdviceMode adviceMode) {

switch (adviceMode) {
case PROXY:
return new String[] {ProxyAsyncConfiguration.class.getName()};
case ASPECTJ:
return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
default:
return null;
}
}

}

復制代碼

接下來我們看看默認的ProxyAsyncConfiguration.class




/**
* {@code @Configuration} class that registers the Spring infrastructure beans necessary
* to enable proxy-based asynchronous method execution.
*
* @author Chris Beams
* @author Stephane Nicoll
* @author Juergen Hoeller
* @since 3.1
* @see EnableAsync
* @see AsyncConfigurationSelector
*/

@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
//繼承了AbstractAsyncConfiguration類
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {

@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
//初始化AsyncAnnotationBeanPostProcessor類型的bean
AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
//設置執(zhí)行器和異常處理器
bpp.configure(this.executor, this.exceptionHandler);
Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
//設置annotation
if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
bpp.setAsyncAnnotationType(customAsyncAnnotation);
}
//設置注解屬性
bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
return bpp;
}

}



復制代碼

這一個類繼承了AbstractAsyncConfiguration類,其實也就做了一件事初始化AsyncAnnotationBeanPostProcessor,@Async注解的就是通過AsyncAnnotationBeanPostProcessor這個后置處理器生成一個代理對象來實現(xiàn)異步的,我們先看繼承的config。




/**
* Abstract base {@code Configuration} class providing common structure for enabling
* Spring's asynchronous method execution capability.
*
* @author Chris Beams
* @author Juergen Hoeller
* @author Stephane Nicoll
* @since 3.1
* @see EnableAsync
*/

@Configuration
public abstract class AbstractAsyncConfiguration implements ImportAware {

@Nullable
protected AnnotationAttributes enableAsync; //;//enableAsync的注解屬性

@Nullable
protected Supplier<Executor> executor; //線程執(zhí)行器

@Nullable
protected Supplier<AsyncUncaughtExceptionHandler> exceptionHandler; //異常處理器 和上面的代碼對應


@Override
//設置注解屬性
public void setImportMetadata(AnnotationMetadata importMetadata) {
this.enableAsync = AnnotationAttributes.fromMap(
importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false));
if (this.enableAsync == null) {
throw new IllegalArgumentException(
"@EnableAsync is not present on importing class " + importMetadata.getClassName());
}
}

/**
* Collect any {@link AsyncConfigurer} beans through autowiring.
*/

@Autowired(required = false)
//設置執(zhí)行器和異常處理器
void setConfigurers(Collection<AsyncConfigurer> configurers) {
if (CollectionUtils.isEmpty(configurers)) {
return;
}
if (configurers.size() > 1) {
throw new IllegalStateException("Only one AsyncConfigurer may exist");
}
AsyncConfigurer configurer = configurers.iterator().next();
this.executor = configurer::getAsyncExecutor;
this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler;
}

}


復制代碼

整個代碼的結構其實非常明確,我們回到上一個類,看他設置的bean AsyncAnnotationBeanPostProcessor。這個bean很復雜,所以干脆先生成類圖。弄清楚baen的生命周期。AsyncAnnotationBeanPostProcessor是一個后置處理器,所以我們先找父類AbstractAdvisingBeanPostProcessor中。




/**
* Base class for {@link BeanPostProcessor} implementations that apply a
* Spring AOP {@link Advisor} to specific beans.
*
* @author Juergen Hoeller
* @since 3.2
*/

@SuppressWarnings("serial")
public abstract class AbstractAdvisingBeanPostProcessor extends ProxyProcessorSupport implements BeanPostProcessor {

@Nullable
protected Advisor advisor;

protected boolean beforeExistingAdvisors = false;

private final Map<Class<?>, Boolean> eligibleBeans = new ConcurrentHashMap<>(256);



public void setBeforeExistingAdvisors(boolean beforeExistingAdvisors) {
this.beforeExistingAdvisors = beforeExistingAdvisors;
}


@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) {
return bean;
}

@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
// 沒有通知,或者是AopInfrastructureBean,那么不進行代理
if (this.advisor == null || bean instanceof AopInfrastructureBean) {
// Ignore AOP infrastructure such as scoped proxies.
return bean;
}
// 添加advisor
if (bean instanceof Advised) {
Advised advised = (Advised) bean;
if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {
// Add our local Advisor to the existing proxy's Advisor chain...
// 這里通過beforeExistingAdvisors決定是將通知添加到所有通知之前還是添加到所有通知之后
// 默認false 在@Async中被設置為true
if (this.beforeExistingAdvisors) {
advised.addAdvisor(0, this.advisor);
}
else {
advised.addAdvisor(this.advisor);
}
return bean;
}
}
//構造ProxyFactory代理工廠
if (isEligible(bean, beanName)) {
ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
//添加代理的接口
if (!proxyFactory.isProxyTargetClass()) {
evaluateProxyInterfaces(bean.getClass(), proxyFactory);
}
//設置切面
proxyFactory.addAdvisor(this.advisor);
customizeProxyFactory(proxyFactory);
//返回代理類
return proxyFactory.getProxy(getProxyClassLoader());
}

// No proxy needed.
return bean;
}

//isEligible用于判斷這個類或者這個類中的某個方法是否含有注解
protected boolean isEligible(Object bean, String beanName) {
return isEligible(bean.getClass());
}


}


復制代碼

在上面代碼中可以看出來,proxyFactory.addAdvisor(this.advisor);這里持有一個AsyncAnnotationAdvisor類的對象advisor:buildAdvice()方法生成通知,buildPointcut生成切點。定位到這個類的buildPointcut方法中,看看他的切點匹配規(guī)則。




@SuppressWarnings("serial")
public class AsyncAnnotationAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware {

private Advice advice;

private Pointcut pointcut;


/**
* Create a new {@code AsyncAnnotationAdvisor} for bean-style configuration.
*/

public AsyncAnnotationAdvisor() {
this((Supplier<Executor>) null, (Supplier<AsyncUncaughtExceptionHandler>) null);
}


@SuppressWarnings("unchecked")
public AsyncAnnotationAdvisor(
@Nullable Executor executor, @Nullable AsyncUncaughtExceptionHandler exceptionHandler)
{

this(SingletonSupplier.ofNullable(executor), SingletonSupplier.ofNullable(exceptionHandler));
}


@SuppressWarnings("unchecked")
public AsyncAnnotationAdvisor(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler)
{

Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
asyncAnnotationTypes.add(Async.class);
try {
asyncAnnotationTypes.add((Class<? extends Annotation>)
ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
}
catch (ClassNotFoundException ex) {
// If EJB 3.1 API not present, simply ignore.
}
this.advice = buildAdvice(executor, exceptionHandler);
this.pointcut = buildPointcut(asyncAnnotationTypes);
}



public void setAsyncAnnotationType(Class<? extends Annotation> asyncAnnotationType) {
Assert.notNull(asyncAnnotationType, "'asyncAnnotationType' must not be null");
Set<Class<? extends Annotation>> asyncAnnotationTypes = new HashSet<>();
asyncAnnotationTypes.add(asyncAnnotationType);
this.pointcut = buildPointcut(asyncAnnotationTypes);
}

/**
* Set the {@code BeanFactory} to be used when looking up executors by qualifier.
*/

@Override
public void setBeanFactory(BeanFactory beanFactory) {
if (this.advice instanceof BeanFactoryAware) {
((BeanFactoryAware) this.advice).setBeanFactory(beanFactory);
}
}


@Override
public Advice getAdvice() {
return this.advice;
}

@Override
public Pointcut getPointcut() {
return this.pointcut;
}

//構建通知,一個簡單的攔截器
protected Advice buildAdvice(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler)
{

AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
interceptor.configure(executor, exceptionHandler);
return interceptor;
}


protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
ComposablePointcut result = null;
for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
// 根據cpc和mpc 匹配器進行匹配
//檢查類上是否有@Async注解
Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
//檢查方法是是否有@Async注解。
Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
if (result == null) {
result = new ComposablePointcut(cpc);
}
else {
result.union(cpc);
}
result = result.union(mpc);
}
return (result != null ? result : Pointcut.TRUE);
}

}


復制代碼

再找到它的通知邏輯buildAdvice,就是一個攔截器,生成AnnotationAsyncExecutionInterceptor對象,對于Interceptor,關注它的核心方法invoke就行了。它的父類AsyncExecutionInterceptor重寫了AsyncExecutionInterceptor接口的invoke方法。代碼如下





public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport implements MethodInterceptor, Ordered {


public AsyncExecutionInterceptor(@Nullable Executor defaultExecutor) {
super(defaultExecutor);
}

public AsyncExecutionInterceptor(@Nullable Executor defaultExecutor, AsyncUncaughtExceptionHandler exceptionHandler) {
super(defaultExecutor, exceptionHandler);
}



@Override
@Nullable
//
public Object invoke(final MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
// 獲取到一個線程池
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
if (executor == null) {
throw new IllegalStateException(
"No executor specified and no default executor set on AsyncExecutionInterceptor either");
}
// 然后將這個方法封裝成一個 Callable對象傳入到線程池中執(zhí)行
Callable<Object> task = () -> {
try {
Object result = invocation.proceed();
if (result instanceof Future) {
//阻塞等待執(zhí)行完畢得到結果
return ((Future<?>) result).get();
}
}
catch (ExecutionException ex) {
handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
}
catch (Throwable ex) {
handleError(ex, userDeclaredMethod, invocation.getArguments());
}
return null;
};

return doSubmit(task, executor, invocation.getMethod().getReturnType());
}


@Override
@Nullable
protected String getExecutorQualifier(Method method) {
return null;
}


@Override
@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}

@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE;
}

}


復制代碼

可以看到,invoke首先是包裝了一個Callable的對象,然后傳入doSubmit,所以代碼的核心就在doSubmit這個方法中。


@Nullable
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
//先判斷是否存在CompletableFuture這個類,優(yōu)先使用CompletableFuture執(zhí)行任務
if (CompletableFuture.class.isAssignableFrom(returnType)) {
return CompletableFuture.supplyAsync(() -> {
try {
return task.call();
}
catch (Throwable ex) {
throw new CompletionException(ex);
}
}, executor);
}
else if (ListenableFuture.class.isAssignableFrom(returnType)) {
return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
}
else if (Future.class.isAssignableFrom(returnType)) {
return executor.submit(task);
}
else {
executor.submit(task);
return null;
}
}

復制代碼

這里主要是判斷不同的返回值,最終都走進了submit方法,而submit根據線程池的不同,其實現(xiàn)也有區(qū)別,下面是SimpleAsyncTaskExecutor的實現(xiàn)方式。


/**
* Template method for the actual execution of a task.
* <p>The default implementation creates a new Thread and starts it.
* @param task the Runnable to execute
* @see #setThreadFactory
* @see #createThread
* @see java.lang.Thread#start()
*/

protected void doExecute(Runnable task) {
Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));
thread.start();
}


復制代碼

@Async的默認線程池

1.使用@Async一定要定義線程池

在上面的源碼中寫的很清楚,默認情況下spring會先搜索TaskExecutor類型的bean或者名字為taskExecutor的Executor類型的bean,都不存在使 SimpleAsyncTaskExecutor執(zhí)行器。但是這個SimpleAsyncTaskExecutor不是真的線程池,這個類不重用線程,每次調用都會創(chuàng)建一個新的線程。很有可能導致OOM。





@SuppressWarnings("serial")
public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator
implements AsyncListenableTaskExecutor, Serializable
{

/**
* Permit any number of concurrent invocations: that is, don't throttle concurrency.
* @see ConcurrencyThrottleSupport#UNBOUNDED_CONCURRENCY
*/

public static final int UNBOUNDED_CONCURRENCY = ConcurrencyThrottleSupport.UNBOUNDED_CONCURRENCY;

/**
* Switch concurrency 'off': that is, don't allow any concurrent invocations.
* @see ConcurrencyThrottleSupport#NO_CONCURRENCY
*/

public static final int NO_CONCURRENCY = ConcurrencyThrottleSupport.NO_CONCURRENCY;


/** Internal concurrency throttle used by this executor. */
private final ConcurrencyThrottleAdapter concurrencyThrottle = new ConcurrencyThrottleAdapter();

@Nullable
private ThreadFactory threadFactory;

@Nullable
private TaskDecorator taskDecorator;


/**
* Create a new SimpleAsyncTaskExecutor with default thread name prefix.
*/

public SimpleAsyncTaskExecutor() {
super();
}

/**
* Create a new SimpleAsyncTaskExecutor with the given thread name prefix.
* @param threadNamePrefix the prefix to use for the names of newly created threads
*/

public SimpleAsyncTaskExecutor(String threadNamePrefix) {
super(threadNamePrefix);
}

/**
* Create a new SimpleAsyncTaskExecutor with the given external thread factory.
* @param threadFactory the factory to use for creating new Threads
*/

public SimpleAsyncTaskExecutor(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}


/**
* Specify an external factory to use for creating new Threads,
* instead of relying on the local properties of this executor.
* <p>You may specify an inner ThreadFactory bean or also a ThreadFactory reference
* obtained from JNDI (on a Java EE 6 server) or some other lookup mechanism.
* @see #setThreadNamePrefix
* @see #setThreadPriority
*/

public void setThreadFactory(@Nullable ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}

/**
* Return the external factory to use for creating new Threads, if any.
*/

@Nullable
public final ThreadFactory getThreadFactory() {
return this.threadFactory;
}


public final void setTaskDecorator(TaskDecorator taskDecorator) {
this.taskDecorator = taskDecorator;
}


//這里可以設置最大線程數(shù),通過限流去限制線程數(shù)
public void setConcurrencyLimit(int concurrencyLimit) {
this.concurrencyThrottle.setConcurrencyLimit(concurrencyLimit);
}

/**
* Return the maximum number of parallel accesses allowed.
*/

public final int getConcurrencyLimit() {
return this.concurrencyThrottle.getConcurrencyLimit();
}

/**
* Return whether this throttle is currently active.
* @return {@code true} if the concurrency limit for this instance is active
* @see #getConcurrencyLimit()
* @see #setConcurrencyLimit
*/

public final boolean isThrottleActive() {
return this.concurrencyThrottle.isThrottleActive();
}


/**
* Executes the given task, within a concurrency throttle
* if configured (through the superclass's settings).
* @see #doExecute(Runnable)
*/

@Override
public void execute(Runnable task) {
execute(task, TIMEOUT_INDEFINITE);
}

/**
* Executes the given task, within a concurrency throttle
* if configured (through the superclass's settings).
* <p>Executes urgent tasks (with 'immediate' timeout) directly,
* bypassing the concurrency throttle (if active). All other
* tasks are subject to throttling.
* @see #TIMEOUT_IMMEDIATE
* @see #doExecute(Runnable)
*/

//
@Override
public void execute(Runnable task, long startTimeout) {
Assert.notNull(task, "Runnable must not be null");
Runnable taskToUse = (this.taskDecorator != null ? this.taskDecorator.decorate(task) : task);
if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) {
this.concurrencyThrottle.beforeAccess();
doExecute(new ConcurrencyThrottlingRunnable(taskToUse));
}
else {
doExecute(taskToUse);
}
}

@Override
public Future<?> submit(Runnable task) {
FutureTask<Object> future = new FutureTask<>(task, null);
execute(future, TIMEOUT_INDEFINITE);
return future;
}

@Override
public <T> Future<T> submit(Callable<T> task) {
FutureTask<T> future = new FutureTask<>(task);
execute(future, TIMEOUT_INDEFINITE);
return future;
}

@Override
public ListenableFuture<?> submitListenable(Runnable task) {
ListenableFutureTask<Object> future = new ListenableFutureTask<>(task, null);
execute(future, TIMEOUT_INDEFINITE);
return future;
}

@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
ListenableFutureTask<T> future = new ListenableFutureTask<>(task);
execute(future, TIMEOUT_INDEFINITE);
return future;
}

/**
* Template method for the actual execution of a task.
* <p>The default implementation creates a new Thread and starts it.
* @param task the Runnable to execute
* @see #setThreadFactory
* @see #createThread
* @see java.lang.Thread#start()
*/

//判斷是否有工廠,沒有的話調用父類創(chuàng)建線程
protected void doExecute(Runnable task) {
Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));
thread.start();
}


/**
* Subclass of the general ConcurrencyThrottleSupport class,
* making {@code beforeAccess()} and {@code afterAccess()}
* visible to the surrounding class.
*/

private static class ConcurrencyThrottleAdapter extends ConcurrencyThrottleSupport {

@Override
protected void beforeAccess() {
super.beforeAccess();
}

@Override
protected void afterAccess() {
super.afterAccess();
}
}


/**
* This Runnable calls {@code afterAccess()} after the
* target Runnable has finished its execution.
*/

private class ConcurrencyThrottlingRunnable implements Runnable {

private final Runnable target;

public ConcurrencyThrottlingRunnable(Runnable target) {
this.target = target;
}

@Override
public void run() {
try {
this.target.run();
}
finally {
concurrencyThrottle.afterAccess();
}
}
}

}

復制代碼

最主要的就是這段代碼


/**
* Template method for the actual execution of a task.
* <p>The default implementation creates a new Thread and starts it.
* @param task the Runnable to execute
* @see #setThreadFactory
* @see #createThread
* @see java.lang.Thread#start()
*/

//判斷是否有工廠,沒有的話調用父類創(chuàng)建線程
protected void doExecute(Runnable task) {
Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));
thread.start();
}

復制代碼

這里并不是用線程池,而是直接創(chuàng)建新的線程,所以會大量創(chuàng)建線程導致OOM。其實這個類是可以通過setConcurrencyLimit設置最大線程數(shù),通過synchronized和wati and notify去進行限流,這里不展開講。所以結論是在使用@Async一定要設置線程池。

@Async異步失效

以下代碼已做脫敏處理

在看公司代碼的時候,發(fā)現(xiàn)這樣一段代碼

    public UserVO saveUser(HttpServletRequest request,
String source)
{
String token = RequestUtils.getToken(request);
String uid = checkUserLoginReturnUid(token);
log.info("注冊登錄, token : {}, uid : {}", token, uid);
//獲取用戶信息
User User = getLoginUser(uid);
if(User == null){
User = new User();
//獲取用戶信息
Map<String,String> userMap = redisTemplateMain.getUserMapByToken(token);
//保存用戶
saveUser(User, userMap, source);
sendUserSystem(Integer.valueOf(userMap.get("id")));
}
//用戶信息放進緩存
setAuth2Redis(User);
return setUser2Redis(User);
}


//通知用戶系統(tǒng),我們這邊成功注冊了一個用戶
@Async
public void sendUserSystem(Integer userId){
Map<String,Object> map = new HashMap<>();
map.put("mainUid", userId);
map.put("source", "");
String json = HttpUtil.post(property.userRegisterSendSystem, map);
log.info("sendUserSystem userId : {}, json : {}", userId, json);
}

復制代碼

在之前我們看源碼的時候已經知道了,由于@Async的AdviceMode默認為PROXY,所以當調用方和被調用方是在同一個類中,無法產生切面,@Async沒有被Spring容器管理。所以這個方法跑了這么久一直是同步。

我們可以寫一個方法去測試一下。


public void asyncInvalid() {
try {
log.info("service start");
asyncInvalidExample();
log.info("service end");
}catch (InterruptedException e){
e.printStackTrace();
}
}


@Async
public void asyncInvalidExample() throws InterruptedException{
Thread.sleep(10);
log.info(Thread.currentThread().getName()+":處理完成");
}


復制代碼

調用結果很明顯,沒有進行異步操作,而是同步。

線程池拒絕導致線程丟失

既然線程池都已一個緩沖隊列來保存未被消費的任務,那么就一定存在隊列被塞滿,導致線程丟失的情況。我們寫一段代碼模擬一下。

配置文件

spring:
task:
execution:
pool:
# 最大線程數(shù)
max-size: 16
# 核心線程數(shù)
core-size: 16
# 存活時間
keep-alive: 10s
# 隊列大小
queue-capacity: 100
# 是否允許核心線程超時
allow-core-thread-timeout: true
# 線程名稱前綴
thread-name-prefix: async-task-

復制代碼

異步方法


@Async
public void asyncRefuseRun() throws InterruptedException {
Thread.sleep(5000000);
}
復制代碼

調用方法



public void asyncRefuseRun() {
for (int t = 0;t<2000;t++){
log.info(""+t);
try {
asyncTask.asyncRefuseRun();
}catch (InterruptedException e){
e.printStackTrace();
}
}
}

復制代碼

這里我循環(huán)了2000個線程,理論上來說當線程到達maxPoolSize + queueCapacity時會進行拒絕,也就是16+100。

到了116的時候果然拋出了異常java.util.concurrent.RejectedExecutionException。證明線程執(zhí)行了它的拒絕策略。

要理解線程池的拒絕策略,先來看看它的接口。


public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

復制代碼

當線程池出現(xiàn)拒絕的情況,就會來調用你設置的拒絕策略,將當前提交的任務以及線程池實例本身傳遞給你處理。這里建議根據自己的業(yè)務場景,去實現(xiàn)拒絕策略。

當然如果JDK內置的實現(xiàn)可以滿足當前業(yè)務,可以直接用jdk實現(xiàn)的。

AbortPolicy(中止策略)

這個中止策略就是我們剛剛演示的,觸發(fā)拒絕策略后,直接中止任務,拋出異常,這個也是ThreadPoolExecutor默認實現(xiàn)。

   /**
* A handler for rejected tasks that throws a
* {@code RejectedExecutionException}.
*/

public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/

public AbortPolicy() { }

/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}


復制代碼

DiscardPolicy(丟棄策略)

    /**
* A handler for rejected tasks that silently discards the
* rejected task.
*/

public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/

public DiscardPolicy() { }

/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}

復制代碼

很明顯,啥也不干,就是一個空實現(xiàn)。

DiscardOldestPolicy(棄老策略)

    /**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
*/

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/

public DiscardOldestPolicy() { }

/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}

復制代碼

如果線程池未關閉,就彈出隊列頭部的元素,然后嘗試執(zhí)行。實際上還是會丟棄任務,如果頭部元素執(zhí)行失敗,就丟棄了。區(qū)別是優(yōu)先丟棄的是老的元素。

CallerRunsPolicy(調用者運行策略)

    /**
* A handler for rejected tasks that runs the rejected task
* directly in the calling thread of the {@code execute} method,
* unless the executor has been shut down, in which case the task
* is discarded.
*/

public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/

public CallerRunsPolicy() { }

/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}

復制代碼

當觸發(fā)拒絕策略時,判斷線程池有沒有關閉,沒有關閉就由提交任務的當前線程處理。但是當大量提交后就會阻塞線程,導致性能降低。

hutool中的線程池拒絕策略實現(xiàn)

hutool作為我們經常使用的一個工具類,也有線程池工具,我們不如來看看它是如何實現(xiàn)的。


/**
* 構建ThreadPoolExecutor
*
* @param builder {@link ExecutorBuilder}
* @return {@link ThreadPoolExecutor}
*/

private static ThreadPoolExecutor build(ExecutorBuilder builder) {
final int corePoolSize = builder.corePoolSize;
final int maxPoolSize = builder.maxPoolSize;
final long keepAliveTime = builder.keepAliveTime;
final BlockingQueue<Runnable> workQueue;
if (null != builder.workQueue) {
workQueue = builder.workQueue;
} else {
// corePoolSize為0則要使用SynchronousQueue避免無限阻塞
workQueue = (corePoolSize <= 0) ? new SynchronousQueue<>() : new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY);
}
final ThreadFactory threadFactory = (null != builder.threadFactory) ? builder.threadFactory : Executors.defaultThreadFactory();
RejectedExecutionHandler handler = ObjectUtil.defaultIfNull(builder.handler, new ThreadPoolExecutor.AbortPolicy());

final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(//
corePoolSize, //
maxPoolSize, //
keepAliveTime, TimeUnit.NANOSECONDS, //
workQueue, //
threadFactory, //
handler//
);
if (null != builder.allowCoreThreadTimeOut) {
threadPoolExecutor.allowCoreThreadTimeOut(builder.allowCoreThreadTimeOut);
}
return threadPoolExecutor;
}

復制代碼

可以很清晰的看到,會判斷是否傳入線程池拒絕策略,如果沒有就用默認的AbortPolicy。

dubbo中的拒絕策略

public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {

protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class);

private final String threadName;

private final URL url;

private static volatile long lastPrintTime = 0;

private static Semaphore guard = new Semaphore(1);

public AbortPolicyWithReport(String threadName, URL url) {
this.threadName = threadName;
this.url = url;
}

@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
String msg = String.format("Thread pool is EXHAUSTED!" +
" Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
" Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
url.getProtocol(), url.getIp(), url.getPort());
logger.warn(msg);
dumpJStack();
throw new RejectedExecutionException(msg);
}

private void dumpJStack() {
//省略實現(xiàn)
}
}

復制代碼

dubbo的策略實現(xiàn)主要就是想讓開發(fā)人員知道拒絕任務的情況以及原因。它先輸出了線程池的詳細設置參數(shù),以及線程池當前的狀態(tài),還有當前拒絕任務的信息。然后又輸出了當前線程堆棧詳情在dumpJStack中實現(xiàn),最后拋出RejectedExecutionException。

Netty中的線程池拒絕策略

    private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {
NewThreadRunsPolicy() {
super();
}

public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
final Thread t = new Thread(r, "Temporary task executor");
t.start();
} catch (Throwable e) {
throw new RejectedExecutionException(
"Failed to start a new thread", e);
}
}
}

復制代碼

Netty的線程池拒絕策略很像CallerRunsPolicy(調用者運行策略),都是不會直接丟棄任務而是繼續(xù)處理任務,不同的地方是CallerRunsPolicy(調用者運行策略)是在調用線程繼續(xù)處理,而Netty是new了一個新線程去處理。

activeMq中的線程池拒絕策略


new RejectedExecutionHandler() {
@Override
public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
try {
executor.getQueue().offer(r, 60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");
}

throw new RejectedExecutionException("Timed Out while attempting to enqueue Task.");
}
});

復制代碼

activeMq中的策略屬于最大努力執(zhí)行任務型,當觸發(fā)拒絕策略時,在嘗試一分鐘的時間重新將任務塞進任務隊列,當一分鐘超時還沒成功時,就拋出異常。

監(jiān)控線程池

在開發(fā)中,我們線程池的運行狀態(tài),線程狀態(tài),對我們來說都非常重要。所以我們應該把線程池監(jiān)控起來。我們可以通過擴展beforeExecute、afterExecute和terminated這三個方法去在執(zhí)行前或執(zhí)行后增加一些新的操作。用來記錄線程池的情況。

方法含義
shutdown()線程池延遲關閉時(等待線程池里的任務都執(zhí)行完畢),統(tǒng)計已執(zhí)行任務、正在執(zhí)行任務、未執(zhí)行任務數(shù)量
shutdownNow()任務執(zhí)行之前,記錄任務開始時間,startTimes這個HashMap以任務的hashCode為key,開始時間為值
beforeExecute(Thread t, Runnable r)線程池延遲關閉時(等待線程池里的任務都執(zhí)行完畢),統(tǒng)計已執(zhí)行任務、正在執(zhí)行任務、未執(zhí)行任務數(shù)量
afterExecute(Runnable r, Throwable t)任務執(zhí)行之后,計算任務結束時間。統(tǒng)計任務耗時、初始線程數(shù)、核心線程數(shù)、正在執(zhí)行的任務數(shù)量、已完成任務數(shù)量、任務總數(shù)、隊列里緩存的任務數(shù)量、池中存在的最大線程數(shù)、最大允許的線程數(shù)、線程空閑時間、線程池是否關閉、線程池是否終止信息

package com.example.threadpool;

import lombok.extern.slf4j.Slf4j;

import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
* @author kurtl
*/

@Slf4j
public class ThreadPoolMonitor extends ThreadPoolExecutor {


/**
* 保存任務開始執(zhí)行的時間,當任務結束時,用任務結束時間減去開始時間計算任務執(zhí)行時間
*/

private final ConcurrentHashMap<String, Date> startTimes;

/**
* 線程池名稱,一般以業(yè)務名稱命名,方便區(qū)分
*/

private final String poolName;

/**
* 調用父類的構造方法,并初始化HashMap和線程池名稱
*
* @param corePoolSize 線程池核心線程數(shù)
* @param maximumPoolSize 線程池最大線程數(shù)
* @param keepAliveTime 線程的最大空閑時間
* @param unit 空閑時間的單位
* @param workQueue 保存被提交任務的隊列
* @param poolName 線程池名稱
*/

public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, String poolName)
{
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), poolName);
}


/**
* 調用父類的構造方法,并初始化HashMap和線程池名稱
*
* @param corePoolSize 線程池核心線程數(shù)
* @param maximumPoolSize 線程池最大線程數(shù)
* @param keepAliveTime 線程的最大空閑時間
* @param unit 空閑時間的單位
* @param workQueue 保存被提交任務的隊列
* @param threadFactory 線程工廠
* @param poolName 線程池名稱
*/

public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory, String poolName)
{
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
this.startTimes = new ConcurrentHashMap<>();
this.poolName = poolName;
}

/**
* 線程池延遲關閉時(等待線程池里的任務都執(zhí)行完畢),統(tǒng)計線程池情況
*/

@Override
public void shutdown() {
// 統(tǒng)計已執(zhí)行任務、正在執(zhí)行任務、未執(zhí)行任務數(shù)量
log.info("{} Going to shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}",
this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
super.shutdown();
}

/**
* 線程池立即關閉時,統(tǒng)計線程池情況
*/

@Override
public List<Runnable> shutdownNow() {
// 統(tǒng)計已執(zhí)行任務、正在執(zhí)行任務、未執(zhí)行任務數(shù)量
log.info("{} Going to immediately shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}",
this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
return super.shutdownNow();
}

/**
* 任務執(zhí)行之前,記錄任務開始時間
*/

@Override
protected void beforeExecute(Thread t, Runnable r) {
startTimes.put(String.valueOf(r.hashCode()), new Date());
}

/**
* 任務執(zhí)行之后,計算任務結束時間
*/

@Override
protected void afterExecute(Runnable r, Throwable t) {
Date startDate = startTimes.remove(String.valueOf(r.hashCode()));
Date finishDate = new Date();
long diff = finishDate.getTime() - startDate.getTime();
// 統(tǒng)計任務耗時、初始線程數(shù)、核心線程數(shù)、正在執(zhí)行的任務數(shù)量、
// 已完成任務數(shù)量、任務總數(shù)、隊列里緩存的任務數(shù)量、池中存在的最大線程數(shù)、
// 最大允許的線程數(shù)、線程空閑時間、線程池是否關閉、線程池是否終止
log.info("{}-pool-monitor: " +
"Duration: {} ms, PoolSize: {}, CorePoolSize: {}, Active: {}, " +
"Completed: {}, Task: {}, Queue: {}, LargestPoolSize: {}, " +
"MaximumPoolSize: {}, KeepAliveTime: {}, isShutdown: {}, isTerminated: {}",
this.poolName,
diff, this.getPoolSize(), this.getCorePoolSize(), this.getActiveCount(),
this.getCompletedTaskCount(), this.getTaskCount(), this.getQueue().size(), this.getLargestPoolSize(),
this.getMaximumPoolSize(), this.getKeepAliveTime(TimeUnit.MILLISECONDS), this.isShutdown(), this.isTerminated());
}

/**
* 創(chuàng)建固定線程池,代碼源于Executors.newFixedThreadPool方法,這里增加了poolName
*
* @param nThreads 線程數(shù)量
* @param poolName 線程池名稱
* @return ExecutorService對象
*/

public static ExecutorService newFixedThreadPool(int nThreads, String poolName) {
return new ThreadPoolMonitor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), poolName);
}

/**
* 創(chuàng)建緩存型線程池,代碼源于Executors.newCachedThreadPool方法,這里增加了poolName
*
* @param poolName 線程池名稱
* @return ExecutorService對象
*/

public static ExecutorService newCachedThreadPool(String poolName) {
return new ThreadPoolMonitor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), poolName);
}

/**
* 生成線程池所用的線程,只是改寫了線程池默認的線程工廠,傳入線程池名稱,便于問題追蹤
*/

static class EventThreadFactory implements ThreadFactory {
private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;

/**
* 初始化線程工廠
*
* @param poolName 線程池名稱
*/

EventThreadFactory(String poolName) {
SecurityManager s = System.getSecurityManager();
group = Objects.nonNull(s) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = poolName + "-pool-" + POOL_NUMBER.getAndIncrement() + "-thread-";
}

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
}


復制代碼

線程池負載關注的核心問題是:基于當前線程池參數(shù)分配的資源夠不夠。對于這個問題,我們可以從事前和事中兩個角度來看。事前,線程池定義了“活躍度”這個概念,來讓用戶在發(fā)生Reject異常之前能夠感知線程池負載問題,線程池活躍度計算公式為:線程池活躍度 = activeCount/maximumPoolSize。這個公式代表當活躍線程數(shù)趨向于maximumPoolSize的時候,代表線程負載趨高。事中,也可以從兩方面來看線程池的過載判定條件,一個是發(fā)生了Reject異常,一個是隊列中有等待任務(支持定制閾值)。以上兩種情況發(fā)生了都會觸發(fā)告警,告警信息會通過大象推送給服務所關聯(lián)的負責人?!缊F技術文檔

核心線程數(shù) 最大線程數(shù) 如何配置

如何合理的配置線程池參數(shù),比較普遍的說法是。

IO密集型 = 2Ncpu(可以測試后自己控制大小,2Ncpu一般沒問題)(常出現(xiàn)于線程中:數(shù)據庫數(shù)據交互、文件上傳下載、網絡數(shù)據傳輸?shù)鹊龋?/p>

計算密集型 = Ncpu(常出現(xiàn)于線程中:復雜算法)

而這種方案沒有考慮多線程池的情況,實際使用上也有偏離。

圖來自美團技術博客

所以參數(shù)的設置應該根據自己實際的應用場景定制。

多線程池的使用

一般在實際業(yè)務中,我們會定義不同的線程池來處理不同的業(yè)務。利用我們之前完成的ThreadPoolMonitor可以很快的定義不同的線程。

ThreadPoolConfig


@EnableAsync
@Configuration
public class ThreadPoolConfig {

@Bean
public ThreadPoolExecutor test01(){
return new ThreadPoolMonitor(16,32,60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100),"test01");
}

@Bean
public ThreadPoolExecutor test02(){
return new ThreadPoolMonitor(8,16,60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100),"test02");
}
}


作者:西西弗的石頭
鏈接:https://juejin.cn/post/6976893903223914527
來源:掘金
著作權歸作者所有。商業(yè)轉載請聯(lián)系作者獲得授權,非商業(yè)轉載請注明出處。



瀏覽 103
點贊
評論
收藏
分享

手機掃一掃分享

分享
舉報
評論
圖片
表情
推薦
點贊
評論
收藏
分享

手機掃一掃分享

分享
舉報

感谢您访问我们的网站,您可能还对以下资源感兴趣:

国产秋霞理论久久久电影-婷婷色九月综合激情丁香-欧美在线观看乱妇视频-精品国avA久久久久久久-国产乱码精品一区二区三区亚洲人-欧美熟妇一区二区三区蜜桃视频 免费操B| 免费无码进口视频| 91丨九色丨东北熟女| 99热这里有精品| 日本一区二区三区四区| 亚洲av大片| 91污视频在线观看| 国产午夜无码视频在线观看| 一区二区三区不卡视频| 亚洲一区二区网站| 国产av播放| 中文字幕第69页| 一区二区三区四区视频在线| 怡春院国产| 波多野结衣成人网站| 精品免费一区二区三区四区| 欧美一区二区三区成人片在线| 香蕉操逼视频| 黄网国产手机在线观看| 2020无码| 久久草大香蕉| 日韩中文在线观看| 操B视频在线播放| 大鸡巴免费视频| 色哟哟精品| 日韩中文久久| 日韩精品丰满无码一级A片∴| 91人妻人人澡人人爽人人精品乱| 国产福利av| 一级成人片在线观看| 国产一级二级片| 精品资源成人| 欧美性爱视频网站| 亚洲成人动漫在线| 亚洲色图五月天| A片免费在线播放| 欧美精品久久久久久久多人混战| 人妻少妇精品视频一区二区三区| 亚洲一区二区三区在线播放| 欧美a在线观看| 亚洲福利影院| 色综合久久88色综合| 在线天堂AV| 99热91| 搡BBBB推BBBB推BBBB| 久草五月| 日韩欧美精品一区二区| 911亚洲精品| 日韩一及| 中文字幕在线播放视频| 色啪视频| 精品亚洲一区二区三区| 欧美老女人性爱视频| 欧美激情五月天| 国产一级a毛片| 日本老女人视频| 91久久| 国产精品视频免费观看| 亚洲无码AV免费观看| 3d动漫精品一区二区三区在线观看| 日本A级毛片| 国产欧美精品成人在线观看| 肏逼综合网| 日本色色色| 无码字幕| 日韩精品成人免费观看视频| 无码婷婷| 毛片9| 中文字幕免费高清网站| 日本黄色影视| 五月天婷婷综合网| 亚洲电影中文字幕| 亚洲美女网站在线观看| 午夜精品人妻无码| 国产小视频在线| 精品中文在线| 男人的天堂2019| 国产成人电影一区二区| 壁特壁视频在线观看| 久久精品免费看| 影音先锋AV资源网站| 欧美爱爱免费看| 欧美精品人妻| 国产九色91回来了| 思思热这里只有精品| 天天色色天天| 中文字幕成人网| 偷拍视频图片综合网| 免费做爱网站| 久久福利网| 波多野结衣东京热| 中文字幕在线看成人电影| 狠狠操2019| 超碰99在线| 精品码产区一区二亚洲国产| 中国熟妇XXXX18| 走光无码一区二区三区| 亚洲欧美国产毛片在线| 久九视频| 永久免费av| 亚洲色图偷拍| 婷婷伊人久操网| 91成人精品视频| 婷婷五月天在线电影| 老婆被黑人杂交呻吟视频| 98在线++传媒麻豆的视频| 亚洲区欧美区| 操逼99| 健身房被教练3p喷水了| av黄页| 国产美女自拍| 日本精品黄色| 五月丁香六月情| 久草欧美| 日韩精品成人无码| 三级大香蕉| 高清无码一区| 国产1区在线观看| 久久精品亚洲无码| 亚洲av资源在线观看| 欧美级毛片一进一出| 操屄视频免费观看| 日韩激情无码一区二区| 91视频熟女| 91丨熟女丨对白| 国产精品久久久久国产A级| 亚洲成人网在线观看| 青青草免费在线视| 黄色午夜| 情趣视频网站| 人人妻人人爽人人精品| 中文字幕超清在线观看| 肏屄视频免费观看| 神马午夜激情| 久久er热| 午夜做爱视频| 午夜操爽| 亚洲日韩影院| 亚洲vs无码秘蜜桃少妇小说| 中文字幕黄色片| 女孩自慰在线观看| 大香伊人中文字幕精品| h片免费网站| 免费一级无码婬片A片APP直播| 无码做爰欢H肉动漫网站在线看| 亚洲日韩第一页| 爱爱午夜福利| 精品乱伦| 人与禽一级A片一区二区三区| 中文字幕无码人妻| 草逼国产| 国产女人与禽zOz0性| 综合天堂AV久久久久久久| 婷婷狠狠操| 免费的av| 亚洲国产成人AV| 五月天婷婷在线无码| 午夜精品18视频国产17c| 日本电影一区二区三区| 高清无码在线观看视频| 黄色一级片免费观看| 中文字幕亚洲中文字幕| 91丨露脸丨熟女| 自拍超碰在线| 熟女人妻一区二区三区免费看| 在线中文字幕AV| 在线乱视频| 自拍偷拍第一页| 日韩精品久久久久久久酒店| 久久黄色视屏| 色婷婷视频在线| 91免费网站在线观看| 天天操夜夜操视频免费高清| 91在线看| 亚洲视频a| 色老板在线精品免费观看| 成人黄色一级片| 午夜天堂在线| 亚洲三区视频| 亚洲免费婷婷| av免费播放| 黄色A片网址| 久久久9999| 日本中文字幕在线播放| 尿在小sao货里面好不好| 一级大毛片| 成人久久久久一级大黄毛片中国| 婷婷在线观看免费| 国产成人AV免费观看| 日本爱爱小视频| 国产精品不卡一区二区三区| 亚洲天堂av在线免费观看| 伊人99热| 亚洲成人777| 黄色视频大全免费看| 999一区二区三区| 久久99精品国产.久久久久| 国产在线不卡| 黄片大全在线免费观看| 五月丁香婷婷开心| 美女大吊,网站视频| 亚洲久久久久| 国产日韩一区| 欧美男人的天堂| 超碰91在线| 亚洲成人av在线观看| 成人黄色视频免费| 中日毛片| 国产AV二区| 在线观看免费黄色视频| 九九九中文字幕| 豆花成人视频在线观看| 国产精品黑人ThePorn| 国产欧美日韩综合| 日逼片| 91久久免费视频| 青草久久久久| 亚洲黄色av网站| 国产亚洲色情| 亚洲AV成人无码| 日本乱伦网站| 黄色成人大片| 色婷婷欧美在线播放内射| 欧美婷婷五月| 成人精品18| 人人操人| 2018人人操| 日本黄色视频。| 91无码一区二区三区在线 | 无码日韩人妻精品久久蜜桃 | 乱伦91视频| 亚洲免费播放| 成人免费毛片蓝莓| 天天插天天狠天天透| 自拍偷拍激情视频| 东京热av一区二区| 久久久久久无码日韩欧美电影| 最近中文字幕免费| 中日韩在线视频| 无码AV电影| 久久久久久高清毛片一级| 精东av| 影音先锋中文字幕资源| a色视频| 激情人妻在线| www超碰在线| 三级无码视频| 人善交精品一区二区三区| 久久视频国产| 影音先锋成人AV资源| 亲孑伦XXXⅹ熟女| 丁香五月天激情网| www免费视频| 国产精品久久免费视频| 国产xxxx| 91性爱视频| 午夜色色福利| 婷婷五月丁香五月| 婷婷五月天中文字幕| 一本一道无码免费看视频| 再深点好爽灬轻点久久国产| 新妺妺窝窝777777野外| 熟女少妇一区二区三区| 国产精品成人在线观看| 国产精品成人影视| 婷婷国产精品视频| 人妻免费在线视频| 午夜成人无码视频| 激情小视频在线观看| 国产淫荡视频| A片欧美| 日韩小视频| 超碰中文字幕| 影音先锋av在线资源站| 亚洲操操操| 日韩一区二区三区四区| 熟女人妻人妻HD| 四虎黄色网址| 国产1页| 天天爽日日澡AAAA片| 亚洲性爱在线| av无码在线观看| 99热99精品| 一级黄色片视频| 夜夜操影院| 在线观看免费黄色视频| 婷婷伊人綜合中文字幕小说| 午夜精品久久久久久久久久久久| 中文一区二区| 都市激情亚洲| 日韩A视频| av玖玖| 日本中文字幕乱伦| 二区三区视频| 色播视频在线观看| 91插插网| 大香蕉75在线| 丁香五月婷婷五月天| 亚洲精品视频免费观看| 成人片免费看| 国产亚洲中文| 无码视频韩国| 一起操在线| 日韩免费一级| 91视频人妻| 中文人妻av| 色操人 | 伊人综合电影| 99热官方网站| 亚洲无吗在线观看| 亚洲一区二区三区视频| 色妞视频| 亚洲激情四射| 99综合| 樱桃av| 中文字幕韩日| 日韩AV大片| 大香蕉在线视频网| 乱子伦国产精品视频一级毛| 成人午夜av| 中文字幕性爱| 激情导航| 97资源网站| 韩国中文字幕HD久久| 无码视屏| 青青免费在线视频| 九九九中文字幕| 日韩成人影片| 丰满少妇一级片| 成人尤物网站| 国产变态另类| 高清人妻无码| 久久久xxx| 去干网欧美| 内射午夜福利在线免费观看视频| 日韩毛片在线免费观看| 五月婷网| 免费日比视频| 亚洲综合国产| 操逼操逼操逼| 精品欧美片在线观看步骤| 国产亚洲无码激情| 免费黄片在线| 先锋影音中文字幕| 香蕉伊人在线| 韩国成人无码视频| 一级黄色免费片| 色xxxx| 欧美三级电影在线观看| 91精品一区| 星空AV| 美腿丝袜中文字幕精品| 插菊花综合网1| 日本无码片| 无码人妻在线播放| 操操网站| 亚洲黄色大片| 中文字幕成人| 曰曰操| 国产色无码网站www色视频| 性欧美成人播放77777| 亚洲免费看黄| 国产不卡在线视频| 91在线无精精品秘白丝| 91福利资源| 国产成人精品无码片子的价格| 蜜臀久久99精品久久久| 成年人在线播放| 91日韩欧美| 天天日日干| 国产成人三级片| 精品国产乱码一区二区| chinese搡老熟老妇人| 中日韩欧美一级A片免费| 天天爽天天做| 青青热久| 天天干人人干| 日韩成人在线观看| 欧美日韩免费在线| 久久久久久久久久久高清毛片一级| 黄色A片网址| 蜜臀99久久精品久久久懂爱| 亚洲猛男操逼欧美国产视频| 免费在线性爱视频| 免费黄色视频网站在线观看| 亚洲AV成人无码精在线| 黄色视频免费在线看| 日韩码波多野结衣| 久草福利在线观看| 日本视频免费| 精品无码一区二区三区四区| 91视频熟女| 国产精品久久久久久久牛牛| 久热精品视频在线观看| 人人操人人操人人操人人操 | 日韩天堂av| 欧美在线免费视频| 久久久三级片| 97中文字幕在线| 久操视频网站| 欧美大香蕉伊人网| 依人综合网| 亚洲无码手机在线| 精品熟妇| 天堂av中文字幕| 黄色影院在线观看| 国产aaaaaaaaaaaaa| www五月天| 深爱婷婷网| 92久久| 北条麻妃一区二区三区在线| 日韩久久网站| 天天添夜夜添| 97在线观看免费视频| 99热1| 午夜成人爽| 91无码秘蜜桃一区二区三区-百度| 97热| 91久久久久久久| 视频国产区| 无码人妻一区二区三区免费n鬼沢| 欧美激情无码炮击| 熟妇人妻中文AV无码| 黄色免费在线观看视频| 激情一一区二区三区| 精品国产重口乱子伦| 97人妻精品| 黄色A片免费看| 老太奶性BBwBBw侧所| 国产综合一区二区| 性爱免费专区| 毛片毛片毛片毛片毛片| 91无码人妻精品一区二区三区四 | 少妇bbb搡bbbb搡bbbb| 免费在线观看Av| 在线观看高清无码| 成人伊人大香蕉| 大香蕉一本| 亚洲欧美动漫| 在线免费观看黄色视频网站 | 日韩小视频在线| 亚洲精品乱码久久久久久按摩观| 青在线视频| 亚洲AV无码一区二区三区少妇| 欧美在线天堂| 国产精品一区二区在线观看| 国产成人精品AV| 嫩草久久99www亚洲红桃| 91N视频| 影音先锋久久| 免费观看一区| 特级av| 男女av免费观看| 国产日韩欧美综合精品在线观看| 波多野结衣网址| 日本翔田千里奶水| 老熟女网站| 视频一区中文字幕| 久久婷婷五月综合伊人| 中文字幕第27页| 日本欧美一级片| www.91madou| 色人阁人妻中文字幕| 四季AV综合网站| 天天干欧美| 爱射网| 青草伊人av| 国产一区二区三区视频在线观看| 草久在线观看| 免费黄色网址啊不卡| 国产乱婬AAAA片视频| 国产在线观看国产精品产拍| 97国产精品| 日韩国产一区二区| 大肉大捧视频免费观看| 日韩高清无码免费看| 三级电影久久麻豆| 51成人网| 中国熟女HD| 欧洲黄网| 视频一区中文字幕| 少妇福利| 在线一区二区三区四区| 精国产品一区二区三区A片| 在线播放JUY-925被丈夫上司侵犯的第7天 | 特級西西444WWw高清大膽| 黄色午夜| 激情五月天在线视频| 久久精品偷拍视频| 无码在线免费观看视频| 亚洲欧美高清视频| 亚洲国产精品久久久| 久久黄色视频免费观看| 一区二区无码精品| 欧美日韩v| 日韩AⅤ无码一区二区三区| 无码电影网| 欧美久久视频| 成人免费三级| 日韩精品成人片| 激情五月天综合网| 日韩精品123| 国产口爆在线观看| 国产高清无码免费在线观看| 国产夫妻自拍av| 小H片在线观看| 国产精品久久久精品| www.俺去了| 超碰欧美在线| www在线播放| 天堂久久久久| 五月色视频| 亚洲天堂视频在线| 日皮视频在线看| 国产一区二三区| 超碰97老师| 国产精品五月天| 久草视频这里只有精品| 法国《少女日记》电影| 全国最大成人网| 国产插逼视频| 日韩成人在线免费观看| 色五月在线| 免费在线观看视频a| 亚洲视频456| 婷婷久久综合久色| 欧美日韩中文字幕在线| 无码熟妇| 国产中文字幕AV在线播放| 黄片大全在线观看| 欧洲性爱视频| 97干在线| 蜜桃免费| 91视频在| 自慰一区二区| 国产视频久久| 高清无码18| 在线无码人妻| 亚洲一级Av无码毛片久久精品| 无码一区精品久久久成人| 91狠狠综合| 婷婷99狠狠躁天天躁| 国产综合久久777777麻豆| 91成人无码视频| 26∪u∪成人网站| AA无码| 182在线视频| 黄页网站视频| 国产,亚洲91| 国产逼逼| 秋霞丝鲁片一区二区三区手机在绒免 | 新亚洲天堂男子Av-| 免费看成人747474九号视频在线观看| 亚洲天堂影院| 蜜桃视频成人版网站| 欧美狼友| 国产AA| 精品在线免费观看| 91久久综合亚洲鲁鲁五月天| 黄色片在线看| 狠狠干,狠狠操| 亚洲性爱手机版| 骚白虎一区| www.zaixianshipin| 无码人妻AⅤ一区二区三区A片一| 看国产毛片| 免费人成视频在线播放| 日韩毛片在线视频x| 草久久| 黄色片在线播放| 四房婷婷| 日韩一区二区AV| 欧美日韩国产成人综合| www99精品| 三级无码在线播放| 国产在线性爱视频| 亚洲无码高清在线视频| 久久区| 日本无码免费视频| 国产又粗又黄| 国产一级二级三级片| 国产性猛交╳XXX乱大交| 久久精品成人导航| 亚洲精品日韩无码| 亚洲欧美另类在线| 日韩综合色| 欧美日韩国产一区| 青误乐在线播放| av一区在线| 人人摸在线视频| 国产一级a毛一级a做免费的视频 | 欧美一级性爱视频| 91香蕉视频18| 久久这里有精品| 国产精品一麻了麻了| 亚洲无吗在线视频| 午夜视频在线| 无码国产av| 天天爽夜夜爽夜夜爽精品| 五月丁香人妻| 大香蕉中文视频| www.五月天.con| 色综合天天综合网国产成人网| 婷婷激情四射| 欧美成人视频18| 美女视频一区二区三区| 91吊逼| 噜噜色小说| 久久久亚洲熟妇熟女| 成人黄色视频网站在线观看| 欧美射精视频| 免费看黄A级毛片成人片| 麻豆mdapp01.tⅴ| 俺去了俺来也| 高潮喷水无码| 男女无套在线观看免费| 97黄片| 亚洲黄色成人网站| 天天看天天色| 狠狠躁日日躁夜夜躁A片男男视频 精品无码一区二区三区蜜桃李宗瑞 | 精品免费在线观看| 青青草成人网站| 在线免费A片| 成人毛片网| 成人黄色在线看| 亚洲一区2区| 大香蕉色伊人| 欧洲成人在线| 亚洲vs天堂vs成人vs无码| 国产一级A片在线观看| 北条麻妃波多波多野结衣| 国产成人在线免费视频| 青春草在线观看| 97香蕉久久夜色精品国产| 亚洲一区二区久久| 另类老妇极品BBWBBw| 成人无码人妻| 国产三级在线观看| 中文字幕无吗| 日韩中文无| 国产激情无码视频| 这里只有精品91| 美女黄视频网站| 内射无码视频| 中文字幕在线视频观看| 熟妇私拍| 国产精品免费观看久久久久久久久 | 大香蕉伊人综合| AV黄片| 久久精品禁一区二区三区四区五区| 成人免费AV| 日本熟妇高潮BBwBBwBBw| av中文字幕在线播放| 久久精品国产亚洲AV麻豆痴男| 黄色av免费网站| 日本親子亂子倫XXXX50路| 成人一级片| 黄色草莓视频| 婷婷亚洲天堂| 免费一区二区三区四区| 伊人网导航| 午夜蜜桃人妻一区二区| 久久国产精品电影| 欧美特黄AAAAAA| 日本免费一二三区| 亚洲五月激情| 未满十八18禁止免费无码网站| 九九天堂网| 欧美日韩中文在线视频| 岛国av无码免费| 老司机在线免费视频| 尻屄电影| 国产一区在线播放| 91传媒在线免费观看| 四川BBB搡BBB爽爽爽欧美| 国产第八页| 在线v片| 18成人在线观看| 中文字幕精品久久久久人妻红杏Ⅰ | 中文字幕在线播放av| 91网站在线免费观看| 91人妻人人澡人人爽人人精品一 | 粉嫩小泬BBBB免费看| AV第一页| 大香蕉伊人手机在线| 青草视频在线观看免费| 婷婷视频网| 欧美爱爱网站| 地表最强网红八月未央道具大秀| 人妻18无码人伦一区二区三区精品 | 色哟哟视频在线观看| 欧美老妇BBBBBBBBB| 少妇搡BBBB搡BBB搡18禁| 日本中文字幕视频| 欧美亚洲国产一区二区三区 | 免费播放婬乱男女婬视频国产| 手机看片1024旧版| 蜜芽成人在线| 人妻FrXXeeXXee护士| 熟女人妻人妻HD| 青青草视频在线免费观看| 91乱子伦国产乱子伦| 九九热只有精品| 男女日逼| 一区二区在线不卡| 天天添夜夜添| 中文字幕97| AV操逼网| 成人精品一区二区三区电影| a片在线观看视频| 北条麻妃AV在线播放| 黄色片网站免费观看| 亚洲精品69| jk在线观看| 骚逼操| 国产精品福利视频| 国产免费无码| caoporen| 日韩无码人妻一区二区三区| 可以免费观看的AV| 成人777777| 骚骚肥肥一区二区三区| 久久波多野结衣一区二区| 国产黄色小视频在线观看| 天天爽天天日| 永久免费av| 国产一区二区视频在线观看| 大香蕉久久爱| 第一页在线| 国产欧美综合视频一区二区在线 | 99精品国产热久久91色欲| 人人操超碰在线观看| 中文字幕一区二区三区在线观看| 黄片无码免费| 日本久久综合| 熟妇人妻中文AV| 成人一区二区三区四区| 欧美日韩国产在线观看| 成人片成人网久久蜜桃臀| 色老板免费精品无码免费视频| 国产成人精品国内自产拍免费看 | 曰韩一级A片| 欧美深夜福利视频| 91成人小视频| 久久久久久久成人| 日韩精品一区在线观看| 久久久9999| 日韩在线观看| 波多野结衣av在线观看窜天猴| 欧美激情中文字幕| 精品日韩中文字幕| 日本a在线免费观看| 中文字幕欧美视频| 内射在线| 成人免费观看视频| 操B网站| 无码精品一区二区三区在线播放 | 操逼不卡视频| 99精品一区二区| 一级A片黄色| 欧美日韩亚洲视频| 毛片毛片毛片毛片毛片毛片| 丁香五月激情视频| 国产精品9999| 色老板免费精品无码免费视频| 免费无码国产在线怀| 国产精品无码激情| 一区二区三区久久久久〖网:.〗| 蜜臀精品| 日本中文在线| 色婷婷天天操天天干| 日韩毛片在线视频x| 亚洲天堂一| 亚洲A片一区二区三区电影网| 亚洲无码色| 污网站免费在线观看| 欧美成人一区二区三区| 99色播| 欧亚一区二区| 亚洲无码久久精品| 丰满人妻精品一区二区在线| 婷婷久久综合久色综| 久久九九99| 精品九九九九九九| 欧美性猛交ⅩXXX乱大交| aaaaaa在线观看免费高清| AV天堂无码| 怡春院国产| 欧美成人视频电影无码高清| 五月天视频网| 91免费网站在线观看| 手机在线观看AV| 7777av| 日韩成人网站| 玖玖中文字幕| 精品国产区一区二| 国产色网站| 青青操B| 人人色人人操| 高清人妻无码| 日韩AV中文字幕在线播放| 免费AV在线播放| 黄色小视频免费看| 亚洲国产熟妇综合色专区| 91在线无码精品入口电车| 美日韩在线| 中日韩免费视频| 亚洲性网| 人妻无码HEYZO少妇精品| 欧美AA级毛片| 蜜臀久久99精品久久久兰草影视| 第一色影院| 奶大丰满一乱一视频一区二区三区在| 国产天堂av| 婷婷久久在线| 国产三级91| 亚洲小视频| 日韩中文字幕一区二区| A片大香蕉| 青青草激情视频| 欧美成人精品| 在线黄网站| 91人妻人人澡人人爽精品| 秋霞午夜| 黑人内射人妖| 91国在线视频| 五月天婷婷丁香网| 成人视频在线观看免费| 欧美黄色小说| 毛片9| 激情伊人五月天| 中文字幕日韩在线视频| 国产一卡二卡| 国产一级做a爱免费视频| 国产午夜视频在线| 欧美v在线| 91在线无码精品国产三年| 自慰一区二区| 91热久久| 女人天堂av| 国产日皮| 亚洲AⅤ无码一区二区波多野按摩| 国产啊啊啊啊| 婷婷久久综合久色| 久久久综合网| 蝌蚪久久| 翔田千里AV在线| 欧美综合亚洲图片综合区| 婷婷中文字幕亚洲| 久久久久一| 国产高清精品软件丝瓜软件| 伊人综合久久| 国产人人爱| 国产在线h| 欧美一级片内射| 高清无码一级片| 人人操人人摸人人爱| 亚洲午夜AV久久乱码| 亚洲精品国产精品乱码不卡√香蕉 | 天天爽天天| 久久婷婷婬片A片AAA| 日韩高清无码电影| 亚洲视频中文字母| 久久视频在线| 婷婷综合五月| 欧美在线不卡综合| 波多野结衣AV在线观看| 色婷婷在线无码精品秘人口传媒| 51黄片| 欧美一级高清片免费一级a| 色三区| 人人妻人人干| 色丁香在线| 国产AV中文| 国产黄色三级片| 人人色网站| 国产2页| 久久h| 亚洲第一黄| 簧片网站在线观看| 91老熟女| 中文字幕AV在线免费观看| 国产嫩草精品A88AV| 影音先锋久久| 亚洲欧美日韩在线| 91啦丨露脸丨熟女色啦| 黄片视频免费看| 美女被操面费网站| 99久久99久久| 亚洲视频在线播放| 日韩AV大片| 七十路の高齢熟女千代子| 成人三级电影网| 久久精品免费电影| 91av成人| 黄色av免费看| 成人AV中文字幕| 少妇一级| 亚洲无码69| 国产资源在线观看|