大橙子网站建设,新征程启航
为企业提供网站建设、域名注册、服务器等服务
一般我们常见的RPC框架都包含如下三个部分:
目前创新互联公司已为上1000+的企业提供了网站建设、域名、网页空间、网站托管维护、企业网站设计、东湖网站维护等服务,公司将坚持客户导向、应用为本的策略,正道将秉承"和谐、参与、激情"的文化,与客户和合作伙伴齐心协力一起成长,共同发展。
注册中心,用于服务端注册远程服务以及客户端发现服务
服务端,对外提供后台服务,将自己的服务信息注册到注册中心
客户端,从注册中心获取远程服务的注册信息,然后进行远程过程调用
上面提到的注册中心其实属于服务治理,即使没有注册中心,RPC的功能也是完整的。之前我大多接触的是基于zookeeper的注册中心,这里基于consul来实现注册中心的基本功能。
Raft相比Paxos直接
此外不多描述,还没研究raft
支持数据中心,可以用来解决单点故障之类的问题
集成相比zookeeper更加简单(代码量少,逻辑清晰简单)
支持健康检查,支持http以及tcp
自带UI管理功能,不需要额外第三方支持。(zookeeper需要单独部署zkui之类的第三方工具)
支持key/value存储
启动consul之后访问管理页面
提取出服务注册与服务发现两个接口,然后使用Consul实现,这里主要通过consul-client来实现(也可以是consul-api),需要在pom中引入:
com.orbitz.consul consul-client 0.14.1
RegistryService
提供服务的注册与删除功能
public interface RegistryService { void register(RpcURL url); void unregister(RpcURL url); }
AbstractConsulService
consul的基类,用于构建Consl对象,服务于服务端以及客户端。
public class AbstractConsulService { private static final Logger logger = LoggerFactory.getLogger(AbstractConsulService.class); protected final static String CONSUL_NAME="consul_node_jim"; protected final static String CONSUL_ID="consul_node_id"; protected final static String CONSUL_TAGS="v3"; protected final static String CONSUL_HEALTH_INTERVAL="1s"; protected Consul buildConsul(String registryHost, int registryPort){ return Consul.builder().withHostAndPort(HostAndPort.fromString(registryHost+":"+registryPort)).build(); } }
ConsulRegistryService
服务注册实现类,在注册服务的同时,指定了健康检查。
服务的删除暂时未实现
public class ConsulRegistryService extends AbstractConsulService implements RegistryService { private final static int CONSUL_CONNECT_PERIOD=1*1000; @Override public void register(RpcURL url) { Consul consul = this.buildConsul(url.getRegistryHost(),url.getRegistryPort()); AgentClient agent = consul.agentClient(); ImmutableRegCheck check = ImmutableRegCheck.builder().tcp(url.getHost()+":"+url.getPort()).interval(CONSUL_HEALTH_INTERVAL).build(); ImmutableRegistration.Builder builder = ImmutableRegistration.builder(); builder.id(CONSUL_ID).name(CONSUL_NAME).addTags(CONSUL_TAGS).address(url.getHost()).port(url.getPort()).addChecks(check); agent.register(builder.build()); } @Override public void unregister(RpcURL url) { } }
由于我实现的RPC是基于TCP的,所以服务注册的健康检查也指定为TCP,consul会按指定的IP以及端口建立连接以此判断服务的健康状态。如果是http,则需要调用http方法,同时指定健康检查地址。
ImmutableRegCheck check = ImmutableRegCheck.builder().tcp(url.getHost()+":"+url.getPort()).interval(CONSUL_HEALTH_INTERVAL).build();
后台的监控信息如下:
虽然只是指定了TCP,可能出于某种机制后台依然会发起HTTP的健康检查请求,上图第一条请求日志。
DiscoveryService
获取所有注册的有效的服务信息。
public interface DiscoveryService { ListgetUrls(String registryHost, int registryPort); }
ConsulDiscoveryService
首先是获取有效的服务列表:
Listurls= Lists.newArrayList();Consul consul = this.buildConsul(registryHost,registryPort);HealthClient client = consul.healthClient();String name = CONSUL_NAME;ConsulResponse object= client.getAllServiceInstances(name);List serviceHealths=(List )object.getResponse();for(ImmutableServiceHealth serviceHealth:serviceHealths){ RpcURL url=new RpcURL(); url.setHost(serviceHealth.getService().getAddress()); url.setPort(serviceHealth.getService().getPort()); urls.add(url); }
服务更新监听,当可用服务列表发现变化时需要通知调用端。
try { ServiceHealthCache serviceHealthCache = ServiceHealthCache.newCache(client, name); serviceHealthCache.addListener(new ConsulCache.Listener() { @Override public void notify(Map map) { logger.info("serviceHealthCache.addListener notify"); RpcClientInvokerCache.clear(); } }); serviceHealthCache.start(); } catch (Exception e) { logger.info("serviceHealthCache.start error:",e); }
由于之前对客户端的Invoker有缓存,所以当服务列表有变化时需要对缓存信息进行更新。
这里简单的直接对缓存做清除处理,其实好一点的方法应该只对有变化的做处理。
RpcClientInvokerCache
对客户端实例化后的Invoker的缓存类
public class RpcClientInvokerCache { private static CopyOnWriteArrayListconnectedHandlers = new CopyOnWriteArrayList<>(); public static CopyOnWriteArrayList getConnectedHandlersClone(){ return (CopyOnWriteArrayList ) RpcClientInvokerCache.getConnectedHandlers().clone(); } public static void addHandler(RpcClientInvoker handler) { CopyOnWriteArrayList newHandlers = getConnectedHandlersClone(); newHandlers.add(handler); connectedHandlers=newHandlers; } public static CopyOnWriteArrayList getConnectedHandlers(){ return connectedHandlers; } public static RpcClientInvoker get(int i){ return connectedHandlers.get(i); } public static int size(){ return connectedHandlers.size(); } public static void clear(){ CopyOnWriteArrayList newHandlers = getConnectedHandlersClone(); newHandlers.clear(); connectedHandlers=newHandlers; } }
代码中取服务列表的方法有小问题,未按接口信息取,后续再完成
public class RoundRobinLoadbalanceService implements LoadbalanceService { private AtomicInteger roundRobin = new AtomicInteger(0); private static final int MAX_VALUE=1000; private static final int MIN_VALUE=1; private AtomicInteger getRoundRobinValue(){ if(this.roundRobin.getAndAdd(1)>MAX_VALUE){ this.roundRobin.set(MIN_VALUE); } return this.roundRobin; } @Override public int index(int size) { return (this.getRoundRobinValue().get() + size) % size; } }