Susumu Yata
null+****@clear*****
Thu Jul 27 11:31:51 JST 2017
Susumu Yata 2017-07-27 11:31:51 +0900 (Thu, 27 Jul 2017) New Revision: 9551bfdc3d82048f71b1a94b3521bf8d261ec76a https://github.com/groonga/grnci/commit/9551bfdc3d82048f71b1a94b3521bf8d261ec76a Message: Rename GQTPConn to gqtpConn and add BufferSize to GQTPClientOptions. Modified files: v2/gqtp.go Modified: v2/gqtp.go (+60 -88) =================================================================== --- v2/gqtp.go 2017-07-27 11:12:18 +0900 (f967e72) +++ v2/gqtp.go 2017-07-27 11:31:51 +0900 (d375e2c) @@ -48,7 +48,7 @@ type gqtpHeader struct { // gqtpResponse is a GQTP response. type gqtpResponse struct { - conn *GQTPConn // Connection + conn *gqtpConn // Connection head gqtpHeader // Current header err error // Error response left int // Number of bytes left in the current chunk @@ -56,7 +56,7 @@ type gqtpResponse struct { } // newGQTPResponse returns a new GQTP response. -func newGQTPResponse(conn *GQTPConn, head gqtpHeader, name string) *gqtpResponse { +func newGQTPResponse(conn *gqtpConn, head gqtpHeader, name string) *gqtpResponse { resp := &gqtpResponse{ conn: conn, head: head, @@ -122,7 +122,7 @@ func (r *gqtpResponse) Close() error { return nil } var err error - if _, e := io.CopyBuffer(ioutil.Discard, r, r.conn.getBuffer()); e != nil { + if _, e := io.CopyBuffer(ioutil.Discard, r, r.conn.buf); e != nil { r.conn.broken = true err = NewError(NetworkError, map[string]interface{}{ "method": "io.CopyBuffer", @@ -156,19 +156,30 @@ func (r *gqtpResponse) Err() error { return r.err } -// GQTPConn is a thread-unsafe GQTP client. -type GQTPConn struct { - client *GQTPClient // Owner client if available - conn net.Conn // Connection to a GQTP server - buf []byte // Copy buffer - bufSize int // Copy buffer size - ready bool // Whether or not the connection is ready to send a command - broken bool // Whether or not the connection is broken +// gqtpConnOptions is options of gqtpConn. +type gqtpConnOptions struct { + BufferSize int } -// DialGQTP returns a new GQTPConn connected to a GQTP server. +// newGQTPConnOptions returns the default gqtpConnOptions. +func newGQTPConnOptions() *gqtpConnOptions { + return &gqtpConnOptions{ + BufferSize: gqtpDefaultBufferSize, + } +} + +// gqtpConn is a thread-unsafe GQTP client. +type gqtpConn struct { + client *GQTPClient // Owner client if available + conn net.Conn // Connection to a GQTP server + buf []byte // Copy buffer + ready bool // Whether or not the connection is ready to send a command + broken bool // Whether or not the connection is broken +} + +// dialGQTP returns a new gqtpConn connected to a GQTP server. // The expected address format is [scheme://][host][:port]. -func DialGQTP(addr string) (*GQTPConn, error) { +func dialGQTP(addr string, options *gqtpConnOptions) (*gqtpConn, error) { a, err := ParseGQTPAddress(addr) if err != nil { return nil, err @@ -181,20 +192,23 @@ func DialGQTP(addr string) (*GQTPConn, error) { "error": err.Error(), }) } - return NewGQTPConn(conn), nil + return newGQTPConn(conn, options), nil } -// NewGQTPConn returns a new GQTPConn using an existing connection. -func NewGQTPConn(conn net.Conn) *GQTPConn { - return &GQTPConn{ - conn: conn, - bufSize: gqtpDefaultBufferSize, - ready: true, +// newGQTPConn returns a new gqtpConn using an existing connection. +func newGQTPConn(conn net.Conn, options *gqtpConnOptions) *gqtpConn { + if options == nil { + options = newGQTPConnOptions() + } + return &gqtpConn{ + conn: conn, + buf: make([]byte, options.BufferSize), + ready: true, } } // Close closes the connection. -func (c *GQTPConn) Close() error { +func (c *gqtpConn) Close() error { if err := c.conn.Close(); err != nil { return NewError(NetworkError, map[string]interface{}{ "method": "net.Conn.Close", @@ -204,24 +218,8 @@ func (c *GQTPConn) Close() error { return nil } -// SetBufferSize updates the size of the copy buffer. -func (c *GQTPConn) SetBufferSize(n int) { - if n <= 0 || n > gqtpMaxChunkSize { - n = gqtpDefaultBufferSize - } - c.bufSize = n -} - -// getBuffer returns the copy buffer. -func (c *GQTPConn) getBuffer() []byte { - if len(c.buf) != c.bufSize { - c.buf = make([]byte, c.bufSize) - } - return c.buf -} - // sendHeader sends a GQTP header. -func (c *GQTPConn) sendHeader(flags byte, size int) error { +func (c *gqtpConn) sendHeader(flags byte, size int) error { head := gqtpHeader{ Protocol: gqtpProtocol, Flags: flags, @@ -237,7 +235,7 @@ func (c *GQTPConn) sendHeader(flags byte, size int) error { } // sendChunkBytes sends data with flags. -func (c *GQTPConn) sendChunkBytes(data []byte, flags byte) error { +func (c *gqtpConn) sendChunkBytes(data []byte, flags byte) error { if err := c.sendHeader(flags, len(data)); err != nil { return err } @@ -251,7 +249,7 @@ func (c *GQTPConn) sendChunkBytes(data []byte, flags byte) error { } // sendChunkString sends data with flags. -func (c *GQTPConn) sendChunkString(data string, flags byte) error { +func (c *gqtpConn) sendChunkString(data string, flags byte) error { if err := c.sendHeader(flags, len(data)); err != nil { return err } @@ -265,7 +263,7 @@ func (c *GQTPConn) sendChunkString(data string, flags byte) error { } // recvHeader receives a GQTP header. -func (c *GQTPConn) recvHeader() (gqtpHeader, error) { +func (c *gqtpConn) recvHeader() (gqtpHeader, error) { var head gqtpHeader if err := binary.Read(c.conn, binary.BigEndian, &head); err != nil { return head, NewError(NetworkError, map[string]interface{}{ @@ -277,7 +275,7 @@ func (c *GQTPConn) recvHeader() (gqtpHeader, error) { } // execNoBody sends a command without body and receives a response. -func (c *GQTPConn) execNoBody(cmd string) (Response, error) { +func (c *gqtpConn) execNoBody(cmd string) (Response, error) { name := strings.TrimLeft(cmd, " \t\r\n") if idx := strings.IndexAny(name, " \t\r\n"); idx != -1 { name = name[:idx] @@ -293,7 +291,7 @@ func (c *GQTPConn) execNoBody(cmd string) (Response, error) { } // execBody sends a command with body and receives a response. -func (c *GQTPConn) execBody(cmd string, body io.Reader) (Response, error) { +func (c *gqtpConn) execBody(cmd string, body io.Reader) (Response, error) { name := strings.TrimLeft(cmd, " \t\r\n") if idx := strings.IndexAny(name, " \t\r\n"); idx != -1 { name = name[:idx] @@ -309,12 +307,11 @@ func (c *GQTPConn) execBody(cmd string, body io.Reader) (Response, error) { return newGQTPResponse(c, head, name), nil } n := 0 - buf := c.getBuffer() for { - m, err := body.Read(buf[n:]) + m, err := body.Read(c.buf[n:]) n += m if err != nil { - if err := c.sendChunkBytes(buf[:n], gqtpFlagTail); err != nil { + if err := c.sendChunkBytes(c.buf[:n], gqtpFlagTail); err != nil { return nil, err } head, err = c.recvHeader() @@ -323,8 +320,8 @@ func (c *GQTPConn) execBody(cmd string, body io.Reader) (Response, error) { } return newGQTPResponse(c, head, name), nil } - if n == len(buf) { - if err := c.sendChunkBytes(buf, 0); err != nil { + if n == len(c.buf) { + if err := c.sendChunkBytes(c.buf, 0); err != nil { return nil, err } head, err = c.recvHeader() @@ -339,8 +336,8 @@ func (c *GQTPConn) execBody(cmd string, body io.Reader) (Response, error) { } } -// exec sends a command and receives a response. -func (c *GQTPConn) exec(cmd string, body io.Reader) (Response, error) { +// Exec sends a command and receives a response. +func (c *gqtpConn) Exec(cmd string, body io.Reader) (Response, error) { if c.broken { return nil, NewError(OperationError, map[string]interface{}{ "error": "The connection is broken.", @@ -364,38 +361,9 @@ func (c *GQTPConn) exec(cmd string, body io.Reader) (Response, error) { return c.execBody(cmd, body) } -// Exec assembles cmd and body into a Command and calls Query. -// The GQTPConn must not be used until the response is closed. -func (c *GQTPConn) Exec(cmd string, body io.Reader) (Response, error) { - command, err := ParseCommand(cmd) - if err != nil { - return nil, err - } - command.SetBody(body) - return c.Query(command) -} - -// Invoke assembles name, params and body into a command and calls Query. -func (c *GQTPConn) Invoke(name string, params map[string]interface{}, body io.Reader) (Response, error) { - cmd, err := NewCommand(name, params) - if err != nil { - return nil, err - } - cmd.SetBody(body) - return c.Query(cmd) -} - -// Query sends a command and receives a response. -// It is the caller's responsibility to close the response. -func (c *GQTPConn) Query(cmd *Command) (Response, error) { - if err := cmd.Check(); err != nil { - return nil, err - } - return c.exec(cmd.String(), cmd.Body()) -} - // GQTPClientOptions is options of GQTPClient. type GQTPClientOptions struct { + BufferSize int // Buffer size MaxIdleConns int // Maximum number of idle connections } @@ -408,8 +376,9 @@ func NewGQTPClientOptions() *GQTPClientOptions { // GQTPClient is a thread-safe GQTP client. type GQTPClient struct { - addr string - idleConns chan *GQTPConn + addr string // Server address + connOptions *gqtpConnOptions // Options for connections + idleConns chan *gqtpConn // Idle connections } // NewGQTPClient returns a new GQTPClient connected to a GQTP server. @@ -418,13 +387,16 @@ func NewGQTPClient(addr string, options *GQTPClientOptions) (*GQTPClient, error) if options == nil { options = NewGQTPClientOptions() } - conn, err := DialGQTP(addr) + connOptions := newGQTPConnOptions() + connOptions.BufferSize = options.BufferSize + conn, err := dialGQTP(addr, connOptions) if err != nil { return nil, err } c := &GQTPClient{ - addr: addr, - idleConns: make(chan *GQTPConn, options.MaxIdleConns), + addr: addr, + connOptions: connOptions, + idleConns: make(chan *gqtpConn, options.MaxIdleConns), } c.idleConns <- conn conn.client = c @@ -450,17 +422,17 @@ func (c *GQTPClient) Close() error { // exec sends a request and receives a response. func (c *GQTPClient) exec(cmd string, body io.Reader) (Response, error) { - var conn *GQTPConn + var conn *gqtpConn var err error select { case conn = <-c.idleConns: default: - conn, err = DialGQTP(c.addr) + conn, err = dialGQTP(c.addr, c.connOptions) if err != nil { return nil, err } } - resp, err := conn.exec(cmd, body) + resp, err := conn.Exec(cmd, body) if err != nil { conn.Close() return nil, err -------------- next part -------------- HTML����������������������������... Download