pigeon学习笔记

pigeon学习笔记本文详细介绍了 PigeonRPC 框架的配置 接口代理实现 集群策略和负载均衡策略

与dubbo,springcloud类似的rpc框架,网上资料比较少,记录一下学习笔记,学习其思想。

配置示例:代码1

	<bean id="orderQueryFacade" class="com.dianping.dpsf.spring.ProxyBeanFactory" init-method="init">
		<property name="serviceName" value="http://service.xxx.com/trade/om/orderQueryFacade_1.0.0"/>
		<property name="iface" value="com.xxx.trade.om.api.facade.OrderQueryFacade"/>
		<property name="serialize" value="hessian"/>
		<property name="callMethod" value="sync"/>
		<property name="timeout" value="5000"/>
	</bean>

配置说明:

serviceName

iface:指定接口

serialize:序列化方式

callMethod:请求方式,对负载均衡策略有影响。sync同步请求。callback 回调,需要配置callback属性并实现ServiceCallback接口。future 将请求结果通过Future与ThreadLocal存储在发起请求的实现中,可以通过ServiceFutureFactory.getResult()获取返回结果,oneway异步请求后不关心返回结果,无返回值。

timeout: 请求超时时间。

接口代理bean注册:

在spring根据配置创建bean时,会执行init方法。通过jdk的Proxy创建指定接口的代理类。ProxyBeanFactory本事实现了FactoryBean接口,所以spring在调用FactoryBean的getObjectType方法获取类型与getObject获取实体时,得到的是指定接口与指定接口的代理。ProxyBeanFactory关键代码如下:代码2

    public void init() throws Exception {
        if (StringUtils.isBlank(this.iface)) {
            throw new IllegalArgumentException("invalid interface:" + this.iface);
        } else {
            this.objType = ClassUtils.loadClass(this.classLoader, this.iface.trim());
            InvokerConfig invokerConfig = new InvokerConfig(this.objType, this.serviceName, this.timeout, this.callMethod, this.serialize, this.callback, this.group, this.writeBufferLimit, this.loadBalance, this.cluster, this.retries, this.timeoutRetry, this.vip, this.version, this.protocol);
            invokerConfig.setClassLoader(this.classLoader);
            this.obj = ServiceFactory.getService(invokerConfig);
            this.configLoadBalance(invokerConfig);
        }
    }

    public Object getObject() {
        return this.obj;
    }

    public Class<?> getObjectType() {
        return this.objType;
    }

创建代理:

在创建接口代理时已经通过zk获取了服务提供者的ip地址,封装在Client类中,方便在调用时使用。代码就是上文中的ServiceFactory.getService(invokerConfig),在此方法中创建了接口代理和进行了服务发现。图1。

跟踪上图59行代码,观察代理创建。代码如下代码3。


    public Object proxyRequest(InvokerConfig<?> invokerConfig) throws SerializationException {
        return Proxy.newProxyInstance(ClassUtils.getCurrentClassLoader(invokerConfig.getClassLoader()), new Class[]{invokerConfig.getServiceInterface()}, new ServiceInvocationProxy(invokerConfig, InvokerProcessHandlerFactory.selectInvocationHandler(invokerConfig)));
    }

代理创建时传入的ServiceInvocationProxy对象很重要,服务的实际调用就是依赖于ServiceInvocationProxy对象。在我们通过代理类代用服务提供者提供的服务时,实际上调用的ServiceInvocationProxy的invoke方法,而此方法最后调用的是在构建时传入其构造方法的handler参数的handle方法。代码4。

  public ServiceInvocationProxy(InvokerConfig<?> invokerConfig, ServiceInvocationHandler handler) {
        this.invokerConfig = invokerConfig;
        this.handler = handler;
    }

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(this.handler, args);
        } else if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return this.handler.toString();
        } else if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return this.handler.hashCode();
        } else {
            return "equals".equals(methodName) && parameterTypes.length == 1 ? this.handler.equals(args[0]) : this.extractResult(this.handler.handle(new DefaultInvokerContext(this.invokerConfig, methodName, parameterTypes, args)), method.getReturnType());
        }
    }

那么handler才是真正执行的执行的方法,再看一下handler是怎么构建的吧。回到代码3,在创建ServiceInvocationProxy时传入的参数为,ServiceInvocationProxy(invokerConfig, InvokerProcessHandlerFactory.selectInvocationHandler(invokerConfig))。将Filter包装成ServiceInvocationHandler ,并通过责任连的方式在代码4中提到的handler.handle触发执行Filter的invoke方法,Filter封装了对服务提供者的选取(ClusterInvokeFilter),远程调用(RemoteCallInvokeFilter)等逻辑。ClusterInvokeFilter的逻辑设计到负载均衡,在下面说到集群策略时再说。Filter的关键代码如下代码5。

 public static void init() {
        if (!isInitialized) {
            registerBizProcessFilter(new InvokerDataFilter());
            if (Constants.MONITOR_ENABLE) {
                registerBizProcessFilter(new RemoteCallMonitorInvokeFilter());
            }

            registerBizProcessFilter(new DegradationFilter());
            registerBizProcessFilter(new FlowControlPigeonClientFilter());
            registerBizProcessFilter(new ClusterInvokeFilter());
            registerBizProcessFilter(new GatewayInvokeFilter());
            registerBizProcessFilter(new ContextPrepareInvokeFilter());
            registerBizProcessFilter(new RemoteCallInvokeFilter());
            bizInvocationHandler = createInvocationHandler(bizProcessFilters);
            isInitialized = true;
        }

    }

    public static ServiceInvocationHandler selectInvocationHandler(InvokerConfig<?> invokerConfig) {
        return bizInvocationHandler;
    }

    private static <V extends ServiceInvocationFilter> ServiceInvocationHandler createInvocationHandler(List<V> internalFilters) {
        final ServiceInvocationHandler last = null;
        List<V> filterList = new ArrayList();
        filterList.addAll(internalFilters);

        for(int i = filterList.size() - 1; i >= 0; --i) {
            final V filter = (ServiceInvocationFilter)filterList.get(i);
            last = new ServiceInvocationHandler() {
                public InvocationResponse handle(InvocationContext invocationContext) throws Throwable {
                    InvocationResponse resp = filter.invoke(last, invocationContext);
                    return resp;
                }
            };
        }

        return last;
    }

