SpringBoot:Event实现发布/订阅模式

如图所示支付业务中,用户支付成功之后,后续还有很多的业务流程,但是对于用户来讲是透明的,所以为了提高接口的响应速率,提高用户体验,后续操作都会选择异步执行

新区网站建设公司创新互联,新区网站设计制作,有大型网站制作公司丰富经验。已为新区上千余家提供企业网站建设服务。企业网站搭建\成都外贸网站制作要多少钱,请找那个售后服务好的新区做网站的公司定做!

异步执行方式

异步执行主体

@Service
public class OrderService {
public void orderSuccess(){

// 订单完成异步任务开启 可以再统一封装
Order order = new Order();
order.setOrderNo(String.valueOf(System.currentTimeMillis()));
Map orderSuccessServiceMap = SpringContextUtil.getBeansOfType(OrderSuccessService.class);
orderSuccessServiceMap.values().forEach(service -> {
service.orderSuccess(order);
});
}
}

异步执行接口

public interface OrderSuccessService {
/**
* 订单支付成功
* @param order
*/
public CompletableFuture orderSuccess(Order order);
}
@Slf4j
@Service
public class MerchantNoticeServiceImpl implements OrderSuccessService {
@Override
@Async("taskExecutor")
public CompletableFuture orderSuccess(Order order) {
log.info("{}商户通知:{}",Thread.currentThread(),order);
// 返回异步调用的结果
return CompletableFuture.completedFuture(true);
}
}
@Slf4j
@Service
public class MerchantNoticeServiceImpl implements OrderSuccessService {
@Override
@Async("taskExecutor")
public CompletableFuture orderSuccess(Order order) {
log.info("{}商户通知:{}",Thread.currentThread(),order);
// 返回异步调用的结果
return CompletableFuture.completedFuture(true);
}
}
@Slf4j
@Service
public class MerchantNoticeServiceImpl implements OrderSuccessService {
@Override
@Async("taskExecutor")
public CompletableFuture orderSuccess(Order order) {
log.info("{}商户通知:{}",Thread.currentThread(),order);
// 返回异步调用的结果
return CompletableFuture.completedFuture(true);
}
}

自定义线程池,线程池隔离,开启异步任务执行

@Configuration // 配置类
@EnableAsync // @Async注解能够生效
public class TaskConfiguration {
@Bean("taskExecutor")
public Executor taskExecutor(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 线程池创建时候初始化的线程数
executor.setCorePoolSize(5);
// 线程池最大的线程数,只有在缓冲队列满了之后,才会申请超过核心线程数的线程
executor.setMaxPoolSize(10);
// 用来缓冲执行任务的队列
executor.setQueueCapacity(200);
// 当超过了核心线程之外的线程,在空闲时间到达之后会被销毁
executor.setKeepAliveSeconds(60);
// 可以用于定位处理任务所在的线程池
executor.setThreadNamePrefix("taskExecutor-orderSuccess-");
// 这里采用CallerRunsPolicy策略,当线程池没有处理能力的时候,该策略会直接在execute方法的调用线程中运行被拒绝的任务;
// 如果执行程序已关闭,则会丢弃该任务
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 设置 线程池关闭 的时候 等待 所有任务都完成后,再继续 销毁 其他的 Bean,
// 这样这些 异步任务 的 销毁 就会先于 数据库连接池对象 的销毁。
executor.setWaitForTasksToCompleteOnShutdown(true);
// 该方法用来设置线程池中 任务的等待时间,如果超过这个时间还没有销毁就 强制销毁,以确保应用最后能够被关闭,而不是阻塞住。
executor.setAwaitTerminationSeconds(60);
return executor;
}
}

Spring Event实现发布/订阅模式

自定义事件:通过继承ApplicationEve​

nt,并重写构造函数,实现事件扩展。

public class OrderApplicationEvent extends ApplicationEvent {
public OrderApplicationEvent(OrderData orderData){
super(orderData);
}
}

定义事件的消息体

@Data
public class OrderData {
/**
* 订单号
*/
private String orderNo;
}

事件监听

@Slf4j
@Service
public class MerchantNoticeListener {
@Async("asyncEventTaskExecutor")
@EventListener
public CompletableFuture orderSuccess(OrderApplicationEvent event) {
log.info("{}商户通知:{}",Thread.currentThread(),event);
// 返回异步调用的结果
return CompletableFuture.completedFuture(true);
}
}
@Slf4j
@Service
public class UserNoticeListener implements ApplicationListener {
@Override
@Async("asyncEventTaskExecutor")
public void onApplicationEvent(OrderApplicationEvent event) {
log.info("{}用户通知:{}",Thread.currentThread(),event);
}
}
@Slf4j
@Service
public class UserNoticeListener implements ApplicationListener {
@Override
@Async("asyncEventTaskExecutor")
public void onApplicationEvent(OrderApplicationEvent event) {
log.info("{}用户通知:{}",Thread.currentThread(),event);
}
}
@Slf4j
@Service
public class UserNoticeListener implements ApplicationListener {
@Override
@Async("asyncEventTaskExecutor")
public void onApplicationEvent(OrderApplicationEvent event) {
log.info("{}用户通知:{}",Thread.currentThread(),event);
}
}

自定义线程池

@Configuration
@Slf4j
@EnableAsync // @Async注解能够生效
public class AsyncConfiguration implements AsyncConfigurer {
@Bean("asyncEventTaskExecutor")
public ThreadPoolTaskExecutor executor(){
//Spring封装的一个线程池
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(50);
executor.setQueueCapacity(30);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setThreadNamePrefix("asyncEventTaskExecutor--orderSuccess-");
executor.initialize();
return executor;
}

@Override
public Executor getAsyncExecutor(){
return executor();
}

/**
* 异常处理
* @return
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler(){
return (ex, method, params) -> log.error(String.format("[async] task{} error:", method), ex);
}
}

事件发布

@Service
@Slf4j
public class OrderEventService {
private final ApplicationEventPublisher applicationEventPublisher;

public OrderEventService(ApplicationEventPublisher applicationEventPublisher){
this.applicationEventPublisher = applicationEventPublisher;
}
public void success(){
OrderData orderData = new OrderData();
orderData.setOrderNo(String.valueOf(System.currentTimeMillis()));
// 消息
OrderApplicationEvent orderApplicationEvent = new OrderApplicationEvent(orderData);
// 发布事件
applicationEventPublisher.publishEvent(orderApplicationEvent);
}
}

写在最后:不管是否基于spring boot 的发布订阅模型,最终都是开启了线程执行任务,和使用第三方的MQ消息组件,问题在于重启服务器或者未知原因崩溃的时候,消息的恢复机制要自行处理。

建议使用在一些边缘业务,比如记录日志,这些要求没有那么高的业务。

网页标题:SpringBoot:Event实现发布/订阅模式
网页URL:http://www.hantingmc.com/qtweb/news33/117233.html

网站建设、网络推广公司-创新互联,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联