Golang 从零到一开发实现 RPC 框架

技术岁月

共 8745字,需浏览 18分钟

 · 2022-04-01

内容提要

RPC 框架是分布式领域核心组件,也是微服务的基础。今天尝试从零撸一个 RPC 框架,剖析其核心原理及代码实现,后续还会逐步迭代追加微服务治理等功能,将之前文章覆盖的熔断、限流、负载均衡、注册发现等功能融合进来,打造一个五脏俱全的 RPC 框架。本文主要内容包括

  • RPC 实现原理
  • RPC 协议设计

  • RPC 服务端实现

  • RPC 客户端实现

实现原理

RPC (Remote Procedure Call)全称是远程过程调用,相对于本地方法调用,在同一内存空间可以直接通过方法栈实现调用,远程调用则跨了不同的服务终端,并不能直接调用。


32b4a3bb202645fb820775df802b8121.webp


RPC框架 要解决的就是远程方法调用的问题,并且实现调用远程服务像调用本地服务一样简单,框架内部封装实现了网络调用的细节(透明化远程调用),其核心过程原理如下图所示。

62ca7491c4ad206252956fdc78af7255.webp

这个版本可以称为 “P2P RPC” ,而生产环境部署往往会将服务提供者(Server)部署多个实例(集群部署),那么客户端就需要具备发现服务端的能力和负载均衡的支持,所以有了服务注册发现负载均衡

485e57b79c841683f1e3986183bce2ff.webp


再然后,为了保障 RPC 调用的可靠性和稳定性,增加了服务监控和服务容错治理的能力,考虑性能提升的异步化能力以及考虑可扩展性的插件化管理,这些完善构成了更完整的微服务 RPC 框架。

RPC 协议实现

协议设计

协议设计算是 RPC 最重要的一部分了,它主要解决服务端与客户端通信的问题。一般来说通讯要解决如下问题:

5dff3a3a03cf042554502e081617691e.webp


1. 网络传输协议

基于 TCP、UDP 还是 HTTP,UDP 要自己解决可靠性传输问题,而 HTTP 又太重,包含很多没必要的头信息,所以一般 RPC 框架会优先选择
TCP 协议

(当然也有大名鼎鼎的 gRPC 基于 HTTP2)


2. 序列化协议

网络传输数据必须是二进制数据,而执行过程是编程语言的对象方法,那么就涉及到如何将对象序列化成可传输消息(二进制),并可反序列化还原。常见的通用型协议如 XML、 JSON、Protobuf、Thrift 等,也有语言绑定的如 Python 原生支持的 pickle 协议, Java 实现的 Serializbale 接口及 Hessian 协议,Golang 原生支持的 Gob 协议等。


3. 消息编码协议

它是一种客户端和服务端的调用约定,比如请求和参数如何组织,Header 放置什么内容。这部分每个框架设计均不同,有时也称这一层为狭义的 RPC 协议层。


另外客户端发起调用一般来说要知道调用的具体类方法(请求标识符)以及入参(Payload),而网络传输的是二进制字节流,如何能从这些字节中找出哪些是方法名,哪些是参数?进一步如果客户端不断的发送消息,如何将每一条
消息分割?(解决 TCP 粘包问题)


采用定长消息很容易解决,但事先并不能确定要固定多长,所以这种方式并不可行。消息加分隔符可以实现,但要确保分隔符不会与正文冲突。而最常用的实现方案就是
定长的头标识出不定长的体,比如用 int32 (定长 4 字节)标识后面的内容长度,这样就能较优雅实现消息分割了。

(注:这个方案中如果消息体的长度大于 2^32 会发生溢出而导致解析失败,可以换更长类型,但理论上总会有溢出风险,设计使用时应该限制避免传输过大数据体)

协议实现

网络传输协议,这里使用 TCP 协议即可,没有太多争议,可预留接口支持。

序列化协议,这里使用 Golang 专有的 Gob 协议,保留接口后期可以扩展支持 JSON、Protobuf 等协议。

