Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 164 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package easytcp

import (
"errors"
"fmt"
"net"
"time"
)

type Client struct {
Conn net.Conn

// Packer is the message packer, will be passed to session.
Packer Packer

// Codec is the message codec, will be passed to session.
Codec Codec

// OnSessionCreate is an event hook, will be invoked when session's created.
OnSessionCreate func(sess Session)

// OnSessionClose is an event hook, will be invoked when session's closed.
OnSessionClose func(sess Session)

socketReadBufferSize int
socketWriteBufferSize int
socketSendDelay bool
readTimeout time.Duration
writeTimeout time.Duration
respQueueSize int
router *Router
printRoutes bool
stopped chan struct{}
writeAttemptTimes int
asyncRouter bool

Sess *session
notifyChan chan interface{}
}

type ClientOption struct {
ServerOption
NotifyChan chan interface{}
}

func NewClient(opt *ClientOption) *Client {
if opt.Packer == nil {
opt.Packer = NewDefaultPacker()
}
if opt.RespQueueSize < 0 {
opt.RespQueueSize = DefaultRespQueueSize
}
if opt.WriteAttemptTimes <= 0 {
opt.WriteAttemptTimes = DefaultWriteAttemptTimes
}
return &Client{
socketReadBufferSize: opt.SocketReadBufferSize,
socketWriteBufferSize: opt.SocketWriteBufferSize,
socketSendDelay: opt.SocketSendDelay,
respQueueSize: opt.RespQueueSize,
readTimeout: opt.ReadTimeout,
writeTimeout: opt.WriteTimeout,
Packer: opt.Packer,
Codec: opt.Codec,
printRoutes: !opt.DoNotPrintRoutes,
router: newRouter(),
stopped: make(chan struct{}),
writeAttemptTimes: opt.WriteAttemptTimes,
asyncRouter: opt.AsyncRouter,
notifyChan: opt.NotifyChan,
}
}

func (c *Client) Run(addr string) error {
dial, err := net.Dial("tcp", addr)
if err != nil {
return err
}
c.Conn = dial
go c.handleConn(c.Conn)
return nil
}

func (c *Client) handleConn(conn net.Conn) {
defer conn.Close() // nolint

sess := newSession(conn, &sessionOption{
Packer: c.Packer,
Codec: c.Codec,
respQueueSize: c.respQueueSize,
asyncRouter: c.asyncRouter,
notifyChan: c.notifyChan,
})
if c.OnSessionCreate != nil {
c.OnSessionCreate(sess)
}
close(sess.afterCreateHook)
c.Sess = sess

go sess.readInbound(c.router, c.readTimeout) // start reading message packet from connection.
go sess.writeOutbound(c.writeTimeout, c.writeAttemptTimes) // start writing message packet to connection.

select {
case <-sess.closed: // wait for session finished.
case <-c.stopped: // or the server is stopped.
}

if c.OnSessionClose != nil {
c.OnSessionClose(sess)
}
close(sess.afterCloseHook)
}

// Stop stops server. Closing Listener and all connections.
func (c *Client) Stop() error {
close(c.stopped)
return c.Conn.Close()
}

// AddRoute registers message handler and middlewares to the router.
func (c *Client) AddRoute(msgID interface{}, handler HandlerFunc, middlewares ...MiddlewareFunc) {
c.router.register(msgID, handler, middlewares...)
}

// Use registers global middlewares to the router.
func (c *Client) Use(middlewares ...MiddlewareFunc) {
c.router.registerMiddleware(middlewares...)
}

// NotFoundHandler sets the not-found handler for router.
func (c *Client) NotFoundHandler(handler HandlerFunc) {
c.router.setNotFoundHandler(handler)
}

func (c *Client) IsStopped() bool {
select {
case <-c.stopped:
return true
default:
return false
}
}

func (c *Client) Send(id, v interface{}) error {
if c.Codec == nil {
return errors.New("codec is nil")
}
data, err := c.Codec.Encode(v)
if err != nil {
return fmt.Errorf("encode message failed: %v", err)
}
return c.SendMsg(NewMessage(id, data))
}

func (c *Client) SendMsg(msg *Message) error {
ctx := c.Sess.AllocateContext().SetResponseMessage(msg)
if c.IsStopped() {
return errors.New("client is stopped")
}
if ok := c.Sess.Send(ctx); !ok {
return errors.New("send message failed")
}
return nil
}
88 changes: 88 additions & 0 deletions internal/examples/tcp/custom_packet/client_router/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package main

import (
"github.com/DarthPestilane/easytcp"
"github.com/DarthPestilane/easytcp/internal/examples/fixture"
"github.com/DarthPestilane/easytcp/internal/examples/tcp/custom_packet/common"
"github.com/sirupsen/logrus"
"time"
)

var log *logrus.Logger

func init() {
log = logrus.New()
log.SetLevel(logrus.DebugLevel)
}

