Nsq principle analysis (1)

Posted May 27, 202013 min read

Nsq is a lightweight distributed message queue developed in go language. It is suitable for small projects, used to learn message queue implementation principles, learn golang channel knowledge, and how to use go to write distributed. , Nsq If there is no ability to carry out secondary development, there are still many problems.


Nsq module introduction

nsqd:It is a process that listens to both http and tcp protocols. It is used to create topics and channels, distribute messages to consumers, and register its own metadata information(topic, channel, consumer) with nsqlooup. Module.

nsqlookup:stores metadata and service information(endpoind) of nsqd, provides service discovery function to consumers, and provides data query function to nsqadmin.

nsqadmin:a simple management interface, showing the topic, channel and consumers on the channel, you can also create topic, channel
nsq.gif
Excerpt from the official website
The producer sends a message to a topic. If the topic has one or more channles, the message will be copied and distributed to each channel. Similar to the fanout type in rabbitmq, channle is similar to a queue.
The official said that nsq is a distributed message queue service, but in my opinion, only the channel to the consumer part presents a distributed feeling. The nsqd module is actually a single point. Nsqd stores topics, channels, and messages in In addition to the local disk, the official also recommends that a producer use an nsqd, which not only wastes resources but also has no guarantee of data backup. Once the host where nsqd is located is magnetically damaged, the data will be lost.

Nsq source code analysis

Deploy a simple environment first, taking the centos operating system as an example

download
wget https://s3.amazonaws.com/bitly-downloads/nsq/nsq-1.2.0.linux-amd64.go1.12.9.tar.gz
Unzip
tar xvf nsq-1.2.0.linux-amd64.go1.12.9.tar.gz
cd nsq-1.2.0.linux-amd64.go1.12.9/bin
cp */bin

Start three terminals, one is used to start nsqadmin(administrative interface), nsqlookup(nsqd service and metadata management), nsqd(nsq core module, metadata, message storage and message distribution), ip is replaced with its own real ip

Terminal 1

/bin/nsqd --lookupd-tcp-address 192.168.1.1:4160 -tcp-address 0.0.0.0:4152 -http-address 0.0.0.0:4153 --broadcast-address 192.168.1.1
Terminal 2
/bin/nsqlookupd --broadcast-address 192.168.1.1:4160
Terminal 3
/bin/nsqadmin --lookupd-http-address 192.168.1.1:4160

Take a look at the simple use of nsq

cat producer.go
package main
import "github.com/nsqio/go-nsq"
config:= nsq.NewConfig()
p, _:= nsq.NewProducer(addr, config)
err:= p.Publish("topic", []byte("message"))
if err! = nil {
    fmt.Printf("dispatch task failed%s", err)
}

cat consumer.go
package main
import "github.com/nsqio/go-nsq"

type MyHandler struct {}

func(h * MyHandler) HandleMessage(message * nsq.Message) error {
    fmt.Printf("consume message%+ v \ n", message)
}

config:= nsq.NewConfig()
c, _:= nsq.NewConsumer("topic", "channel", config)
c.SetLoggerLevel(nsq.LogLevelDebug)
handler:= & MyHandler {}
c.AddHandler(handler)
//The port here is 4161, which is the http port of nsqlookup. Both nsqd and nsqlookup listen to both tcp and http protocols.
err:= c.ConnectToNSQLookupd("192.168.1.1:4161")
if err! = nil {
    fmt.Printf("Connect nsq lookup failed%+ v \ n", err)
}

1 . Producer code analysis

go-nsq/producer.go

//After Config is passed into NewProducer the values   are no longer mutable(they are copied).
func NewProducer(addr string, config * Config)(* Producer, error) {
    err:= config.Validate()
    if err! = nil {
        return nil, err
    }

    p:= & Producer {
        id:atomic.AddInt64(& instCount, 1),

        addr:addr,
        config:* config,

        logger:make([]logger, int(LogLevelMax + 1)),
        logLvl:LogLevelInfo,

        transactionChan:make(chan * ProducerTransaction),
        exitChan:make(chan int),
        responseChan:make(chan []byte),
        errorChan:make(chan []byte),
    }

    //Set default logger for all log levels
    l:= log.New(os.Stderr, "", log.Flags())
    for index, _:= range p.logger {
        p.logger [index]= l
    }
    return p, nil
}

Producer structure is initialized