type Codec interface {
    Encode(i interface{}) ([]byte, error)
    Decode(data []byte, i interface{}) error
}
type GobCodec struct{}
func (c GobCodec) Encode(i interface{}) ([]byte, error) {
    var buffer bytes.Buffer
    encoder := gob.NewEncoder(&buffer)
    if err := encoder.Encode(i); err != nil {
        return nil, err 
    }   
    return buffer.Bytes(), nil 
}
func (c GobCodec) Decode(data []byte, i interface{}) error {
    buffer := bytes.NewBuffer(data)
    decoder := gob.NewDecoder(buffer)
    return decoder.Decode(i)
}

codec/codec.go

RPC 消息格式编码设计如下,协议消息头定义定长 5 字节(byte),依次放置魔术数(用于校验),协议版本,消息类型(区分请求/响应),压缩类型,序列化协议类型,每个占 1 个字节(8 个 bit)。可扩展追加 消息 ID 以及 元数据 等信息用于做服务治理。83d26cac629fdbfdae87cd48e8f2dfa9.webp
const (
    HEADER_LEN = 5
)
const (
    magicNumber byte = 0x06
)
type MsgType byte
const (
    Request MsgType = iota
    Response
)
type CompressType byte
const (
    None CompressType = iota
    Gzip
)
type SerializeType byte
const (
    Gob SerializeType = iota
    JSON
)
type Header [HEADER_LEN]byte
func (h *Header) CheckMagicNumber() bool {
    return h[0] == magicNumber
}
func (h *Header) Version() byte {
    return h[1]
}
func (h *Header) SetVersion(version byte) {
    h[1] = version
}
//省略 MsgType,CompressType,SerializeType

protocol/header.go

定义协议消息格式,除了协议头,还包括调用的服务类名、方法名以及参数(Payload)。
type RPCMsg struct {
    *Header
    ServiceClass  string
    ServiceMethod string
    Payload       []byte
}
func NewRPCMsg() *RPCMsg {
    header := Header([HEADER_LEN]byte{})
    header[0] = magicNumber
    return &RPCMsg{
        Header: &header,
    }
}

protocol/msg.go

实现传输

定义好协议后,要解决的问题就是如何通过网络(IO)发送和接收,实现通信的目的。

func (msg *RPCMsg) Send(writer io.Writer) error {
    //send header
    _, err := writer.Write(msg.Header[:])
    if err != nil {
        return err
    }
    //写入消息体总长度,方便一次性解析
    dataLen := SPLIT_LEN + len(msg.ServiceClass) + SPLIT_LEN + len(msg.ServiceMethod) + SPLIT_L
EN + len(msg.Payload)
    err = binary.Write(writer, binary.BigEndian, uint32(dataLen)) //4
    if err != nil {
        return err
    }
    //write service.class len
    err = binary.Write(writer, binary.BigEndian, uint32(len(msg.ServiceClass)))
    if err != nil {
        return err
    }
    //write service.class content
    err = binary.Write(writer, binary.BigEndian, util.StringToByte(msg.ServiceClass))
    if err != nil {
        return err
    }
    //省略 service.method,payload 
}

protocol/msg.go

其中类名、方法名、payload 均为不定长部分,要想顺利解析就需要一一对应的长度字段标识不定长的长度,也就是 SPLIT_LEN 代表各部分长度,是 int32 类型(32 bit),正好相当于 4 个 byte,所以 SPLIT_LEN 为 4


另外要注意网络传输一般使用大端字节序。先理解字节序即为字节(byte)的组成顺序,分为大端序(最高有效位放低地址)和小端序(最低有效位放低地址)。CPU 一般采用小端序读写,而 TCP 网络传输采用大端序则更方便。对应这里的 binary.BigEndian 代码实现大端序。


消息读取后反解析,按发送顺序依次还原 Header、类名、方法名、Payload,不定长部分都有对应的长度保存,因此可以顺利解析到所有数据。

