2021年7月27日星期二

Dubbo的优雅下线原理分析

一、注销注册中心

public static void destroyAll() { if (LOGGER.isInfoEnabled()) {  LOGGER.info("Close all registries " + getRegistries()); } //加锁,防止关闭多次 LOCK.lock(); try {  Iterator var0 = getRegistries().iterator();  //关闭所有已创建的注册中心  while(var0.hasNext()) {   Registry registry = (Registry)var0.next();   try {    registry.destroy();   } catch (Throwable var6) {    LOGGER.error(var6.getMessage(), var6);   }  }  REGISTRIES.clear(); } finally {  //释放锁  LOCK.unlock(); }}

首先获取到所有的注册中心连接,封装成迭代器模式

Iterator var0 = getRegistries().iterator();

接下来,迭代获取每一个注册连接对象进行关闭:

registry.destroy();

该destroy方法定义在接口Node当中,其具体实现将会在对应的Dubbo注册对象里:

public interface Node { URL getUrl(); boolean isAvailable(); void destroy();}

这里Dubbo使用的注册中心是Zookeeper,故而destroy会在ZookeeperRegistry类中具体实现:
image

进入到ZookeeperRegistry类,找到registry.destroy()对应的destroy()方法,可以看到,调用destroy(),其本质是关闭zk客户端连接,当客户端关闭之后,其注册到zk里的生产者或者消费者信息,都会被自动删除。

public void destroy() { super.destroy(); try {  // 关闭zk客户端  this.zkClient.close(); } catch (Exception var2) {  logger.warn("Failed to close zookeeper client " + this.getUrl() + ", cause: " + var2.getMessage(), var2); }}

在这里,还有一个需要进一步研究的地方,即 super.destroy(),这个方法实现了什么功能呢?从源码当中,可以看出,其有一行这样的 this.retryFuture.cancel(true)代码,这行代码大概意思是,将失败重试取消方式设置为true,即取消了失败重试的操作,我的理解是,这里是关闭了失败重试,可以在下线过程当中,避免出现因RPC生产者接口缺少而发生反复的失败重试操作,因为到这一步,已经不需要再有失败重试的操作了。

public void destroy() { //移除内存中已经注册的服务,取消所有服务订阅 super.destroy(); try {  //取消失败重试  this.retryFuture.cancel(true); } catch (Throwable var2) {  this.logger.warn(var2.getMessage(), var2); }}

注意一点,这里在取消失败重试机制之前,还执行了一行 super.destroy()代码,这行代码的主要功能包括两个:

第一是移除内存中已经注册的服务,第二是取消所有服务订阅。

我们先来看一下其方法详情:

public void destroy() { if (this.logger.isInfoEnabled()) {  this.logger.info("Destroy registry:" + this.getUrl()); } // 1.移除内存中已经注册的服务 Set<URL> destroyRegistered = new HashSet(this.getRegistered()); if (!destroyRegistered.isEmpty()) {  Iterator var2 = (new HashSet(this.getRegistered())).iterator();  while(var2.hasNext()) {   URL url = (URL)var2.next();   if (url.getParameter("dynamic", true)) {    try {     this.unregister(url);     if (this.logger.isInfoEnabled()) {      this.logger.info("Destroy unregister url " + url);     }    } catch (Throwable var10) {     this.logger.warn("Failed to unregister url " + url + " to registry " + this.getUrl() + " on destroy, cause: " + var10.getMessage(), var10);    }   }  } } //2.取消所有的服务订阅 Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap(this.getSubscribed()); if (!destroySubscribed.isEmpty()) {  Iterator var12 = destroySubscribed.entrySet().iterator();  while(var12.hasNext()) {   Map.Entry<URL, Set<NotifyListener>> entry = (Map.Entry)var12.next();   URL url = (URL)entry.getKey();   Iterator var6 = ((Set)entry.getValue()).iterator();   while(var6.hasNext()) {    NotifyListener listener = (NotifyListener)var6.next();    try {     this.unsubscribe(url, listener);     if (this.logger.isInfoEnabled()) {      this.logger.info("Destroy unsubscribe url " + url);     }    } catch (Throwable var9) {     this.logger.warn("Failed to unsubscribe url " + url + " to registry " + this.getUrl() + " on destroy, cause: " + var9.getMessage(), var9);    }   }  } }}

1.移除内存中已经注册的服务

 // 1.移除内存中已经注册的服务 Set<URL> destroyRegistered = new HashSet(this.getRegistered()); if (!destroyRegistered.isEmpty()) {  Iterator var2 = (new HashSet(this.getRegistered())).iterator();  while(var2.hasNext()) {   URL url = (URL)var2.next();   if (url.getParameter("dynamic", true)) {    try {     this.unregister(url);     if (this.logger.isInfoEnabled()) {      this.logger.info("Destroy unregister url " + url);     }    } catch (Throwable var10) {     this.logger.warn("Failed to unregister url " + url + " to registry " + this.getUrl() + " on destroy, cause: " + var10.getMessage(), var10);    }   }  } }

这部分代码主要是将内存当中的注册信息移除,这部分缓存记录,是在容器启动时,当向注册中心订阅成功后,会同步缓存一份到内存当中。可见,若注册中心挂掉了,Dubbo仍然可以通过缓存获取到远程RPC服务,但是无法获取到新增的RPC服务。

这里主要分析两个方法:this.getRegistered()和 this.unregister(url)。

this.getRegistered()——

private final Set<URL> registered = new ConcurrentHashSet();public Set<URL> getRegistered() { return this.registered;}

这是获取缓存URL的集合。

this.unregister(url)——

public void unregister(URL url) { if (url == null) {  throw new IllegalArgumentException("unregister url == null"); } else {  if (this.logger.isInfoEnabled()) {   this.logger.info("Unregister: " + url);  }  this.registered.remove(url); }}

这是将URL从Set集合当中移除的操作。这部分代码其实我有点想明白,为何还需要从Set获取到所有URL,然后再通过迭代器方式一个一个取出去进行移除,直接将Set置空不是更好些吗?当然,这里面应该还有一些我没有考虑到的细节,还有待进一步进行研究。

2.取消所有服务订阅

 //2.取消所有的服务订阅 Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap(this.getSubscribed()); if (!destroySubscribed.isEmpty()) {  Iterator var12 = destroySubscribed.entrySet().iterator();  while(var12.hasNext()) {   Map.Entry<URL, Set<NotifyListener>> entry = (Map.Entry)var12.next();   URL url = (URL)entry.getKey();   Iterator var6 = ((Set)entry.getValue()).iterator();   while(var6.hasNext()) {    NotifyListener listener = (NotifyListener)var6.next();    try {     this.unsubscribe(url, listener);     if (this.logger.isInfoEnabled()) {      this.logger.info("Destroy unsubscribe url " + url);     }    } catch (Throwable var9) {     this.logger.warn("Failed to unsubscribe url " + url + " to registry " + this.getUrl() + " on destroy, cause: " + var9.getMessage(), var9);    }   }  } }

这部分逻辑与移除内存url都很类型,都是先从缓存里把所有订阅信息都取出来,然后再跌代移除。


二、关闭protocol协议

这部分个关闭,主要是关闭provider和consumer,即对应前边提到的,服务提供方会先标记不再接受新请求,新请求过来直接报错,然后,检查线程池中的线程是否还在运行,如果有,等待线程完成,若超时,则强制关闭;服务消费者则不再发起新请求,同时检测看还有没有请求的响应没有返回,若有,等待返回,若超时,则强制关闭。

下面大概分析一下其源码逻辑。

protocol.destroy(),其方法在接口里定义,具体实现是在RegistryProtocol当中。

@SPI("dubbo")public interface Protocol { int getDefaultPort(); @Adaptive <T> Exporter<T> export(Invoker<T> var1) throws RpcException; @Adaptive <T> Invoker<T> refer(Class<T> var1, URL var2) throws RpcException; void destroy();}

RegistryProtocol的具体实现如下:

public void destroy() { List<Exporter<?>> exporters = new ArrayList(this.bounds.values()); Iterator var2 = exporters.iterator(); while(var2.hasNext()) {  Exporter<?> exporter = (Exporter)var2.next();  e......

原文转载:http://www.shaoqun.com/a/899490.html

跨境电商:https://www.ikjzd.com/

贸发局:https://www.ikjzd.com/w/1621

飞书互动:https://www.ikjzd.com/w/1319

邮乐网购:https://www.ikjzd.com/w/1776


一、注销注册中心publicstaticvoiddestroyAll(){if(LOGGER.isInfoEnabled()){LOGGER.info("Closeallregistries"+getRegistries());}//加锁,防止关闭多次LOCK.lock();try{Iteratorvar0=getRegistries().iterator();//关闭所有已创
网易考拉海购大促:https://www.ikjzd.com/w/1052
扬帆计划:https://www.ikjzd.com/w/1581
shirley:https://www.ikjzd.com/w/1684
一线天在哪_一线天在哪里_一线天在哪个省 :http://www.30bags.com/a/413125.html
一小时之旅 在吉林你想去哪儿:http://www.30bags.com/a/416599.html
一眼沦陷!"山城花冠" 的寒冬,美成了一幅五彩画卷:http://www.30bags.com/a/225000.html
一眼三国 感受吉林的朝鲜族风情(图) - :http://www.30bags.com/a/406628.html
嗯啊,,别舔了,,啊,轻点 那晚我再也忍不住强上了表姐:http://lady.shaoqun.com/m/a/274802.html
深圳锦绣中华火把节活动亮点:http://www.30bags.com/a/527681.html
深圳8月展会排期表2021:http://www.30bags.com/a/527682.html
深圳京剧新龙门客栈演出地点(附交通指南):http://www.30bags.com/a/527683.html
深圳京剧新龙门客栈什么时候演出:http://www.30bags.com/a/527684.html

没有评论:

发表评论