使用Zookeeper实现来监控服务器集群及负载状态[转]

本例子就是利用Zookeeper的文件目录特性和事件通知机制,来实现服务器集群的监控的一个简单例子。在实际的开发中,可借鉴此例子的思想来实现自己的集群负载均衡管理器。
实现思路

  • 在Zookeeper里面创建名为/sgroup的永久节点,表示是整个服务器集群的根节点
  • 每一个服务器节点启动时,在/sgroup的节点下创建自己的EPHEMERAL节点,表示此服务器在运行状态,并每隔10秒上传自己的负载信息,存为此节点的数据。EPHEMERAL有个重要的特性,当创建此类节点的客户端与Zookeeper服务器的连接关闭时,此节点自动删除,可利用此特性来监控服务器的上下线状态。
  • 监控服务器一直不断的监视集群节点/sgroup下子节点的状态,达到监控相应的服务器的运行状态和负载情况的目录,如有异常,可迅速地启动报警机制。

首先我们来创建自己的服务器,代码如下:
[su_spoiler title=”CODE”]

package com.laizs.test.zookeeper;
import java.io.IOException;
import java.util.Random;
import java.util.Scanner;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
public class AppServer {
    /**
     * zookeeper中集群服务器的总节点
     */
    private String groupNode = "sgroup";
    private ZooKeeper zooKeeper;
    /**
     * 服务器创建的节点的路径
     */
    private String serverNodePath="";
    /**
     * 当前服务器的负载
     */
    private int loadBalance=0;
    /**
     * 连接zookeeper服务器,并在集群总结点下创建EPHEMERAL类型的子节点,把服务器名称存入子节点的数据
     * @param zookeeperServerHost
     * @param serverName
     * @throws IOException
     * @throws KeeperException
     * @throws InterruptedException
     */
    public void connectZookeeper(String zookeeperServerHost, String serverName)
            throws IOException, KeeperException, InterruptedException {
         zooKeeper = new ZooKeeper(zookeeperServerHost, 5000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                // 啥都不做
 
            }
        });
        // 先判断sgroup节点是否存在
        String groupNodePath = "/" + groupNode;
        Stat stat = zooKeeper.exists(groupNodePath, false);
        if (null == stat) {
            zooKeeper.create(groupNodePath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
        // 将server的地址数据关联到新创建的子节点上 
        serverNodePath=zooKeeper.create(groupNodePath+"/"+serverName, 
                String.valueOf(loadBalance).getBytes("utf-8"), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        System.out.println("创建了server节点:"+serverNodePath);
        //定时上传服务器的负载
        uploadBarance();
    }
    /**
     * 关闭于zookeeper服务器的连接
     * @throws InterruptedException
     */
    public void closeZookeeper() throws InterruptedException{
        if(null!=zooKeeper){
            zooKeeper.close();
        }
    }
    /**
     * 每隔10秒上传一次负载
     * 
     */
    private void uploadBarance(){
        new Thread(new  Runnable() {
            public void run() {
                while(true){
                    try {
                        Thread.sleep(10000);
                        loadBalance=new Random().nextInt(100000);
                        String l=String.valueOf(loadBalance);
                        System.out.println("服务器上传负载:"+loadBalance);
                        zooKeeper.setData(serverNodePath, l.getBytes("utf-8"), -1);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
 
 
            }
        }).start();
    }
 
    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
          System.out.print("请输入服务器名称(如server001):");
          Scanner scan = new Scanner(System.in);
          String serverName = scan.nextLine();
          AppServer appServer=new AppServer();
          appServer.connectZookeeper("192.168.0.5:2181", serverName);
          while(true){
              System.out.println("请输入您的操作指令(exit 退出系统):");
              String command = scan.nextLine();
              if("exit".equals(command)){
                  System.out.println("服务器关闭中....");
                  appServer.zooKeeper.close();
                  System.exit(0);
                  break;
              }else{
                  continue;
              }
          }
    }
}

[/su_spoiler]
然后,我们再完成监控服务器的代码:
[su_spoiler title=”CODE”]

package com.laizs.test.zookeeper;
 
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
 
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
 
/**
 * 服务器集群监听管理器
 * @author laizs
 * @time 2016年3月17日上午11:02:01
 * @file AppServerMonitor.java
 */
public class AppServerMonitor implements Watcher{
    private String groupNode = "sgroup";
    private ZooKeeper zk;
    private Stat stat = new Stat();
    //服务器信息,包含了服务器名称、负载两个信息,使用map存储,key是服务器节点path,value是服务器信息对象
    private volatile Map<String,ServerInfo> serverList=new TreeMap<String, ServerInfo>();
 
    /**
     * 连接zookeeper服务器
     * 
     * @throws IOException
     * @throws InterruptedException 
     * @throws KeeperException 
     */
    public void connectZookeeper() throws IOException, KeeperException, InterruptedException {
        zk = new ZooKeeper("192.168.0.5:2181", 5000, this);
        //查看要检测的服务器集群的根节点是否存在,如果不存在,则创建
        if(null==zk.exists("/"+groupNode, false)){
            zk.create("/"+groupNode, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
        updateServerList();
    }
    /**
     * 更新服务器列表信息
     * @throws KeeperException
     * @throws InterruptedException
     * @throws UnsupportedEncodingException
     */
    private void updateServerList() throws KeeperException, InterruptedException, UnsupportedEncodingException {
        Map<String,ServerInfo> newServerList=new TreeMap<String,ServerInfo>();
        // 获取并监听groupNode的子节点变化  
        // watch参数为true, 表示监听子节点变化事件.   
        // 每次都需要重新注册监听, 因为一次注册, 只能监听一次事件, 如果还想继续保持监听, 必须重新注册  
        List subList=zk.getChildren("/"+groupNode,true);
        for(String subNode:subList){
            ServerInfo serverInfo=new ServerInfo();
            serverInfo.setPath("/"+groupNode+"/"+subNode);
            serverInfo.setName(subNode);
            //获取每个子节点下关联的服务器负载的信息
            byte[] data=zk.getData(serverInfo.getPath(), true, stat);
            String loadBalance=new String(data,"utf-8");
            serverInfo.setLoadBalance(loadBalance);
            newServerList.put(serverInfo.getPath(), serverInfo);
 
        }
        // 替换server列表  
        serverList=newServerList;
        System.out.println("$$$更新了服务器列表:"+serverList);
    }
    /**
     * 更新服务器节点的负载数据
     * @param serverNodePath
     * @throws InterruptedException 
     * @throws KeeperException 
     * @throws UnsupportedEncodingException 
     */
    private void updateServerLoadBalance(String serverNodePath) throws KeeperException, InterruptedException, UnsupportedEncodingException{
        ServerInfo serverInfo=serverList.get(serverNodePath);
        if(null!=serverInfo){
            //获取每个子节点下关联的服务器负载的信息
            byte[] data=zk.getData(serverInfo.getPath(), true, stat);
            String loadBalance=new String(data,"utf-8");
            serverInfo.setLoadBalance(loadBalance);
            serverList.put(serverInfo.getPath(), serverInfo);
            System.out.println("@@@更新了服务器的负载:"+serverInfo);
            System.out.println("------");
            System.out.println("###更新服务器负载后,服务器列表信息:"+serverList);
        }
    }
 
    @Override
    public void process(WatchedEvent event) {
        System.out.println("监听到zookeeper事件-----eventType:"+event.getType()+",path:"+event.getPath());
        //集群总节点的子节点变化触发的事件
        if (event.getType() == EventType.NodeChildrenChanged && 
                event.getPath().equals("/" + groupNode)) {
             //如果发生了"/sgroup"节点下的子节点变化事件, 更新server列表, 并重新注册监听  
            try {
                updateServerList();
            } catch (Exception e) {
                e.printStackTrace();
            } 
        }
        if (event.getType() == EventType.NodeDataChanged && 
                event.getPath().startsWith("/" + groupNode)) {
             //如果发生了服务器节点数据变化事件, 更新server列表, 并重新注册监听  
            try {
                updateServerLoadBalance(event.getPath());
            } catch (Exception e) {
                e.printStackTrace();
            } 
        }
    }
    /**
     * client的工作逻辑写在这个方法中 
     * 此处不做任何处理, 只让client sleep 
     * @throws InterruptedException 
     */
    public void handle() throws InterruptedException{
        Thread.sleep(Long.MAX_VALUE);
    }
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        AppServerMonitor ac=new AppServerMonitor();
        ac.connectZookeeper();
        ac.handle();
    }
    /**
     * 内部类,服务器信息
     * @author Administrator
     *
     */
    class ServerInfo{
        //服务节点在zookeeper上的路径
        private String path;
        //服务器名称
        private String name;
        //服务器负载量
        private String loadBalance;
 
        public String getPath() {
            return path;
        }
        public void setPath(String path) {
            this.path = path;
        }
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
        public String getLoadBalance() {
            return loadBalance;
        }
        public void setLoadBalance(String loadBalance) {
            this.loadBalance = loadBalance;
        }
        @Override
        public String toString() {
            return " [服务器节点路径=" + path + ", 服务器名称=" + name + ", 服务器负载=" + loadBalance + "]";
        }
    }
}

[/su_spoiler]
代码搞定,我们运行AppServerMonitor代表我们的监控服务器;然后分别启动两次AppServer,表示运行了两个服务器,然后再关闭其中一个的运行。
AppServerMonitor控制台输出的信息如下:
[su_spoiler title=”CODE”]

INFO : (ClientCnxn.java:1235)    Session establishment complete on server 192.168.0.5/192.168.0.5:2181, sessionid = 0x153a16c686e000b, negotiated timeout = 5000
监听到zookeeper事件-----eventType:None,path:null
$$$更新了服务器列表:{}
监听到zookeeper事件-----eventType:NodeChildrenChanged,path:/sgroup
$$$更新了服务器列表:{/sgroup/server001= [服务器节点路径=/sgroup/server001, 服务器名称=server001, 服务器负载=0]}
监听到zookeeper事件-----eventType:NodeDataChanged,path:/sgroup/server001
@@@更新了服务器的负载: [服务器节点路径=/sgroup/server001, 服务器名称=server001, 服务器负载=64581]
------
###更新服务器负载后,服务器列表信息:{/sgroup/server001= [服务器节点路径=/sgroup/server001, 服务器名称=server001, 服务器负载=64581]}
监听到zookeeper事件-----eventType:NodeChildrenChanged,path:/sgroup
$$$更新了服务器列表:{/sgroup/server001= [服务器节点路径=/sgroup/server001, 服务器名称=server001, 服务器负载=64581], /sgroup/server002= [服务器节点路径=/sgroup/server002, 服务器名称=server002, 服务器负载=0]}
监听到zookeeper事件-----eventType:NodeDataChanged,path:/sgroup/server001
@@@更新了服务器的负载: [服务器节点路径=/sgroup/server001, 服务器名称=server001, 服务器负载=38594]
------
###更新服务器负载后,服务器列表信息:{/sgroup/server001= [服务器节点路径=/sgroup/server001, 服务器名称=server001, 服务器负载=38594], /sgroup/server002= [服务器节点路径=/sgroup/server002, 服务器名称=server002, 服务器负载=0]}
监听到zookeeper事件-----eventType:NodeDeleted,path:/sgroup/server001
监听到zookeeper事件-----eventType:NodeChildrenChanged,path:/sgroup
$$$更新了服务器列表:{/sgroup/server002= [服务器节点路径=/sgroup/server002, 服务器名称=server002, 服务器负载=0]}
监听到zookeeper事件-----eventType:NodeDataChanged,path:/sgroup/server002
@@@更新了服务器的负载: [服务器节点路径=/sgroup/server002, 服务器名称=server002, 服务器负载=43131]
------
###更新服务器负载后,服务器列表信息:{/sgroup/server002= [服务器节点路径=/sgroup/server002, 服务器名称=server002, 服务器负载=43131]}

[/su_spoiler]
由此可见,AppServerMonitor能及时地观察到集群服务器的状态的变化。

关于xmsg

技术面前人人平等.同时技术也不分高低贵贱.正所谓学无大小,达者为尊.
此条目发表在产品设计, 集群配置分类目录,贴了, , 标签。将固定链接加入收藏夹。