//Publish synchronously publishes a message body to the specified topic, returning
//an error if publish failed
func(w * Producer) Publish(topic string, body []byte) error {
    return w.sendCommand(Publish(topic, body))
}

Specify which topic to send the message to and the message to be sent

//Publish creates a new Command to write a message to a given topic
func Publish(topic string, body []byte) * Command {
    var params = [][]byte {[]byte(topic)}
    return & Command {[]byte("PUB"), params, body}
}

Encapsulated command

func(w * Producer) sendCommand(cmd * Command) error {
    doneChan:= make(chan * ProducerTransaction)
    //Asynchronous sending is used internally
    err:= w.sendCommandAsync(cmd, doneChan, nil)
    if err! = nil {
        close(doneChan)
        return err
    }
    //Wait for the asynchronous sending to complete
    t:= <-doneChan
    return t.Error
}

func(w * Producer) sendCommandAsync(cmd * Command, doneChan chan * ProducerTransaction,
    args []interface {}) error {
    //keep track of how many outstanding producers we're dealing with
    //in order to later ensure that we clean them all up ...
    atomic.AddInt32(& w.concurrentProducers, 1)
    defer atomic.AddInt32(& w.concurrentProducers, -1)
    //Determine if there is a connection with nsqd, skip it has been established
    if atomic.LoadInt32(& w.state)! = StateConnected {
        err:= w.connect()
        if err! = nil {
            return err
        }
    }

    t:= & ProducerTransaction {
        cmd:cmd,
        doneChan:doneChan,
        Args:args,
    }

    select {
    case w.transactionChan <-t:
    case <-w.exitChan:
        return ErrStopped
    }
    return nil
}

In the above code, I still don't see the code that sends the PUB command to the nsqd process. Let's take a look at the connect function

func(w * Producer) connect() error {
    w.guard.Lock()
    defer w.guard.Unlock()

    if atomic.LoadInt32(& w.stopFlag) == 1 {
        return ErrStopped
    }

    switch state:= atomic.LoadInt32(& w.state); state {
    case StateInit:
    case StateConnected:
        return nil
    default:
        return ErrNotConnected
    }

    w.log(LogLevelInfo, "(%s) connecting to nsqd", w.addr)

    w.conn = NewConn(w.addr, & w.config, & producerConnDelegate {w})
    w.conn.SetLoggerLevel(w.getLogLevel())
    format:= fmt.Sprintf("%3d(%%s)", w.id)
    for index:= range w.logger {
        w.conn.SetLoggerForLevel(w.logger [index], LogLevel(index), format)
    }
    //This is mainly used by consumers. In the consumer section will be analyzed in detail
    _, err:= w.conn.Connect()
    if err! = nil {
        w.conn.Close()
        w.log(LogLevelError, "(%s) error connecting to nsqd-%s", w.addr, err)
        return err
    }
    atomic.StoreInt32(& w.state, StateConnected)
    w.closeChan = make(chan int)
    w.wg.Add(1)
    //Producers use this goroutine to send commands and receive responses to nsqd
    go w.router()

    return nil
}

func(w * Producer) router() {
    for {
        select {
        //In the above sendCommandAsync method, I only saw that the command to be sent was wrapped and thrown into a channel, where I was listening and sending the command to nsqd
        case t:= <-w.transactionChan:
            w.transactions = append(w.transactions, t)
            err:= w.conn.WriteCommand(t.cmd)
            if err! = nil {
                w.log(LogLevelError, "(%s) sending command-%s", w.conn.String(), err)
                w.close()
            }
            //Receive response from nsqd
        case data:= <-w.responseChan:
            w.popTransaction(FrameTypeResponse, data)
        case data:= <-w.errorChan:
            w.popTransaction(FrameTypeError, data)
        case <-w.closeChan:
            goto exit
        case <-w.exitChan:
            goto exit
        }
    }

exit:
    w.transactionCleanup()
    w.wg.Done()
    w.log(LogLevelInfo, "exiting router")
}

2 . Consumer code analysis