func Read(r io.Reader) (*RPCMsg, error) {
    msg := NewRPCMsg()
    err := msg.Decode(r)
    if err != nil {
        return nil, err
    }
    return msg, nil
}
func (msg *RPCMsg) Decode(r io.Reader) error {
    //read header
    _, err := io.ReadFull(r, msg.Header[:])
    if !msg.Header.CheckMagicNumber() { //magicNumber
        return fmt.Errorf("magic number error: %v", msg.Header[0])
    }
    //total body len    
headerByte := make([]byte4)
    _, err = io.ReadFull(r, headerByte)
    if err != nil {
        return err
    }
    bodyLen := binary.BigEndian.Uint32(headerByte)
    //一次将整个body读取,再依次拆解
    data := make([]byte, bodyLen)
    _, err = io.ReadFull(r, data)
    //service.class len
    start := 0
    end := start + SPLIT_LEN
    classLen := binary.BigEndian.Uint32(data[start:end]) //0,4
    //service.class
    start = end
    end = start + int(classLen)
    msg.ServiceClass = util.ByteToString(data[start:end]) //4,x
    //省略 method,payload}

protocol/msg.go

解决了最复杂的协议部分,下面依次来看服务端和客户端的实现。

服务端实现

服务端实现主要包括服务启停(端口监听)、服务注册、响应连接和处理请求几部分。

定义服务接口

服务接口提供服务启停和处理方法注册的能力。

type Server interface {
    Register(stringinterface{})
    Run()
    Close()
}

provider/server.go

服务启停

实现服务启停,关键在于通过 ip 和端口开启监听,这里通过 Listener 封装 net 包开启 tcp Listen。

type RPCServer struct {
    listener Listener
}
func NewRPCServer(ip string, port int) *RPCServer {
    return &RPCServer{
        listener: NewRPCListener(ip, port),
    }   
}
func (svr *RPCServer) Run() {
    go svr.listener.Run()
}
func (svr *RPCServer) Close() {
    if svr.listener != nil {
        svr.listener.Close()
    }
}

provider/server.go

type Listener interface {
    Run()
    SetHandler(string, Handler)
    Close()
}
type RPCListener struct {
    ServiceIp   string
    ServicePort int
    Handlers    map[string]Handler
    nl          net.Listener
}
func NewRPCListener(serviceIp string, servicePort int) *RPCListener {
    return &RPCListener{ServiceIp: serviceIp,
        ServicePort: servicePort,
        Handlers:    make(map[string]Handler)}
}
func (l *RPCListener) Run() {
    addr := fmt.Sprintf("%s:%d", l.ServiceIp, l.ServicePort)
    nl, err := net.Listen(config.NET_TRANS_PROTOCOL, addr) //tcp
    if err != nil {
        panic(err)
    }   
    l.nl = nl
    for {
        conn, err := l.nl.Accept()
        if err != nil {
            continue
        }  
        go l.handleConn(conn)
    }
}
func (l *RPCListener) Close() { if l.nl != nil {    
l.nl.Close()
}
}

provider/listener.go


这里通过为每个连接创建一个协程处理请求,得益于 Golang 的协程优势,Thread-Per-Message 模式来满足并发请求更容易实现(Java 线程成本太大,一般采用线程池实现 Worker Thread 模式)。

服务注册

服务注册就是在内存中维护一个映射关系,map (key=服务名,value=对象实例),通过 interface{} 泛化,可以反射还原。
func (svr *RPCServer) Register(class interface{}) {
    name := reflect.Indirect(reflect.ValueOf(class)).Type().Name()
    svr.RegisterName(name, class)
}
func (svr *RPCServer) RegisterName(name string, class interface{}) {
    handler := &RPCServerHandler{class: reflect.ValueOf(class)}
    svr.listener.SetHandler(name, handler)
    log.Printf("%s registered success!\n", name)
}
func (l *RPCListener) SetHandler(name string, handler Handler) {
    if _, ok := l.Handlers[name]; ok {
        log.Printf("%s is registered!\n", name)
        return
    }
    l.Handlers[name] = handler
}

provider/server.go

(1)由于服务启动初始化时进行所有服务注册,所以用 map 没考虑并发,否则如果有动态注册就需要考虑并发问题了。
(2)这里没有注册到服务注册中心,设计考虑将以应用服务(系统)为单位进行注册,而具体的服务接口通过应用内存映射。这种注册粒度大,优点就是减少对注册中心的依赖和注册实例数量,提高服务发现资源利用率。(dubbo 3 重要改进就是将接口级服务发现切换为应用级服务发现)

响应连接请求

整个过程依次涉及从网络连接读取数据,反序列化获得请求结构体 (RPCMsg),根据注册类和方法找到目标函数并执行,将执行结果序列化后封装成 RPCMsg 通过网络发送,整个过程是同步 io 模型。

func (l *RPCListener) handleConn(conn net.Conn) {
    defer catchPanic()
    for {
        msg, err := l.receiveData(conn)
        if err != nil || msg == nil {
            return
        }
        coder := global.Codecs[msg.Header.SerializeType()]
        if coder == nil {
            return
        }
        inArgs := make([]interface{}, 0)
        err = coder.Decode(msg.Payload, &inArgs)
        if err != nil {
            return
        }
        handler, ok := l.Handlers[msg.ServiceClass]
        if !ok {
            return
        }
        result, err := handler.Handle(msg.ServiceMethod, inArgs)
        encodeRes, err := coder.Encode(result) 
        if err != nil {
            return
        }
        err = l.sendData(conn, encodeRes)
        if err != nil {
            return
        }
    }
}

provider/listener.go

其中实际执行本地方法过程如下:
func (handler *RPCServerHandler) Handle(method string, params []interface{}) ([]interface{}, error) {
    args := make([]reflect.Value, len(params))
    for i := range params {
        args[i] = reflect.ValueOf(params[i])
    } 
    reflectMethod := handler.class.MethodByName(method)    
    result := reflectMethod.Call(args)
    resArgs := make([]interface{}, len(result))
    for i := 0; i < len(result); i++ {
        resArgs[i] = result[i].Interface()
    } 
    var err error
    if _, ok := result[len(result)-1].Interface().(error); ok {
        err = result[len(result)-1].Interface().(error)
    }
    return resArgs, err
}

provider/handler.go

收发网络请求通过调用之前封装的 Protocol 来完成。
func (l *RPCListener) receiveData(conn net.Conn) (*protocol.RPCMsg, error) {
    msg, err := protocol.Read(conn)
    if err != nil {
        if err != io.EOF { //close
            return nil, err
        }
    }
    return msg, nil
}
func (l *RPCListener) sendData(conn net.Conn, payload []byte) error {
    resMsg := protocol.NewRPCMsg()
    resMsg.SetVersion(config.Protocol_MsgVersion)
    resMsg.SetMsgType(protocol.Response)
    resMsg.SetCompressType(protocol.None)
    resMsg.SetSerializeType(protocol.Gob)
    resMsg.Payload = payload
    return  resMsg.Send(conn)
}

provider/listener.go

测试服务端

通过环境变量注入 ip 和 port,开启服务监听,依次注册几个服务。

func main() {
    flag.Parse()
    if ip == "" || port == 0 {
        panic("init ip and port error")
    }
    srv := provider.NewRPCServer(ip, port)
    srv.RegisterName("User", &UserHandler{})
    srv.RegisterName("Test", &TestHandler{})
    gob.Register(User{})
    go srv.Run()
    quit := make(chan os.Signal)
    signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGQUIT)
    <-quit
    srv.Close()
}

