Skip to content

Commit

Permalink
Rearrange code in files
Browse files Browse the repository at this point in the history
  • Loading branch information
anacrolix committed Nov 12, 2021
1 parent 7461c0f commit a320dde
Show file tree
Hide file tree
Showing 5 changed files with 460 additions and 422 deletions.
60 changes: 60 additions & 0 deletions confluence.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package main

import (
"context"
"fmt"
"io"
"net/http"
"net/http/httputil"
"net/url"
)

type confluenceHandler struct {
confluenceHost string
confluenceScheme string
confluenceTransport http.Transport
}

func (ch *confluenceHandler) data(w http.ResponseWriter, r *http.Request, ih string, path string) {
(&httputil.ReverseProxy{
Director: func(r *http.Request) {
r.URL.Host = ch.confluenceHost
r.URL.Scheme = ch.confluenceScheme
r.URL.Path = "/data"
r.URL.RawQuery = url.Values{"ih": {ih}, "path": {path}}.Encode()
},
Transport: &ch.confluenceTransport,
}).ServeHTTP(w, r)
}

func (ch *confluenceHandler) do(ctx context.Context, path string, q url.Values) (resp *http.Response, err error) {
hc := http.Client{
Transport: &ch.confluenceTransport,
}
u := url.URL{
Scheme: ch.confluenceScheme,
Host: ch.confluenceHost,
Path: path,
RawQuery: q.Encode(),
}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
if err != nil {
panic(err)
}
resp, err = hc.Do(req)
return
}

func (ch *confluenceHandler) dhtGet(ctx context.Context, target, salt string) (b []byte, err error) {
resp, err := ch.do(ctx, "/bep44", url.Values{"target": {target}, "salt": {salt}})
if err != nil {
return
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
err = fmt.Errorf("unexpected response status code: %v", resp.StatusCode)
return
}
b, err = io.ReadAll(resp.Body)
return
}
240 changes: 240 additions & 0 deletions gateway.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
package main

import (
"bytes"
"context"
"encoding/gob"
"encoding/hex"
"fmt"
"html/template"
"log"
"net/http"
"net/url"
"strings"
"time"

"github.com/anacrolix/dht/v2/bep44"
"github.com/anacrolix/dht/v2/krpc"
"github.com/anacrolix/torrent/bencode"
"github.com/anacrolix/torrent/metainfo"
"github.com/dgraph-io/ristretto"
"github.com/multiformats/go-base36"
"golang.org/x/sync/singleflight"
)

type dhtItemCacheValue struct {
updated time.Time
updating bool
payload krpc.Bep46Payload
}

type handler struct {
dirPageTemplate *template.Template
confluence confluenceHandler
dhtItemCache *ristretto.Cache
dhtItemCacheGetDedup singleflight.Group
dhtGetDedup singleflight.Group
infoCache *ristretto.Cache
}

func reverse(ss []string) {
for i := 0; i < len(ss)/2; i++ {
j := len(ss) - i - 1
ss[i], ss[j] = ss[j], ss[i]
}
}

func (h *handler) serveBtLink(w http.ResponseWriter, r *http.Request) bool {
log.Printf("considering %q for btlink handling", r.Host)
ss := strings.Split(r.Host, ".")
reverse(ss)
if ss[0] != "bt" {
return false
}
log.Printf("handling .bt request for %q", requestUrl(r))
ss = ss[1:]
if len(ss) == 0 {
http.Error(w, "not implemented yet", http.StatusNotImplemented)
return true
}
switch ss[0] {
case "ih":
ss = ss[1:]
reverse(ss)
h.serveTorrentPath(w, r, strings.Join(ss, "."))
return true
case "pk":
ss = ss[1:]
var salt, pk []byte
switch len(ss) {
case 2:
salt = []byte(ss[1])
fallthrough
case 1:
pk, _ = base36.DecodeString(ss[0])
default:
http.Error(w, "bad host", http.StatusBadRequest)
return true
}
target := bep44.MakeMutableTarget(*(*[32]byte)(pk), salt)
log.Printf("looking up infohash for %q at %x", r.Host, target)
bep46, err := h.getMutableInfohash(target, string(salt))
if err != nil {
log.Printf("error resolving %q: %v", r.Host, err)
http.Error(w, err.Error(), http.StatusNotFound)
return true
}
log.Printf("resolved %q to %x", r.Host, bep46.Ih)
h.serveTorrentPath(w, r, hex.EncodeToString(bep46.Ih[:]))
return true
}
panic("unimplemented")
}

func (h *handler) getTorrentInfo(w http.ResponseWriter, r *http.Request, ihHex string) (info metainfo.Info, ok bool) {
cacheVal, ok := h.infoCache.Get(ihHex)
if ok {
info = cacheVal.(metainfo.Info)
return
}
resp, err := h.confluence.do(r.Context(), "/info", url.Values{"ih": {ihHex}})
if err != nil {
log.Printf("error getting info from confluence [ih: %q]: %v", ihHex, err)
http.Error(w, "error getting torrent info", http.StatusBadGateway)
return
}
defer resp.Body.Close()
err = bencode.NewDecoder(resp.Body).Decode(&info)
if err != nil {
log.Printf("error decoding info: %v", err)
http.Error(w, "error decoding torrent info", http.StatusBadGateway)
return
}
ok = true
cost := estimateRecursiveMemoryUse(info)
log.Printf("store info for %v in cache with estimated cost %v", ihHex, cost)
h.infoCache.Set(ihHex, info, int64(cost))
return
}

