欢迎来到DIVCSS5查找CSS资料与学习DIV CSS布局技术!
前言
 
在实际生产环境中中,通常生产者和消费者会是两个独立的应用,这样才能通过消息队列实现了服务解耦和广播。因为此项目仅是一个案例,为了方便期间,生产和消费定义在了同一个项目中。
 
基础配置
 
pom.xml 添加依赖:
 
<!-- activemq -->
 
<dependency>
 
 <groupId>org.springframework.boot</groupId>
 
 <artifactId>spring-boot-starter-activemq</artifactId>
 
</dependency>
 
application.properties 基础配置:
 
# activemq 基础配置
 
#spring.activemq.broker-url=tcp://47.94.232.109:61616
 
# 生产环境设置密码
 
#spring.activemq.user=admin
 
#spring.activemq.password=123456
 
#spring.activemq.in-memory=true
 
#spring.activemq.pool.enabled=false
 
项目集成
 
定义生产者:
 
import javax.jms.Destination;
 
import org.springframework.beans.factory.annotation.Autowired;
 
import org.springframework.jms.core.JmsMessagingTemplate;
 
import org.springframework.stereotype.Component;
 
@Component
 
public class ActiveMQSender {
 
 @Autowired
 
 private JmsMessagingTemplate jmsTemplate; 
 
 /*
 
 * 发送消息,destination是发送到的队列,message是待发送的消息
 
 */
 
 public void sendChannelMess(Destination destination, final String message){
 
 jmsTemplate.convertAndSend(destination, message);
 
 }
 
}
 
定义消费者:
 
import org.springframework.beans.factory.annotation.Autowired;
 
import org.springframework.jms.annotation.JmsListener;
 
import org.springframework.stereotype.Service;
 
import com.itstyle.seckill.common.entity.Result;
 
import com.itstyle.seckill.common.enums.SeckillStatEnum;
 
import com.itstyle.seckill.common.redis.RedisUtil;
 
import com.itstyle.seckill.common.webSocket.WebSocketServer;
 
import com.itstyle.seckill.service.ISeckillService;
 
@Service
 
public class ActiveMQConsumer {
 
  @Autowired
 
 private ISeckillService seckillService;
 
 @Autowired
 
 private RedisUtil redisUtil; 
 
 // 使用JmsListener配置消费者监听的队列,其中text是接收到的消息
 
 @JmsListener(destination = "seckill.queue")
 
 public void receiveQueue(String message) {
 
 //收到通道的消息之后执行秒杀操作(超卖)
 
 String[] array = message.split(";"); 
 
 Result result = seckillService.startSeckilDBPCC_TWO(Long.parseLong(array[0]), Long.parseLong(array[1]));
 
 if(result.equals(Result.ok(SeckillStatEnum.SUCCESS))){
 
 WebSocketServer.sendInfo(array[0].toString(), "秒杀成功");//推送给前台
 
 }else{
 
 WebSocketServer.sendInfo(array[0].toString(), "秒杀失败");//推送给前台
 
 redisUtil.cacheValue(array[0], "ok");//秒杀结束
 
 }
 
 }
 
}
 
测试案例:
 
@ApiOperation(value="秒杀五(ActiveMQ分布式队列)",nickname="科帮网")
 
@PostMapping("/startActiveMQQueue")
 
public Result startActiveMQQueue(long seckillId){
 
 seckillService.deleteSeckill(seckillId);
 
 final long killId = seckillId;
 
 LOGGER.info("开始秒杀五");
 
 for(int i=0;i<1000;i++){
 
 final long userId = i;
 
 Runnable task = new Runnable() {
 
 @Override
 
 public void run() {
 
 if(redisUtil.getValue(killId+"")==null){
 
 Destination destination = new ActiveMQQueue("seckill.queue");
 
 //思考如何返回给用户信息ws
 
 activeMQSender.sendChannelMess(destination,killId+";"+userId);
 
 }else{
 
 //秒杀结束
 
 }
 
 }
 
 };
 
 executor.execute(task);
 
 }
 
 try {
 
 Thread.sleep(10000);
 
 redisUtil.cacheValue(killId+"", null);
 
 Long seckillCount = seckillService.getSeckillCount(seckillId);
 
 LOGGER.info("一共秒杀出{}件商品",seckillCount);
 
 } catch (InterruptedException e) {
 
 e.printStackTrace();
 
 }
 
 return Result.ok();
 
}
 
注意事项
 
spring-boot-starter-activemq 依赖即可默认采用内嵌的 ActiveMQ,这个跟 elasticsearch 是一样的,测试的小伙伴可以不用安装,注释掉相关参数,使用默认即可。
 
如果自行安装 ActiveMQ 记得配置防火墙/安全组,配置web访问密码以及连接密码。
 
在生产环境下尽量还是采用外部 activemq 服务,提高扩展性、稳定性、可维护性。

如需转载,请注明文章出处和来源网址:http://www.divcss5.com/html/h63209.shtml