server.go

这里注册两个结构体 User 和 Test,特别注意:只有可导出的类方法(首字母大写)才能被客户端调用执行,否则会找不到对应类方法而失败。此外 User 作为接口值实现传输必须注册才行(gob.Register(User{}))。

type TestHandler struct{}
func (t *TestHandler) Hello() string {
    return "hello world"
}
type User struct {
    ID   int    `json:"id"`
    Name string `json:"name"`
    Age  int    `json:"age"`
}
var userList = map[int]User{
    1: User{1"hero"11},
    2: User{2"kavin"12},
}
type UserHandler struct{}
func (u *UserHandler) GetUserById(id int) (User, error) {
    if u, ok := userList[id]; ok {
        return u, nil
    }
    return User{}, fmt.Errorf("id %d not found", id)
}

server.go

c66f533472097552d95617aa60ea282f.webp

客户端实现

客户端发起 RPC 调用,就像调本地服务一样,所以需要定义一个 stub,该 stub 同请求服务端方法签名一致,然后通过代理实现网络请求和解析。

var Hello func() string
r, err := cli.Call(ctx, "UserService.Test.Hello", &Hello)

var GetUserById func(id int) (User, error)
_, err := cli.Call(ctx, "UserService.User.GetUserById", &GetUserById)
u, err := GetUserById(2)

