Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add variables and comments #938

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,8 @@ func (cn *Peer) request(r RequestIndex) (more bool, err error) {
return cn.peerImpl._request(ppReq), nil
}

var peerUpdateRequestsPeerCancelReason = "Peer.cancel"

func (me *Peer) cancel(r RequestIndex) {
if !me.deleteRequest(r) {
panic("request not existing should have been guarded")
Expand All @@ -480,7 +482,7 @@ func (me *Peer) cancel(r RequestIndex) {
}
me.decPeakRequests()
if me.isLowOnRequests() {
me.updateRequests("Peer.cancel")
me.updateRequests(peerUpdateRequestsPeerCancelReason)
}
}

Expand Down Expand Up @@ -566,6 +568,8 @@ func runSafeExtraneous(f func()) {
}
}

var peerUpdateRequestsRemoteRejectReason = "Peer.remoteRejectedRequest"

// Returns true if it was valid to reject the request.
func (c *Peer) remoteRejectedRequest(r RequestIndex) bool {
if c.deleteRequest(r) {
Expand All @@ -574,7 +578,7 @@ func (c *Peer) remoteRejectedRequest(r RequestIndex) bool {
return false
}
if c.isLowOnRequests() {
c.updateRequests("Peer.remoteRejectedRequest")
c.updateRequests(peerUpdateRequestsRemoteRejectReason)
}
c.decExpectedChunkReceive(r)
return true
Expand Down
11 changes: 9 additions & 2 deletions requesting.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,13 +312,20 @@ func (p *Peer) applyRequestState(next desiredRequestState) {
panic("changed")
}

if p.needRequestUpdate == "Peer.remoteRejectedRequest" {
// don't add requests on reciept of a reject - because this causes request back
// to potentially permanently unresponive peers - which just adds network noise. If
// the peer can handle more requests it will send an "unchoked" message - which
// will cause it to get added back to the request queue
if p.needRequestUpdate == peerUpdateRequestsRemoteRejectReason {
continue
}

existing := t.requestingPeer(req)
if existing != nil && existing != p {
if p.needRequestUpdate == "Peer.cancel" {
// don't steal on cancel - because this is triggered by t.cancelRequest below
// which means that the cancelled can immediately try to steal back a request
// it has lost which can lead to circular cancel/add processing
if p.needRequestUpdate == peerUpdateRequestsPeerCancelReason {
continue
}

Expand Down
41 changes: 36 additions & 5 deletions webseed-peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type webseedPeer struct {
client webseed.Client
activeRequests map[Request]webseed.Request
requesterCond sync.Cond
updateRequestor *time.Timer
lastUnhandledErr time.Time
}

Expand Down Expand Up @@ -72,7 +73,6 @@ func (ws *webseedPeer) intoSpec(r Request) webseed.RequestSpec {
}

func (ws *webseedPeer) _request(r Request) bool {
ws.requesterCond.Signal()
return true
}

Expand All @@ -91,15 +91,17 @@ func (ws *webseedPeer) doRequest(r Request) error {
func (ws *webseedPeer) requester(i int) {
ws.requesterCond.L.Lock()
defer ws.requesterCond.L.Unlock()
start:

for !ws.peer.closed.IsSet() {
// Restart is set if we don't need to wait for the requestCond before trying again.
restart := false

ws.peer.requestState.Requests.Iterate(func(x RequestIndex) bool {
r := ws.peer.t.requestIndexToRequest(x)
if _, ok := ws.activeRequests[r]; ok {
return true
}

err := ws.doRequest(r)
ws.requesterCond.L.Unlock()
if err != nil && !errors.Is(err, context.Canceled) {
Expand All @@ -117,10 +119,38 @@ start:
ws.requesterCond.L.Lock()
return false
})
if restart {
goto start

if !restart {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something here doesn't feel right. I don't think this part of the code should be determining if updating is appropriate. Sorry to block on this so long, but a lot of the other changes seem worthwhile so I'm trying to minimize this change to a minimum so it can make sense in isolation.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mh0lt let's break it to 2 PR's?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you rebase you should find less changes are needed too. Thanks for your efforts.

if !ws.peer.t.dataDownloadDisallowed.Bool() && ws.peer.isLowOnRequests() && len(ws.peer.getDesiredRequestState().Requests.requestIndexes) > 0 {
if ws.updateRequestor == nil {
ws.updateRequestor = time.AfterFunc(updateRequestsTimerDuration, func() { requestUpdate(ws) })
}
}

ws.requesterCond.Wait()

if ws.updateRequestor != nil {
ws.updateRequestor.Stop()
ws.updateRequestor = nil
}
}
}
}

func requestUpdate(ws *webseedPeer) {
if ws != nil {
if !ws.peer.closed.IsSet() {
if len(ws.peer.getDesiredRequestState().Requests.requestIndexes) > 0 {
if ws.peer.isLowOnRequests() {
if time.Since(ws.peer.lastRequestUpdate) > updateRequestsTimerDuration {
ws.peer.updateRequests(peerUpdateRequestsTimerReason)
return
}
}

ws.requesterCond.Signal()
}
}
ws.requesterCond.Wait()
}
}

Expand All @@ -142,6 +172,7 @@ func (ws *webseedPeer) handleUpdateRequests() {
ws.peer.t.cl.lock()
defer ws.peer.t.cl.unlock()
ws.peer.maybeUpdateActualRequestState()
ws.requesterCond.Signal()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like it's very likely appropriate.

}()
}

Expand Down
Loading