# flink-java **Repository Path**: chenqianwen/flink-java ## Basic Information - **Project Name**: flink-java - **Description**: flink - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2019-12-24 - **Last Updated**: 2021-04-26 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # Spring Cloud Plugin Framework 核心框架 ### I've become familiar with you leaving,I had to be better. And looking forward to the next good girl. ### 我已经慢慢熟悉你离开,我只好变得更好。 期待遇见下个美好。 ## 总览 ### 【1】[**I**]核心适配器 CoreAdapter 1. 通过服务注册的元数据获取服务信息(版本,区域,环境等) 2. 本地缓存中获取规则信息 ### 【2】[**I**]本地缓存 caffeine 存储远程配置中心的规则信息 ### 【3】[**I**]服务注册-白名单过滤 向注册中心注册服务时,执行RegisterListener对应注册方法。完成白名单过滤。 ### 【4】[**I**]ribbon负载均衡-获取可用的服务列表进行规则的过滤 ribbon根据定义的规则、过滤,进行负载均衡: 1. 规则匹配: 1.1 版本、区域、环境匹配: 由于是固定在缓存中的,所以从注册中心获取服务列表时,可直接通过自定义规则进行过滤,只留下匹配的服务。 1.2 版本、区域权重匹配: 利用PredicateBasedRule进行选择。 2. 策略匹配:由于是灵活的配置,所以需要在规则匹配之后的服务列表中,PredicateBasedRule进行匹配。 Header传值????????????? 1. 规则匹配落地实现: 从注册中心获取更新的服务列表时: servers = serverListImpl.getUpdatedListOfServers(); 执行LoadBalanceListener对应的onGetServers,进行配置的规则过滤 ### Ribbon 负载均衡原理 ``` ribbon的自动配置只有RibbonAutoConfiguration org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ org.springframework.cloud.netflix.ribbon.RibbonAutoConfiguration 其中声明了 @Bean @ConditionalOnMissingBean(LoadBalancerClient.class) public LoadBalancerClient loadBalancerClient() { return new RibbonLoadBalancerClient(springClientFactory()); } springcloudgateway负载均衡过程 负载均衡的拦截器:LoadBalancerClientFilter(将通过网关的请求负载均衡到、分发到对应的微服务) org.springframework.cloud.gateway.filter.LoadBalancerClientFilter.filter org.springframework.cloud.gateway.filter.LoadBalancerClientFilter.choose org.springframework.cloud.netflix.ribbon.RibbonLoadBalancerClient.choose(java.lang.String) org.springframework.cloud.netflix.ribbon.RibbonLoadBalancerClient.choose(java.lang.String, java.lang.Object) public ServiceInstance choose(String serviceId, Object hint) { Server server = getServer(getLoadBalancer(serviceId), hint); if (server == null) { return null; } return new RibbonServer(serviceId, server, isSecure(server, serviceId), serverIntrospector(serviceId).getMetadata(server)); } org.springframework.cloud.netflix.ribbon.SpringClientFactory.getLoadBalancer public ILoadBalancer getLoadBalancer(String name) { return getInstance(name, ILoadBalancer.class); } org.springframework.cloud.context.named.NamedContextFactory.createContext context.register(PropertyPlaceholderAutoConfiguration.class,this.defaultConfigType) this.defaultConfigType = class org.springframework.cloud.netflix.ribbon.RibbonClientConfiguration 向上下文容器中注册RibbonClientConfiguration配置。 其中包含了ribbon的负载均衡器:ribbonLoadBalancer(ZoneAwareLoadBalancer) 和默认的规则ZoneAvoidanceRule @Bean @ConditionalOnMissingBean public ILoadBalancer ribbonLoadBalancer(IClientConfig config, ServerList serverList, ServerListFilter serverListFilter, IRule rule, IPing ping, ServerListUpdater serverListUpdater) { if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) { return this.propertiesFactory.get(ILoadBalancer.class, config, name); } return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList, serverListFilter, serverListUpdater); } @Bean @ConditionalOnMissingBean public IRule ribbonRule(IClientConfig config) { if (this.propertiesFactory.isSet(IRule.class, name)) { return this.propertiesFactory.get(IRule.class, config, name); } ZoneAvoidanceRule rule = new ZoneAvoidanceRule(); rule.initWithNiwsConfig(config); return rule; } 实例化DynamicServerListLoadBalancer时 public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping, ServerList serverList, ServerListFilter filter, ServerListUpdater serverListUpdater) { super(clientConfig, rule, ping); this.serverListImpl = serverList; this.filter = filter; this.serverListUpdater = serverListUpdater; if (filter instanceof AbstractServerListFilter) { ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats()); } restOfInit(clientConfig); } restOfInit==>enableAndInitLearnNewServersFeature和updateListOfServers=>serverListUpdater.start(updateAction) com.netflix.loadbalancer.PollingServerListUpdater.start定时执行updateListOfServers private static int LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 30 * 1000; // msecs;默认定时时间间隔30秒 更新服务的列表---- updateAllServerList并且更新allServerList列表 public void updateListOfServers() { List servers = new ArrayList(); if (serverListImpl != null) { servers = serverListImpl.getUpdatedListOfServers(); LOGGER.debug("List of Servers for {} obtained from Discovery client: {}", getIdentifier(), servers); if (filter != null) { servers = filter.getFilteredListOfServers(servers); LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}", getIdentifier(), servers); } } updateAllServerList(servers); } servers = serverListImpl.getUpdatedListOfServers();这个由注册中心实现,本例时nacos实现。com.alibaba.cloud.nacos.ribbon.NacosServerList 继续上述Server server = getServer(getLoadBalancer(serviceId), hint);方法 getLoadBalancer(serviceId)获得到ZoneAwareLoadBalancer负载均衡器。 此时的loadBalancer是ZoneAwareLoadBalancer继承=>DynamicServerListLoadBalancer继承=>BaseLoadBalancer com.netflix.loadbalancer.ZoneAwareLoadBalancer.chooseServer(Object key) 。参数为“default” com.netflix.loadbalancer.BaseLoadBalancer.chooseServer => return rule.choose(key); 根据规则选择对应的服务 默认的规则是上述ZoneAvoidanceRule继承自com.netflix.loadbalancer.PredicateBasedRule com.netflix.loadbalancer.PredicateBasedRule.choose public Server choose(Object key) { ILoadBalancer lb = getLoadBalancer(); Optional server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key); if (server.isPresent()) { return server.get(); } else { return null; } } com.netflix.loadbalancer.AbstractServerPredicate.chooseRoundRobinAfterFiltering(java.util.List, java.lang.Object) /** * Choose a server in a round robin fashion after the predicate filters a given list of servers and load balancer key. */ public Optional chooseRoundRobinAfterFiltering(List servers, Object loadBalancerKey) { List eligible = getEligibleServers(servers, loadBalancerKey); if (eligible.size() == 0) { return Optional.absent(); } return Optional.of(eligible.get(incrementAndGetModulo(eligible.size()))); } com.netflix.loadbalancer.AbstractServerPredicate.getEligibleServers(java.util.List, java.lang.Object) public List getEligibleServers(List servers, Object loadBalancerKey) { if (loadBalancerKey == null) { return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate())); } else { List results = Lists.newArrayList(); for (Server server: servers) { if (this.apply(new PredicateKey(loadBalancerKey, server))) { results.add(server); } } return results; } } 通过Predicate的实现apply方法过滤服务列表,得到最终的servers. 上述 return Optional.of(eligible.get(incrementAndGetModulo(eligible.size()))); 轮询的算法。 /** * Referenced from RoundRobinRule * Inspired by the implementation of {@link AtomicInteger#incrementAndGet()}. * * @param modulo The modulo to bound the value of the counter. * @return The next value. */ private int incrementAndGetModulo(int modulo) { for (;;) { int current = nextIndex.get(); int next = (current + 1) % modulo; if (nextIndex.compareAndSet(current, next) && current < modulo) return current; } } chooseRoundRobinAfterFiltering经过规则过滤,轮询得到最终的一个Optional 最终构造return new RibbonServer(serviceId, server, isSecure(server, serviceId),serverIntrospector(serviceId).getMetadata(server));返回。 @LoadBalanced注解restTemplate ribbon负载均衡的过程 org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration==>loadBalancedRestTemplateInitializerDeprecated() LoadBalancerInterceptor拦截器的作用就是对请求的URI进行转换获取到具体应该请求哪个服务实例. org.springframework.cloud.netflix.ribbon.RibbonLoadBalancerClient.execute(java.lang.String, org.springframework.cloud.client.loadbalancer.LoadBalancerRequest, java.lang.Object) public T execute(String serviceId, LoadBalancerRequest request, Object hint) throws IOException { ILoadBalancer loadBalancer = getLoadBalancer(serviceId); Server server = getServer(loadBalancer, hint); if (server == null) { throw new IllegalStateException("No instances available for " + serviceId); } RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server, serviceId), serverIntrospector(serviceId).getMetadata(server)); return execute(serviceId, ribbonServer, request); } getLoadBalancer 原理同上。 org.springframework.cloud.netflix.ribbon.RibbonLoadBalancerClient.getServer(com.netflix.loadbalancer.ILoadBalancer, java.lang.Object) protected Server getServer(ILoadBalancer loadBalancer, Object hint) { if (loadBalancer == null) { return null; } // Use 'default' on a null hint, or just pass it on? return loadBalancer.chooseServer(hint != null ? hint : "default"); } getServer原理同上。 ```