何为CMQ?
腾讯云消息队列(Cloud Message Queue,CMQ)是一种分布式消息队列服务,它能够提供可靠的基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)之间的收发消息,存储在可靠有效的 CMQ 队列中,防止消息丢失。 CMQ 支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。——来源以及更多内容推荐看官方文档。
之前公司内部使用rabbitMQ,但是运维调整部署全部迁移到腾讯云上,如果继续使用rabbitMQ,还需要运维自主去搭建环境,维护之类,而且经考察对rabbitMQ维护成本相比直接使用腾讯云的CQM高很多,所以最近技术部门对CMQ进行研究发现基本可以替代rabbitMQ,但是同时也发现一个比较严重的问题,使用cmq的mq功能,无法实现完全实现自动触发消息消费,因为cmq的消息监听基于长连接的,长时间没有消息推送会造成长连接断开,无法实现自动触发消息消费了。本文目的主要解决CQM自动触发消息消费。
利用spring中可以根据注解获取bean,调用对应通知方法,实现多线程自动拉取消息。
自定义注解Queue
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Component public @interface IzkQueue { String value() default ""; String queueName() default ""; }
消息处理器抽象统一接口
/ * 消息处理器抽象统一接口 */ public interface IBaseCmqHandler { / * 处理从cmq中获取的消息 * * @param queueName : 队列名 * @param message : 消息体 * @return */ boolean onMessage(String queueName, Message message); }
CMQ消息监听类
@Slf4j @Component public class CmqListener implements ApplicationContextAware, ApplicationListener<ApplicationEvent> { @Setter private ApplicationContext applicationContext; @Autowired private TaskExecutor taskExecutor; private boolean isStart = false; / * 获取所有的需要监听mq的类,以及注册的mq * @param applicationEvent */ @Override public void onApplicationEvent(ApplicationEvent applicationEvent) { HashMap<String, IBaseCmqHandler> map = new HashMap<>(16); Map<String, Object> beanMap = applicationContext.getBeansWithAnnotation(IzkQueue.class); beanMap.forEach((key, value) -> { IzkQueue annotation = value.getClass().getAnnotation(IzkQueue.class); String queue = annotation.queueName(); map.put(queue, (IBaseCmqHandler) value); }); if (!isStart) { isStart = true; if (!CollectionUtils.isEmpty(map)) { taskExecutor.execute(() -> executeQueueHandler(map)); } } } private void executeQueueHandler(HashMap<String, IBaseCmqHandler> map) { map.forEach((queueName, bean) -> { taskExecutor.execute(() -> receiveCmqMessage(queueName, bean)); }); } / * 功能描述 : 将队列与对应的消息处理器进行匹配,并进行消息消费 * * @param queueName : queue name * @param cmqHandler : 具体的消息处理器 * @return * @created 2019-07-14 16:55 */ private void receiveCmqMessage(String queueName, IBaseCmqHandler cmqHandler) { try { while (true) { // 睡眠 释放cpu资源 Thread.sleep(10); CmqQueue cmqQueue = applicationContext.getBean(queueName, CmqQueue.class); Message message = cmqQueue.receiveMsg(); if (null != message) { log.info("时间:{}, 队列:{}, 收到消息:{}", LocalDateTime.now(), queueName, message.msgBody); if (!StringUtils.isEmpty(message.msgBody) && !StringUtils.isEmpty(message.receiptHandle)) { taskExecutor.execute(() -> { try { // 处理消息 if (cmqHandler.onMessage(queueName, message)) { // 消费成功 删除消息 cmqQueue.deleteMsg(message.receiptHandle); } else { taskExecutor.execute(() -> receiveCmqMessage(queueName, cmqHandler)); } } catch (Exception e) { log.error("消息处理失败 --> 队列名:{}, 已进行自动补偿,Exception:", queueName, e); taskExecutor.execute(() -> receiveCmqMessage(queueName, cmqHandler)); } }); } } } } catch (Exception e) { log.error("消息执行失败 --> 队列名:{}, 已进行自动补偿,Exception:", queueName, e); taskExecutor.execute(() -> receiveCmqMessage(queueName, cmqHandler)); } } }
关于上述涉及到类CmqQueue是公司内部封装类,将queue队列和cmq的账号绑定,只是大概展示一下,仅供参考。
账号信息类
@Data public class MqAccount { private String host; private String port; private String username; private String password; private String vhost; private String secretId; private String secretKey; private String endpoint; private String queueEndpoint; }
CmqQueue的信息类
public class CmqQueue extends AbstractMq { private static final Logger LOGGER = LoggerFactory.getLogger(CmqQueue.class); private Account account; private Queue queue; public CmqQueue(MqAccount mqAccount, String queueName) { mqAccount = (MqAccount)Preconditions.checkNotNull(mqAccount); Preconditions.checkNotNull(queueName); queueName = this.getNameWithSuffix(queueName); this.init(mqAccount, queueName); } private void init(MqAccount mqAccount, String queueName) { this.account = new Account(mqAccount.getQueueEndpoint(), mqAccount.getSecretId(), mqAccount.getSecretKey()); ArrayList list = Lists.newArrayList(); try { this.account.listQueue(queueName, -1, -1, list); long count = list.stream().filter((name) -> { return queueName.equalsIgnoreCase(name); }).count(); if (count == 0L) { QueueMeta meta = new QueueMeta(); this.account.createQueue(queueName, meta); } else { LOGGER.warn("cmq queueName {} has exist", queueName); } this.queue = this.account.getQueue(queueName); } catch (Exception var7) { LOGGER.error("cmq createQueue error", var7); throw new RuntimeException(var7); } } public void setQueueAttr(QueueMeta meta) { try { this.queue.setQueueAttributes(meta); } catch (Exception var3) { LOGGER.error("cmq setQueueAttr error", var3); } } public String sendMsg(String msg) { try { return this.queue.sendMessage(msg); } catch (Exception var3) { LOGGER.error("cmq queuename:{},sendMsg errorinfo:{},error:", new Object[]{this.exchangeName, var3.toString(), var3}); return null; } } public List<String> batchSendMsg(List<String> msgs) { try { return this.queue.batchSendMessage(msgs); } catch (Exception var3) { LOGGER.error("cmq queuename:{},batchSendMsg errorinfo:{},error:", new Object[]{this.exchangeName, var3.toString(), var3}); return null; } } public Message receiveMsg() { Message message = null; try { message = this.queue.receiveMessage(10); } catch (Exception var3) { LOGGER.error("cmq queuename:{},receiveMsg errorinfo:{},error:", new Object[]{this.exchangeName, var3.toString(), var3}); } return message; } public List<Message> batchReceiveMsg(int numOfMsg) { try { return this.queue.batchReceiveMessage(numOfMsg, 10); } catch (Exception var3) { LOGGER.error("cmq queuename:{},batchReceiveMsg errorinfo:{},error:", new Object[]{this.exchangeName, var3.toString(), var3}); return null; } } public void deleteMsg(String receiHandle) { try { this.queue.deleteMessage(receiHandle); } catch (Exception var3) { LOGGER.error("cmq queuename:{},deleteMsg errorinfo:{},error:", new Object[]{this.exchangeName, var3.toString(), var3}); } } public void batchDeleteMsg(List<String> receiHandles) { try { this.queue.batchDeleteMessage(receiHandles); } catch (Exception var3) { LOGGER.error("cmq queuename:{},batchDeleteMsg errorinfo:{},error:", new Object[]{this.exchangeName, var3.toString(), var3}); } } } public abstract class AbstractMq { protected String exchangeName; protected String exchangeType = "topic"; public AbstractMq() { } protected String getExchangeType() { return this.exchangeType; } protected String getNameWithSuffix(String name) { return !DeveloperUtil.isLocalDebug() ? name + "_" + Util.runEvn : name + "_local"; } }
Demo案例
@IzkQueue(queueName = "queueDemo",value = "demo") public class MessageDemo implements IBaseCmqHandler { @Override public boolean onMessage(String queueName, Message message) { //todo return false; } }
总结
不将就是发现的原动力,多思考多动手。
今天的文章 CMQ——多线程实现自动拉取消息分享到此就结束了,感谢您的阅读。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/bian-cheng-ji-chu/91870.html