目录
1.用过哪些MQ,怎么用的,和其他mq比较有什么优缺点,MQ的连接是线程安全的吗?
9.消息中间件如何保证消息不被重复消费?或者说,如何保证消息消费的幂等性?(必考)
13.Kafka消息是采用Pull模式,还是Push模式?(必考)
19.Netty和kafka都是用了零拷贝,那么零拷贝究竟是怎么实现的?底层原理是?
21.如何解决消息队列的延时以及过期失效问题?消息队列满了以后该怎么处理?有几百万消息持续积压几小时,说说怎么解决?
22.如果让你写一个消息队列,该如何进行架构设计?说一下你的思路。
干货分享,感谢您的阅读!备注:针对基本问题做一些基本的总结,不是详细解答!
1.用过哪些MQ,怎么用的,和其他mq比较有什么优缺点,MQ的连接是线程安全的吗?
可以将常见的消息中间件的特性、优缺点总结成表格,这样可以更加直观地进行比较:
MQ | 主要特性 | 优点 | 缺点 |
---|---|---|---|
Kafka | - 分布式系统,支持自动负载均衡 - 快速持久化,支持批量操作 - 高吞吐量,每秒可处理百万级消息 - 支持数据复制和容错 |
- 高性能,支持多种客户端语言 - 支持海量消息堆积 - 良好的扩展性和容错性 - 社区活跃,广泛使用 |
- 超过64个队列时负载飙升 - 短轮询方式影响实时性 - 消费失败不支持重试 - 宕机后可能导致消息乱序 |
RabbitMQ | - 基于AMQP协议,消息路由灵活 - 支持消息集群和高可用性 - 支持多种协议和编程语言 - 提供丰富的插件和管理界面 |
- 高并发,稳定性强 - 支持复杂路由 - 消息确认机制和持久化 - 管理界面友好 |
- 性能不如Kafka - 学习曲线较陡峭 - 消息封装大,中央节点增加延迟 |
RocketMQ | - 支持分布式队列模型 - 提供丰富的消息拉取模式 - 高效订阅,支持亿级消息堆积 - 实现了严格的消息顺序 |
- 性能高,支持大量消息堆积 - 提供集群和广播消费模式 - 支持主从和高可用 - 社区更新快,适合国内场景 |
- 支持的客户端语言少 - 缺少Web管理界面 - 社区关注度不及Kafka和RabbitMQ |
ActiveMQ | - 完全支持JMS规范 - 提供广泛的连接协议支持 - 支持多种客户端语言和协议 - 提供灵活的持久化和安全机制 |
- 跨平台,支持JDBC持久化 - 支持自动重连和错误重试 - 监控和管理工具完善 - 界面友好,易于使用 |
- 社区活跃度较低 - 可能出现丢失消息的情况 - 维护集中在新版本Apollo,5.x维护较少 |
对于选择哪种消息中间件,关键在于项目需求:
- 如果需要处理高吞吐量的日志数据流,Kafka是首选。
- 如果需要可靠的消息传递和复杂的消息路由,RabbitMQ则更适合。
- RocketMQ则适用于需要高可靠性和严格消息顺序的场景。
- ActiveMQ在传统企业应用和JMS兼容场景下具有优势。
在实际应用中,了解每种MQ的特性及其适用场景是至关重要的,可以根据需求选择最合适的解决方案。
MQ连接的线程安全性
不同的MQ实现中,连接的线程安全性可能有所不同。一般来说,大多数MQ客户端的连接是线程安全的,例如Kafka和RabbitMQ的官方客户端都支持多线程访问。也就是说,多个线程可以共享同一个连接对象,而不会导致数据冲突或不一致的问题。但在具体实现中,仍然建议查看官方文档,确认所使用的客户端库是否支持线程安全。
2.消息中间件的组成有哪些?
组件名称 | 作用 | 特点 |
---|---|---|
消息生产者(Producer) | 创建并发送消息到消息中间件 | 与业务逻辑紧密结合,根据需求选择不同类型的消息(如持久消息、瞬时消息等) |
消息队列/主题(Message Queue/Topic) | 暂存消息,确保消息在消费前安全存储 | 消息队列用于点对点通信,确保消息一次消费;主题用于发布-订阅模式,允许多个订阅者消费 |
消息消费者(Consumer) | 接收并处理消息 | 可以同步或异步处理消息,与多个生产者/消费者协同工作 |
消息代理/服务器(Message Broker/Server) | 作为中转站,接收、存储、路由和分发消息 | 支持多种协议、路由规则和消息格式,具备高可用性和扩展性 |
消息存储(Message Store) | 提供持久化存储,确保消息不丢失 | 持久化消息适用于需要高可靠性和数据回溯的场景 |
消息路由(Message Routing) | 决定消息传递路径 | 路由策略可以是静态(预定义)或动态(基于消息内容或条件) |
管理控制台(Management Console) | 提供可视化界面进行管理和监控 | 集成监控、报警、日志分析功能,方便运维人员管理 |
安全组件(Security Components) | 确保消息传输和存储安全 | 包括身份认证、权限控制、加密传输,满足数据隐私和合规性要求 |
3.消息中间件模式分类有哪些?(必考)
模式名称 | 特点 | 应用场景 |
---|---|---|
点对点模式(P2P) | 消息通过队列传递,每条消息只能被一个消费者消费一次,消费后消息从队列中删除。 | 任务处理、工作队列(如订单处理、任务分发) |
发布-订阅模式(Pub/Sub) | 消息通过主题发布,所有订阅该主题的消费者都能接收到消息。一个消息可以被多个消费者同时消费。 | 广播消息、事件通知(如新闻推送、日志收集) |
请求-响应模式(Request-Response) | 生产者发送请求消息,消费者处理后返回响应消息。用于同步通信,确保请求方可以获取处理结果。 | 同步任务执行、远程过程调用(RPC) |
流式处理模式(Streaming Processing) | 消息作为数据流的一部分连续发布,消费者以流的形式实时处理数据。 | 实时日志分析、金融交易处理 |
事务消息模式(Transactional Messaging) | 支持事务的消息传递,确保一组消息要么全部成功发送,要么全部回滚,保证消息传递的一致性。 | 分布式事务、银行转账 |
延迟消息模式(Delayed Messaging) | 允许消息在指定时间后被消费,用于需要延迟处理的场景。 | 定时任务、延迟执行的业务逻辑 |
死信队列模式(DLQ) | 存储无法被正常消费的消息,方便后续进行分析或人工干预处理。 | 错误消息处理、重试机制 |
负载均衡模式(Load Balancing) | 多个消费者从同一队列中消费消息,负载均衡策略确保每个消费者获得相对均衡的处理任务。 | 高并发任务处理、均衡工作负载 |
4.分析消息中间件的普遍优势(必考)
- 系统解耦
交互系统之间没有直接的调用关系,只是通过消息传输,故系统侵入性不强,耦合度低。
- 提高系统响应时间
例如原来的一套逻辑,完成支付可能涉及先修改订单状态、计算会员积分、通知物流配送几个逻辑才能完成;通过MQ架构设计,就可将紧急重要(需要立刻响应)的业务放到该调用方法中,响应要求不高的使用消息队列,放到MQ队列中,供消费者处理。
- 为大数据处理架构提供服务
通过消息作为整合,大数据背景下,消息队列还与实时处理架构整合,为数据处理提供性能支持。
- Java消息服务——JMS
Java消息服务(Java Message Service,JMS)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。
JMS中的P2P和Pub/Sub消息模式:点对点(point to point, queue)与发布订阅(publish/subscribe,topic)最初是由JMS定义的。这两种模式主要区别或解决的问题就是发送到队列的消息能否重复消费(多订阅)。
5.消息中间件常用协议分析
以下是常用的几种消息中间件协议的分析:
协议 | 简介 | 特点 | 应用场景 |
---|---|---|---|
AMQP | Advanced Message Queuing Protocol (AMQP) 是一种开放标准的应用层协议,旨在提供统一的消息服务,允许不同平台和语言的客户端与消息中间件之间进行可靠的通信。 | - 可靠性:提供消息确认、事务性消息、持久性支持。 - 通用性:支持多种消息传递模式,如Direct、Topic、Fanout。 |
适用于金融交易系统、物流平台等需要高可靠性和跨平台通信的系统。 |
MQTT | MQTT (Message Queuing Telemetry Transport) 是一种轻量级的即时通讯协议,设计初衷是为低带宽、不稳定网络下的设备通信提供支持。 | - 轻量级:占用带宽小,适合资源受限的设备。 - 支持PUSH机制:适合移动端应用和嵌入式系统。 |
物联网(IoT)设备通信,如智能家居、环境监测传感器网络。 |
STOMP | STOMP (Streaming Text Orientated Messaging Protocol) 是一种简单的基于文本的消息协议,旨在提供一个通用的消息传输方式,便于不同语言的客户端与消息代理进行交互。 | - 简单易用:基于文本格式,易于实现和调试。 - 通用性强:支持多种语言客户端,适用于跨平台的消息传递。 |
适用于快速实现消息通信的应用,如实时数据推送、WebSocket通信。 |
XMPP | XMPP (Extensible Messaging and Presence Protocol) 是一种基于XML的协议,广泛用于即时消息传递和在线状态检测。它的设计目标是提供一个开放的、可扩展的通讯协议。 | - 可扩展性:基于XML,支持广泛的扩展功能。 - 高安全性:支持加密和认证机制,适合安全性要求高的应用场景。 - 通用性:兼容性强,适用于服务器之间的准即时操作。 |
适用于即时通讯系统、在线状态检测,如Google Talk(早期)、Jabber等。 |
不同的协议适用于不同的场景。AMQP更适合需要高可靠性和复杂消息传递模式的系统;MQTT在物联网和移动端应用中表现出色;STOMP因其简单易用的特点适合轻量级应用;XMPP则因其可扩展性和安全性,在即时通讯和在线状态检测方面具有优势。在实际应用中,选择合适的协议需要根据具体的需求和系统架构来决定。
6.消息队列应用场景分析(必考)
在实际应用中常用的使用场景:异步处理、应用解耦、流量削锋和消息通讯四个场景
场景一:异步处理
场景说明:用户注册后,需要发注册邮件和注册短信。传统的做法有两种 1.串行的方式;2.并行方式
- 串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端。
- 并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间。
假设三个业务节点每个使用50毫秒钟,不考虑网络等其他开销,则串行方式的时间是150毫秒,并行的时间可能是100毫秒。
因为CPU在单位时间内处理的请求数是一定的,假设CPU1秒内吞吐量是100次。则串行方式1秒内CPU可处理的请求量是7次(1000/150),并行方式处理的请求量是10次(1000/100)
小结:如以上案例描述,传统的方式系统的性能(并发量,吞吐量,响应时间)会有瓶颈。如何解决这个问题呢?
引入消息队列,将不是必须的业务逻辑,异步处理。改造后的架构如下:
按照以上约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20 QPS。比串行提高了3倍,比并行提高了两倍。
场景二:应用解耦
场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。假如库存系统无法访问,则订单减库存将失败,从而导致订单失败,缺点在于订单系统与库存系统耦合。
如何解决以上问题呢?引入应用消息队列后的方案,如下图:
- 订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功
- 库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作
假如:在下单时库存系统不能正常使用,也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦。
场景三:流量削锋
应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列:可以控制活动的人数、可以缓解短时间内高流量压垮应用。用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面,秒杀业务根据消息队列中的请求信息,再做后续处理。
场景四:日志处理
日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题。架构简化如下,
- 日志采集客户端:负责日志数据采集,定时写受写入Kafka队列
- Kafka消息队列:负责日志数据的接收,存储和转发
- 日志处理应用:订阅并消费kafka队列中的日志数据
查看新浪kafka日志处理应用案例:转自(http://cloud.51cto.com/art/201507/484338.htm)
- (1)Kafka:接收用户日志的消息队列
- (2)Logstash:做日志解析,统一成JSON输出给Elasticsearch
- (3)Elasticsearch:实时日志分析服务的核心技术,一个schemaless,实时的数据存储服务,通过index组织数据,兼具强大的搜索和统计功能
- (4)Kibana:基于Elasticsearch的数据可视化组件,超强的数据可视化能力是众多公司选择ELK stack的重要原因
场景五:消息通讯
消息通讯是指,消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等
- 点对点通讯:客户端A和客户端B使用同一队列,进行消息通讯。
- 聊天室通讯:客户端A,客户端B,客户端N订阅同一主题,进行消息发布和接收。实现类似聊天室效果。
以上实际是消息队列的两种消息模式,点对点或发布订阅模式。模型为示意图,供参考。
7.消息中间件数据如何保证不丢失?(必考)
数据的丢失问题,可能出现在生产者、MQ、消费者中,咱们从 RabbitMQ 和 Kafka 分别来分析一下吧。
RabbitMQ
1.生产者弄丢了数据
生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。
此时可以选择用 RabbitMQ 提供的事务功能,就是生产者发送数据之前开启 RabbitMQ 事务channel.txSelect
,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback
,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit
。
// 开启事务 channel.txSelect try { // 这里发送消息 } catch (Exception e) { channel.txRollback // 这里再次重发这条消息 } // 提交事务 channel.txCommit
但是问题是,RabbitMQ 事务机制(同步)一搞,基本上吞吐量会下来,因为太耗性能。
所以一般来说,如果你要确保说写 RabbitMQ 的消息别丢,可以开启 confirm
模式,在生产者那里设置开启 confirm
模式之后,你每次写的消息都会分配一个唯一的 id,然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack
消息,告诉你说这个消息 ok 了。如果 RabbitMQ 没能处理这个消息,会回调你的一个 nack
接口,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。
事务机制和 confirm
机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm
机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息 RabbitMQ 接收了之后会异步回调你的一个接口通知你这个消息接收到了。
所以一般在生产者这块避免数据丢失,都是用 confirm
机制的。
2.RabbitMQ 弄丢了数据
就是 RabbitMQ 自己弄丢了数据,这个你必须开启 RabbitMQ 的持久化,就是消息写入之后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。除非极其罕见的是,RabbitMQ 还没持久化,自己就挂了,可能导致少量数据丢失,但是这个概率较小。
设置持久化有两个步骤:
- 创建 queue 的时候将其设置为持久化
这样就可以保证 RabbitMQ 持久化 queue 的数据,但是它是不会持久化 queue 里的数据的。 - 第二个是发送消息的时候将消息的
deliveryMode
设置为 2
就是将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。
必须要同时设置这两个持久化才行,RabbitMQ 哪怕是挂了,再次重启,也会从磁盘上重启恢复 queue,恢复这个 queue 里的数据。
注意,哪怕是你给 RabbitMQ 开启了持久化机制,也有一种可能,就是这个消息写到了 RabbitMQ 中,但是还没来得及持久化到磁盘上,结果不巧,此时 RabbitMQ 挂了,就会导致内存里的一点点数据丢失。
所以,持久化可以跟生产者那边的 confirm
机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者 ack
了,所以哪怕是在持久化到磁盘之前,RabbitMQ 挂了,数据丢了,生产者收不到 ack
,你也是可以自己重发的。
3.消费端弄丢了数据
RabbitMQ 如果丢失了数据,主要是因为你消费的时候,刚消费到,还没处理,结果进程挂了,比如重启了,那么就尴尬了,RabbitMQ 认为你都消费了,这数据就丢了。
这个时候得用 RabbitMQ 提供的 ack
机制,简单来说,就是你必须关闭 RabbitMQ 的自动 ack
,可以通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里 ack
一把。这样的话,如果你还没处理完,不就没有 ack
了?那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。
Kafka
1.消费端弄丢了数据
唯一可能导致消费者弄丢数据的情况,就是说,你消费到了这个消息,然后消费者那边自动提交了 offset,让 Kafka 以为你已经消费好了这个消息,但其实你才刚准备处理这个消息,你还没处理,你自己就挂了,此时这条消息就丢咯。
这不是跟 RabbitMQ 差不多吗,大家都知道 Kafka 会自动提交 offset,那么只要关闭自动提交 offset,在处理完之后自己手动提交 offset,就可以保证数据不会丢。但是此时确实还是可能会有重复消费,比如你刚处理完,还没提交 offset,结果自己挂了,此时肯定会重复消费一次,自己保证幂等性就好了。
生产环境碰到的一个问题,就是说我们的 Kafka 消费者消费到了数据之后是写到一个内存的 queue 里先缓冲一下,结果有的时候,你刚把消息写入内存 queue,然后消费者会自动提交 offset。然后此时我们重启了系统,就会导致内存 queue 里还没来得及处理的数据就丢失了。
2.Kafka 弄丢了数据
这块比较常见的一个场景,就是 Kafka 某个 broker 宕机,然后重新选举 partition 的 leader。大家想想,要是此时其他的 follower 刚好还有些数据没有同步,结果此时 leader 挂了,然后选举某个 follower 成 leader 之后,不就少了一些数据?这就丢了一些数据啊。
生产环境也遇到过,我们也是,之前 Kafka 的 leader 机器宕机了,将 follower 切换为 leader 之后,就会发现说这个数据就丢了。
所以此时一般是要求起码设置如下 4 个参数:
- 给 topic 设置
replication.factor
参数:这个值必须大于 1,要求每个 partition 必须有至少 2 个副本。 - 在 Kafka 服务端设置
min.insync.replicas
参数:这个值必须大于 1,这个是要求一个 leader 至少感知到有至少一个 follower 还跟自己保持联系,没掉队,这样才能确保 leader 挂了还有一个 follower 吧。 - 在 producer 端设置
acks=all
:这个是要求每条数据,必须是写入所有 replica 之后,才能认为是写成功了。 - 在 producer 端设置
retries=MAX
(很大很大很大的一个值,无限次重试的意思):这个是要求一旦写入失败,就无限重试,卡在这里了。
我们生产环境就是按照上述要求配置的,这样配置之后,至少在 Kafka broker 端就可以保证在 leader 所在 broker 发生故障,进行 leader 切换时,数据不会丢失。
3.生产者会不会弄丢数据?
如果按照上述的思路设置了 acks=all
,一定不会丢,要求是,你的 leader 接收到消息,所有的 follower 都同步到了消息之后,才认为本次写成功了。如果没满足这个条件,生产者会自动不断的重试,重试无限次。
8.消息中间件如何保证消息按顺序执行?(必考)
我举个例子,我们以前做过一个 mysql binlog
同步的系统,压力还是非常大的,日同步数据要达到上亿,就是说数据从一个 mysql 库原封不动地同步到另一个 mysql 库里面去(mysql -> mysql)。常见的一点在于说比如大数据 team,就需要同步一个 mysql 库过来,对公司的业务系统的数据做各种复杂的操作。
你在 mysql 里增删改一条数据,对应出来了增删改 3 条 binlog
日志,接着这三条 binlog
发送到 MQ 里面,再消费出来依次执行,起码得保证人家是按照顺序来的吧?不然本来是:增加、修改、删除;你楞是换了顺序给执行成删除、修改、增加,不全错了么。
本来这个数据同步过来,应该最后这个数据被删除了;结果你搞错了这个顺序,最后这个数据保留下来了,数据同步就出错了。
先看看顺序会错乱的俩场景:
- RabbitMQ:一个 queue,多个 consumer。比如,生产者向 RabbitMQ 里发送了三条数据,顺序依次是 data1/data2/data3,压入的是 RabbitMQ 的一个内存队列。有三个消费者分别从 MQ 中消费这三条数据中的一条,结果消费者2先执行完操作,把 data2 存入数据库,然后是 data1/data3。这不明显乱了。
- Kafka:比如说我们建了一个 topic,有三个 partition。生产者在写的时候,其实可以指定一个 key,比如说我们指定了某个订单 id 作为 key,那么这个订单相关的数据,一定会被分发到同一个 partition 中去,而且这个 partition 中的数据一定是有顺序的。
消费者从 partition 中取出来数据的时候,也一定是有顺序的。到这里,顺序还是 ok 的,没有错乱。接着,我们在消费者里可能会搞多个线程来并发处理消息。因为如果消费者是单线程消费处理,而处理比较耗时的话,比如处理一条消息耗时几十 ms,那么 1 秒钟只能处理几十条消息,这吞吐量太低了。而多个线程并发跑的话,顺序可能就乱掉了。
解决方案
RabbitMQ
拆分多个 queue,每个 queue 一个 consumer,就是多一些 queue 而已,确实是麻烦点;或者就一个 queue 但是对应一个 consumer,然后这个 consumer 内部用内存队列做排队,然后分发给底层不同的 worker 来处理。
Kafka
- 一个 topic,一个 partition,一个 consumer,内部单线程消费,单线程吞吐量太低,一般不会用这个。
- 分配内存队列(Memory Queue):为了解决多线程处理中的顺序问题,可以在
Consumer
内部分配多个内存队列,每个内存队列处理一个key
,从而保证具有相同key
的消息始终进入同一个内存队列。然后,再由各自的线程消费内存队列中的消息,确保消息处理顺序一致。
9.消息中间件如何保证消息不被重复消费?或者说,如何保证消息消费的幂等性?(必考)
回答这个问题,首先你别听到重复消息这个事儿,就一无所知吧,你先大概说一说可能会有哪些重复消费的问题。
首先,比如 RabbitMQ、RocketMQ、Kafka,都有可能会出现消息重复消费的问题,正常。因为这问题通常不是 MQ 自己保证的,是由我们开发来保证的。挑一个 Kafka 来举个例子,说说怎么重复消费吧。
Kafka 实际上有个 offset 的概念,就是每个消息写进去,都有一个 offset,代表消息的序号,然后 consumer 消费了数据之后,每隔一段时间(定时定期),会把自己消费过的消息的 offset 提交一下,表示“我已经消费过了,下次我要是重启啥的,你就让我继续从上次消费到的 offset 来继续消费吧”。
但是凡事总有意外,比如我们之前生产经常遇到的,就是你有时候重启系统,看你怎么重启了,如果碰到点着急的,直接 kill 进程了,再重启。这会导致 consumer 有些消息处理了,但是没来得及提交 offset,尴尬了。重启之后,少数消息会再次消费一次。
举个栗子。
有这么个场景。数据 1/2/3 依次进入 kafka,kafka 会给这三条数据每条分配一个 offset,代表这条数据的序号,我们就假设分配的 offset 依次是 152/153/154。消费者从 kafka 去消费的时候,也是按照这个顺序去消费。假如当消费者消费了 offset=153
的这条数据,刚准备去提交 offset 到 zookeeper,此时消费者进程被重启了。那么此时消费过的数据 1/2 的 offset 并没有提交,kafka 也就不知道你已经消费了 offset=153
这条数据。那么重启之后,消费者会找 kafka 说,嘿,哥儿们,你给我接着把上次我消费到的那个地方后面的数据继续给我传递过来。由于之前的 offset 没有提交成功,那么数据 1/2 会再次传过来,如果此时消费者没有去重的话,那么就会导致重复消费。
如果消费者干的事儿是拿一条数据就往数据库里写一条,会导致说,你可能就把数据 1/2 在数据库里插入了 2 次,那么数据就错啦。
其实重复消费不可怕,可怕的是你没考虑到重复消费之后,怎么保证幂等性。
举个例子吧。假设你有个系统,消费一条消息就往数据库里插入一条数据,要是你一个消息重复两次,你不就插入了两条,这数据不就错了?但是你要是消费到第二次的时候,自己判断一下是否已经消费过了,若是就直接扔了,这样不就保留了一条数据,从而保证了数据的正确性。
一条数据重复出现两次,数据库里就只有一条数据,这就保证了系统的幂等性。
幂等性,通俗点说,就一个数据,或者一个请求,给你重复来多次,你得确保对应的数据是不会改变的,不能出错。
所以第二个问题来了,怎么保证消息队列消费的幂等性?
其实还是得结合业务来思考,我这里给几个思路:
- 比如你拿个数据要写库,你先根据主键查一下,如果这数据都有了,你就别插入了,update 一下好吧。
- 比如你是写 Redis,那没问题了,反正每次都是 set,天然幂等性。
- 比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的 id,类似订单 id 之类的东西,然后你这里消费到了之后,先根据这个 id 去比如 Redis 里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个 id 写 Redis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。
- 比如基于数据库的唯一键来保证重复数据不会重复插入多条。因为有唯一键约束了,重复数据插入只会报错,不会导致数据库中出现脏数据。
当然,如何保证 MQ 的消费是幂等性的,需要结合具体的业务来看。
10.消息中间件如何保证消息队列的高可用?(必考)
其实应该问的是 MQ 的高可用性怎么保证?这样就是你用过哪个 MQ,你就说说你对那个 MQ 的高可用性的理解。
RabbitMQ 的高可用性
RabbitMQ 是比较有代表性的,因为是基于主从(非分布式)做高可用性的,我们就以 RabbitMQ 为例子讲解第一种 MQ 的高可用性怎么实现。
RabbitMQ 有三种模式:单机模式、普通集群模式、镜像集群模式。
单机模式
单机模式,就是 Demo 级别的,一般就是你本地启动了玩玩儿的😄,没人生产用单机模式。
普通集群模式(无高可用性)
普通集群模式,意思就是在多台机器上启动多个 RabbitMQ 实例,每个机器启动一个。你创建的 queue,只会放在一个 RabbitMQ 实例上,但是每个实例都同步 queue 的数据(数据可以认为是 queue 的一些配置信息,通过数据,可以找到 queue 所在实例)。你消费的时候,实际上如果连接到了另外一个实例,那么那个实例会从 queue 所在实例上拉取数据过来。
这种方式确实很麻烦,也不怎么好,没做到所谓的分布式,就是个普通集群。因为这导致你要么消费者每次随机连接一个实例然后拉取数据,要么固定连接那个 queue 所在实例消费数据,前者有数据拉取的开销,后者导致单实例性能瓶颈。
而且如果那个放 queue 的实例宕机了,会导致接下来其他实例就无法从那个实例拉取,如果你开启了消息持久化,让 RabbitMQ 落地存储消息的话,消息不一定会丢,得等这个实例恢复了,然后才可以继续从这个 queue 拉取数据。
所以这个事儿就比较尴尬了,这就没有什么所谓的高可用性,这方案主要是提高吞吐量的,就是说让集群中多个节点来服务某个 queue 的读写操作。
镜像集群模式(高可用性)
这种模式,才是所谓的 RabbitMQ 的高可用模式。跟普通集群模式不一样的是,在镜像集群模式下,你创建的 queue,无论数据还是 queue 里的消息都会存在于多个实例上,就是说,每个 RabbitMQ 节点都有这个 queue 的一个完整镜像,包含 queue 的全部数据的意思。然后每次你写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue 上。
那么如何开启这个镜像集群模式呢?其实很简单,RabbitMQ 有很好的管理控制台,就是在后台新增一个策略,这个策略是镜像集群模式的策略,指定的时候是可以要求数据同步到所有节点的,也可以要求同步到指定数量的节点,再次创建 queue 的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。
这样的话,好处在于,你任何一个机器宕机了,没事儿,其它机器(节点)还包含了这个 queue 的完整数据,别的 consumer 都可以到其它节点上去消费数据。坏处在于,第一,这个性能开销也太大了吧,消息需要同步到所有机器上,导致网络带宽压力和消耗很重!第二,这么玩儿,不是分布式的,就没有扩展性可言了,如果某个 queue 负载很重,你加机器,新增的机器也包含了这个 queue 的所有数据,并没有办法线性扩展你的 queue。你想,如果这个 queue 的数据量很大,大到这个机器上的容量无法容纳了,此时该怎么办呢?
Kafka 的高可用性
Kafka 一个最基本的架构认识:由多个 broker 组成,每个 broker 是一个节点;你创建一个 topic,这个 topic 可以划分为多个 partition,每个 partition 可以存在于不同的 broker 上,每个 partition 就放一部分数据。
这就是天然的分布式消息队列,就是说一个 topic 的数据,是分散放在多个机器上的,每个机器就放一部分数据。
实际上 RabbmitMQ 之类的,并不是分布式消息队列,它就是传统的消息队列,只不过提供了一些集群、HA(High Availability, 高可用性) 的机制而已,因为无论怎么玩儿,RabbitMQ 一个 queue 的数据都是放在一个节点里的,镜像集群下,也是每个节点都放这个 queue 的完整数据。
Kafka 0.8 以前,是没有 HA 机制的,就是任何一个 broker 宕机了,那个 broker 上的 partition 就废了,没法写也没法读,没有什么高可用性可言。
比如说,我们假设创建了一个 topic,指定其 partition 数量是 3 个,分别在三台机器上。但是,如果第二台机器宕机了,会导致这个 topic 的 1/3 的数据就丢了,因此这个是做不到高可用的。
Kafka 0.8 以后,提供了 HA 机制,就是 replica(复制品) 副本机制。每个 partition 的数据都会同步到其它机器上,形成自己的多个 replica 副本。所有 replica 会选举一个 leader 出来,那么生产和消费都跟这个 leader 打交道,然后其他 replica 就是 follower。写的时候,leader 会负责把数据同步到所有 follower 上去,读的时候就直接读 leader 上的数据即可。只能读写 leader?很简单,要是你可以随意读写每个 follower,那么就要 care 数据一致性的问题,系统复杂度太高,很容易出问题。Kafka 会均匀地将一个 partition 的所有 replica 分布在不同的机器上,这样才可以提高容错性。
这么搞,就有所谓的高可用性了,因为如果某个 broker 宕机了,没事儿,那个 broker上面的 partition 在其他机器上都有副本的。如果这个宕机的 broker 上面有某个 partition 的 leader,那么此时会从 follower 中重新选举一个新的 leader 出来,大家继续读写那个新的 leader 即可。这就有所谓的高可用性了。
- 写数据的时候,生产者就写 leader,然后 leader 将数据落地写本地磁盘,接着其他 follower 自己主动从 leader 来 pull 数据。一旦所有 follower 同步好数据了,就会发送 ack 给 leader,leader 收到所有 follower 的 ack 之后,就会返回写成功的消息给生产者。(当然,这只是其中一种模式,还可以适当调整这个行为)
- 消费的时候,只会从 leader 去读,但是只有当一个消息已经被所有 follower 都同步成功返回 ack 的时候,这个消息才会被消费者读到。
看到这里,相信你大致明白了 Kafka 是如何保证高可用机制的了,对吧?不至于一无所知,现场还能给面试官画画图。要是遇上面试官确实是 Kafka 高手,深挖了问,那你只能说不好意思,太深入的你没研究过。
11.Kafka 如何保证可靠性?(必考)
Kafka 在设计上提供了多种机制来保证消息传递的可靠性。以下是 Kafka 通过不同层面来保障可靠性的具体原理说明:
数据复制(Replication)
- 原理:
- Kafka 的每个主题(Topic)可以配置多个分区(Partition),每个分区的数据会在多个 Kafka Broker 上进行复制。每个分区有一个主副本(Leader)和多个从副本(Follower)。
- Leader 负责处理生产者和消费者的读写请求,Follower 副本同步 Leader 的数据。
- 当 Leader 发生故障时,Kafka 会从 Follower 中选举一个新的 Leader,确保数据的高可用性和可靠性。
- 可靠性保证:即使某个 Broker 发生故障,消息仍然可以从其他副本中读取,避免数据丢失。
消息确认机制(Acknowledgments, acks)
- 原理:
- 生产者在发送消息时,可以配置消息的确认级别(acks)。常见的配置包括:
acks=0
: 不等待任何确认,生产者发送消息后即认为成功,最低可靠性。acks=1
: 只等待 Leader 副本的确认,即认为消息成功,适中可靠性。acks=all
(或-1
):等待所有副本(Leader 和 Follower)确认后,生产者才认为消息发送成功,最高可靠性。
- 生产者在发送消息时,可以配置消息的确认级别(acks)。常见的配置包括:
- 可靠性保证:使用
acks=all
可以确保消息被所有副本接收后才算成功,即使 Leader 出现故障,新 Leader 也可以提供一致的数据。
ISR(In-Sync Replicas)机制
- 原理:
- ISR 是指当前与 Leader 保持同步的 Follower 副本集合。当 Follower 复制滞后太久或故障时,会从 ISR 中移除,直到重新同步。
- Kafka 仅允许在 ISR 中的副本成为新的 Leader,确保新 Leader 始终持有最新的消息数据。
- 可靠性保证:保证在副本故障或滞后时,不会选举数据不完整的 Follower 作为新的 Leader,避免数据不一致。
日志留存与分段(Log Retention and Segmentation)
- 原理:
- Kafka 将消息存储在日志文件中,并支持基于时间或大小的日志留存策略。即使消息被消费,也可以在指定时间内保留,以供数据恢复或回溯使用。
- 日志文件被分为多个段(Segment),每个段可以独立管理,便于高效的日志清理和压缩。
- 可靠性保证:通过日志留存机制,Kafka 能够保留消息历史数据,防止因消费错误或其他原因导致的数据丢失。
生产者幂等性(Idempotence)
- 原理:
- Kafka 生产者可以配置幂等性(Idempotence)模式,在这种模式下,生产者会为每条消息生成唯一的序列号,并在发送时携带该序列号。Kafka 会检查该序列号,确保相同的消息不会被重复写入。
- 可靠性保证:避免网络抖动或重试机制导致的消息重复,从而保证消息的一致性和准确性。
事务支持(Transactions)
- 原理:
- Kafka 提供了事务支持,可以确保一组消息要么全部成功写入,要么全部失败回滚。生产者可以开启事务,将多个消息批次写入作为一个原子操作。
- Kafka 通过两个阶段提交协议(Two-Phase Commit Protocol, 2PC)实现事务一致性。
- 可靠性保证:确保在分布式环境下的多分区写入操作具有原子性,避免出现部分成功部分失败的情况,保证数据的一致性。
严格的顺序保证
- 原理:
- 在每个分区内,Kafka 保证消息按照生产者的发送顺序进行存储和消费,即消息的顺序性得到保证。
- 可靠性保证:保证在关键业务场景下(如金融交易系统)的消息顺序,避免因乱序导致的数据不一致问题。
这些机制共同作用,使得 Kafka 能够在各种复杂场景下保障消息的可靠传递和一致性,适应高可用、高性能的分布式系统需求。
12.Kafka的文件存储机制
Kafka使用文件存储机制来持久化消息数据。下面是对Kafka文件存储机制的分析:
- 分区文件:Kafka中的消息被组织成一个个分区(partition),每个分区对应一个磁盘上的文件。每个分区文件以固定大小的片段(segment)进行存储,通常是一系列连续的文件片段。
- 文件片段(Segment):每个分区文件由多个文件片段组成,每个文件片段对应一个时间范围内的消息。文件片段的大小可以配置,通常为一定的存储容量或时间长度。当一个文件片段达到指定的大小或时间限制时,Kafka会创建一个新的文件片段。
- 日志索引:为了快速定位消息在分区文件中的位置,Kafka使用了日志索引。每个文件片段都有对应的日志索引文件,它包含了消息在文件片段中的偏移量(offset)和物理位置的索引。通过使用日志索引,Kafka可以快速定位和查找消息,提高读写的效率。
- 文件清理和压缩:Kafka通过一系列的策略来管理分区文件,包括文件清理和压缩。文件清理是指删除已经过时的文件片段,即已经被所有消费者消费完的文件片段。文件压缩是将多个文件片段合并为一个更大的文件,以减少存储空间和提高读写性能。
- 零拷贝技术:Kafka使用零拷贝(Zero-Copy)技术来提高数据的传输效率。在消息写入和读取过程中,Kafka避免了数据在内核空间和用户空间之间的多次拷贝,减少了CPU和内存的开销,提高了性能和吞吐量。
- 快速写入和追加:Kafka采用追加写入(Append-Only)的方式来存储消息。新的消息会被追加到分区文件的末尾,不涉及文件的修改和移动操作,提高了写入的效率。
通过以上文件存储机制,Kafka能够高效地持久化消息数据,并提供快速的读写能力。文件分段、日志索引、文件清理和压缩等策略和优化措施,进一步提升了Kafka的性能和存储效率。这使得Kafka在大规模的消息处理场景下具备了良好的可扩展性和可靠性。
13.Kafka消息是采用Pull模式,还是Push模式?(必考)
Kafka 的消息传递机制采用的是 Pull 模式。
Pull 模式:在 Pull 模式下,消费者主动从 Kafka Broker 请求消息。消费者决定何时、从哪个分区获取消息。工作原理:
- 消费者请求: 消费者定期向 Kafka Broker 发送请求,指定要消费的主题和分区。
- 消息拉取: Kafka Broker 根据消费者的请求,将符合条件的消息返回给消费者。
- 消费进度: 消费者会维护自己的消费进度(offset),记录已消费到哪个位置,以便在下次拉取时继续从正确的位置开始消费。
特点:
- 灵活性: 消费者可以控制消息的拉取速率,根据自己的处理能力来调整拉取速度,避免因处理速度过慢而导致的积压。
- 容错性: 如果消费者由于网络问题或故障无法接收消息,消息会保留在 Kafka 中,直到消费者恢复并请求拉取。
- 负载均衡: 可以通过增加消费者实例来实现负载均衡,多个消费者可以共同消费同一个主题的不同分区,提高处理能力。
对比 Push 模式:在 Push 模式下,消息由生产者或中间件主动推送到消费者。消费者无需主动请求消息,消息会被自动推送到消费者。
特点:
- 实时性: 消息可以被立即推送到消费者,但如果消费者处理能力不足,可能会导致消息积压或丢失。
- 灵活性低: 消费者对消息的处理速率控制较少,可能需要额外的机制来应对处理能力不匹配的问题。
为什么 Kafka 选择 Pull 模式
- 适应高吞吐量: Pull 模式允许消费者以适当的速率拉取消息,适应各种处理能力的消费者,提升系统的整体吞吐量。
- 简单的消息确认: 消费者可以通过偏移量(offset)机制管理自己的消费进度,更容易实现消息确认和重试机制。
- 提高可靠性: 消息在 Kafka 中持久化存储,消费者故障恢复后可以从消息的最新位置继续消费,避免消息丢失。
Kafka 的设计选择 Pull 模式,以提高系统的灵活性、容错性和处理能力。消费者可以根据自己的处理能力来控制消息的拉取速率,从而更好地适应不同的业务场景和负载需求。
14.Kafka是如何实现高吞吐率的?
机制/原理 | 描述 |
---|---|
分布式架构 | 分区机制: 将每个主题分成多个分区,每个分区在不同 Broker 上存储,实现并行处理。 副本机制: 每个分区有一个主副本和多个从副本,主副本处理读写请求,从副本负责数据同步。 |
高效的磁盘存储 | 顺序写入: 消息顺序写入磁盘,减少磁盘寻道时间,提高 I/O 性能。 日志分段: 将日志文件分为多个段,提高日志管理效率,减少磁盘碎片化。 |
内存优化 | 批量处理: 生产者将多个消息打包成一个批次发送,消费者批量拉取消息,减少网络开销和磁盘 I/O。 内存缓存: 使用内存缓存消息索引和数据,提高读写性能。 |
高效的网络传输 | 压缩机制: 支持 GZIP、Snappy、LZ4 等压缩格式,减少网络带宽和磁盘存储占用。 网络传输优化: 使用高效的网络协议和传输方式,减少网络延迟和开销。 |
可扩展性 | 水平扩展: 通过增加 Broker 节点实现水平扩展,处理更多数据和负载。 负载均衡: 自动均衡分区和 Broker 之间的负载,避免单点瓶颈。 |
生产者和消费者优化 | 异步发送: 生产者异步发送消息,减少网络延迟,提高吞吐量。 消费者并行处理: 消费者并行处理不同分区的数据,多个消费者组成消费者组,共同消费数据。 |
数据压缩与索引 | 消息压缩: 支持消息压缩,减少消息大小,提高存储和传输效率。 索引优化: 使用高效的索引结构加速消息查找,索引存储在内存中,定期更新。 |
15.Kafka判断一个节点还活着的两个条件?
Kafka判断一个节点是否还活着通常依赖于两个条件:
- 心跳机制:Kafka使用心跳机制来检测节点的活跃状态。每个Broker节点会定期发送心跳信号给集群的控制器(Controller),以表示它仍然活着。如果一段时间内没有收到来自节点的心跳信号,Kafka会认为该节点不再活跃,并将其标记为失效。
- ISR(In-Sync Replicas)列表:ISR是指与Leader副本保持同步的副本集合。Kafka通过监测ISR列表来判断节点的活跃状态。如果一个节点的ISR列表中没有该节点的副本,说明该节点与集群的同步已经中断,Kafka会认为该节点不再活跃,并将其标记为失效。
这两个条件的结合可以帮助Kafka监测节点的状态,及时发现失效节点并进行相应的故障处理。当一个节点被标记为失效后,Kafka可以执行副本重分配和重新选举等操作,以确保集群的可用性和数据的一致性。
16.Kafka采用的拉取模式还是推送模式?两者有啥区别?
Kafka 采用的是 拉取模式(Pull Mode)。下面是 Kafka 拉取模式与推送模式的区别,以表格形式展示:
特性 | 拉取模式(Kafka) | 推送模式 |
---|---|---|
定义 | 消费者主动从 Kafka Broker 请求消息。 | 消息生产者或中间件主动将消息推送到消费者。 |
消息获取 | 消费者定期拉取消息,主动请求指定的主题和分区。 | 消息自动推送到消费者,消费者无需主动请求。 |
控制 | 消费者可以控制消息的拉取速率,根据处理能力调整拉取的频率和批量大小。 | 消费者对消息的接收速度控制较少,可能需要额外的机制来处理消息积压。 |
容错性 | 消息在 Kafka 中持久化存储,即使消费者暂时无法处理消息,消息不会丢失,消费者可以稍后重新拉取。 | 如果消费者处理能力不足或网络问题,可能会导致消息丢失或积压,需额外的处理机制来确保消息的可靠性。 |
负载均衡 | 通过增加消费者实例来实现负载均衡,多个消费者可以共同消费不同分区的消息,提高处理能力。 | 负载均衡通常需要在生产者或中间件端进行控制,消费者端可能难以实现高效的负载均衡。 |
实现复杂度 | 相对简单,消费者控制拉取的频率和批量,易于实现消息确认和处理。 | 需要处理消息推送的失败和重试机制,可能需要复杂的机制来确保消息的完整传输。 |
实时性 | 拉取模式的实时性可能受到消费者拉取间隔的影响。 | 推送模式通常具有更高的实时性,消息可以立即推送到消费者。 |
网络带宽使用 | 拉取模式可以根据需要批量拉取消息,减少网络带宽的使用。 | 推送模式可能导致网络带宽的高峰使用,尤其在高负载情况下。 |
- 拉取模式(Kafka):
- 优点: 灵活性高,消费者可以控制消息拉取的速度和批量,适应不同的处理能力,容错性好,消息持久化存储,负载均衡和扩展性较好。
- 缺点: 实时性可能稍差,取决于消费者的拉取频率。
- 推送模式:
- 优点: 实时性高,消息可以立即推送到消费者,适合需要实时处理的场景。
- 缺点: 控制和管理较复杂,容易出现消息丢失或积压,负载均衡和网络带宽使用可能较高。
17.如果流量突增导致MQ积压过高,该如何处理?
处理流量突增导致MQ(消息队列)积压过高的问题需要综合考虑多个因素,并采取适当的措施来解决。以下是一些可能的处理方法:
- 扩容MQ集群: 如果MQ集群的规模不够大,可以考虑增加节点或者扩容机器规格,以提高MQ集群的处理能力和吞吐量。
- 优化消费者端: 确保消费者端的处理能力足够强大,可以通过增加消费者数量或者优化消费者的代码逻辑来提高消费速度。
- 增加MQ Topic和分区: 将消息按照不同的Topic或分区进行划分,可以减少单一队列的压力,提高并行处理能力。
- 流量限制和流控机制: 在MQ生产者端和消费者端实施流量限制和流控机制,防止过多的消息涌入队列,同时防止消费者消费能力不足导致积压。
- 优化消息处理逻辑: 评估MQ消息处理逻辑,确保消费者处理消息的效率,避免不必要的复杂计算或资源浪费。
- 持久化设置: 确保MQ的消息持久化设置合理,避免消息丢失。
- 监控和报警: 设置监控和报警机制,实时监控MQ队列的积压情况,及时发现问题并采取措施。
- 灾备处理: 在极端情况下,如果MQ集群无法快速消化积压的消息,可以考虑设置灾备机制,将消息持久化到磁盘,保证数据不丢失,待MQ恢复正常后再进行处理。
以上方法中的选择和执行需要根据具体情况来决定,以及根据系统架构和业务需求来调整。同时,持续监控和性能优化也是保障系统稳定性的重要手段。
18. Kafka 什么情况下数据丢失?
Kafka 是一种分布式消息队列系统,具有高吞吐量和可靠性。但在某些情况下,数据可能会丢失。以下是一些导致 Kafka 数据丢失的常见情况:
- Producer 发送失败: 当 Kafka Producer 发送消息到 Kafka 集群时,如果遇到网络问题、Kafka 集群故障或其他异常情况,可能导致消息发送失败。在这种情况下,消息可能会丢失。
- Replica 副本不足: Kafka 通过复制数据来保障可靠性。如果某些分区的 Replica 副本数量不足,当 Leader 副本(主副本)出现故障时,无法保障数据可靠性。
- Producer 未设置acks=all: 如果 Kafka Producer 的 acks 配置未设置为 “all”,即等待所有 Replica 副本确认消息写入成功后再返回 ACK,那么在某些情况下可能出现数据丢失。
- 数据过期: 如果 Kafka Broker 配置了数据保留时间或者数据保留大小,当消息的时间戳超过保留时间或者数据大小超过限制时,消息会被删除,从而导致数据丢失。
- Producer 重试机制: 在某些情况下,Producer 可能会启用重试机制以确保消息发送成功。但是,如果设置的重试次数过少或者未启用重试机制,可能会导致数据在发送失败后被丢弃。
为了降低数据丢失的风险,可以考虑以下措施:
- 配置 Producer 的 acks 为 “all”,确保数据在 Leader 和所有 Replica 副本上写入成功后才返回 ACK。
- 设置合理的数据保留时间和大小,避免数据过期删除。
- 确保足够的 Replica 副本,保障数据的可靠性。
- 启用 Producer 的重试机制,设置适当的重试次数和重试时间间隔。
- 监控 Kafka 集群的状态,及时发现异常并采取相应措施。
19.Netty和kafka都是用了零拷贝,那么零拷贝究竟是怎么实现的?底层原理是?
零拷贝(Zero-Copy) 是一种优化技术,旨在减少数据在内存中的复制操作,从而提高系统的性能。Netty 和 Kafka 都使用了零拷贝技术,以提高网络数据的传输效率。
零拷贝的基本概念
零拷贝指的是在数据传输过程中避免将数据从一个内存区域复制到另一个内存区域。传统的数据传输通常涉及多个内存拷贝操作,例如,从用户空间到内核空间,再到网络接口,零拷贝技术通过减少这些拷贝操作来提高性能。
零拷贝的实现原理
零拷贝的实现通常依赖以下几个底层原理和技术:
1.文件映射(Memory-Mapped Files)
- 原理: 文件映射技术允许将文件内容直接映射到进程的内存空间,这样应用程序可以像操作内存一样操作文件数据。这种映射避免了将文件数据从磁盘读取到内核缓冲区,然后再从内核缓冲区复制到用户空间的过程。
- 使用: 在 Netty 和 Kafka 中,文件映射用于高效读取大文件或传输大量数据。
2. 直接内存访问(Direct Memory Access, DMA)
- 原理: 直接内存访问允许设备直接读写内存中的数据,而不需要经过 CPU 的干预。网络接口卡(NIC)可以使用 DMA 将数据直接写入内存缓冲区,从而避免了 CPU 参与数据的复制。
- 使用: Netty 使用直接内存来减少内存拷贝,特别是在处理大量的网络数据时,数据可以直接从网络接口卡写入应用程序的内存中。
3.sendfile 系统调用
- 原理:
sendfile
是一个系统调用,它允许将文件内容直接从磁盘发送到网络套接字,而不需要将数据读入用户空间。该调用通过内核空间中的直接内存拷贝来完成文件到网络的传输。 - 使用: Kafka 使用
sendfile
来优化日志文件的传输,从而减少数据的拷贝次数,提高写入性能。
4.内核级别的 I/O 操作
- 原理: 某些操作系统提供内核级别的 I/O 操作支持,例如
splice
系统调用,它允许将数据从一个文件描述符直接移动到另一个文件描述符,而不需要经过用户空间。 - 使用: Netty 利用这些内核级别的 I/O 操作来进一步减少内存拷贝的需求。
Netty 和 Kafka 的零拷贝实现
Netty
- ByteBuf: Netty 的
ByteBuf
类支持直接内存缓冲区,通过ByteBufAllocator
可以分配直接内存,从而避免数据拷贝。ByteBuf
的实现使得数据可以直接在内存中操作,并且支持零拷贝传输。 - 文件映射: Netty 通过
FileRegion
和FileRegion
对象利用文件映射技术进行零拷贝传输。
Kafka
- 直接内存: Kafka 使用
ByteBuffer
和直接内存来减少数据的拷贝,特别是在处理消息和日志时。 - sendfile: Kafka 使用
sendfile
系统调用将日志文件直接从磁盘传输到网络,减少了数据的拷贝和处理延迟。
零拷贝技术通过减少数据在内存中的复制次数,利用文件映射、直接内存访问、内核级 I/O 操作等技术来优化数据传输效率。Netty 和 Kafka 通过使用这些技术,提高了网络数据传输和日志处理的性能。
20.导致MQ积压的原因有哪些?怎么应对?
消息队列(MQ)积压是指在消息队列中累积了大量未处理的消息,导致系统处理能力不足,影响正常的消息传递和处理。导致消息队列积压的原因可以有很多,以下是一些常见的原因和应对方法:
原因:
- 消费者处理速度慢: 消费者处理消息的速度比生产者生成消息的速度慢,导致消息堆积。
- 消费者故障: 消费者出现故障,停止了正常的消息处理,导致消息堆积。
- 消费者数量不足: 消费者的数量不足以处理消息的产生速度,导致积压。
- 生产者发送过快: 生产者发送消息的速度过快,超过了消费者的处理能力。
- 消息处理失败重试: 消息处理失败后会重试,如果重试失败,会导致消息不断堆积。
- 消息体积大: 如果消息体很大,会占用更多的存储空间,导致消息队列积压。
应对方法:
- 增加消费者数量: 增加消费者的数量,以提高消息处理的速度。
- 优化消费者处理逻辑: 优化消费者的处理逻辑,提升处理效率,减少处理时间。
- 监控和报警: 部署监控系统,及时发现消息积压问题,并设置报警机制。
- 消息处理失败策略: 设计合理的消息处理失败策略,避免消息无限重试,可以将处理失败的消息暂时移到死信队列中。
- 增加消息队列容量: 增加消息队列的容量,以适应高峰期的消息产生。
- 限流和熔断: 实施限流和熔断策略,避免过多的请求导致消息队列积压。
- 消息过期处理: 设置消息的过期时间,对于长时间未处理的消息进行清理,避免积压。
- 水平扩展: 根据需求进行消息队列的水平扩展,以提高处理能力。
- 定期清理: 定期清理过期的、无法处理的消息,以避免长期的积压。
综合上述方法,需要根据实际情况进行合理的调整和应对,以确保消息队列的稳定运行,避免积压问题影响整体系统性能。
21.如何解决消息队列的延时以及过期失效问题?消息队列满了以后该怎么处理?有几百万消息持续积压几小时,说说怎么解决?
关于这个事儿,我们一个一个来梳理吧,先假设一个场景,我们现在消费端出故障了,然后大量消息在 mq 里积压,现在出事故了,慌了。
大量消息在 mq 里积压了几个小时了还没解决
几千万条数据在 MQ 里积压了七八个小时,从下午 4 点多,积压到了晚上 11 点多。这个是我们真实遇到过的一个场景,确实是线上故障了,这个时候要不然就是修复 consumer 的问题,让它恢复消费速度,然后傻傻的等待几个小时消费完毕。这个肯定不能在面试的时候说吧。
一个消费者一秒是 1000 条,一秒 3 个消费者是 3000 条,一分钟就是 18 万条。所以如果你积压了几百万到上千万的数据,即使消费者恢复了,也需要大概 1 小时的时间才能恢复过来。
一般这个时候,只能临时紧急扩容了,具体操作步骤和思路如下:
- 先修复 consumer 的问题,确保其恢复消费速度,然后将现有 consumer 都停掉。
- 新建一个 topic,partition 是原来的 10 倍,临时建立好原先 10 倍的 queue 数量。
- 然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue。
- 接着临时征用 10 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据。这种做法相当于是临时将 queue 资源和 consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据。
- 等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的 consumer 机器来消费消息。
mq 中的消息过期失效了
假设你用的是 RabbitMQ,RabbtiMQ 是可以设置过期时间的,也就是 TTL。如果消息在 queue 中积压超过一定的时间就会被 RabbitMQ 给清理掉,这个数据就没了。那这就是第二个坑了。这就不是说数据会大量积压在 mq 里,而是大量的数据会直接搞丢。
这个情况下,就不是说要增加 consumer 消费积压的消息,因为实际上没啥积压,而是丢了大量的消息。我们可以采取一个方案,就是批量重导,这个我们之前线上也有类似的场景干过。就是大量积压的时候,我们当时就直接丢弃数据了,然后等过了高峰期以后,比如大家一起喝咖啡熬夜到晚上12点以后,用户都睡觉了。这个时候我们就开始写程序,将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入 mq 里面去,把白天丢的数据给他补回来。也只能是这样了。
假设 1 万个订单积压在 mq 里面,没有处理,其中 1000 个订单都丢了,你只能手动写程序把那 1000 个订单给查出来,手动发到 mq 里去再补一次。
mq 都快写满了
如果消息积压在 mq 里,你很长时间都没有处理掉,此时导致 mq 都快写满了,咋办?这个还有别的办法吗?没有,谁让你第一个方案执行的太慢了,你临时写程序,接入数据来消费,消费一个丢弃一个,都不要了,快速消费掉所有的消息。然后走第二个方案,到了晚上再补数据吧。
22.如果让你写一个消息队列,该如何进行架构设计?说一下你的思路。
其实回答这类问题,说白了,不求你看过那技术的源码,起码你要大概知道那个技术的基本原理、核心组成部分、基本架构构成,然后参照一些开源的技术把一个系统设计出来的思路说一下就好。
比如说这个消息队列系统,我们从以下几个角度来考虑一下:
- 首先这个 mq 得支持可伸缩性吧,就是需要的时候快速扩容,就可以增加吞吐量和容量,那怎么搞?设计个分布式的系统呗,参照一下 kafka 的设计理念,broker -> topic -> partition,每个 partition 放一个机器,就存一部分数据。如果现在资源不够了,简单啊,给 topic 增加 partition,然后做数据迁移,增加机器,不就可以存放更多数据,提供更高的吞吐量了?
- 其次你得考虑一下这个 mq 的数据要不要落地磁盘吧?那肯定要了,落磁盘才能保证别进程挂了数据就丢了。那落磁盘的时候怎么落啊?顺序写,这样就没有磁盘随机读写的寻址开销,磁盘顺序读写的性能是很高的,这就是 kafka 的思路。
- 其次你考虑一下你的 mq 的可用性啊?这个事儿,具体参考之前可用性那个环节讲解的 kafka 的高可用保障机制。多副本 -> leader & follower -> broker 挂了重新选举 leader 即可对外服务。
- 能不能支持数据 0 丢失啊?可以的,参考我们之前说的那个 kafka 数据零丢失方案。
mq 肯定是很复杂的,面试官问你这个问题,其实是个开放题,他就是看看你有没有从架构角度整体构思和设计的思维以及能力。确实这个问题可以刷掉一大批人,因为大部分人平时不思考这些东西。
23.kafka高性能的原因是啥?
简单回顾下Kafka消息,Kafka中的消息以主题(Topic)为单位进行分类,主题是一个逻辑上的概念,主题还可以细分为一个或多个分区,一个分区只属于单个主题,所以也可以把分区称为主题分区(Topic-Partition)。同一个主题下的不同分区包含的消息是不同的,每个分区还可以有多个副本用于容灾备份。分区在存储层面可以看作一个可追加的日志(Log)文件,消息在被追加到分区日志文件的时候都会分配一个特定的偏移量(offset)。offset是消息在分区中的唯一标识,Kafka通过它来保证消息在分区内的顺序性,不过offset并不跨越分区,也就是说,Kafka保证的是分区有序而不是主题有序。下图表示了主题、分区、副本、日志之间的关系:一个主题包含至少一个分区,一个分区可以有一个或多个副本,每个副本对应一个日志(实际上是个目录),每个日志里包含多个日志分段(是根据偏移量offset进行分段的),每个日志分段存储日志文件、索引文件等。
- 分区
- 日志分段存储
- 消息顺序追加
- 页缓存
- 零拷贝
参考书籍、文献和资料
1.https://www.cnblogs.com/Vito-Yan/p/10319826.html
2.消息中间件高频面试题整理(RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMQ)
3.https://www.bianchenghao.cn/p/70c5b8d51c75
4.新浪技术分享:我们如何扛下32亿条实时日志的分析处理-51CTO.COM
5.关于MQ的几件小事(四)如何保证消息不丢失 - 简书
今天的文章 消息中间件高频面试题整理(RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMQ)分享到此就结束了,感谢您的阅读。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/bian-cheng-ji-chu/86169.html