基于netty的rpc框架

基于netty的rpc框架

[TOC]

如果你已经对以下东东有所了解,那么你就可以完成一个rpc框架了

  • Java的反射技术
  • java的动态代理机制
  • 基于nio的框架netty
  • 全世界最好的框架-spring
  • java的序列化

神马是rpc?

  • 在这个大数据时代,很多公司的服务器都是以集群的方式存在的。在我们传统的mvc后台开发中,我们就需要把不同层的服务部署到不同的服务器上面,这个每个服务器的的压力就会比较小了。

    但是这样也会带来一个问题——我在这台机器上如何才能调用到另一台机器的代码呢?这是个问题。

    我们先来举个栗子:

    这里写图片描述

    比如我们在一个传统mvc项目中,我们有一个UserController处理用户的请求,假如它长的这个样子:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    @RestController
    @RequestMapping(value = "/user")
    public class UserController {
    @Autowired
    UserService userService;
    @RequestMapping(value = "/current")
    public AjaxResponse login(String name,String password) {
    return AjaxResponse.success(userService.login(name,password));
    }
    }

    正常情况下,这个UserService的实现肯定是在同一个项目或者是本地的,早就已经被加入到spring容器中了,不过加入我们为了减少服务器的压力,我们将UserService的实现放到另一台服务器上,加入我们有一个膜法,可以在本地的Controller像调用本地方法一样调用另一台的userServiceImpl就好了,Rpc就是这样一种技术。

实现思路

  • 表面上看这是一个很难完成的任务,本机怎么可能可以调用到远程的方法呢?不过如果我们这个任务拆分开来,就会发现只要一步一步来,其实还是挺简单的。

    我们可以换一种思路,既然直接调用不行,我们可以曲线救国呀,我们只要把调用方法的对象的名称,方法的名字,方法的参数与方法的类型都通过网络发送到另一台机器上,另一条机器接收到之后根据请求信息调用该对象的方法,然后在把执行结果通过网路直接返回回来不就ok了。其实,rpc框架的大体思路就是如此。

  • 大体实现流程如下:

    1. 通过java的动态代理机制为我们UserService创建代理对象,在代理对象执行方法的时候实际上已经被我们定制的方法拦截。
    2. 在拦截的逻辑里面,我们在获取到调用的方法的所有接口,方法名,参数集合,参数类型集合后封装到一个JavaBean——request中去,然后我们将这个对象序列化之后通过网络传输到另一台机器上。
    3. 另一台机器接受到这个网络请求后,将数据反序列化为Request对象,从而了解我们请求的是具体是什么对象的什么方法,然后服务器通过反射的方式调用,并将执行结果通过另一个JavaBean——Response返回。
    4. 本机收到服务端的返回。整个rpc调用就完成了。
  • 如下图所示,由于画图水平有限,不过大致就是这个意思:

    这里写图片描述

代码具体实现

  1. 首先我们需要为我们的网络请求分装两个JavaBean,分别为Request与Response.。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    //在Request中应有的属性
    private String requestId;
    private String className;
    private String methodName;
    private Class<?>[] parameterTypes;
    private Object[] parameters;
    //在response应该有的属性
    private String requestId;
    private Throwable error;
    private Object result;
  2. 创建RpcClient,封装我们的网络请求流程。其中最重要的是这个方法:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    public Response send(Request request) throws InterruptedException {
    ClientBootstrap bootstrap = new ClientBootstrap();
    ExecutorService boss = Executors.newCachedThreadPool();
    ExecutorService work = Executors.newCachedThreadPool();
    bootstrap.setFactory(new NioClientSocketChannelFactory(boss,work));
    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
    @Override
    public ChannelPipeline getPipeline() throws Exception {
    ChannelPipeline pipeline = Channels.pipeline();
    pipeline.addLast("decoder",new ResponseDecoder());
    pipeline.addLast("encoder",new RequestEncoder());
    pipeline.addLast("handler",RpcClient.this);
    return pipeline;
    }
    });
    ChannelFuture connect = bootstrap.connect(new InetSocketAddress(address, port)).sync();
    connect.getChannel().write(request).sync();
    //阻塞线程直到完成请求或者请求失败
    synchronized (obj){
    obj.wait();
    }
    connect.getChannel().close().sync();
    return this.response;
    }

    这里用netty3进行的网咯请求,这里ResponseDecoderRequestEncoder是对Response与Request进行的序列化与反序列化,采用的谷歌的Protostuff序列化框架实现(为啥不用java自带的序列化工具呢?因为java自定的序列化附带了很多其他信息,序列化的字节长度比谷歌的长好几倍,所以是为了节约带宽,同时Protostuff的序列化支持多种编程语言)

  3. 创建代理的工具类,返回代理对象。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    public <T>T proxy(Class<?> clazz){
    return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[] { clazz }, new InvocationHandler() {
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    Request request = new Request();
    request.setClassName(method.getDeclaringClass().getName());
    request.setMethodName(method.getName());
    request.setParameters(args);
    request.setRequestId(UUID.randomUUID().toString());
    request.setParameterTypes(method.getParameterTypes());
    RpcClient client =new RpcClient(address,port);
    //通过封装的网络框架进行网络请求
    Response response = client.send(request);
    if (response.getError()!=null){
    throw response.getError();
    }
    else{
    return response;
    }
    }
    });
    }
  4. 服务端在开启服务的时候就需要通过spring扫描所有的service实现类,将其装进spring的容器中。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    Map<String, Object> beansWithAnnotation = applicationContext.getBeansWithAnnotation(RPCService.class);
    for(Map.Entry<String,Object> entry :beansWithAnnotation.entrySet()){
    String interfaceName = entry.getValue().getClass()
    .getAnnotation(RPCService.class).value().getName();
    serviceMap.put(interfaceName,entry.getValue());
    }
    startServer();
    }

    需要发布的服务类都需要使用@RPCService注解,这是一个自定义的注解。

  5. 在服务端收到客户端的网络请求之后,我们就需要从spring容器中找到请求的服务类完成调用并返回执行结果。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception {
    Request request = (Request) event.getMessage();
    Response response = new Response();
    //调用请求类的请求方法执行并返回执行结果
    Object invoke = null;
    try {
    Object requestBean = serviceMap.get(request.getClassName());
    Class<?> requestClazz = Class.forName(request.getClassName());
    Method method = requestClazz.getMethod(request.getMethodName(), request.getParameterTypes());
    invoke = method.invoke(requestBean, request.getParameters());
    response.setRequestId(UUID.randomUUID().toString());
    response.setResult(invoke);
    } catch (Exception e) {
    response.setError(e);
    response.setRequestId(UUID.randomUUID().toString());
    }
    System.out.println(request+""+response);
    //返回执行结果
    ctx.getChannel().write(response);
    }

总结

  • 整体的流程还是比较简单的,就是具体实现的时候会有一些细节问题需要好好处理。虽然是第一次写这种轮子程序,不过感觉还是不错的。完整代码已上传到我的GitHub仓库里面,有兴趣的小伙伴可以去看看。