Go daily rpcx

Posted May 26, 20209 min read

Introduction

In the previous two articles rpc and json-rpc , we introduced the implementation of rpc provided by the Go standard library. In actual development, the function of rpc library is still lacking. Today we introduce a very good Go RPC library-rpcx. rpcx was developed by a Chinese Daniel. The detailed development process can be found in rpcx Official Blog rpcx has a performance comparable to, or even surpassing to some extent, gRPC, has perfect Chinese documentation, and provides plug-ins for service discovery and governance.

Quick to use

This example uses go modules.

The first is to install:

$go get -v -tags "reuseport quic kcp zookeeper etcd consul ping" github.com/smallnest/rpcx/...

It can be seen that the installation of rpcx is a bit special. Using the go get -v github.com/smallnest/rpcx/... command will only install the basic functions of rpcx. Extended functions are specified through build tags. For ease of use, all tags are generally installed, as shown in the above command. This is also the officially recommended installation method.

Let's write the server program first. In fact, this program is almost exactly the same as the program written with rpc standard library:

package main

import(
  "context"
  "errors"

  "github.com/smallnest/rpcx/server"

)

type Args struct {
  A, B int
}

type Quotient struct {
  Quo, Rem int
}

type Arith int

func(t * Arith) Mul(cxt context.Context, args * Args, reply * int) error {
  * reply = args.A * args.B
  return nil
}

func(t * Arith) Div(cxt context.Context, args * Args, quo * Quotient) error {
  if args.B == 0 {
    return errors.New("divide by 0")
  }

  quo.Quo = args.A/args.B
  quo.Rem = args.A%args.B
  return nil
}

func main() {
  s:= server.NewServer()
  s.RegisterName("Arith", new(Arith), "")
  s.Serve("tcp", ":8972")
}

First create a Server object and call itsRegisterName()method to register the Mul and Div methods under the service path Arith. Compared with the standard library, rpcx requires that the first parameter of the registration method must be of type context.Context. Finally call s.Serve(" tcp ",":8972 ") to listen on TCP port 8972. Is not it simple? Start the server:

$go run main.go

Then there is the client program:

package main

import(
  "context"
  "flag"
  "log"

  "github.com/smallnest/rpcx/client"

)

var(
  addr = flag.String("addr", ":8972", "service address")

)

func main() {
  flag.Parse()

  d:= client.NewPeer2PeerDiscovery("tcp @" + * addr, "")
  xclient:= client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
  defer xclient.Close()

  args:= & Args {A:10, B:20}
  var reply int

  err:= xclient.Call(context.Background(), "Mul", args, & reply)
  if err! = nil {
    log.Fatalf("failed to call:%v", err)
  }

  fmt.Printf("%d *%d =%d \ n", args.A, args.B, reply)

  args = & Args {50, 20}
  var quo Quotient
  err = xclient.Call(context.Background(), "Div", args, & quo)
  if err! = nil {
    log.Fatalf("failed to call:%v", err)
  }

  fmt.Printf("%d *%d =%d ...%d \ n", args.A, args.B, quo.Quo, quo.Rem)
}

rpcx supports multiple service discovery methods for clients to find servers. In the above code, we use the simplest point-to-point method, which is direct connection. To call methods on the server, you must first create a Client object. Use the Client object to call remote methods. Run the client:

$go run main.go
10 * 20 = 200
50 * 20 = 2 ... 10

Note that the parameters for creating a Client object are client.Failtry and client.RandomSelect. These two parameters are failure mode and how to choose a server.

transmission

rpcx supports multiple transmission protocols:

  • TCP:TCP protocol, the network name is tcp;
  • HTTP:HTTP protocol, the network name is http;
  • UnixDomain:unix domain protocol, the network name is unix;
  • QUIC:short for Quick UDP Internet Connections, which means fast UDP network connection. The bottom layer of HTTP/3 is the QUIC protocol, produced by Google. The network name is quic;
  • KCP:fast and reliable ARQ protocol, the network name is kcp.

rpcx encapsulates these protocols very well. In addition to the need to specify the protocol name when creating a server and client connection, the use of other times is basically transparent. We modified the above example to use the http protocol:

Server changes:

s.Serve("http", ":8972")

Client changes:

d:= client.NewPeer2PeerDiscovery("http @" + * addr, "")

The use of QUIC and KCP is a bit special. QUIC must be used together with TLS. KCP also needs to be encrypted for transmission. Using Go language, we can easily generate a certificate and private key:

package main

import(
  "crypto/rand"
  "crypto/rsa"
  "crypto/x509"
  "crypto/x509/pkix"
  "encoding/pem"
  "math/big"
  "net"
  "os"
  "time"

)