//NewConsumer creates a new instance of Consumer for the specified topic/channel
//
//The only valid way to create a Config is via NewConfig, using a struct literal will panic.
//After Config is passed into NewConsumer the values   are no longer mutable(they are copied).
//Specify the topic and channel of the subscription to be monitored
func NewConsumer(topic string, channel string, config * Config)(* Consumer, error) {
    if err:= config.Validate(); err! = nil {
        return nil, err
    }

    if! IsValidTopicName(topic) {
        return nil, errors.New("invalid topic name")
    }

    if! IsValidChannelName(channel) {
        return nil, errors.New("invalid channel name")
    }

    r:= & Consumer {
        id:atomic.AddInt64(& instCount, 1),

        topic:topic,
        channel:channel,
        config:* config,

        logger:make([]logger, LogLevelMax + 1),
        logLvl:LogLevelInfo,
        maxInFlight:int32(config.MaxInFlight),

        incomingMessages:make(chan * Message),

        rdyRetryTimers:make(map [string]* time.Timer),
        pendingConnections:make(map [string]* Conn),
        connections:make(map [string]* Conn),

        lookupdRecheckChan:make(chan int, 1),

        rng:rand.New(rand.NewSource(time.Now(). UnixNano())),

        StopChan:make(chan int),
        exitChan:make(chan int),
    }

    //Set default logger for all log levels
    l:= log.New(os.Stderr, "", log.Flags())
    for index:= range r.logger {
        r.logger [index]= l
    }

    r.wg.Add(1)
    //Because nsq is to push the push to consume the message, the early consumer will control the speed of consumption, the current limiting effect can be configured to be automatically updated
    go r.rdyLoop()
    return r, nil
}

Initialize Consumer structure

After initialization, you need to add a message processing function AddHandler

//AddHandler sets the Handler for messages received by this Consumer. This can be called
//multiple times to add additional handlers. Handler will have a 1:1 ratio to message handling goroutines.
//
//This panics if called after connecting to NSQD or NSQ Lookupd
//
//(see Handler or HandlerFunc for details on implementing this interface)
func(r * Consumer) AddHandler(handler Handler) {
    r.AddConcurrentHandlers(handler, 1)
}

//AddConcurrentHandlers sets the Handler for messages received by this Consumer. It
//takes a second argument which indicates the number of goroutines to spawn for
//message handling.
//
//This panics if called after connecting to NSQD or NSQ Lookupd
//
//(see Handler or HandlerFunc for details on implementing this interface)
func(r * Consumer) AddConcurrentHandlers(handler Handler, concurrency int) {
    if atomic.LoadInt32(& r.connectedFlag) == 1 {
        panic("already connected")
    }

    atomic.AddInt32(& r.runningHandlers, int32(concurrency))
    for i:= 0; i <concurrency; i ++ {
        //Can set concurrency
        go r.handlerLoop(handler)
    }
}

func(r * Consumer) handlerLoop(handler Handler) {
    r.log(LogLevelDebug, "starting Handler")

    for {
        //Continuously receive requests from nsqd, the readloop dead loop method will still send messages to this channel, as we will talk about later
        message, ok:= <-r.incomingMessages
        if! ok {
            goto exit
        }

        if r.shouldFailMessage(message, handler) {
            message.Finish()
            continue
        }
       //Use the message processing function we added to consume the message
        err:= handler.HandleMessage(message)
        if err! = nil {
            r.log(LogLevelError, "Handler returned error(%s) for msg%s", err, message.ID)
            if! message.IsAutoResponseDisabled() {
                message.Requeue(-1)
            }
            continue
        }
       //Whether to remove a message from the queue after the completion of processing a message is equivalent to submission. By default, a message is automatically submitted after consumption, and batch submission can be set
        if! message.IsAutoResponseDisabled() {
            message.Finish()
        }
    }

exit:
    r.log(LogLevelDebug, "stopping Handler")
    if atomic.AddInt32(& r.runningHandlers, -1) == 0 {
        r.exit()
    }
}

func(r * Consumer) shouldFailMessage(message * Message, handler interface {}) bool {
    //message passed the max number of attempts
    if r.config.MaxAttempts> 0 && message.Attempts> r.config.MaxAttempts {
        r.log(LogLevelWarning, "msg%s attempted%d times, giving up",
            message.ID, message.Attempts)

        logger, ok:= handler.(FailedMessageLogger)
        if ok {
            logger.LogFailedMessage(message)
        }

        return true
    }
    return false
}

func(r * Consumer) exit() {
    r.exitHandler.Do(func() {
        close(r.exitChan)
        r.wg.Wait()
        close(r.StopChan)
    })
}