定义客户端

定义客户端接口,其中 Invoke 代理执行 RPC 请求。

type Client interface {
    Connect(string) error
    Invoke(context.Context, *Service, interface{}, ...interface{}) (interface{}, error)
    Close()
}

consumer/client.go

定义连接参数,设置重试次数、超时时间、序列化协议、压缩类型等。
type Option struct {
    Retries           int
    ConnectionTimeout time.Duration
    SerializeType     protocol.SerializeType
    CompressType      protocol.CompressType
}
var DefaultOption = Option{
    Retries:           3,
    ConnectionTimeout: 5 * time.Second,
    SerializeType:     protocol.Gob,
    CompressType:      protocol.None,
}
type RPCClient struct {
    conn   net.Conn
    option Option
}
func NewClient(option Option) Client {
    return &RPCClient{option: option}
}

consumer/client.go

执行请求

实现网络连接、关闭以及执行部分。

func (cli *RPCClient) Connect(addr string) error {
    conn, err := net.DialTimeout(config.NET_TRANS_PROTOCOL, addr, cli.option.ConnectionTimeout)
    if err != nil {
        return err
    }
    cli.conn = conn
    return nil
}
func (cli *RPCClient) Invoke(ctx context.Context, service *Service, stub interface{}, params ...interface{}) (interface{}, error) {
    cli.makeCall(service, stub)
    return cli.wrapCall(ctx, stub, params...)
}
func (cli *RPCClient) Close() {
    if cli.conn != nil {
        cli.conn.Close()
    }
}

consumer/client.go

执行代理过程,主要依赖反射实现。这里 cli.makeCall() 主要是通过反射来生成代理函数,在代理函数中完成网络连接、请求数据序列化、网络传输、响应返回数据解析的工作,然后通过 cli.wrapCall() 发起实际调用。
func (cli *RPCClient) makeCall(service *Service, methodPtr interface{}) {
    container := reflect.ValueOf(methodPtr).Elem() 
    coder := global.Codecs[cli.option.SerializeType]

    handler := func(req []reflect.Value) []reflect.Value {  
        numOut := container.Type().NumOut()
        errorHandler := func(err error) []reflect.Value {
            outArgs := make([]reflect.Value, numOut)
            for i := 0; i < len(outArgs)-1; i++ {
                outArgs[i] = reflect.Zero(container.Type().Out(i))
            }
            outArgs[len(outArgs)-1] = reflect.ValueOf(&err).Elem()
            return outArgs
        }
        inArgs := make([]interface{}, 0len(req))
        for _, arg := range req {
            inArgs = append(inArgs, arg.Interface())
        }
        payload, err := coder.Encode(inArgs) //[]byte
        if err != nil {
            log.Printf("encode err:%v\n", err)
            return errorHandler(err)
        }
        msg := protocol.NewRPCMsg()
        msg.SetVersion(config.Protocol_MsgVersion)
        msg.SetMsgType(protocol.Request)
        msg.SetCompressType(cli.option.CompressType)
        msg.SetSerializeType(cli.option.SerializeType)
        msg.ServiceClass = service.Class
        msg.ServiceMethod = service.Method
        msg.Payload = payload
        err = msg.Send(cli.conn)
        if err != nil {
            log.Printf("send err:%v\n", err)
            return errorHandler(err)
        }
        respMsg, err := protocol.Read(cli.conn)
        if err != nil {
            return errorHandler(err)
        }
        respDecode := make([]interface{}, 0)
        err = coder.Decode(respMsg.Payload, &respDecode)
        if err != nil {
            log.Printf("decode err:%v\n", err)
            return errorHandler(err)
        }
        if len(respDecode) == 0 {
            respDecode = make([]interface{}, numOut)
        }
        outArgs := make([]reflect.Value, numOut)
        for i := 0; i < numOut; i++ {
            if i != numOut {
                if respDecode[i] == nil {
                    outArgs[i] = reflect.Zero(container.Type().Out(i))
                } else {
                    outArgs[i] = reflect.ValueOf(respDecode[i])
                }
            } else {
                outArgs[i] = reflect.Zero(container.Type().Out(i))
            }
        }
        return outArgs
    }
    container.Set(reflect.MakeFunc(container.Type(), handler))
}

