欢迎来到DIVCSS5查找CSS资料与学习DIV CSS布局技术!
根据一贯的风格,我们先来梳理下项目目录结构,结构如下:
 
.
 
|__ bin/                   # 用于存放编译后生成的二进制文件
 
|__ config/                # 用于存放配置文件
 
|__ connection/            # 存放连接相关的文件
 
|   |__ proxy.go           # 代理组件
 
|   |__ pool.go            # 连接池组件
 
|   |__ repl_set.go        # 复制集组件
 
|   |__ conn.go            # 连接对象组件
 
|__ internal/              # 存放 mongo 内部协议相关文件
 
|   |__ auth.go            # 握手鉴权组件
 
|   |__ protocol.go        # 协议解析组件
 
|   |__ request.go         # 请求重写组件
 
|   |__ response.go        # 响应重写组件
 
|__ statistics/            # 存放指标统计上报组件
 
|__ test/                  # 存放各种语言驱动测试代码的文件夹
 
|__ utils/                 # 工具函数文件夹
 
|__ glide.yaml             # 依赖包配置文件
 
|__ main.go                # 入口文件
 
限于篇幅的原因,不可能把上面的细节一一讲个遍,我只挑选 proxy、pool 两个组件来讲...想了解更多实现细节的童鞋,可以私信我。
 
proxy 实现
 
最简单的 proxy 实现套路就像下面这样:
 
// main.go
 
func main() {
 
  // 传入配置参数,实例化一个代理对象
 
  p := NewProxy(conf)
 
  // 卡住,循环接受客户端请求
 
  p.LoopAccept()
 
}
 
接着来实现 NewProxy、LoopAccept 方法:
 
// connection/proxy.go
 
type Proxy struct {
 
  sync.RWMutex
 
  listener            net.Listener
 
  writePool, readPool *pool
 
}
 
func NewProxy(conf config.UserConf) *Proxy {
 
  // 开始监听本地端口
 
  listener, err := net.Listen("tcp", ":"+conf.GetString("port"))
 
  if err != nil {
 
    log.Fatalln(err)
 
  }
 
  p := &Proxy{
 
    listener: listener,
 
  }
 
  // 实例化连接池
 
  p.readPool, p.writePool, err = newPool(p)
 
  if err != nil {
 
    panic(err)
 
  }
 
  return p 
 
}
 
func (p *Proxy) LoopAccept() {
 
  for {
 
    client, err := p.listener.Accept()
 
    go func(c net.Conn) {
 
      defer c.Close()
 
      // 一个连接在多次 messageHandler 中共用一个 Reader 对象
 
      cr := bufio.NewReader(c)
 
      // 因为一个连接可能会进行多次读或写操作
 
      for {
 
        // 将客户端请求代理给服务端,服务端响应代理回客户端
 
        // 同时中间对请求或响应进行重写操作
 
        err := p.messageHandler(cr, c)
 
        if err != nil {
 
          // 只要出现错误,就执行到上面的 defer c.Close() 来关闭连接
 
          return
 
        }
 
      }
 
    }(client)
 
  }
 
}
 
接着来实现核心逻辑 messageHandler:
 
// connection/proxy.go
 
func (p *Proxy) messageHandler(cr *bufio.Reader, c net.Conn) error {
 
  // 对请求报文进行解析操作
 
  req, err := internal.Decode(clientReader)
 
  if err != nil {
 
        return errors.New("decode error")
 
    }
 
  // 将客户端请求发送给数据库服务器
 
  res, err := p.clientToServer(req)
 
  if err != nil {
 
    return errors.New("request error")
 
  }
 
  // 将数据库服务器响应返回给客户端
 
  return res.WriteTo(c)
 
}
 
func (p *Proxy) clientToServer(req *internal.Message) (*internal.Message, error) {
 
  var server net.Conn
 
  // 如果是读操作,就从读池中取出连接
 
  if req.IsReadOp() {
 
    host := req.GetHost()
 
    // 某些读操作需要发送到指定的读库上,所以需要传 host,来获取指定读库连接
 
    server = p.readPool.Acquire(host)
 
  // 反之,写操作从写池中取出连接
 
  } else {
 
    // 由于写库只有一个,所以不用传 host 参数了
 
    server = p.writePool.Acquire()
 
  }
 
  // 将客户端请求发送给数据库服务器
 
  err := req.WriteTo(server)
 
  if err != nil {
 
    return nil, err
 
  }
 
  // 获取解析数据库服务器响应
 
  res, err := internal.Decode(bufio.NewReader(server))
 
  return res, err
 
}
 
大致逻辑就是,客户端通过代理把请求发给服务端,服务端响应也通过代理响应回客户端。
 
------------  request  -----------  request  ------------
 
|          | --------> |         | --------> |          |
 
|  client  |           |  proxy  |           | repl_set |
 
|          | <-------- |         | <-------- |          |
 
------------  response -----------  response ------------
 
呐——,当然还有非常多的细节,由于篇幅原因不得不省略...
 
pool 实现
 
由 proxy 的代码逻辑来看,我们取读或写库连接是通过读或写池的 Acquire 方法来取的:
 
// connection/pool.go
 
type pool struct {
 
  sync.RWMutex
 
  connCh   chan net.Conn
 
  newConn  func(string) (net.Conn, error)
 
  freeConn func(net.Conn) error
 
}
 
func (p *pool) Acquire(opts ...interface{}) (net.Conn, error) {
 
  host := ""
 
  if len(opts) > 0 {
 
    host, _ = (opts[0]).(string)
 
  }
 
  chLen := len(p.connCh)
 
  // 从 channel 中遍历剩余数量的 conn
 
  for i := 0; i < chLen; i++ {
 
    select {
 
    case conn, ok := <- ch:
 
      if ok {
 
        if len(host) > 0 {
 
          if conn.RemoteAddr().String() == host {
 
            return conn, nil
 
          }
 
          // 没有找到对应 host 的 conn,则把 conn 重新放回 channel
 
          // 你可以简单理解为只是执行了 p.connCh <- conn 操作
 
          p.freeConn(conn)
 
        } else {
 
          return conn, nil
 
        }
 
      }
 
    // 避免数量不足而导致 channel 阻塞等待
 
    default:
 
    }
 
  }
 
  // 若还没有从 channel 中取到 conn,则立马 new 一个
 
  conn, err := p.newConn(host)
 
  if err != nil {
 
    return nil, err
 
  }
 
  return conn, nil
 
}
 
池的实现大致就是实现了一个循环队列,连接从池中取,取出的连接在使用完后,可以放回池中。
 
总结
 
聪明的童鞋可能已经看出,我在定义各种 struct 的时候,基本没有添加什么状态量,因为在并发场景下,对状态量的把控不好会导致一些很严重的问题,读者可以自由发挥设计功底,使用 atomic 或 go 1.9 提供的 sync.Map 等无锁操作来解决这些问题。
 
结束语
 
一溜写下来,看过抓包篇的童鞋可能会说,mmp 你根本就没讲如何实现自动主备切换的逻辑。我表示确实是立了个大 flag (老脸一红...
 
但我要真的一字一句写下来,恐怕很多人看都不想看,文章篇幅就是要简短明了,才有看下去的勇气。当然你真想知道细节,可以私信我,我一定知而不答(233。

如需转载,请注明文章出处和来源网址:http://www.divcss5.com/html/h63558.shtml