1. 引言
在设备联动中,有些场景需要保持设备继续工作一段时间再关机。比如在厨房场景下,存在燃气灶和烟机的联动场景,以燃气灶为条件,烟机为结果。当打开燃气灶后,烟机自动打开;当关掉燃气灶后,烟机需要延时3分钟再关机。
物联网设备,打开或者关闭燃气灶的时候,设备的状态会进行上报,当云端收到状态变化信号后,就能去触发烟机的动作。对于设备的联动链路可以阅读之前的文章:
那对于延时关机,就不能收到状态变化后,直接就去控制设备啦。这时候就需要定时服务来介入。
2. 延时关机的实现方式
2.1 自建定时服务实现
在微服务的时候,很多的项目根据业务的不同划分了单独的服务,因此,在物联网领域就需要有个定时服务,这个服务就只做定时;当要做延时时,就向定时服务中插入一条带有未来时间的数据,当定时服务轮询到该数据时,就执行回调,再执行设备控制。具体的时序图如下:

在这张图中,共有4个模块,条件-设备作为条件设备,进行状态上报;结果-设备作为触发设备,是被控制设备;主服务,即进行逻辑判读和设备控制的服务;定时作为任务轮询,到期回调。
具体的判断逻辑:当时设备-条件上报关机到主服务,主服务进行逻辑判断是否需要延时关机,如果不需要就直接控制设备,当需要延时关机时,就插入预约任务到到定时任务,主服务等待定时服务的回调,当收到回调后,再去控制结果-设备。
2.2 RocketMQ中间件实现
RocketMQ作为分布式消息中间件,深受各个行业的广泛使用,之前就介绍过物联网中RocketMQ的使用。在之前的介绍中,主要集中于消费端那一块,但其生产端的功能也很多,比如可以发送延时消息。因此当设备需要延时关机时,就可以利用到延时消息功能,在生产端输入控制参数的时候,加上一个时间戳,当这个时间戳到了之后,消费者才进行设备控制,这样就实现了设备的延时控制。
2.2.1 生成端demo
为了发送一个延时消息,RocketMQ提供了delayTimeLevel属性,对于生产端的实现方式:
其工作流程如下:

其代码demo如下:
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; public class DelayedMessageProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("example_group"); producer.setNamesrvAddr("localhost:9876"); producer.start(); // Create a message with a 10 second delay Message message = new Message("example_topic", "Hello, RocketMQ!".getBytes(RemotingHelper.DEFAULT_CHARSET)); message.setDelayTimeLevel(3); // Send the message producer.send(message); producer.shutdown(); } }
在RocketMQ中提供了多个延时级别:
1:1s 2:5s 3:10s 4:30s 5:1m 6:2m 7:3m 8:4m 9:5m 10:6m 11:7m 12:8m 13:9m 14:10m 15:20m 16:30m 17:1h
如果要发送一个延迟 3 分钟的消息,可以设置 DelayTimeLevel 属性为 7,即message.setDelayTimeLevel(7)。
2.2.2 消费端demo
在消费者端,就需要去监听,延时消息是否已经到了,如果到了就进行消费,如果没有到就进行等待。
其工作流程如下:

其代码demo如下:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.consumer.listener.*; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; public class DelayedMessageConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group"); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("example_topic", "*"); // Set the message listener consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
// Check if message is delayed if (message.getProperty("DELAY_TIME_LEVEL") != null) {
long delayTimeLevel = Long.parseLong(message.getProperty("DELAY_TIME_LEVEL")); long diff = System.currentTimeMillis() - message.getStoreTimestamp(); if (diff >= delayTimeLevel * 1000) {
// Process the message System.out.println("Received delayed message: " + new String(message.getBody())); // todo 实现延时控制设备 } else {
// Re-consume the message later return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } else {
// Process the message System.out.println("Received message: " + new String(message.getBody())); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer started"); } }
在上面的代码中,创建一个DefaultMQPushConsumer的实例,并且订阅监听topic;在registerMessageListener方法中进行逻辑判断,延时时间是否已经到了,如果到了就进行消费,如果没有到就进行等待。
3. 结尾
综上所述,介绍了两种实现延时关机的方式,第一种需要自己建立一个单独的定时服务,需要自己开发,难度较大,但是灵活性很高,可以自定义规则;第二种是利用了RocketMQ中延时消息方式实现的,其延时时间是固定的,不支持自定义。对于量很大的设备控制,其上的两种方式都需要搭建集群方式,保障系统的稳定性。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/bian-cheng-ji-chu/4392.html