func main() {
  max:= new(big.Int) .Lsh(big.NewInt(1), 128)
  serialNumber, _:= rand.Int(rand.Reader, max)
  subject:= pkix.Name {
    Organization:[]string {"Go Daily Lib"},
    OrganizationalUnit:[]string {"TechBlog"},
    CommonName:"go daily lib",
  }

  template:= x509.Certificate {
    SerialNumber:serialNumber,
    Subject:subject,
    NotBefore:time.Now(),
    NotAfter:time.Now(). Add(365 * 24 * time.Hour),
    KeyUsage:x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature,
    ExtKeyUsage:[]x509.ExtKeyUsage {x509.ExtKeyUsageServerAuth},
    IPAddresses:[]net.IP {net.ParseIP("127.0.0.1")},
  }

  pk, _:= rsa.GenerateKey(rand.Reader, 2048)

  derBytes, _:= x509.CreateCertificate(rand.Reader, & template, & template, & pk.PublicKey, pk)
  certOut, _:= os.Create("server.pem")
  pem.Encode(certOut, & pem.Block {Type:"CERTIFICATE", Bytes:derBytes})
  certOut.Close()

  keyOut, _:= os.Create("server.key")
  pem.Encode(keyOut, & pem.Block {Type:"RSA PRIVATE KEY", Bytes:x509.MarshalPKCS1PrivateKey(pk)})
  keyOut.Close()
}

The above code generates a certificate and private key, valid for 1 year. Run the program and get two files server.pem and server.key. Then we can write a program that uses the QUIC protocol. Server:

func main() {
  cert, _:= tls.LoadX509KeyPair("server.pem", "server.key")
  config:= & tls.Config {Certificates:[]tls.Certificate {cert}}

  s:= server.NewServer(server.WithTLSConfig(config))
  s.RegisterName("Arith", new(Arith), "")
  s.Serve("quic", "localhost:8972")
}

In fact, it is to load the certificate and key, and then pass in as an option when creating the Server object. Client changes:

conf:= & tls.Config {
  InsecureSkipVerify:true,
}

option:= client.DefaultOption
option.TLSConfig = conf
d:= client.NewPeer2PeerDiscovery("quic @" + * addr, "")
xclient:= client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, option)
defer xclient.Close()

The client also needs to configure TLS.

One thing to note is that rpcx support for quic/kcp these protocols is achieved through build tags. By default, files related to quic/kcp will not be compiled. If you want to use it, you must manually specify tags. Start the server program first:

$go run -tags quic main.go

Then switch to the client program directory and execute the following command:

$go run -tags quic main.go

One more thing to note, when using the tcp and http(the bottom layer is also tcp) protocols, we can shorten the address to:8972, because the default is the local address. But quic doesn't work, you must write the address completely:

//Server
s.Serve("quic", "localhost:8972")
//client
addr = flag.String("addr", "localhost:8972", "service address")

Registration function

The above examples are all methods of calling objects, we can also call functions. Compared with the object method, the function type has no recipient. The registration function needs to specify a service path. Server:

type Args struct {
  A, B int
}

type Quotient struct {
  Quo, Rem int
}


func Mul(cxt context.Context, args * Args, reply * int) error {
  * reply = args.A * args.B
  return nil
}

func Div(cxt context.Context, args * Args, quo * Quotient) error {
  if args.B == 0 {
    return errors.New("divide by 0")
  }

  quo.Quo = args.A/args.B
  quo.Rem = args.A%args.B
  return nil
}

func main() {
  s:= server.NewServer()
  s.RegisterFunction("function", Mul, "")
  s.RegisterFunction("function", Div, "")
  s.Serve("tcp", ":8972")
}

It's just that the registration method changed from RegisterName to RegisterFunction, and the parameter changed from an object to a function. We need to specify a service path for the registered function, and the client will find the corresponding method according to this path when it is called. Client:

func main() {
  flag.Parse()

  d:= client.NewPeer2PeerDiscovery("tcp @" + * addr, "")
  xclient:= client.NewXClient("function", client.Failtry, client.RandomSelect, d, client.DefaultOption)
  defer xclient.Close()

  args:= & Args {A:10, B:20}
  var reply int

  err:= xclient.Call(context.Background(), "Mul", args, & reply)
  if err! = nil {
    log.Fatalf("failed to call:%v", err)
  }

  fmt.Printf("%d *%d =%d \ n", args.A, args.B, reply)

  args = & Args {50, 20}
  var quo Quotient
  err = xclient.Call(context.Background(), "Div", args, & quo)
  if err! = nil {
    log.Fatalf("failed to call:%v", err)
  }

  fmt.Printf("%d *%d =%d ...%d \ n", args.A, args.B, quo.Quo, quo.Rem)
}

Registration Center

rpcx supports multiple registration centers:

  • Point-to-point:In fact, it is directly connected, there is no registration center;
  • Point to many:multiple servers can be configured;
  • zookeeper:commonly used registration center;
  • Etcd:Registration center written in Go language;
  • In-process call:Convenient debugging function, find service in the same process;
  • Consul/mDNS etc.

We have demonstrated point-to-point connections before. Next we introduce how to use zookeeper as a registration center. In rpcx, the registration center is integrated by means of plug-ins. Use ZooKeeperRegisterPlugin plugin to integrate Zookeeper. Server code:

type Args struct {
  A, B int
}

type Quotient struct {
  Quo, Rem int
}

var(
  addr = flag.String("addr", ":8972", "service address")
  zkAddr = flag.String("zkAddr", "127.0.0.1:2181", "zookeeper address")
  basePath = flag.String("basePath", "/services/math", "service base path")

)

type Arith int

func(t * Arith) Mul(cxt context.Context, args * Args, reply * int) error {
  fmt.Println("Mul on", * addr)
  * reply = args.A * args.B
  return nil
}

func(t * Arith) Div(cxt context.Context, args * Args, quo * Quotient) error {
  fmt.Println("Div on", * addr)
  if args.B == 0 {
    return errors.New("divide by 0")
  }

  quo.Quo = args.A/args.B
  quo.Rem = args.A%args.B
  return nil
}

func main() {
  flag.Parse()

  p:= & serverplugin.ZooKeeperRegisterPlugin {
    ServiceAddress:"tcp @" + * addr,
    ZooKeeperServers:[]string {* zkAddr},
    BasePath:* basePath,
    Metrics:metrics.NewRegistry(),
    UpdateInterval:time.Minute,
  }
  if err:= p.Start(); err! = nil {
    log.Fatal(err)
  }

  s:= server.NewServer()
  s.Plugins.Add(p)

  s.RegisterName("Arith", new(Arith), "")
  s.Serve("tcp", * addr)
}

In ZooKeeperRegisterPlugin, we specify the service address, zookeeper cluster address(which can be multiple), starting path, etc. When the server is started, the service information is automatically registered with zookeeper, and the client can directly pull the available service list from zookeeper.

First start the zookeeper server. For the installation and startup of zookeeper, please refer to my previous article. Start 3 servers in 3 consoles and specify different ports(note that you need to specify -tags zookeeper):

//console 1
$go run -tags zookeeper main.go -addr 127.0.0.1:8971
//console 2
$go run -tags zookeeper main.go -addr 127.0.0.1:8972
//console 3
$go run -tags zookeeper main.go -addr 127.0.0.1:8973

After starting, we observe the contents of the zookeeper path /services/math:

Great, the usable service address does not need us to maintain manually!

Next is the client:

var(
  zkAddr = flag.String("zkAddr", "127.0.0.1:2181", "zookeeper address")
  basePath = flag.String("basePath", "/services/math", "service base path")

)

func main() {
  flag.Parse()

  d:= client.NewZookeeperDiscovery(* basePath, "Arith", []string {* zkAddr}, nil)
  xclient:= client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
  defer xclient.Close()

  args:= & Args {A:10, B:20}
  var reply int

  err:= xclient.Call(context.Background(), "Mul", args, & reply)
  if err! = nil {
    log.Fatalf("failed to call:%v", err)
  }

  fmt.Printf("%d *%d =%d \ n", args.A, args.B, reply)

  args = & Args {50, 20}
  var quo Quotient
  err = xclient.Call(context.Background(), "Div", args, & quo)
  if err! = nil {
    log.Fatalf("failed to call:%v", err)
  }

  fmt.Printf("%d *%d =%d ...%d \ n", args.A, args.B, quo.Quo, quo.Rem)
}

We read the list of available Arith services through zookeeper, and then randomly select a service to send the request:

$go run -tags zookeeper main.go
2020/05/26 23:03:40 Connected to 127.0.0.1:2181
2020/05/26 23:03:40 authenticated:id = 72057658440744975, timeout = 10000
2020/05/26 23:03:40 re-submitting `0` credentials after reconnect
10 * 20 = 200
50 * 20 = 2 ... 10

Our client sent two requests. Because of the client.RandomSelect strategy, these two requests are randomly sent to a server. I added a print to the methods of Mul and Div, you can observe the output of each console!

If we shut down a server, the corresponding service address will be removed from zookeeper. I shut down server 1, and the zookeeper service list becomes:

Compared to the previous article, which requires manual maintenance of zookeeper, the automatic registration and maintenance of rpcx is obviously much more convenient!

to sum up

rpcx is the premier rpc library in Go, with rich features, outstanding performance, and rich documentation. It has been adopted by many companies and individuals. This article introduces only the most basic functions. Rpcx supports various advanced functions such as routing strategies, grouping, current limiting, and identity authentication. It is recommended to learn in depth!

If you find a fun and easy-to-use Go language library, please submit an issue to Go Daily GitHub on GitHub?

reference

  1. rpcx GitHub: https://github.com/smallnest/rpcx
  2. rpcx blog: https://blog.rpcx.io/
  3. rpcx official website: https://rpcx.io/
  4. rpcx documentation: https://doc.rpcx.io/
  5. Go daily one library GitHub: https://github.com/darjun/go-daily-lib

I

My blog: https://darjun.github.io

Welcome to pay attention to my WeChat public account [GoUpUp]to learn together and make progress together ~