func estimateRecursiveMemoryUse(val interface{}) int {
var buf bytes.Buffer
err := gob.NewEncoder(&buf).Encode(val)
if err != nil {
panic(err)
}
return buf.Len()
}

type dirPageItem struct {
Href string
Name string
}

func (h *handler) serveTorrentDir(w http.ResponseWriter, r *http.Request, ihHex string) {
info, ok := h.getTorrentInfo(w, r, ihHex)
if !ok {
return
}
var subFiles []dirPageItem
baseDisplayPath := r.URL.Path[1:]
uniqFiles := make(map[dirPageItem]bool)
for _, f := range info.UpvertedFiles() {
dp := f.DisplayPath(&info)
if strings.HasPrefix(dp, baseDisplayPath) {
relPath := dp[len(baseDisplayPath):]
nextSep := strings.Index(relPath, "/")
if nextSep != -1 {
relPath = relPath[:nextSep+1]
}
item := dirPageItem{
Href: relPath,
Name: relPath,
}
if !uniqFiles[item] {
subFiles = append(subFiles, dirPageItem{
Href: relPath,
Name: relPath,
})
}
uniqFiles[item] = true
}
}
if len(subFiles) == 0 {
http.NotFound(w, r)
return
}
if baseDisplayPath != "" {
subFiles = append([]dirPageItem{
{"../", "../"},
}, subFiles...)
}
dirPath := r.URL.Path
w.Header().Set("Content-Type", "text/html")
h.dirPageTemplate.Execute(w, dirPageData{
Path: dirPath,
Children: subFiles,
})
}

type dirPageData struct {
Path string
Children []dirPageItem
}

func (h *handler) serveTorrentPath(w http.ResponseWriter, r *http.Request, ihHex string) {
if strings.HasSuffix(r.URL.Path, "/") {
h.serveTorrentDir(w, r, ihHex)
return
}
h.confluence.data(w, r, ihHex, r.URL.Path[1:])
}

func (h *handler) getMutableInfohash(target bep44.Target, salt string) (_ krpc.Bep46Payload, err error) {
ret, err, _ := h.dhtItemCacheGetDedup.Do(string(target[:]), func() (interface{}, error) {
v, ok := h.dhtItemCache.Get(target[:])
if ok {
v := v.(*dhtItemCacheValue)
stale := time.Since(v.updated) >= time.Minute
if !v.updating && stale {
log.Printf("initiating async refresh of cached dht item [target=%x]", target)
v.updating = true
go h.getMutableInfohashFromDht(target, salt)
}
log.Printf("served dht item from cache [target=%x, stale=%v]", target, stale)
return v.payload, nil
}
return h.getMutableInfohashFromDht(target, salt)
})
if err != nil {
return
}
return ret.(krpc.Bep46Payload), err
}

func (h *handler) getMutableInfohashFromDht(target bep44.Target, salt string) (_ krpc.Bep46Payload, err error) {
ret, err, _ := h.dhtGetDedup.Do(string(target[:]), func() (_ interface{}, err error) {
b, err := h.confluence.dhtGet(context.Background(), hex.EncodeToString(target[:]), salt)
if err != nil {
err = fmt.Errorf("getting from dht via confluence: %w", err)
return
}
var bep46 krpc.Bep46Payload
err = bencode.Unmarshal(b, &bep46)
if err != nil {
err = fmt.Errorf("unmarshalling bep46 payload from confluence response: %w", err)
return
}
stored := h.dhtItemCache.Set(target[:], &dhtItemCacheValue{
updated: time.Now(),
updating: false,
payload: bep46,
}, 1)
log.Printf("caching dht item [target=%x, stored=%v]", target, stored)
return bep46, err
})
if err != nil {
return
}
return ret.(krpc.Bep46Payload), err
}
59 changes: 59 additions & 0 deletions logging.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package main

import (
"bytes"
"io"
"net/http"
"unicode"
)

type indentWriter struct {
w io.Writer
newLine bool
indent []byte
}

func newIndentWriter(w io.Writer, indent string) io.Writer {
return &indentWriter{w, true, []byte(indent)}
}

func (me *indentWriter) Write(p []byte) (n int, err error) {
for len(p) != 0 {
if me.newLine {
_, err = me.w.Write(me.indent)
// We intentionally do not include the inserted indent in the return count due to the
// io.Writer contract.
if err != nil {
return
}
me.newLine = false
}
var nn int
nn, err = me.w.Write(p[:1])
n += nn
if err != nil {
return
}
if p[0] == '\n' {
me.newLine = true
}
p = p[1:]
}
return
}

func requestLogString(r *http.Request) []byte {
var buf bytes.Buffer
r.Write(newIndentWriter(&buf, " "))
return bytes.TrimRightFunc(buf.Bytes(), unicode.IsSpace)
}

func requestUrl(r *http.Request) string {
u := *r.URL
u.Host = r.Host
u.Scheme = "http"
if r.TLS != nil {
u.Scheme = "https"
}
return u.String()
}
Loading

0 comments on commit a320dde

Please sign in to comment.