Skip to content

Commit

Permalink
Remove log package prefix
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangxu19830126 committed Nov 22, 2019
1 parent 71b39b7 commit 5fa142e
Show file tree
Hide file tree
Showing 16 changed files with 64 additions and 64 deletions.
2 changes: 1 addition & 1 deletion log.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,5 @@ func init() {
// SetLogger set the log for prophet
func SetLogger(l Logger) {
log = l
log.Infof("prophet: logger set")
log.Infof("logger set")
}
6 changes: 3 additions & 3 deletions prophet_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func newCoordinator(cfg *Cfg, runner *Runner, rt *Runtime) *Coordinator {

func (c *Coordinator) start() {
if c.running {
log.Warningf("prophet: coordinator is already started.")
log.Warningf("coordinator is already started.")
return
}

Expand Down Expand Up @@ -130,7 +130,7 @@ func (c *Coordinator) runScheduler(ctx context.Context, s *scheduleController) {
for {
select {
case <-ctx.Done():
log.Infof("prophet: scheduler %s stopped", s.Name())
log.Infof("scheduler %s stopped", s.Name())
return
case <-timer.C:
timer.Reset(s.Interval())
Expand Down Expand Up @@ -158,7 +158,7 @@ func (c *Coordinator) runScheduler(ctx context.Context, s *scheduleController) {
// dispatch is used for coordinator resource,
// it will coordinator when the heartbeat arrives
func (c *Coordinator) dispatch(target *ResourceRuntime) *resourceHeartbeatRsp {
log.Debugf("prophet: dispatch resource %d, %+v",
log.Debugf("dispatch resource %d, %+v",
target.meta.ID(),
target.meta)

Expand Down
34 changes: 17 additions & 17 deletions prophet_etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,43 +102,43 @@ func initWithEmbedEtcd(ecfg *EmbeddedEtcdCfg, opts *options) {

err := prepareJoin(ecfg)
if err != nil {
log.Fatalf("prophet: prepare join embed etcd failed with %+v",
log.Fatalf("prepare join embed etcd failed with %+v",
err)
}

log.Info("prophet: start embed etcd")
log.Info("start embed etcd")
cfg, err := ecfg.getEmbedEtcdConfig()
if err != nil {
log.Fatalf("prophet: start embed etcd failed with %+v",
log.Fatalf("start embed etcd failed with %+v",
err)
}

etcd, err := embed.StartEtcd(cfg)
if err != nil {
log.Fatalf("prophet: start embed etcd failed with %+v",
log.Fatalf("start embed etcd failed with %+v",
err)
}

select {
case <-etcd.Server.ReadyNotify():
log.Info("prophet: embed etcd is ready")
log.Info("embed etcd is ready")
doAfterEmbedEtcdServerReady(etcd, cfg, ecfg, opts)
case <-time.After(time.Minute * 5):
log.Fatalf("prophet: start embed etcd timeout")
log.Fatalf("start embed etcd timeout")
}
}

func doAfterEmbedEtcdServerReady(etcd *embed.Etcd, cfg *embed.Config, ecfg *EmbeddedEtcdCfg, opts *options) {
checkEtcdCluster(etcd, ecfg)

id := uint64(etcd.Server.ID())
log.Infof("prophet: embed server ids, current %X, leader %X",
log.Infof("embed server ids, current %X, leader %X",
id,
etcd.Server.Leader())

client, err := initEtcdClient(ecfg)
if err != nil {
log.Fatalf("prophet: init embed etcd client failure, errors:\n %+v",
log.Fatalf("init embed etcd client failure, errors:\n %+v",
err)
}

Expand All @@ -149,7 +149,7 @@ func doAfterEmbedEtcdServerReady(etcd *embed.Etcd, cfg *embed.Config, ecfg *Embe
// all etcds in initial_cluster at same time, so here just log
// an error.
// Note that pd can not work correctly if we don't start all etcds.
log.Fatalf("prophet: etcd start failure, errors:\n%+v", err)
log.Fatalf("etcd start failure, errors:\n%+v", err)
}

opts.client = client
Expand All @@ -158,15 +158,15 @@ func doAfterEmbedEtcdServerReady(etcd *embed.Etcd, cfg *embed.Config, ecfg *Embe
for {
members, err := getCurrentClusterMembers(client)
if err != nil {
log.Fatalf("prophet: get current members of etcd cluster failed with %+v", err)
log.Fatalf("get current members of etcd cluster failed with %+v", err)
}

var eps []string
for _, m := range members.Members {
eps = append(eps, m.GetClientURLs()...)
}
client.SetEndpoints(eps...)
log.Debugf("prophet: etcd client endpoints set to %+v", eps)
log.Debugf("etcd client endpoints set to %+v", eps)

time.Sleep(time.Second * 10)
}
Expand All @@ -179,7 +179,7 @@ func initEtcdClient(ecfg *EmbeddedEtcdCfg) (*clientv3.Client, error) {
clientAddrs = strings.Split(ecfg.Join, ",")
}

log.Infof("prophet: create etcd v3 client with endpoints <%v>", clientAddrs)
log.Infof("create etcd v3 client with endpoints <%v>", clientAddrs)

client, err := clientv3.New(clientv3.Config{
Endpoints: clientAddrs,
Expand All @@ -195,14 +195,14 @@ func initEtcdClient(ecfg *EmbeddedEtcdCfg) (*clientv3.Client, error) {
func updateAdvertisePeerUrls(id uint64, client *clientv3.Client, cfg *EmbeddedEtcdCfg) {
members, err := getCurrentClusterMembers(client)
if err != nil {
log.Fatalf("prophet: update current members of etcd cluster")
log.Fatalf("update current members of etcd cluster")
}

for _, m := range members.Members {
if id == m.ID {
etcdPeerURLs := strings.Join(m.PeerURLs, ",")
if cfg.URLsAdvertisePeer != etcdPeerURLs {
log.Infof("prophet: update advertise peer urls from <%s> to <%s>",
log.Infof("update advertise peer urls from <%s> to <%s>",
cfg.URLsAdvertisePeer,
etcdPeerURLs)
cfg.URLsAdvertisePeer = etcdPeerURLs
Expand Down Expand Up @@ -237,7 +237,7 @@ func endpointStatus(cfg *embed.Config, c *clientv3.Client) (*clientv3.StatusResp
cancel()

if cost := time.Since(start); cost > DefaultSlowRequestTime {
log.Warningf("prophet: check etcd status failed, endpoint=<%s> resp=<%+v> cost<%s> errors:\n %+v",
log.Warningf("check etcd status failed, endpoint=<%s> resp=<%+v> cost<%s> errors:\n %+v",
endpoint,
resp,
cost,
Expand All @@ -250,13 +250,13 @@ func endpointStatus(cfg *embed.Config, c *clientv3.Client) (*clientv3.StatusResp
func checkEtcdCluster(etcd *embed.Etcd, cfg *EmbeddedEtcdCfg) {
um, err := types.NewURLsMap(cfg.InitialCluster)
if err != nil {
log.Fatalf("prophet: check embed etcd failure, errors:\n %+v",
log.Fatalf("check embed etcd failure, errors:\n %+v",
err)
}

err = checkClusterID(etcd.Server.Cluster().ID(), um)
if err != nil {
log.Fatalf("prophet: check embed etcd failure, errors:\n %+v",
log.Fatalf("check embed etcd failure, errors:\n %+v",
err)
}
}
Expand Down
14 changes: 7 additions & 7 deletions prophet_heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (p *defaultProphet) doResourceHeartbeatLoop() {
if err != nil {
conn.Close()
conn = nil
log.Errorf("prophet: send resource heartbeat failed, errors: %+v", err)
log.Errorf("send resource heartbeat failed, errors: %+v", err)
break
}

Expand All @@ -72,16 +72,16 @@ func (p *defaultProphet) doResourceHeartbeatLoop() {
if err != nil {
conn.Close()
conn = nil
log.Errorf("prophet: read heartbeat rsp failed, errors: %+v", err)
log.Errorf("read heartbeat rsp failed, errors: %+v", err)
break
}

log.Debugf("prophet: read rpc response (%T)%+v", msg, msg)
log.Debugf("read rpc response (%T)%+v", msg, msg)

if rsp, ok := msg.(*errorRsp); ok {
conn.Close()
conn = nil
log.Infof("prophet: read heartbeat rsp with error %s", rsp.Err)
log.Infof("read heartbeat rsp with error %s", rsp.Err)
break
} else if rsp, ok := msg.(*resourceHeartbeatRsp); ok {
if rsp.NewLeader != nil {
Expand Down Expand Up @@ -124,7 +124,7 @@ func (p *defaultProphet) startContainerHeartbeatLoop() {
if err != nil {
conn.Close()
conn = nil
log.Errorf("prophet: send container heartbeat failed, errors: %+v", err)
log.Errorf("send container heartbeat failed, errors: %+v", err)
continue
}

Expand All @@ -133,14 +133,14 @@ func (p *defaultProphet) startContainerHeartbeatLoop() {
if err != nil {
conn.Close()
conn = nil
log.Errorf("prophet: read container rsp failed, errors: %+v", err)
log.Errorf("read container rsp failed, errors: %+v", err)
continue
}

if rsp, ok := msg.(*errorRsp); ok {
conn.Close()
conn = nil
log.Infof("prophet: read container rsp with error %s", rsp.Err)
log.Infof("read container rsp with error %s", rsp.Err)
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions prophet_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,14 @@ func listEtcdMembers(client *clientv3.Client) (*clientv3.MemberListResponse, err
func isDataExist(d string) bool {
dir, err := os.Open(d)
if err != nil {
log.Error("open directory %s failed with %+v", d, err)
log.Errorf("open directory %s failed with %+v", d, err)
return false
}
defer dir.Close()

names, err := dir.Readdirnames(-1)
if err != nil {
log.Error("list directory %s failed with %+v", d, err)
log.Errorf("list directory %s failed with %+v", d, err)
return false
}
return len(names) != 0
Expand Down
8 changes: 4 additions & 4 deletions prophet_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func mustUnmarshal(data []byte) *Node {
value := &Node{}
err := json.Unmarshal(data, value)
if err != nil {
log.Fatalf("prophet: unmarshal leader node failed with %+v", err)
log.Fatalf("unmarshal leader node failed with %+v", err)
}

return value
Expand All @@ -48,7 +48,7 @@ func (p *defaultProphet) startLeaderLoop() {
}

func (p *defaultProphet) enableLeader() {
log.Infof("prophet: ********become to leader now********")
log.Infof("********become to leader now********")
p.leader = p.node

p.rt = newRuntime(p)
Expand All @@ -69,11 +69,11 @@ func (p *defaultProphet) enableLeader() {

func (p *defaultProphet) disableLeader() {
atomic.StoreInt64(&p.leaderFlag, 0)
log.Infof("prophet: ********become to follower now********")
log.Infof("********become to follower now********")

value, err := p.elector.CurrentLeader(math.MaxUint64)
if err != nil {
log.Fatalf("prophet: get current leader failed with %+v", err)
log.Fatalf("get current leader failed with %+v", err)
}
p.leader = nil
if len(value) > 0 {
Expand Down
4 changes: 2 additions & 2 deletions prophet_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (rpc *simpleRPC) AllocID() (uint64, error) {

if rsp, ok := value.(*allocIDRsp); ok {
if rpc.notLeaderErr(rsp.Err) {
log.Warningf("prophet: alloc id failed with not leader, retry")
log.Warningf("alloc id failed with not leader, retry")
rpc.wait(conn)
break
}
Expand Down Expand Up @@ -115,7 +115,7 @@ func (rpc *simpleRPC) AskSplit(res Resource) (uint64, []uint64, error) {

if rsp, ok := value.(*askSplitRsp); ok {
if rpc.notLeaderErr(rsp.Err) {
log.Warningf("prophet: ask split failed with not leader, retry")
log.Warningf("ask split failed with not leader, retry")
rpc.wait(conn)
break
}
Expand Down
8 changes: 4 additions & 4 deletions prophet_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,14 +327,14 @@ func (p *defaultProphet) startListen() {
go func() {
err := p.tcpL.Start(p.doConnection)
if err != nil {
log.Fatalf("prophet: rpc listen at %s failed, errors:\n%+v",
log.Fatalf("rpc listen at %s failed, errors:\n%+v",
p.node.Addr,
err)
}
}()

<-p.tcpL.Started()
log.Infof("prophet: start rpc listen at %s", p.node.Addr)
log.Infof("start rpc listen at %s", p.node.Addr)
}

func (p *defaultProphet) doConnection(conn goetty.IOSession) error {
Expand Down Expand Up @@ -383,11 +383,11 @@ func (p *defaultProphet) getLeaderClient() goetty.IOSession {

conn, err := p.createLeaderClient(addr)
if err == nil {
log.Infof("prophet: create leader connection to %s", addr)
log.Infof("create leader connection to %s", addr)
return conn
}

log.Errorf("prophet: create leader connection failed, errors: %+v", err)
log.Errorf("create leader connection failed, errors: %+v", err)
time.Sleep(time.Second)
}
}
Expand Down
6 changes: 3 additions & 3 deletions runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (rc *Runtime) load() {
rc.resources[meta.ID()] = newResourceRuntime(meta, nil)
})
if err != nil {
log.Fatalf("prophet: load resources failed, errors:%+v", err)
log.Fatalf("load resources failed, errors:%+v", err)
}

err = rc.p.store.LoadContainers(batchLimit, func(meta Container) {
Expand All @@ -48,7 +48,7 @@ func (rc *Runtime) load() {
rc.containers[meta.ID()] = newContainerRuntime(meta)
})
if err != nil {
log.Fatalf("prophet: load containers failed, errors:%+v", err)
log.Fatalf("load containers failed, errors:%+v", err)
}
}

Expand Down Expand Up @@ -163,7 +163,7 @@ func (rc *Runtime) getResourceWithoutLock(id uint64) *ResourceRuntime {
func randResource(resources map[uint64]*ResourceRuntime) *ResourceRuntime {
for _, res := range resources {
if res.leaderPeer == nil {
log.Fatalf("prophet: rand resource %d without leader", res.meta.ID())
log.Fatalf("rand resource %d without leader", res.meta.ID())
}

if len(res.downPeers) > 0 {
Expand Down
2 changes: 1 addition & 1 deletion runtime_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (rc *Runtime) handleResource(source *ResourceRuntime) error {
if current.leaderPeer == nil ||
(current.leaderPeer != nil &&
current.leaderPeer.ID != source.leaderPeer.ID) {
log.Infof("prophet: resource %d leader changed to peer %d",
log.Infof("resource %d leader changed to peer %d",
current.meta.ID(),
source.leaderPeer.ID)
rc.p.notifyEvent(newLeaderChangerEvent(source.meta.ID(), source.leaderPeer.ID))
Expand Down
2 changes: 1 addition & 1 deletion scheduler_balancer_replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (s *balanceReplicaScheduler) transferPeer(rt *Runtime, res *ResourceRuntime

id, err := checker.rt.p.store.AllocID()
if err != nil {
log.Errorf("prophet: allocate peer failure, %+v", err)
log.Errorf("allocate peer failure, %+v", err)
return nil
}
newPeer.ID = id
Expand Down
4 changes: 2 additions & 2 deletions scheduler_op_aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type aggregationOperator struct {

func newAggregationOp(cfg *Cfg, target *ResourceRuntime, ops ...Operator) Operator {
if len(ops) == 0 {
log.Fatal("prophet: create new resource aggregation operator use empty opts")
log.Fatal("create new resource aggregation operator use empty opts")
}

return &aggregationOperator{
Expand All @@ -42,7 +42,7 @@ func (op *aggregationOperator) ResourceKind() ResourceKind {

func (op *aggregationOperator) Do(target *ResourceRuntime) (*resourceHeartbeatRsp, bool) {
if time.Since(op.StartAt) > op.cfg.TimeoutWaitOperatorComplete {
log.Errorf("prophet: operator %s timeout", op)
log.Errorf("operator %s timeout", op)
return nil, true
}

Expand Down
Loading

0 comments on commit 5fa142e

Please sign in to comment.