func main() {
notify := make(chan interface{})
client := easytcp.NewClient(&easytcp.ClientOption{
// specify codec and packer
ServerOption: easytcp.ServerOption{
Codec: &easytcp.JsonCodec{},
Packer: &common.CustomPacker{},
},
NotifyChan: notify,
})
err := client.Run(fixture.ServerAddr)
if err != nil {
panic(err)
}
client.AddRoute("json01-resp", respHandler, logMiddleware)
log = logrus.New()
go func() {
// write loop
i := 0
for {
time.Sleep(time.Second)
req := &common.Json01Req{
Key1: "hello",
Key2: i,
Key3: true,
}
err = client.Send("json01-req", req)
if err != nil {
panic(err)
}
i++
}
}()
i := 0
for true {
if client.IsStopped() {
log.Infof("stop")
break
}
select {
case v := <-notify:
if i == 10 {
_ = client.Stop()
}
i++
log.Infof("recv notify %v", v)
}
}
}

func respHandler(ctx easytcp.Context) {
var data common.Json01Resp
_ = ctx.Bind(&data)
ctx.Notify(data.Data)
}

func logMiddleware(next easytcp.HandlerFunc) easytcp.HandlerFunc {
return func(ctx easytcp.Context) {
fullSize := ctx.Request().MustGet("fullSize")
req := ctx.Request()
log.Infof("recv request | fullSize:(%d) id:(%v) dataSize(%d) data: %s", fullSize, req.ID(), len(req.Data()), req.Data())

defer func() {
resp := ctx.Response()
if resp != nil {
log.Infof("send response | dataSize:(%d) id:(%v) data: %s", len(resp.Data()), resp.ID(), resp.Data())
}
}()
next(ctx)
}
}
10 changes: 10 additions & 0 deletions router_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ type Context interface {

// Copy returns a copy of Context.
Copy() Context

// Notify sends a notification to others.
Notify(v interface{})
}

// routeContext implements the Context interface.
Expand Down Expand Up @@ -255,3 +258,10 @@ func (c *routeContext) reset() {
c.respMsg = nil
c.storage = nil
}

func (c *routeContext) Notify(v interface{}) {
if c.session == nil {
return
}
c.session.Notify(v)
}
38 changes: 27 additions & 11 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,24 @@ type Session interface {

// AfterCloseHook blocks until session's on-close hook triggered.
AfterCloseHook() <-chan struct{}

// Notify sends a notification to others.
Notify(v interface{})
}

type session struct {
id interface{} // session's ID.
conn net.Conn // tcp connection
closed chan struct{} // to close()
afterCreateHook chan struct{} // to close after session's on-create hook triggered
afterCloseHook chan struct{} // to close after session's on-close hook triggered
closeOnce sync.Once // ensure one session only close once
respQueue chan Context // response queue channel, pushed in Send() and popped in writeOutbound()
packer Packer // to pack and unpack message
codec Codec // encode/decode message data
ctxPool sync.Pool // router context pool
asyncRouter bool // calls router HandlerFunc in a goroutine if false
id interface{} // session's ID.
conn net.Conn // tcp connection
closed chan struct{} // to close()
afterCreateHook chan struct{} // to close after session's on-create hook triggered
afterCloseHook chan struct{} // to close after session's on-close hook triggered
closeOnce sync.Once // ensure one session only close once
respQueue chan Context // response queue channel, pushed in Send() and popped in writeOutbound()
packer Packer // to pack and unpack message
codec Codec // encode/decode message data
ctxPool sync.Pool // router context pool
asyncRouter bool // calls router HandlerFunc in a goroutine if false
notifyChan chan interface{} // to others
}

// sessionOption is the extra options for session.
Expand All @@ -59,6 +63,7 @@ type sessionOption struct {
Codec Codec
respQueueSize int
asyncRouter bool
notifyChan chan interface{}
}

// newSession creates a new session.
Expand All @@ -77,6 +82,7 @@ func newSession(conn net.Conn, opt *sessionOption) *session {
codec: opt.Codec,
ctxPool: sync.Pool{New: func() interface{} { return NewContext() }},
asyncRouter: opt.asyncRouter,
notifyChan: opt.notifyChan,
}
}

Expand Down Expand Up @@ -181,6 +187,9 @@ func (s *session) readInbound(router *Router, timeout time.Duration) {
func (s *session) handleReq(router *Router, reqMsg *Message) {
ctx := s.AllocateContext().SetRequestMessage(reqMsg)
router.handleRequest(ctx)
if ctx.Response() == nil {
return
}
s.Send(ctx)
}

Expand Down Expand Up @@ -251,3 +260,10 @@ func (s *session) packResponse(ctx Context) ([]byte, error) {
}
return s.packer.Pack(ctx.Response())
}

func (s *session) Notify(v interface{}) {
if s.notifyChan == nil {
return
}
s.notifyChan <- v
}