Skip to content

Commit

Permalink
update group ci
Browse files Browse the repository at this point in the history
  • Loading branch information
fatedier committed May 23, 2018
1 parent f56b49a commit 495b577
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 21 deletions.
25 changes: 25 additions & 0 deletions server/group/group.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright 2018 fatedier, [email protected]
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package group

import (
"errors"
)

var (
ErrGroupAuthFailed = errors.New("group auth failed")
ErrGroupParamsInvalid = errors.New("group params invalid")
ErrListenerClosed = errors.New("group listener closed")
)
15 changes: 5 additions & 10 deletions server/group.go → server/group/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package server
package group

import (
"errors"
"fmt"
"net"
"sync"

gerr "github.com/fatedier/golib/errors"
)
"github.com/fatedier/frp/server/ports"

var (
ErrGroupAuthFailed = errors.New("group auth failed")
ErrGroupParamsInvalid = errors.New("group params invalid")
ErrListenerClosed = errors.New("group listener closed")
gerr "github.com/fatedier/golib/errors"
)

type TcpGroupListener struct {
Expand Down Expand Up @@ -173,11 +168,11 @@ func (tg *TcpGroup) CloseListener(ln *TcpGroupListener) {
type TcpGroupCtl struct {
groups map[string]*TcpGroup

portManager *PortManager
portManager *ports.PortManager
mu sync.Mutex
}

func NewTcpGroupCtl(portManager *PortManager) *TcpGroupCtl {
func NewTcpGroupCtl(portManager *ports.PortManager) *TcpGroupCtl {
return &TcpGroupCtl{
groups: make(map[string]*TcpGroup),
portManager: portManager,
Expand Down
2 changes: 1 addition & 1 deletion server/ports.go → server/ports/ports.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package server
package ports

import (
"errors"
Expand Down
14 changes: 8 additions & 6 deletions server/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"github.com/fatedier/frp/assets"
"github.com/fatedier/frp/g"
"github.com/fatedier/frp/models/msg"
"github.com/fatedier/frp/server/group"
"github.com/fatedier/frp/server/ports"
"github.com/fatedier/frp/utils/log"
frpNet "github.com/fatedier/frp/utils/net"
"github.com/fatedier/frp/utils/util"
Expand Down Expand Up @@ -66,13 +68,13 @@ type Service struct {
visitorManager *VisitorManager

// Manage all tcp ports
tcpPortManager *PortManager
tcpPortManager *ports.PortManager

// Manage all udp ports
udpPortManager *PortManager
udpPortManager *ports.PortManager

// Tcp Group Controller
tcpGroupCtl *TcpGroupCtl
tcpGroupCtl *group.TcpGroupCtl

// Controller for nat hole connections
natHoleController *NatHoleController
Expand All @@ -84,10 +86,10 @@ func NewService() (svr *Service, err error) {
ctlManager: NewControlManager(),
pxyManager: NewProxyManager(),
visitorManager: NewVisitorManager(),
tcpPortManager: NewPortManager("tcp", cfg.ProxyBindAddr, cfg.AllowPorts),
udpPortManager: NewPortManager("udp", cfg.ProxyBindAddr, cfg.AllowPorts),
tcpPortManager: ports.NewPortManager("tcp", cfg.ProxyBindAddr, cfg.AllowPorts),
udpPortManager: ports.NewPortManager("udp", cfg.ProxyBindAddr, cfg.AllowPorts),
}
svr.tcpGroupCtl = NewTcpGroupCtl(svr.tcpPortManager)
svr.tcpGroupCtl = group.NewTcpGroupCtl(svr.tcpPortManager)

// Init assets.
err = assets.Load(cfg.AssetsDir)
Expand Down
16 changes: 16 additions & 0 deletions tests/conf/auto_test_frpc.ini
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,22 @@ remote_port = 10901
use_encryption = true
use_compression = true

[tcp_group1]
type = tcp
local_ip = 127.0.0.1
local_port = 10701
remote_port = 10802
group = test1
group_key = 123

[tcp_group2]
type = tcp
local_ip = 127.0.0.1
local_port = 10702
remote_port = 10802
group = test1
group_key = 123

[udp_normal]
type = udp
local_ip = 127.0.0.1
Expand Down
40 changes: 40 additions & 0 deletions tests/echo_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,24 @@ func StartTcpEchoServer() {
}
}

func StartTcpEchoServer2() {
l, err := frpNet.ListenTcp("127.0.0.1", TEST_TCP2_PORT)
if err != nil {
fmt.Printf("echo server2 listen error: %v\n", err)
return
}

for {
c, err := l.Accept()
if err != nil {
fmt.Printf("echo server2 accept error: %v\n", err)
return
}

go echoWorker2(c)
}
}

func StartUdpEchoServer() {
l, err := frpNet.ListenUDP("127.0.0.1", TEST_UDP_PORT)
if err != nil {
Expand Down Expand Up @@ -85,3 +103,25 @@ func echoWorker(c net.Conn) {
c.Write(buf[:n])
}
}

func echoWorker2(c net.Conn) {
buf := make([]byte, 2048)

for {
n, err := c.Read(buf)
if err != nil {
if err == io.EOF {
c.Close()
break
} else {
fmt.Printf("echo server read error: %v\n", err)
return
}
}

var w []byte
w = append(w, buf[:n]...)
w = append(w, buf[:n]...)
c.Write(w)
}
}
33 changes: 29 additions & 4 deletions tests/func_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/fatedier/frp/client"
"github.com/fatedier/frp/server"
"github.com/fatedier/frp/server/ports"

gnet "github.com/fatedier/golib/net"
)
Expand All @@ -25,7 +25,9 @@ var (

TEST_STR = "frp is a fast reverse proxy to help you expose a local server behind a NAT or firewall to the internet."
TEST_TCP_PORT int = 10701
TEST_TCP2_PORT int = 10702
TEST_TCP_FRP_PORT int = 10801
TEST_TCP2_FRP_PORT int = 10802
TEST_TCP_EC_FRP_PORT int = 10901
TEST_TCP_ECHO_STR string = "tcp type:" + TEST_STR

Expand Down Expand Up @@ -62,6 +64,7 @@ var (

func init() {
go StartTcpEchoServer()
go StartTcpEchoServer2()
go StartUdpEchoServer()
go StartUnixDomainServer()
go StartHttpServer()
Expand Down Expand Up @@ -226,19 +229,19 @@ func TestAllowPorts(t *testing.T) {
status, err := getProxyStatus(ProxyTcpPortNotAllowed)
if assert.NoError(err) {
assert.Equal(client.ProxyStatusStartErr, status.Status)
assert.True(strings.Contains(status.Err, server.ErrPortNotAllowed.Error()))
assert.True(strings.Contains(status.Err, ports.ErrPortNotAllowed.Error()))
}

status, err = getProxyStatus(ProxyUdpPortNotAllowed)
if assert.NoError(err) {
assert.Equal(client.ProxyStatusStartErr, status.Status)
assert.True(strings.Contains(status.Err, server.ErrPortNotAllowed.Error()))
assert.True(strings.Contains(status.Err, ports.ErrPortNotAllowed.Error()))
}

status, err = getProxyStatus(ProxyTcpPortUnavailable)
if assert.NoError(err) {
assert.Equal(client.ProxyStatusStartErr, status.Status)
assert.True(strings.Contains(status.Err, server.ErrPortUnAvailable.Error()))
assert.True(strings.Contains(status.Err, ports.ErrPortUnAvailable.Error()))
}

// Port normal
Expand Down Expand Up @@ -310,3 +313,25 @@ func TestRangePortsMapping(t *testing.T) {
}
}
}

func TestGroup(t *testing.T) {
assert := assert.New(t)

var (
p1 int
p2 int
)
addr := fmt.Sprintf("127.0.0.1:%d", TEST_TCP2_FRP_PORT)

for i := 0; i < 6; i++ {
res, err := sendTcpMsg(addr, TEST_TCP_ECHO_STR)
assert.NoError(err)
switch res {
case TEST_TCP_ECHO_STR:
p1++
case TEST_TCP_ECHO_STR + TEST_TCP_ECHO_STR:
p2++
}
}
assert.True(p1 > 0 && p2 > 0, "group proxies load balancing")
}

0 comments on commit 495b577

Please sign in to comment.