通过zookeeper完成动态感知分布式服务器务器上下线的功能

通过zookeeper完成动态感知分布式服务器上下线的功能

[TOC]

业务描述

  • 某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。当主节点下线的时候,客户端会收到通知,并更新目前在还在线的主节点host信息,可以防止客户端向已经挂掉的节点进行请求。

服务器端的实现

  • 要想完成上述功能,在我们可以想到通过zookeeper的短暂态的节点完成,在服务器启动的时候,我们连接到zookeeper并向其注册一个短暂态的节点,当服务器因为某种意外宕机的时候,这个节点也会被删除,这样客户端访问所有的注册节点的信息,就是仍然在正常工作的主节点。

  • 服务器端的代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    import java.io.IOException;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooDefs.Ids;
    import org.apache.zookeeper.ZooKeeper;
    public class ClusterServer {
    //zookeeper的ip与端口,这里是zookeeper是一个集群
    private String ZK_SERVER_LIST = "mini1:2181,mini2:2181,mini3:2181";
    //设置超时时间,当主节点2s后没反应就认该节点已经挂了
    private static final int sessionTimeout = 2000;
    //所有的注册节点信息当道/servers下面,方便管理
    private String SERVER_DIR = "/servers";
    private ZooKeeper zk;
    public void connect() throws IOException {
    zk = new ZooKeeper(ZK_SERVER_LIST, sessionTimeout, new Watcher() {
    //当zookeeper服务器集群有断线时会调用
    @Override
    public void process(WatchedEvent event) {
    // TODO Auto-generated method stub
    System.out.println(event.toString());
    }
    });
    }
    public void register(String hontname) throws IOException, KeeperException, InterruptedException {
    //创建瞬时态且带序号的节点,这样在节点下线后该节点就会被删除
    String result = zk.create(SERVER_DIR+"/server", hontname.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    System.out.println("创建结果:"+result);
    }
    public void diy(){
    System.out.println("just do it");
    try {
    Thread.sleep(Integer.MAX_VALUE);
    } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }
    }
    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
    ClusterServer server =new ClusterServer();
    server.connect();
    server.register(args[0]);
    server.diy();
    }
    }
  • 程序启动的时候就向服务器把当前主节点的host信息注册到zookeeper中。

  • 这里的hostname本来可以通过java api进行动态获取的,不过为了方便实验就省略了。

客户端的实现

  • 客户端所需要的工作是获取正常工作的主节点并且当主节点发生变化的时候可以收到信息,获取最新的正常工作的主节点。所以我们可以对servers下面的子节点信息进行监听,

  • 代码实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    public class ClusterClient {
    private String ZK_SERVER_LIST = "mini1:2181,mini2:2181,mini3:2181";
    private static final int sessionTimeout = 2000;
    private String SERVER_DIR = "/servers";
    private ZooKeeper zk;
    private List<String> servers=new ArrayList<>();
    public void connect() throws IOException {
    zk = new ZooKeeper(ZK_SERVER_LIST, sessionTimeout, new Watcher() {
    @Override
    public void process(WatchedEvent event) {
    System.out.println(event.toString());
    try {
    //收到通知后再次对服务器设置监听。
    getServerList();
    System.out.println(servers);
    } catch (KeeperException | InterruptedException e) {
    }
    }
    });
    }
    //获取服务器列表并设置监听
    public void getServerList() throws KeeperException, InterruptedException{
    List<String> nodeList = zk.getChildren(SERVER_DIR,true);
    servers.clear();
    for(String node:nodeList){
    byte []data = zk.getData(SERVER_DIR+"/"+node, false, null);
    String hostName = new String(data);
    servers.add(hostName);
    }
    }
    //做自己爱做的事同时完成守护进程的功能
    public void diy(){
    new Thread(new Runnable() {
    @Override
    public void run() {
    // TODO Auto-generated method stub
    while (true){
    }
    }
    }).start();;
    }
    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
    ClusterClient client =new ClusterClient();
    client.connect();
    client.getServerList();
    client.diy();
    }
    }

测试

  • 我们可以在多个终端分别运行服务器程序,用来模拟多个主节点,直接关闭终端模仿主节点的宕机。

  • 实验结果如下(当我们动态增加节点的结果):

    这里写图片描述

  • 所有需要的jar包在zookeeper的解压包里面都有。或者直接在我的github里面有代码以及jar包。