文章资讯
RPC中实现提供者信息变化后通知消费者
2022-07-22 10:14  浏览:6
目录,Zookeeper的Watcher,消费者端的事件监听(观察者模式),

目录

Zookeeper的Watcher

消费者端的事件监听(观察者模式)


Zookeeper的Watcher

使用Java可以链接Zookeeper并且进行节点的增删改查以及监听,目前有三套API可以使用。

这里用的是CuratorFramework。 这个client就是一个CuratorFramework的实现。

 zk收到节点变更后就会执行 process方法。这样在process方法中执行消费者本地的缓存修改即可。为了解耦,采用了事件监听器。 在process中需要注意zk节点的消息通知是一次性的,所以在执行完process之后 需要重新监听该节点。

    public void watchNodeDataChange(String newServerNodePath) {
        zkClient.watchNodeData(newServerNodePath, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                String path = watchedEvent.getPath();
                System.out.println("[watchNodeDataChange] 监听到zk节点下的" + path + "节点数据发生变更");
                String nodeData = zkClient.getNodeData(path);
                // 把 url字符串 转化为一个 providerNodeInfo
                ProviderNodeInfo providerNodeInfo = URL.buildURLFromUrlStr(nodeData);
                IRpcEvent iRpcEvent = new IRpcNodeChangeEvent(providerNodeInfo);
                IRpcListenerLoader.sendEvent(iRpcEvent);
                //因为zk节点的消息通知其实是只具有一次性的功效,所以
                // 可能会出现第一次修改节点之后发送一次通知,
                // 之后再次修改节点不再会发送节点变更通知操作。
                ZookeeperRegister.this.watchNodeDataChange(newServerNodePath);
            }
        });
    }

消费者端的事件监听(观察者模式)

事件机制三大要素是事件源,事件监听器和事件对象,事件监听器注册到事件源上,如果事件源上发生了变化就会产生某些事件,会交给相应的事件监听器去处理。

public static void sendEvent(IRpcEvent iRpcEvent) {
        if (CommonUtils.isEmptyList(iRpcListenerList)) {
            //iRpcListenerList 里面有多种事件监听器
            return;
        }
        for (IRpcListener
   iRpcListener : iRpcListenerList) {
            //通过 事件监听器的参数类型确定该事件应该倍哪一个事件监听器处理
            Class
   type = getInterfaceT(iRpcListener);
            if (type.equals(iRpcEvent.getClass())) {
                // 使用线程池进行异步事件处理
                eventThreadPool.execute(() -> {
                    try {
                        iRpcListener.callBack(iRpcEvent.getData());
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
            }
        }
    }

 

,目录