//ConnectToNSQLookupd adds an nsqlookupd address to the list for this Consumer instance.
//
//If it is the first to be added, it initiates an HTTP request to discover nsqd
//producers for the configured topic.
//
//A goroutine is spawned to handle continual polling.
func(r * Consumer) ConnectToNSQLookupd(addr string) error {
    if atomic.LoadInt32(& r.stopFlag) == 1 {
        return errors.New("consumer stopped")
    }
    if atomic.LoadInt32(& r.runningHandlers) == 0 {
        return errors.New("no handlers")
    }

    if err:= validatedLookupAddr(addr); err! = nil {
        return err
    }

    atomic.StoreInt32(& r.connectedFlag, 1)

    r.mtx.Lock()
    for _, x:= range r.lookupdHTTPAddrs {
        if x == addr {
            r.mtx.Unlock()
            return nil
        }
    }
    r.lookupdHTTPAddrs = append(r.lookupdHTTPAddrs, addr)
    numLookupd:= len(r.lookupdHTTPAddrs)
    r.mtx.Unlock()

    //if this is the first one, kick off the go loop
    if numLookupd == 1 {
        r.queryLookupd()
        r.wg.Add(1)
        go r.lookupdLoop()
    }

    return nil
}

Consumers need to connect to nsqlookup, query the service information of nsqd from nsqlookup, and then connect

//make an HTTP req to one of the configured nsqlookupd instances to discover
//which nsqd's provide the topic we are consuming.
//
//initiate a connection to any new producers that are identified.
func(r * Consumer) queryLookupd() {
    retries:= 0

retry:
    endpoint:= r.nextLookupdEndpoint()

    r.log(LogLevelInfo, "querying nsqlookupd%s", endpoint)

    var data lookupResp
    err:= apiRequestNegotiateV1("GET", endpoint, nil, & data)
    if err! = nil {
        r.log(LogLevelError, "error querying nsqlookupd(%s)-%s", endpoint, err)
        retries ++
        if retries <3 {
            r.log(LogLevelInfo, "retrying with next nsqlookupd")
            goto retry
        }
        return
    }

    var nsqdAddrs []string
    for _, producer:= range data.Producers {
        broadcastAddress:= producer.BroadcastAddress
        port:= producer.TCPPort
        joined:= net.JoinHostPort(broadcastAddress, strconv.Itoa(port))
        nsqdAddrs = append(nsqdAddrs, joined)
    }
    //apply filter
    if discoveryFilter, ok:= r.behaviorDelegate.(DiscoveryFilter); ok {
        nsqdAddrs = discoveryFilter.Filter(nsqdAddrs)
    }
    //Get all nsqd information in nsqlookup, and then connect
    for _, addr:= range nsqdAddrs {
        err = r.ConnectToNSQD(addr)
        if err! = nil && err! = ErrAlreadyConnected {
            r.log(LogLevelError, "(%s) error connecting to nsqd-%s", addr, err)
            continue
        }
    }
}

The official does not recommend that consumers directly connect to nsqd,

