本文正在参与 “性能优化实战记录”话题征文活动
业务场景
消费Kafka订阅的数据,经过匹配拆分计算,把数据推送到多个外网用户HTTP Server地址。比例属于 1:N的关系。
性能瓶颈
在压测过程中发现,Kafka顺序消费场景下极易出现由于某个客户端超时导致整体积压。转发任务对于实时性要求比较高,需要进行优化改造。
性能优化StepByStep
1. 批量消费Kafka
对于推送到外部客户端场景下,瓶颈主要是在等待客户响应IO耗时中,Kafka在Spring中虽然也是默认批量消费,但是本身封装的批量框架对一批消费转换为了单线程顺序处理。因此首先需要批处理消息,提高并行度。参考代码如下:
@Data
@Configuration
public class KafkaBatchConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private Boolean autoCommit;
@Value("${spring.kafka.consumer.group-id}")
private String batchGroupId;
@Value("${spring.kafka.consumer.max-poll-records}")
private Integer maxPollRecords;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${spring.kafka.consumer.properties.session.timeout.ms}")
private Integer sessionTimeOut;
@Value("${spring.kafka.consumer.properties.partition:1}")
private Integer partition;
/** * 批量消费者配置信息 */
@Bean
public Map<String, Object> consumerBatchConfigs() {
Map<String, Object> props = new HashMap<>(16);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, sessionTimeOut);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeOut);
props.put(ConsumerConfig.GROUP_ID_CONFIG, batchGroupId);
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
/** * 消费者批量工程 */
@Bean
public KafkaListenerContainerFactory<?> batchFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerBatchConfigs()));
// 根据分区数来设置消费的线程提高性能,多余分区数没有意义
factory.setConcurrency(partition);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
//设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
factory.setBatchListener(true);
return factory;
}
}
2. 线程池转发处理
Apache的HttpClient是单例模式,但是HttpClient有线程池的概念。可以同步并发转发该批请求。 需要理解单例模式和线程池之间的关系,HttpClient虽然是单个类实例,但是在处理请求的时候可以使用PoolingHttpClientConnectionManager 线程池来分别处理不同的连接。 比如,同时执行10个Http请求,此时会去PoolingHttpClientConnectionManager中获取连接池资源。获取成功后会执行HTTP请求。
同时为了保证Kafka消息消费的可靠性,Kafka需要在HTTP转发成功后手工提交Offset,需要引入多线程编程中使用的CountDownLatch来做协同。保证消息顺利提交到Kafka。 因此模式修改为: 在Consumer主线程计算出N条消息需要转发的N个客户端,主线程起一个CountDownLatch,用来等待所有转发的任务完成。防止消费速度过快导致客户端积压。
@Slf4j
public class HttpClientSyncServiceImpl implements HttpClientService {
private CloseableHttpClient closeableHttpClient;
private HttpClientConfig httpClientConfig;
private String pemBody = null;
public HttpClientSyncServiceImpl() {
}
public HttpClientSyncServiceImpl(HttpClientConfig httpClientConfig) {
this.httpClientConfig = httpClientConfig;
try {
log.info("init http client start, default config is {}", httpClientConfig);
SSLContext sslcontext = HttpClientUtils.buildSSLContext();
SSLConnectionSocketFactory trustAll = HttpClientUtils.buildSSLSocketFactory(sslcontext);
// 配置同时支持 HTTP 和 HTTPS
// 一个httpClient对象对于https仅会选用一个SSLConnectionSocketFactory
Registry<ConnectionSocketFactory> socketFactoryRegistry = RegistryBuilder.<ConnectionSocketFactory>create().
register("http", PlainConnectionSocketFactory.getSocketFactory()).
register("https", trustAll).build();
// 初始化连接管理器
PoolingHttpClientConnectionManager poolConnManager = buildPoolConnManager(socketFactoryRegistry);
RequestConfig config = buildHttpClient(poolConnManager);
closeableHttpClient = HttpClients.custom()
// 设置连接池管理
.setConnectionManager(poolConnManager)
.setDefaultRequestConfig(config).build();
log.info("init default http client finish");
} catch (Exception e) {
log.error("", e);
}
}
public HttpClientSyncServiceImpl(HttpClientConfig httpClientConfig, String pem) {
this.pemBody = pem;
this.httpClientConfig = httpClientConfig;
try {
log.info("build new httpclient {}", httpClientConfig);
closeableHttpClient = buildHttpClientByKeyStore(pem);
log.info("init new http client finish");
} catch (Exception e) {
log.error("", e);
}
}
private PoolingHttpClientConnectionManager buildPoolConnManager(Registry<ConnectionSocketFactory> socketFactoryRegistry) {
PoolingHttpClientConnectionManager poolConnManager = new PoolingHttpClientConnectionManager(socketFactoryRegistry);
poolConnManager.setMaxTotal(httpClientConfig.getPollMaxTotal());// 同时最多连接数
// 设置最大路由
poolConnManager.setDefaultMaxPerRoute(httpClientConfig.getPollMaxPeerRouter());
return poolConnManager;
}
private CloseableHttpClient buildHttpClientByKeyStore(String pem) {
try {
SSLContext sslContext = HttpClientUtils.buildSSLContext(pem);
SSLConnectionSocketFactory sslConnectionSocketFactory = HttpClientUtils.buildSSLSocketFactory(sslContext);
// 初始化连接管理器
Registry<ConnectionSocketFactory> socketFactoryRegistry = RegistryBuilder.<ConnectionSocketFactory>create().
register("http", PlainConnectionSocketFactory.getSocketFactory()).
register("https", sslConnectionSocketFactory).build();
PoolingHttpClientConnectionManager poolConnManager = new PoolingHttpClientConnectionManager(socketFactoryRegistry);
// 同时最多连接数,自定义线程池设置为每路连接数
poolConnManager.setMaxTotal(httpClientConfig.getPollMaxPeerRouter());
poolConnManager.setDefaultMaxPerRoute(httpClientConfig.getPollMaxPeerRouter());
RequestConfig config = HttpClientUtils.buildRequestConfig(httpClientConfig);
return HttpClients.custom()
// 设置连接池管理
.setConnectionManager(poolConnManager)
.setDefaultRequestConfig(config).build();
} catch (Exception e) {
log.error("build httpclient failed, use default", e);
return null;
}
}
private RequestConfig buildHttpClient(PoolingHttpClientConnectionManager poolConnManager) {
return RequestConfig.custom().setConnectTimeout(httpClientConfig.getConnectTimeout())
.setConnectionRequestTimeout(httpClientConfig.getConnectionRequestTimeout())
.setSocketTimeout(httpClientConfig.getResponseTimeout())
.build();
}
}
3. 计算优化并行度
性能压测中发现单条消息处理匹配查询的速度为2-4ms,对于100条数据,串行计算总共需要0.4s才能计算完成。此部分时间消耗可以通过Java8的parallelStream并行计算来优化。注意使用parallelStream要注意线程安全,返回的数据是无序数据。
此时继续出现多个问题:
1. JVM出现Full GC
[Full GC (Allocation Failure) 507M->382M(512M), 0.8299883 secs] [Eden: 0.0B(25.0M)->0.0B(69.0M) Survivors: 0.0B->0.0B Heap: 507.3M(512.0M)->382.7M(512.0M)], [Metaspace: 109047K->109047K(1153024K)]
调试环境设置GC 512MB 太小,修改为2G后可以。
Full GC 触发条件
- concurrent mode failure:并发模式失败,CMS 收集器也有同样的概念。G1 并发标记期间,如果在标记结束前,老年代被填满,G1 会放弃标记。
- 晋升失败:并发周期结束后,是混合垃圾回收周期,伴随着年轻代垃圾收集,进行清理老年代空间,如果这个时候清理的速度小于消耗的速度,导致老年代不够用,那么会发生晋升失败。
- 疏散失败:年轻代垃圾收集的时候,如果 Survivor 和 Old 区没有足够的空间容纳所有的存活对象。这种情况肯定是非常致命的,因为基本上已经没有多少空间可以用了,这个时候会触发 Full GC 也是很合理的。
- 大对象分配失败,我们应该尽可能地不创建大对象,尤其是大于一个区块大小的那种对象。
2. 转发队列出现卡顿,部分转发需要等待之前转发完成后继续转发
在压测中发现转发队列会出现卡顿,停顿一下后才会继续处理任务,此时推送任务并没有延迟返回。不会存在占用线程的情况。此时观察CPU和内存还有GC情况都正常。
问题原因:一个线程池执行的任务属于IO
密集型,CPU
大多属于闲置状态,系统资源未充分利用。如果一瞬间来了大量请求,如果线程池数量大于coreSize
时,多余的请求都会放入到等待队列中。等待着corePool
中的线程执行完成后再来执行等待队列中的任务。
线程池中代码执行顺序是:corePool->workQueue->maxPool
因此修改线程池工作顺序,在核心线程满的时候不先跳入队列中,而是新开辟工作线程。这样保证队列消息不会堆积在内存中。
修改方式为线程在执行时记录正在运行的任务数量,线程在队列入队的时候首先判断正在工作的任务是否大于核心线程,如果不大于核心,在核心线程创建并执行,如果未超过最大线程数量,则创建新的线程执行任务。
- 提交任务的线程A 在提交任务的时候,首先线程对executor线程池管理器计数+1,提交后线程池异步运行。此时直接对计数-1。线程池仍在运行任务。此时提交任务的线程A在等待线程异步运行完成。
- 线程异步完成后,通知线程A,线程A此时跟着完成。
如果使用PollExecutor 的afterExecute方法内对计数-1,此时操作方为线程池内线程。线程异步完成后已经通知A,接受新的任务。如果此时又有新的任务提交,则计数来不及更新,此时线程池判断核心线程无空闲线程,会新建线程。
发现:多线程并发调用execute方法,线程池创建超限
观察线程名字,发现多线程并发调用execute方法,线程数不停增加。旧的线程不会被复用,导致线程队列占满。
线程池中线程回收原理
线程池运行的状态,并不是用户显式设置的,而是伴随着线程池的运行,由内部来维护。线程池内部使用一个变量维护两个值:运行状态(runState)和线程数量 (workerCount)。在具体实现中,线程池将运行状态(runState)、线程数量 (workerCount)两个关键参数的维护放在了一起,如下代码所示:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int runStateOf(int c) { return c & ~CAPACITY; } //计算当前运行状态
private static int workerCountOf(int c) { return c & CAPACITY; } //计算当前线程数量
private static int ctlOf(int rs, int wc) { return rs | wc; } //通过状态和线程数生成ctl
ctl
这个AtomicInteger类型,是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它同时包含两部分的信息:线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),高3位保存runState,低29位保存workerCount,两个变量之间互不干扰。用一个变量去存储两个值,可避免在做相关决策时,出现不一致的情况,不必为了维护两者的一致,而占用锁资源。通过阅读线程池源代码也可以发现,经常出现要同时判断线程池运行状态和线程数量的情况。线程池也提供了若干方法去供用户获得线程池当前的运行状态、线程个数。这里都使用的是位运算的方式,相比于基本运算,速度也会快很多。
线程池需要管理线程的生命周期,需要在线程长时间不运行的时候进行回收。线程池使用一张Hash表去持有线程的引用,这样可以通过添加引用、移除引用这样的操作来控制线程的生命周期。这个时候重要的就是如何判断线程是否在运行。
Worker是通过继承AQS,使用AQS来实现独占锁这个功能。没有使用可重入锁ReentrantLock,而是使用AQS,为的就是实现不可重入的特性去反应线程现在的执行状态。
- lock方法一旦获取了独占锁,表示当前线程正在执行任务中。 2.如果正在执行任务,则不应该中断线程。 3.如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断。 4.线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;如果线程是空闲状态则可以安全回收。 在线程回收过程中就使用到了这种特性,回收过程如下图所示:
线程池中线程的销毁依赖JVM自动的回收,线程池做的工作是根据当前线程池的状态维护一定数量的线程引用,防止这部分线程被JVM回收,当线程池决定哪些线程需要回收时,只需要将其引用消除即可。Worker被创建出来后,就会不断地进行轮询,然后获取任务去执行,核心线程可以无限等待获取任务,非核心线程要限时获取任务。当Worker无法获取到任务,也就是获取的任务为空时,循环会结束,Worker会主动消除自身在线程池内的引用。
此时可以看到,线程池中已经工作完成的线程并不会随着任务结束立即释放。
比如核心线程是20,最大线程是40。 线程池膨胀到40的时候。会卡顿一下。回收一批线程后才会继续处理。此时怀疑多线程execute提交速度过快导致线程数不停增加,没有复用。
测试发现:
-
批量提交任务会有新线程生成。即使每批提交的线程数远小于core线程数。并且每批提交设置5S间隔也会不断生成新线程。
-
一批任务循环提交,如果每次提交延迟0.1nS,则不会有新线程生成。
此时可以判断提交速度与线程池分配新线程和复用老线程有关系。
解决方式尝试1:
取消countdownlatch发令枪协同机制,可以使用线程池activeCount检测任务是否完成。线程池中自带activeCount方法,用来检测活跃的线程池数。当activeCount为0的时候,认为线程池全部都释放了,效果等同于countdownlatch。
源码答疑
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */
int c = ctl.get();
// workerCountOf(c) 线程池正在工作的线程 小于核心线程数,会新建核心线程数
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// isRunning(c) 代表executor是否正在运行
// 关键代码: workQueue是正在运行的队列,offer操作为入队,如果入队失败,则返回false!
// 返回false会创建新的线程。
// 线程池生产-消费模型借助workQueue队列实现,消费速度由于锁、CAS的限制,在小于入队速度的时候。队列会不断增加,最终导致入队失败。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
此时重新压测发现线程池中线程仍在不停增加,未复用老的线程。说明activeCount与JVM实际释放线程并没关系。
解决方式尝试2:
线程池队列模型
ThreadPoolExecutor执行execute方法分下面4种情况。
1)如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。
2)如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。
3)如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。
4)如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。
ThreadPoolExecutor采取上述步骤的总体设计思路,是为了在执行execute()方法时,尽可能地避免获取全局锁(那将会是一个严重的可伸缩瓶颈) 。在ThreadPoolExecutor完成预热之后(当前运行的线程数大于等于corePoolSize),几乎所有的execute()方法调用都是执行步骤2,而步骤2不需要获取全局锁。
此时再查看线程池的任务调度机制,线程池是生产者消费者模型,生产和消费借助的是线程池设置的队列模型。
runnableTaskQueue(任务队列):用于保存等待执行的任务的阻塞队列。可以选择以下几个阻塞队列:
❑ ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按FIFO(先进先出)原则对元素进行排序。
❑ LinkedBlockingQueue:基于链表的阻塞队列,可以指定最大长度,但默认是无界的。此队列按FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
❑ SynchronousQueue:没有实际存储空间的同步阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于Linked-BlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
❑ PriorityBlockingQueue:基于堆的无界阻塞优先级队列。\
联想之前的业务流程,Kafka每批消费100的转发消息,线程池设置为100,首次成功转发,接着消费第二批,但是此时核心线程并未全部释放,消息加入到了等待队列中。因此出现了卡顿。
考虑出现卡顿的场景,如果调低了等待队列,会不停的创建新线程,直到超过maxSize线程数抛出异常。如果控制的线程数总数始终大于要执行的任务,就会导致不停的创建新线程,旧的线程随着keepAlive到期销毁。
此时考虑更换线程池队列模型,SynchronousQueue并没有实际的存储空间,必须等待上一个任务被消费后才会入队下一条任务。此时核心线程会不断消费任务,不会存在队列满了新建线程的情况。
线程池各种参数下性能测试记录
- 线程池提交休眠1ns,使用发令枪协同。
线程池不会创建新线程,每分钟请求量稳定在6.6K左右。
-
线程池 countDownlatch,不休眠。 线程池会不断新增,存在队列排入线程池的情况。速度不稳定。持续下降。
-
线程池使用同步队列,消费一批250 稳定在13.2K左右。
-
线程池使用同步队列,消费一批10
稳定在17K,CPU使用率稳定在73%
- 线程池使用同步队列,消费一批50
稳定在7K,且CPU使用率较高。位置在85%
- 线程池使用同步队列,消费一批8
波动较大,但是性能较高,平均22K。猜测推送队列受到毛刺影响较大,且CPU使用率较低。50%
- 线程池使用同步队列,消费一批4
性能较高,平均25K。猜测推送队列受到毛刺影响较大,且CPU使用率较低,28%。可见对于IO密集型任务,CPU增大反而优势并未增大。
性能优化点总结
- 优化了Kafka消费模式,使用批量消费Kafka提高并发度。
- 使用多线程配合HTTPClient PollManager机制并发提高转发率。
- 优化线程池任务队列模式,由之前的LinkList修改为SynchronousQueue,线程不再重新创建。
下一步重点
可以看到即使优化了线程池,HttpClient转发的性能也就在400条/s左右。性能并不高,因此下一步考虑使用Netty/Vertx重写转发线程。
今天的文章Httpclient性能优化实战分享到此就结束了,感谢您的阅读。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/19164.html