consumer/client.go

wrapCall 执行实际函数调用。
func (cli *RPCClient) wrapCall(ctx context.Context, stub interface{}, params ...interface{}) (interface{}, error) {
    f := reflect.ValueOf(stub).Elem()
    if len(params) != f.Type().NumIn() {
        return nil, errors.New(fmt.Sprintf("params not adapted: %d-%d"len(params), f.Type().NumIn()))
    }
    in := make([]reflect.Value, len(params))
    for idx, param := range params {
        in[idx] = reflect.ValueOf(param)
    }
    result := f.Call(in)
    return result, nil
}

consumer/client.go

代理实现

到目前为止,客户端主要实现逻辑有了,但是客户端在发起调用前是需要先连接到服务端,然后执行调用,还有长连接管理、超时、重试甚至鉴权等功能没有实现,因此需要有一个代理类完成以上动作。

type RPCClientProxy struct {
    option Option
}
func (cp *RPCClientProxy) Call(ctx context.Context, servicePath string, stub interface{}, params ...interface{}) (interface{}, error) {
    service, err := NewService(servicePath)
    if err != nil {
        return nil, err
    }       
    client := NewClient(cp.option)
    addr := service.SelectAddr()
    err = client.Connect(addr) //TODO 长连接管理
    if err != nil {
        return nil, err
    }
    retries := cp.option.Retries
    for retries > 0 {
        retries--
        return client.Invoke(ctx, service, stub, params...)
    }
    return nil, errors.New("error")
}

consumer/client_proxy.go

这里通过服务路径拆分依次获取类名、方法名、服务 AppId,然后根据AppId 查找服务注册中心获取到服务端(服务提供者)地址。由于篇幅限制,这部分将在下一篇实现(包括注册发现、负载均衡、长连接管理等),测试方便这里直接写死服务端地址。
type Service struct {
    AppId  string
    Class  string
    Method string
    Addrs  []string
}
//demo: UserService.user.GetUser
func NewService(servicePath string) (*Service, error) {
    arr := strings.Split(servicePath, ".")
    service := &Service{}
    if len(arr) != 3 { 
        return service, errors.New("service path inlegal")
    }   
    service.AppId = arr[0]
    service.Class = arr[1]
    service.Method = arr[2]
    return service, nil 
}
func (service *Service) SelectAddr() string {
    return "ip:8811"
}

consumer/service.go

测试客户端

客户端通过 stub 发起调用,执行过程看到发起了远程执行并从服务端获取到了结果。

func main() {
    gob.Register(User{})
    cli := consumer.NewClientProxy(consumer.DefaultOption)
    ctx := context.Background()
    var GetUserById func(id int) (User, error)
    cli.Call(ctx, "UserService.User.GetUserById", &GetUserById)
    u, err := GetUserById(2)
    log.Println("result:", u, err)
    var Hello func() string
    r, err := cli.Call(ctx, "UserService.Test.Hello", &Hello)
    log.Println("result:", r, err)
}

client.go

2b16f3834e5d5680eeceab804190e5b7.webp


总结与补充

至此实现了简单的“P2P RPC”,后续可以迭代加入注册发现能力、长连接管理、异步调用、插件化扩展、负载均衡、认证授权、容错治理等能力,希望大家多多支持。


文章完整代码请关注公众号  技术岁月 ,发送关键字 RPC 获取。

感谢您的阅读,欢迎点赞、转发



浏览 77
点赞
评论
收藏
分享

手机扫一扫分享

举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

举报