//ConnectToNSQD takes a nsqd address to connect directly to.
//
//It is recommended to use ConnectToNSQLookupd so that topics are discovered
//automatically. This method is useful when you want to connect to a single, local,
//instance.
func(r * Consumer) ConnectToNSQD(addr string) error {
    if atomic.LoadInt32(& r.stopFlag) == 1 {
        return errors.New("consumer stopped")
    }

    if atomic.LoadInt32(& r.runningHandlers) == 0 {
        return errors.New("no handlers")
    }

    atomic.StoreInt32(& r.connectedFlag, 1)
    //initialize
    conn:= NewConn(addr, & r.config, & consumerConnDelegate {r})
    conn.SetLoggerLevel(r.getLogLevel())
    format:= fmt.Sprintf("%3d [%s /%s](%%s)", r.id, r.topic, r.channel)
    for index:= range r.logger {
        conn.SetLoggerForLevel(r.logger [index], LogLevel(index), format)
    }
    r.mtx.Lock()
    _, pendingOk:= r.pendingConnections [addr]
    _, ok:= r.connections [addr]
    if ok || pendingOk {
        r.mtx.Unlock()
        return ErrAlreadyConnected
    }
    r.pendingConnections [addr]= conn
    if idx:= indexOf(addr, r.nsqdTCPAddrs); idx == -1 {
        r.nsqdTCPAddrs = append(r.nsqdTCPAddrs, addr)
    }
    r.mtx.Unlock()

    r.log(LogLevelInfo, "(%s) connecting to nsqd", addr)

    cleanupConnection:= func() {
        r.mtx.Lock()
        delete(r.pendingConnections, addr)
        r.mtx.Unlock()
        conn.Close()
    }
    //Make a connection, as you saw when analyzing the producer, this is where the consumer and nsqd establish a connection
    resp, err:= conn.Connect()
    if err! = nil {
        cleanupConnection()
        return err
    }

    if resp! = nil {
        if resp.MaxRdyCount <int64(r.getMaxInFlight()) {
            r.log(LogLevelWarning,
                "(%s) max RDY count%d <consumer max in flight%d, truncation possible",
                conn.String(), resp.MaxRdyCount, r.getMaxInFlight())
        }
    }
    //consumer sends a subscription command to nsqd, at this time the consumer will register itself to nsqd, the more accurate statement is that the consumer will register itself to the client list of the channel under the topic, when news arrives, channle will randomly send to his customers Messages from the terminal list
    cmd:= Subscribe(r.topic, r.channel)
    err = conn.WriteCommand(cmd)
    if err! = nil {
        cleanupConnection()
        return fmt.Errorf("[%s]failed to subscribe to%s:%s-%s",
            conn, r.topic, r.channel, err.Error())
    }

    r.mtx.Lock()
    delete(r.pendingConnections, addr)
    r.connections [addr]= conn
    r.mtx.Unlock()

    //pre-emptive signal to existing connections to lower their RDY count
    for _, c:= range r.conns() {
        r.maybeUpdateRDY(c)
    }

    return nil

go-nsq/conn.go

//Connect dials and bootstraps the nsqd connection
//(including IDENTIFY) and returns the IdentifyResponse
func(c * Conn) Connect()(* IdentifyResponse, error) {
    dialer:= & net.Dialer {
        LocalAddr:c.config.LocalAddr,
        Timeout:c.config.DialTimeout,
    }
    //The producer or consumer establishes a tcp connection with nsqd here
    conn, err:= dialer.Dial("tcp", c.addr)
    if err! = nil {
        return nil, err
    }
    c.conn = conn.(* net.TCPConn)
    c.r = conn
    c.w = conn
    //After the connection is established, first send 4 bytes of information to indicate which protocol to use, there are currently two protocols v1 and v2
    _, err = c.Write(MagicV2)
    if err! = nil {
        c.Close()
        return nil, fmt.Errorf("[%s]failed to write magic-%s", c.addr, err)
    }
    //Tell nsqd some basic information about yourself, such as heartbeat interval, timeout for processing messages, client id, etc.
    resp, err:= c.identify()
    if err! = nil {
        return nil, err
    }

    if resp! = nil && resp.AuthRequired {
        if c.config.AuthSecret == "" {
            c.log(LogLevelError, "Auth Required")
            return nil, errors.New("Auth Required")
        }
        err:= c.auth(c.config.AuthSecret)
        if err! = nil {
            c.log(LogLevelError, "Auth Failed%s", err)
            return nil, err
        }
    }

    c.wg.Add(2)
    atomic.StoreInt32(& c.readLoopRunning, 1)
    //These two goroutines are important
    go c.readLoop()
    go c.writeLoop()
    return resp, nil
}

func(c * Conn) readLoop() {
    delegate:= & connMessageDelegate {c}
    for {
        if atomic.LoadInt32(& c.closeFlag) == 1 {
            goto exit
        }
        //Get message from nsqd
        frameType, data, err:= ReadUnpackedResponse(c)
        if err! = nil {
            if err == io.EOF && atomic.LoadInt32(& c.closeFlag) == 1 {
                goto exit
            }
            if! strings.Contains(err.Error(), "use of closed network connection") {
                c.log(LogLevelError, "IO error-%s", err)
                c.delegate.OnIOError(c, err)
            }
            goto exit
        }
        //The heartbeat detection is checked once every 30s by default.
        if frameType == FrameTypeResponse && bytes.Equal(data, []byte("_ heartbeat_")) {
            c.log(LogLevelDebug, "heartbeat received")
            c.delegate.OnHeartbeat(c)
            err:= c.WriteCommand(Nop())
            if err! = nil {
                c.log(LogLevelError, "IO error-%s", err)
                c.delegate.OnIOError(c, err)
                goto exit
            }
            continue
        }

        switch frameType {
        //Process the corresponding information
        case FrameTypeResponse:
            c.delegate.OnResponse(c, data)
            //Receive messages for consumption
        case FrameTypeMessage:
            msg, err:= DecodeMessage(data)
            if err! = nil {
                c.log(LogLevelError, "IO error-%s", err)
                c.delegate.OnIOError(c, err)
                goto exit
            }
            msg.Delegate = delegate
            msg.NSQDAddress = c.String()

            atomic.AddInt64(& c.messagesInFlight, 1)
            atomic.StoreInt64(& c.lastMsgTimestamp, time.Now(). UnixNano())
             //Here, the message obtained from nsqd is thrown into a channel, and this channel is the channel waiting for the message in the above loop of the handlerloop
            c.delegate.OnMessage(c, msg)
        case FrameTypeError:
            c.log(LogLevelError, "protocol error-%s", data)
            c.delegate.OnError(c, data)
        default:
            c.log(LogLevelError, "IO error-%s", err)
            c.delegate.OnIOError(c, fmt.Errorf("unknown frame type%d", frameType))
        }
    }

exit:
    atomic.StoreInt32(& c.readLoopRunning, 0)
    //start the connection close
    messagesInFlight:= atomic.LoadInt64(& c.messagesInFlight)
    if messagesInFlight == 0 {
        //if we exited readLoop with no messages in flight
        //we need to explicitly trigger the close because
        //writeLoop won't
        c.close()
    } else {
        c.log(LogLevelWarning, "delaying close,%d outstanding messages", messagesInFlight)
    }
    c.wg.Done()
    c.log(LogLevelInfo, "readLoop exiting")
}

func(c * Conn) writeLoop() {
    for {
        select {
        case <-c.exitChan:
            c.log(LogLevelInfo, "breaking out of writeLoop")
            //Indicate drainReady because we will not pull any more off msgResponseChan
            close(c.drainReady)
            goto exit
        case cmd:= <-c.cmdChan:
            err:= c.WriteCommand(cmd)
            if err! = nil {
                c.log(LogLevelError, "error sending command%s-%s", cmd, err)
                c.close()
                continue
            }
        case resp:= <-c.msgResponseChan:
            //Decrement this here so it is correct even if we can't respond to nsqd
            msgsInFlight:= atomic.AddInt64(& c.messagesInFlight, -1)

            if resp.success {
                c.log(LogLevelDebug, "FIN%s", resp.msg.ID)
                c.delegate.OnMessageFinished(c, resp.msg)
                c.delegate.OnResume(c)
            } else {
                c.log(LogLevelDebug, "REQ%s", resp.msg.ID)
                c.delegate.OnMessageRequeued(c, resp.msg)
                if resp.backoff {
                    c.delegate.OnBackoff(c)
                } else {
                    c.delegate.OnContinue(c)
                }
            }

            err:= c.WriteCommand(resp.cmd)
            if err! = nil {
                c.log(LogLevelError, "error sending command%s-%s", resp.cmd, err)
                c.close()
                continue
            }

            if msgsInFlight == 0 &&
                atomic.LoadInt32(& c.closeFlag) == 1 {
                c.close()
                continue
            }
        }
    }

exit:
    c.wg.Done()
    c.log(LogLevelInfo, "writeLoop exiting")
}

When the message processing is completed, the consumer will send a FIN command to nsqd through writeloop, telling nsqd what message consumption I have completed can be removed from the queue.
In fact, the above is the client code of go nsq. I haven't seen the code of nsq itself. Let me summarize it first. Then continue to look at the code of nsqd
producer

  1. The producer first initializes the Producerj structure and then sets up some configurations
  2. Producer and nsqd establish tcp connection
  3. Negotiation version
  4. The producer starts a route coroutine, which is used to continuously send PUB instructions to nsqd and carry messages

consumer

  1. Consumer initializes Consumer structure
  2. The consumer establishes a tcp connection through nsqlookup and nsqd, nsqd may be one or more
  3. Negotiation version
  4. After establishing the connection, send your own identification information to nsqd, carrying some basic configuration information, such as heartbeat interval, message consumption timeout, client id, etc.
  5. Start RDY current limit mechanism
  6. Start readloop and writeloop