与dubbo,springcloud类似的rpc框架,网上资料比较少,记录一下学习笔记,学习其思想。
配置示例:代码1
<bean id="orderQueryFacade" class="com.dianping.dpsf.spring.ProxyBeanFactory" init-method="init">
<property name="serviceName" value="http://service.xxx.com/trade/om/orderQueryFacade_1.0.0"/>
<property name="iface" value="com.xxx.trade.om.api.facade.OrderQueryFacade"/>
<property name="serialize" value="hessian"/>
<property name="callMethod" value="sync"/>
<property name="timeout" value="5000"/>
</bean>
配置说明:
serviceName
iface:指定接口
serialize:序列化方式
callMethod:请求方式,对负载均衡策略有影响。sync同步请求。callback 回调,需要配置callback属性并实现ServiceCallback接口。future 将请求结果通过Future与ThreadLocal存储在发起请求的实现中,可以通过ServiceFutureFactory.getResult()获取返回结果,oneway异步请求后不关心返回结果,无返回值。
timeout: 请求超时时间。
接口代理bean注册:
在spring根据配置创建bean时,会执行init方法。通过jdk的Proxy创建指定接口的代理类。ProxyBeanFactory本事实现了FactoryBean接口,所以spring在调用FactoryBean的getObjectType方法获取类型与getObject获取实体时,得到的是指定接口与指定接口的代理。ProxyBeanFactory关键代码如下:代码2
public void init() throws Exception {
if (StringUtils.isBlank(this.iface)) {
throw new IllegalArgumentException("invalid interface:" + this.iface);
} else {
this.objType = ClassUtils.loadClass(this.classLoader, this.iface.trim());
InvokerConfig invokerConfig = new InvokerConfig(this.objType, this.serviceName, this.timeout, this.callMethod, this.serialize, this.callback, this.group, this.writeBufferLimit, this.loadBalance, this.cluster, this.retries, this.timeoutRetry, this.vip, this.version, this.protocol);
invokerConfig.setClassLoader(this.classLoader);
this.obj = ServiceFactory.getService(invokerConfig);
this.configLoadBalance(invokerConfig);
}
}
public Object getObject() {
return this.obj;
}
public Class<?> getObjectType() {
return this.objType;
}
创建代理:
在创建接口代理时已经通过zk获取了服务提供者的ip地址,封装在Client类中,方便在调用时使用。代码就是上文中的ServiceFactory.getService(invokerConfig),在此方法中创建了接口代理和进行了服务发现。图1。
跟踪上图59行代码,观察代理创建。代码如下代码3。
public Object proxyRequest(InvokerConfig<?> invokerConfig) throws SerializationException {
return Proxy.newProxyInstance(ClassUtils.getCurrentClassLoader(invokerConfig.getClassLoader()), new Class[]{invokerConfig.getServiceInterface()}, new ServiceInvocationProxy(invokerConfig, InvokerProcessHandlerFactory.selectInvocationHandler(invokerConfig)));
}
代理创建时传入的ServiceInvocationProxy对象很重要,服务的实际调用就是依赖于ServiceInvocationProxy对象。在我们通过代理类代用服务提供者提供的服务时,实际上调用的ServiceInvocationProxy的invoke方法,而此方法最后调用的是在构建时传入其构造方法的handler参数的handle方法。代码4。
public ServiceInvocationProxy(InvokerConfig<?> invokerConfig, ServiceInvocationHandler handler) {
this.invokerConfig = invokerConfig;
this.handler = handler;
}
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (method.getDeclaringClass() == Object.class) {
return method.invoke(this.handler, args);
} else if ("toString".equals(methodName) && parameterTypes.length == 0) {
return this.handler.toString();
} else if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return this.handler.hashCode();
} else {
return "equals".equals(methodName) && parameterTypes.length == 1 ? this.handler.equals(args[0]) : this.extractResult(this.handler.handle(new DefaultInvokerContext(this.invokerConfig, methodName, parameterTypes, args)), method.getReturnType());
}
}
那么handler才是真正执行的执行的方法,再看一下handler是怎么构建的吧。回到代码3,在创建ServiceInvocationProxy时传入的参数为,ServiceInvocationProxy(invokerConfig, InvokerProcessHandlerFactory.selectInvocationHandler(invokerConfig))。将Filter包装成ServiceInvocationHandler ,并通过责任连的方式在代码4中提到的handler.handle触发执行Filter的invoke方法,Filter封装了对服务提供者的选取(ClusterInvokeFilter),远程调用(RemoteCallInvokeFilter)等逻辑。ClusterInvokeFilter的逻辑设计到负载均衡,在下面说到集群策略时再说。Filter的关键代码如下代码5。
public static void init() {
if (!isInitialized) {
registerBizProcessFilter(new InvokerDataFilter());
if (Constants.MONITOR_ENABLE) {
registerBizProcessFilter(new RemoteCallMonitorInvokeFilter());
}
registerBizProcessFilter(new DegradationFilter());
registerBizProcessFilter(new FlowControlPigeonClientFilter());
registerBizProcessFilter(new ClusterInvokeFilter());
registerBizProcessFilter(new GatewayInvokeFilter());
registerBizProcessFilter(new ContextPrepareInvokeFilter());
registerBizProcessFilter(new RemoteCallInvokeFilter());
bizInvocationHandler = createInvocationHandler(bizProcessFilters);
isInitialized = true;
}
}
public static ServiceInvocationHandler selectInvocationHandler(InvokerConfig<?> invokerConfig) {
return bizInvocationHandler;
}
private static <V extends ServiceInvocationFilter> ServiceInvocationHandler createInvocationHandler(List<V> internalFilters) {
final ServiceInvocationHandler last = null;
List<V> filterList = new ArrayList();
filterList.addAll(internalFilters);
for(int i = filterList.size() - 1; i >= 0; --i) {
final V filter = (ServiceInvocationFilter)filterList.get(i);
last = new ServiceInvocationHandler() {
public InvocationResponse handle(InvocationContext invocationContext) throws Throwable {
InvocationResponse resp = filter.invoke(last, invocationContext);
return resp;
}
};
}
return last;
}
服务发现:
再回到图1中的74行代码,看一下服务发现的逻辑。由于调用链比较长,直接贴关键代码,在CuratorRegistry类中,client为CuratorClient负责与ZK进行沟通,watch ZK事件等,如下,图2。拿到的adress为服务提供的者的ip与端口。
将获取到的服务提供者信息封装Client对象,放入serviceClients中,在实际请求服务提供时从serviceClients获取Client。图3。
集群策略:
集群策略范围4种。
Failfast:调用一个节点失败后抛出异常,可以同时配置重试timeoutRetry和retries属性。
Failover:调用服务的一个节点失败后会尝试调用另外的一个节点,可以同时配置重试timeoutRetry和retries属性
Failsafe:调用服务的一个节点失败后不会抛出异常,按配置默认值。
Forking:同时调用服务的所有可用节点,返回调用最快的节点结果数据。pigeon 在实现时使用线程池实现异步对所有连接发送请求,LinkedBlockingQueue实现发布消费模式,获取第一个返回的结果。
在代码5中我们看到了ClusterInvokeFilter,在其invoke方法中实现了集群策略的选择,由于我们在示例配置中没有设置集群策略,则使用默认的failfast策略。代码6。
public InvocationResponse invoke(ServiceInvocationHandler handler, InvokerContext invocationContext) throws Throwable {
invocationContext.getTimeline().add(new TimePoint(TimePhase.CL));
InvokerConfig<?> invokerConfig = invocationContext.getInvokerConfig();
Cluster cluster = ClusterFactory.selectCluster(invokerConfig.getCluster());
if (cluster == null) {
throw new IllegalArgumentException("Unsupported cluster type:" + cluster);
} else {
try {
return cluster.invoke(handler, invocationContext);
} catch (Throwable var6) {
if (this.monitor != null) {
this.monitor.logError("invoke remote call failed", var6);
}
logger.error("invoke remote call failed", var6);
throw var6;
}
}
}
集群策略的实现比较简单,贴一下快速失败的代码
可以看到timeoutRetry=true时,配置retries才会对网络超时异常生效,重试是在集群策略上生效的,具体选择那一个连接还是由ClientManager决定。在发生网络异常和可重试异常都会进行重试。
负责均衡:
在跟踪ClientManager的getClient方法发现,在该方法中会找到所有的可使用的连接,然后具体选择哪一个连接是由负载均衡决定的。最后一句代码。
public Client getClient(InvokerConfig<?> invokerConfig, InvocationRequest request, List<Client> excludeClients) {
InvokeClientListInfo clientListInfo = this.clusterListener.getInvokeClient(invokerConfig, request);
List<Client> clientsToRoute = Collections.emptyList();
if (!CollectionUtils.isEmpty(clientListInfo.getClientList())) {
clientsToRoute = new ArrayList(clientListInfo.getClientList());
if (excludeClients != null) {
((List)clientsToRoute).removeAll(excludeClients);
}
}
List<Client> fallbackClientsToRoute = Collections.emptyList();
if (!CollectionUtils.isEmpty(clientListInfo.getFallBackClients())) {
fallbackClientsToRoute = new ArrayList(clientListInfo.getFallBackClients());
if (excludeClients != null) {
((List)fallbackClientsToRoute).removeAll(excludeClients);
}
}
return this.routerManager.route((List)clientsToRoute, invokerConfig, request, (List)fallbackClientsToRoute, clientListInfo.getTrafficFlag());
}
负载均衡有4中策略。默认WeightedAutoaware。
AutoawareLoadBalance:感知服务端负载情况, 将请求路由到负载较低的服务端。对每一个连接上的请求进行统计,请求多的任务负载大。
RoundRobinLoadBalance:加权轮询法,依次轮询各节点,若该节点大于所有节点的最大公约数则返回该节点,否则轮询到下一个权重等于最大权重的节点。权重值由服务提供者在注册时放入zk。
RandomLoadBalance:随机负载均衡策略。
Client client = (Client)clients.get(this.random.nextInt(clientSize));
WeightedAutoawareLoadBalance:与AutoawareLoadBalance相同都会统计每一个连接上的负载,但是在使用负载值进行计算选择连接时使用的算法不同。
总结:
1、pigeon 与dubbo比较类似,使用zk 进行负载注册发现,pigeon 根据配置bean时指定的serviceName 参数值是@HTTP@ 则使用http 协议,否则的netty。
2、pigeon 代码在集群策略、负载均衡结构清晰、耦合程度低。使用代理伪装接口将真正的请求逻辑封装通过责任链模式实现。
3、重试需要配置timeoutRetry和retries,只针对网络超时异常重试。重试是在集群策略层面,具体使用哪个连接重试是由负载均衡决定。
4、负载均衡策略都是基于消费端实现包括AutoawareLoadBalance与WeightedAutoawareLoadBalance并不一定真正能反应出服务提供者的真实负载情况。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/bian-cheng-ji-chu/107294.html