服务发现:

再回到图1中的74行代码,看一下服务发现的逻辑。由于调用链比较长,直接贴关键代码,在CuratorRegistry类中,client为CuratorClient负责与ZK进行沟通,watch ZK事件等,如下,图2。拿到的adress为服务提供的者的ip与端口。

将获取到的服务提供者信息封装Client对象,放入serviceClients中,在实际请求服务提供时从serviceClients获取Client。图3。

集群策略:

集群策略范围4种。

Failfast:调用一个节点失败后抛出异常,可以同时配置重试timeoutRetry和retries属性。

Failover调用服务的一个节点失败后会尝试调用另外的一个节点,可以同时配置重试timeoutRetry和retries属性

Failsafe调用服务的一个节点失败后不会抛出异常,按配置默认值。

Forking同时调用服务的所有可用节点,返回调用最快的节点结果数据。pigeon 在实现时使用线程池实现异步对所有连接发送请求,LinkedBlockingQueue实现发布消费模式,获取第一个返回的结果。

在代码5中我们看到了ClusterInvokeFilter,在其invoke方法中实现了集群策略的选择,由于我们在示例配置中没有设置集群策略,则使用默认的failfast策略。代码6。

 public InvocationResponse invoke(ServiceInvocationHandler handler, InvokerContext invocationContext) throws Throwable {
        invocationContext.getTimeline().add(new TimePoint(TimePhase.CL));
        InvokerConfig<?> invokerConfig = invocationContext.getInvokerConfig();
        Cluster cluster = ClusterFactory.selectCluster(invokerConfig.getCluster());
        if (cluster == null) {
            throw new IllegalArgumentException("Unsupported cluster type:" + cluster);
        } else {
            try {
                return cluster.invoke(handler, invocationContext);
            } catch (Throwable var6) {
                if (this.monitor != null) {
                    this.monitor.logError("invoke remote call failed", var6);
                }

                logger.error("invoke remote call failed", var6);
                throw var6;
            }
        }
    }

集群策略的实现比较简单,贴一下快速失败的代码

可以看到timeoutRetry=true时,配置retries才会对网络超时异常生效,重试是在集群策略上生效的,具体选择那一个连接还是由ClientManager决定。在发生网络异常和可重试异常都会进行重试。

负责均衡:

在跟踪ClientManager的getClient方法发现,在该方法中会找到所有的可使用的连接,然后具体选择哪一个连接是由负载均衡决定的。最后一句代码。

public Client getClient(InvokerConfig<?> invokerConfig, InvocationRequest request, List<Client> excludeClients) {
        InvokeClientListInfo clientListInfo = this.clusterListener.getInvokeClient(invokerConfig, request);
        List<Client> clientsToRoute = Collections.emptyList();
        if (!CollectionUtils.isEmpty(clientListInfo.getClientList())) {
            clientsToRoute = new ArrayList(clientListInfo.getClientList());
            if (excludeClients != null) {
                ((List)clientsToRoute).removeAll(excludeClients);
            }
        }

        List<Client> fallbackClientsToRoute = Collections.emptyList();
        if (!CollectionUtils.isEmpty(clientListInfo.getFallBackClients())) {
            fallbackClientsToRoute = new ArrayList(clientListInfo.getFallBackClients());
            if (excludeClients != null) {
                ((List)fallbackClientsToRoute).removeAll(excludeClients);
            }
        }

        return this.routerManager.route((List)clientsToRoute, invokerConfig, request, (List)fallbackClientsToRoute, clientListInfo.getTrafficFlag());
    }
 

负载均衡有4中策略。默认WeightedAutoaware。

AutoawareLoadBalance:感知服务端负载情况, 将请求路由到负载较低的服务端。对每一个连接上的请求进行统计,请求多的任务负载大。

RoundRobinLoadBalance:加权轮询法,依次轮询各节点,若该节点大于所有节点的最大公约数则返回该节点,否则轮询到下一个权重等于最大权重的节点。权重值由服务提供者在注册时放入zk。

RandomLoadBalance:随机负载均衡策略。

Client client = (Client)clients.get(this.random.nextInt(clientSize));

WeightedAutoawareLoadBalance:与AutoawareLoadBalance相同都会统计每一个连接上的负载,但是在使用负载值进行计算选择连接时使用的算法不同。

总结:

1、pigeon 与dubbo比较类似,使用zk 进行负载注册发现,pigeon 根据配置bean时指定的serviceName 参数值是@HTTP@ 则使用http 协议,否则的netty。

2、pigeon 代码在集群策略、负载均衡结构清晰、耦合程度低。使用代理伪装接口将真正的请求逻辑封装通过责任链模式实现。

3、重试需要配置timeoutRetry和retries,只针对网络超时异常重试。重试是在集群策略层面,具体使用哪个连接重试是由负载均衡决定。

4、负载均衡策略都是基于消费端实现包括AutoawareLoadBalance与WeightedAutoawareLoadBalance并不一定真正能反应出服务提供者的真实负载情况。

编程小号
上一篇 2025-01-10 12:17
下一篇 2025-01-10 12:06

相关推荐

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/bian-cheng-ji-chu/107294.html