消息队列
什么是消息:A要通知B,发送的东西叫做消息。
什么是队列:先进先出,顺序。
存放消息的队列。
为什么会产生消息队列思想呢?是为了解决什么问题产生的。
比如淘宝服务
如果哪个环节经常导致网络波荡出错误就会报错,导致用户下单失败。
假设网络传输是100ms,理想情况下。访问的服务越多耗时越长。
传统串行化服务的缺点:
1. 耦合性强(系统各个模块之间的耦合性)
2. 系统吞吐量不大,耗时多
优点:系统架构简单,排查错误比较方便
看一下消息队列的形式
客户端往消息队列放消息,放成功之后就返回成功了。
优点:解耦,提升性能
缺点:分布式事务解决方案
事务
只要涉及到消息队列,肯定要处理事务问题。
事务:原子性。我所有的动作,要么不执行,要么不全部执行成功。
比如说:张三给我转账100元。可以拆分为两个动作:张三账户减100,我的账户加100。
优秀的程序架构设计需要遵循的守则:低耦合,高内聚。
我们写的这四个服务并不是顺序执行的,是随机执行的。
吞吐量是:系统发送请求给服务器,到服务器返回客户端这是一次吞吐量。
架构设计:
从功能块的访问量来选择。消息队列,redis,zookeeper,锁,jvm,Spring,servlet
cas:就是操作系统里的cas,并发编程就是操作系统里的线程。
操作系统,计算机网络,数据结构,计算机组成原理。
什么是MQ
(详细讲解查看此博客:
什么是MQ?_清醒温柔的博客-CSDN博客)
Message Query(MQ),消息队列中间件,很多初学者认为,MQ通过消息的发送和接受来实现程序的异步和解耦,mq主要用于异步操作,这个不是mq的真正目的,只不过是mq的应用,mq真正的目的是为了通讯。他屏蔽了复杂的通讯协议,像常用的dubbo,http协议都是同步的。
这两种协议很难实现 双端通讯,A调用B,B也可以主动调用A,而且不支持长连接。mq做的就是在这些协议上构建一个简单协议——生产者、消费者模型,mq带给我们的不是底层的通讯协议,而是更高层次的通讯模型。他定义了两个对象:发送数据的叫做生产者,接受消息的叫做消费者,我们可以无视底层的通讯协议,我们可以自己定义生产者消费者。
MQ的两种流派
1.有broker的
broker是什么,可以理解为是一个中转站。生产者将消息发送给他就结束自己的任务了,broker将消息主动推送给消费者(具体的将消息推送到哪个队列,或者说消费者主动请求)
重topic
必须要有topic
kafka:全球消息处理性能最快的一款mq
rocket:阿里内部的一个大神根据kafka的执行原理手写的,性能与kafka差不多,但是功能上比kafka要多,比如说顺序消费。
轻topic
可以没有topic,topic只是一种中转模式
rabbitmq
2.无broker的
zeromq:没有使用broker,是直接使用socket进行通信。
穿插知识点
并行和并发的区别
事务简单介绍
消息队列和普通串行程序相比有哪些优缺点
一、kafka介绍(详解如下链接博客)
https://blog.csdn.net/weixin_64881460/article/details/123974331
1.kafka简单介绍
kafka是一款分布式、支持分区的、多副本,基于zookeeper协调的分布式消息系统。最大的特性就是可以实时处理大量数据来满足需求。
2.kafka使用场景
1,日志收集:可以用kafka收集各种服务的日志 ,通过已统一接口的形式开放给各种消费者。
2,消息系统:解耦生产和消费者,缓存消息。
3,用户活动追踪:kafka可以记录webapp或app用户的各种活动,如浏览网页,点击等活动,这些活动可以发送到kafka,然后订阅者通过订阅这些消息来做监控。
4,运营指标:可以用于监控各种数据。
3.kafka基本概念
kafka是一个分布式的分区的消息,提供消息系统应该具备的功能。
名称 | 解释 |
---|---|
broker | 消息中间件处理节点,一个broker就是一个kafka节点,多个broker构成一个kafka集群。 |
topic | kafka根据消息进行分类,发布到kafka的每个消息都有一个对应的topic |
producer | 消息生产(发布)者 |
consumer | 消息消费(订阅)者 |
consumergroup | 消息订阅集群,一个消息可以被多个consumergroup消费,但是一个consumergroup只有一个consumer可以消费消息。 |
4.kafka的安装
#下载安装包并解压
tar -xzvf
#修改配置文件
#默认端口号
#修改日志位置
#zk地址
#启动
./kafka-server-start.sh -daemon ../config/server.properties
5.java实现消息的生产和消费
引入maven依赖
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.0</version>
</dependency>
</dependencies>
生产者代码
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
* @author zhencong
* @title: Kafkapro
* @projectName kafkapro
* @description: TODO
* @date 2022/3/24下午 02:55
*/
public class Kafkapro {
public static void main(String[] s){
Properties properties = new Properties();
properties.put("bootstrap.servers", "192.168.129.129:9092");
properties.put("acks", "all");
properties.put("retries", 0);
properties.put("batch.size", 16384);
properties.put("linger.ms", 1);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = null;
try {
producer = new KafkaProducer<>(properties);
for (int i = 0; i < 100; i++) {
String msg = "Message " + i;
producer.send(new ProducerRecord<>("test", msg));
System.out.println("Sent:" + msg);
}
producer.send(new ProducerRecord<>("test", "msg"));
System.out.println("Sent:" + "msg");
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}
}
消息生产成功
消费者代码
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
/**
* @author zhencong
* @title: Kafkacus
* @projectName kafkacus
* @description: TODO
* @date 2022/3/24下午 03:07
*/
public class Kafkacus {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "192.168.129.129:9092");
properties.put("group.id", "group-1");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("auto.offset.reset", "earliest");
properties.put("session.timeout.ms", "30000");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
kafkaConsumer.subscribe(Arrays.asList("test"));
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(10000);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, value = %s", record.offset(), record.value());
System.out.println();
}
}
}
}
消息消费成功
目录
今天的文章MQ,KAFKA,消息队列分享到此就结束了,感谢您的阅读,如果确实帮到您,您可以动动手指转发给其他人。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/27199.html