diff --git a/datacap/datacap_client.go b/datacap/datacap_client.go new file mode 100644 index 00000000..93574d7d --- /dev/null +++ b/datacap/datacap_client.go @@ -0,0 +1,138 @@ +package datacap + +import ( + "context" + "encoding/json" + "net/http" + "strings" + "time" + + "github.com/getlantern/errors" +) + +type DatacapSidecarClient interface { + TrackDatacapUsage(ctx context.Context, deviceID string, bytesUsed int64, countryCode, platform string) (usage *TrackDatacapResponse, err error) + + GetDatacapUsage(ctx context.Context, deviceID string) (usage *TrackDatacapResponse, err error) +} + +type Config struct { + SidecarAddr string + HTTPClient *http.Client +} + +type datacapClient struct { + config Config +} + +// TrackDatacapRequest represents the request to track data usage +type TrackDatacapRequest struct { + DeviceID string `json:"deviceId"` + BytesUsed int64 `json:"bytesUsed"` + CountryCode string `json:"countryCode"` + Platform string `json:"platform"` +} + +// TrackDatacapResponse represents the response from tracking data usage +type TrackDatacapResponse struct { + Allowed bool `json:"allowed"` + RemainingBytes int64 `json:"remainingBytes"` + CapLimit int64 `json:"capLimit"` + ExpiryTime int64 `json:"expiryTime"` +} + +func NewClient(config Config) DatacapSidecarClient { + if config.HTTPClient == nil { + config.HTTPClient = &http.Client{ + Timeout: 10 * time.Second, + } + } + + return &datacapClient{ + config: config, + } +} + +func (c *datacapClient) TrackDatacapUsage(ctx context.Context, deviceID string, bytesUsed int64, countryCode, platform string) (*TrackDatacapResponse, error) { + req := TrackDatacapRequest{ + DeviceID: deviceID, + BytesUsed: bytesUsed, + CountryCode: countryCode, + Platform: platform, + } + + // Ensure the sidecar address has a trailing slash + sidecarAddr := c.config.SidecarAddr + if !strings.HasSuffix(sidecarAddr, "/") { + sidecarAddr += "/" + } + + url := sidecarAddr + "data-cap/usage" + + reqBody, err := json.Marshal(req) + if err != nil { + return nil, errors.New("failed to marshal request for tracking datacap usage: %v", err) + } + + httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, strings.NewReader(string(reqBody))) + if err != nil { + return nil, errors.New("failed to create request for tracking datacap usage: %v", err) + } + + httpReq.Header.Set("Content-Type", "application/json") + + resp, err := c.config.HTTPClient.Do(httpReq) + if err != nil { + return nil, errors.New("failed to send request to sidecar: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, errors.New("sidecar returned non-200 status: %d", resp.StatusCode) + } + + var trackResp TrackDatacapResponse + if err := json.NewDecoder(resp.Body).Decode(&trackResp); err != nil { + return nil, errors.New("failed to decode response for tracking datacap usage: %v", err) + } + + log.Debugf("Track response for device %s: allowed=%v, remaining=%d", + deviceID, trackResp.Allowed, trackResp.RemainingBytes) + + return &trackResp, nil +} + +func (c *datacapClient) GetDatacapUsage(ctx context.Context, deviceID string) (*TrackDatacapResponse, error) { + // Ensure the sidecar address has a trailing slash + sidecarAddr := c.config.SidecarAddr + if !strings.HasSuffix(sidecarAddr, "/") { + sidecarAddr += "/" + } + + url := sidecarAddr + "data-cap/device/" + deviceID + + httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return nil, errors.New("failed to create request for tracking datacap usage: %v", err) + } + + resp, err := c.config.HTTPClient.Do(httpReq) + if err != nil { + return nil, errors.New("failed to send request to sidecar: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, errors.New("sidecar returned non-200 status: %d", resp.StatusCode) + } + + var trackResp TrackDatacapResponse + if err := json.NewDecoder(resp.Body).Decode(&trackResp); err != nil { + return nil, errors.New("failed to decode response for tracking datacap usage: %v", err) + } + + log.Debugf("Track response for device %s: allowed=%v, remaining=%d", + deviceID, trackResp.Allowed, trackResp.RemainingBytes) + + return &trackResp, nil +} diff --git a/datacap/device_filter.go b/datacap/device_filter.go new file mode 100644 index 00000000..17d1b27b --- /dev/null +++ b/datacap/device_filter.go @@ -0,0 +1,179 @@ +package datacap + +import ( + "fmt" + "net/http" + "net/http/httputil" + "sync" + "time" + + "github.com/dustin/go-humanize" + "github.com/getlantern/http-proxy-lantern/v2/common" + "github.com/getlantern/http-proxy-lantern/v2/domains" + "github.com/getlantern/http-proxy-lantern/v2/instrument" + "github.com/getlantern/http-proxy-lantern/v2/listeners" + "github.com/getlantern/http-proxy-lantern/v2/usage" + "github.com/getlantern/proxy/v3/filters" +) + +var ( + epoch = time.Date(2016, 1, 1, 0, 0, 0, 0, time.UTC) + + alwaysThrottle = listeners.NewRateLimiter(10, 10) // this is basically unusably slow, only used for malicious or really old/broken clients + + defaultThrottleRate = int64(5000 * 1024 / 8) // 5 Mbps +) + +// deviceFilter handles filtering and throttling of requests based on datacap +type deviceFilter struct { + datacapClient DatacapSidecarClient + instrument instrument.Instrument + sendXBQHeader bool + limitersByDevice map[string]*listeners.RateLimiter + limitersByDeviceMx sync.Mutex +} + +// Settings represents the datacap settings for a device +type Settings struct { + Threshold int64 +} + +// NewFilter creates a new datacap filter +func NewFilter(datacapClient DatacapSidecarClient, instrument instrument.Instrument, sendXBQHeader bool) *deviceFilter { + return &deviceFilter{ + datacapClient: datacapClient, + instrument: instrument, + sendXBQHeader: sendXBQHeader, + limitersByDevice: make(map[string]*listeners.RateLimiter, 0), + } +} + +// Apply applies the datacap filter to the request +func (f *deviceFilter) Apply(cs *filters.ConnectionState, req *http.Request, next filters.Next) (*http.Response, *filters.ConnectionState, error) { + + if log.IsTraceEnabled() { + reqStr, _ := httputil.DumpRequest(req, true) + log.Tracef("DeviceFilter Middleware received request:\n%s", reqStr) + } + + wc := cs.Downstream().(listeners.WrapConn) + lanternDeviceID := req.Header.Get(common.DeviceIdHeader) + if lanternDeviceID == "" { + // Old lantern versions and possible cracks do not include the device + // ID. Just throttle them. + f.instrument.Throttle(req.Context(), true, "no-device-id") + wc.ControlMessage("throttle", alwaysThrottle) + return next(cs, req) + } + if lanternDeviceID == "~~~~~~" { + // This is checkfallbacks, don't throttle it + f.instrument.Throttle(req.Context(), false, "checkfallbacks") + return next(cs, req) + } + + // Even if a device hasn't hit its data cap, we always throttle to a default throttle rate to + // keep bandwidth hogs from using too much bandwidth. Note - this does not apply to pro proxies + // which don't use the devicefilter at all. + throttleDefault := func(message string) { + if defaultThrottleRate <= 0 { + f.instrument.Throttle(req.Context(), false, message) + } + limiter := f.rateLimiterForDevice(lanternDeviceID, defaultThrottleRate, defaultThrottleRate) + if log.IsTraceEnabled() { + log.Tracef("Throttling connection to %v per second by default", + humanize.Bytes(uint64(defaultThrottleRate))) + } + f.instrument.Throttle(req.Context(), true, "default") + wc.ControlMessage("throttle", limiter) + } + + // Some domains are excluded from being throttled and don't count towards the + // bandwidth cap. + if domains.ConfigForRequest(req).Unthrottled { + throttleDefault("domain-excluded") + return next(cs, req) + } + + // Check usage from cache only - no eager fetching + u := usage.Get(lanternDeviceID) + if u == nil { + // No usage data available yet, allow the request + f.instrument.Throttle(req.Context(), false, "no-usage-data") + return next(cs, req) + } + + settings, err := f.datacapClient.GetDatacapUsage(req.Context(), lanternDeviceID) + if err != nil { + log.Errorf("failed to get datacap usage for device %s: %v", lanternDeviceID, err) + f.instrument.Throttle(req.Context(), false, "datacap-error") + //allow the request to proceed if we fail to get datacap usage + settings = &TrackDatacapResponse{ + Allowed: true, + } + } + + measuredCtx := map[string]interface{}{ + "throttled": false, + } + + var capOn bool + + // To turn the datacap off we simply set the threshold to 0 or below + if settings.Allowed { + log.Tracef("Got datacap settings: %v", settings) + capOn = settings.CapLimit > 0 + + measuredCtx["datacap_settings"] = settings + if capOn { + measuredCtx["datacap_threshold"] = settings.CapLimit + measuredCtx["datacap_usage"] = u.Bytes + } + } + + if capOn && u.Bytes > settings.CapLimit { + f.instrument.Throttle(req.Context(), true, "over-datacap") + measuredCtx["throttled"] = true + limiter := f.rateLimiterForDevice(lanternDeviceID, defaultThrottleRate, defaultThrottleRate) + if log.IsTraceEnabled() { + log.Tracef("Throttling connection from device %s to %v per second", lanternDeviceID, + humanize.Bytes(uint64(defaultThrottleRate))) + } + f.instrument.Throttle(req.Context(), true, "datacap") + wc.ControlMessage("throttle", limiter) + measuredCtx["throttled"] = true + } else { + throttleDefault("") + } + + wc.ControlMessage("measured", measuredCtx) + + resp, nextCtx, err := next(cs, req) + if resp == nil || err != nil { + return resp, nextCtx, err + } + if !capOn || !f.sendXBQHeader { + return resp, nextCtx, err + } + if resp.Header == nil { + resp.Header = make(http.Header, 1) + } + uMiB := u.Bytes / (1024 * 1024) + xbq := fmt.Sprintf("%d/%d/%d", uMiB, settings.CapLimit/(1024*1024), int64(u.AsOf.Sub(epoch).Seconds())) + xbqv2 := fmt.Sprintf("%s/%d", xbq, u.TTLSeconds) + resp.Header.Set(common.XBQHeader, xbq) // for backward compatibility with older clients + resp.Header.Set(common.XBQHeaderv2, xbqv2) // for new clients that support different bandwidth cap expirations + f.instrument.XBQHeaderSent(req.Context()) + return resp, nextCtx, err +} + +func (f *deviceFilter) rateLimiterForDevice(deviceID string, rateLimitRead, rateLimitWrite int64) *listeners.RateLimiter { + f.limitersByDeviceMx.Lock() + defer f.limitersByDeviceMx.Unlock() + + limiter := f.limitersByDevice[deviceID] + if limiter == nil || limiter.GetRateRead() != rateLimitRead || limiter.GetRateWrite() != rateLimitWrite { + limiter = listeners.NewRateLimiter(rateLimitRead, rateLimitWrite) + f.limitersByDevice[deviceID] = limiter + } + return limiter +} diff --git a/datacap/measured_reporter.go b/datacap/measured_reporter.go new file mode 100644 index 00000000..fb16b018 --- /dev/null +++ b/datacap/measured_reporter.go @@ -0,0 +1,133 @@ +package datacap + +import ( + "context" + "math/rand" + "net" + "time" + + "github.com/getlantern/geo" + "github.com/getlantern/golog" + "github.com/getlantern/http-proxy-lantern/v2/common" + "github.com/getlantern/http-proxy-lantern/v2/listeners" + "github.com/getlantern/http-proxy-lantern/v2/usage" + "github.com/getlantern/measured" +) + +var ( + log = golog.LoggerFor("datacap") +) + +type statsAndContext struct { + ctx map[string]interface{} + stats *measured.Stats +} + +func (sac *statsAndContext) add(other *statsAndContext) *statsAndContext { + newStats := *other.stats + if sac != nil { + newStats.SentTotal += sac.stats.SentTotal + newStats.RecvTotal += sac.stats.RecvTotal + } + return &statsAndContext{other.ctx, &newStats} +} + +// NewMeasuredReporter creates a new reporter that sends usage data to the sidecar service +func NewMeasuredReporter(countryLookup geo.CountryLookup, sidecarClient DatacapSidecarClient, reportInterval time.Duration) listeners.MeasuredReportFN { + // Provide some buffering so that we don't lose data while submitting to the sidecar + statsCh := make(chan *statsAndContext, 10000) + go reportPeriodically(countryLookup, sidecarClient, reportInterval, statsCh) + + return func(ctx map[string]interface{}, stats *measured.Stats, deltaStats *measured.Stats, final bool) { + select { + case statsCh <- &statsAndContext{ctx, deltaStats}: + // submitted successfully + default: + // data lost, probably because sidecar submission is taking longer than expected + log.Error("Failed to queue stats for sidecar submission - queue is full") + } + } +} + +func reportPeriodically(countryLookup geo.CountryLookup, sidecarClient DatacapSidecarClient, reportInterval time.Duration, statsCh chan *statsAndContext) { + // randomize the interval to evenly distribute traffic + randomized := time.Duration(reportInterval.Nanoseconds()/2 + rand.Int63n(reportInterval.Nanoseconds())) + log.Debugf("Will report data usage to sidecar every %v", randomized) + ticker := time.NewTicker(randomized) + statsByDeviceID := make(map[string]*statsAndContext) + + for { + select { + case sac := <-statsCh: + _deviceID := sac.ctx[common.DeviceID] + if _deviceID == nil { + // ignore requests without device ID + continue + } + deviceID := _deviceID.(string) + statsByDeviceID[deviceID] = statsByDeviceID[deviceID].add(sac) + + case <-ticker.C: + if log.IsTraceEnabled() { + log.Tracef("Submitting %d stats to sidecar", len(statsByDeviceID)) + } + + // Submit stats to sidecar + err := submitToSidecar(context.Background(), countryLookup, sidecarClient, statsByDeviceID) + if err != nil { + log.Errorf("Unable to submit stats to sidecar: %v", err) + } + + // Reset stats + statsByDeviceID = make(map[string]*statsAndContext) + } + } +} + +func submitToSidecar(ctx context.Context, countryLookup geo.CountryLookup, client DatacapSidecarClient, statsByDeviceID map[string]*statsAndContext) error { + now := time.Now() + + for deviceID, sac := range statsByDeviceID { + stats := sac.stats + + // Extract client IP for country lookup + _clientIP := sac.ctx[common.ClientIP] + if _clientIP == nil { + log.Error("Missing client_ip in context, this shouldn't happen. Ignoring.") + continue + } + clientIP := _clientIP.(string) + countryCode := countryLookup.CountryCode(net.ParseIP(clientIP)) + + // Extract platform + platform := "unknown" + _platform := sac.ctx[common.Platform] + if _platform != nil { + platform = _platform.(string) + } + + // Calculate total bytes used + totalBytes := stats.RecvTotal + stats.SentTotal + if totalBytes <= 0 { + continue + } + + // Call sidecar to track usage + trackResp, err := client.TrackDatacapUsage(ctx, deviceID, int64(totalBytes), countryCode, platform) + if err != nil { + log.Errorf("Failed to track usage for device %s: %v", deviceID, err) + continue + } + + // Store the result in local usage cache + // TTL will be provided by the sidecar, default to 1 day if not provided + ttl := int64(24 * 60 * 60) // 1 day in seconds as default + usage.Set(deviceID, countryCode, int64(trackResp.RemainingBytes), now, ttl) + + if !trackResp.Allowed { + log.Debugf("Device %s has reached its data cap", deviceID) + } + } + + return nil +} diff --git a/http-proxy/main.go b/http-proxy/main.go index 36c7d771..565f655e 100644 --- a/http-proxy/main.go +++ b/http-proxy/main.go @@ -27,6 +27,7 @@ import ( proxy "github.com/getlantern/http-proxy-lantern/v2" "github.com/getlantern/http-proxy-lantern/v2/blacklist" + "github.com/getlantern/http-proxy-lantern/v2/datacap" "github.com/getlantern/http-proxy-lantern/v2/googlefilter" "github.com/getlantern/http-proxy-lantern/v2/obfs4listener" lanternredis "github.com/getlantern/http-proxy-lantern/v2/redis" @@ -108,6 +109,7 @@ var ( proxiedSitesTrackingId = flag.String("proxied-sites-tracking-id", "UA-21815217-16", "The Google Analytics property id for tracking proxied sites") reportingRedisAddr = flag.String("reportingredis", "", "The address of the reporting Redis instance in \"redis[s]://host:port\" format") + datacapSidecarUrl = flag.String("datacap-sidecar-url", "", "The address of the Lantern sidecar service in \"http[s]://host:port\" format") // default value of tunnelPorts matches ports in flashlight/client/client.go tunnelPorts = flag.String("tunnelports", "80,443,22,110,995,143,993,8080,8443,5222,5223,5224,5228,5229,7300,19302,19303,19304,19305,19306,19307,19308,19309", "Comma seperated list of ports allowed for HTTP CONNECT tunnel. Allow all ports if empty.") @@ -379,13 +381,25 @@ func main() { go periodicallyForceGC() var reportingRedisClient *redis.Client - if *reportingRedisAddr != "" { + var datacapSidecarClient datacap.DatacapSidecarClient + + // Configure either datacap Sidecar or Redis reporting based on provided configuration + if *datacapSidecarUrl != "" { + log.Debugf("Configuring datacap sidecar client from command line flag: %s", *datacapSidecarUrl) + datacapSidecarClient = datacap.NewClient(datacap.Config{ + SidecarAddr: *datacapSidecarUrl, + HTTPClient: &http.Client{ + Timeout: 10 * time.Second, + }, + }) + } else if *reportingRedisAddr != "" { + log.Debugf("Configuring Redis reporting client: %s", *reportingRedisAddr) reportingRedisClient, err = lanternredis.NewClient(*reportingRedisAddr) if err != nil { log.Errorf("failed to initialize redis client, will not be able to perform bandwidth limiting: %v", err) } } else { - log.Debug("no redis address configured for bandwidth reporting") + log.Debug("no redis or datacap sidecar configuration found for bandwidth reporting") } p := &proxy.Proxy{ @@ -410,6 +424,7 @@ func main() { ProxiedSitesSamplePercentage: *proxiedSitesSamplePercentage, ProxiedSitesTrackingID: *proxiedSitesTrackingId, ReportingRedisClient: reportingRedisClient, + DatacapSidecarClient: datacapSidecarClient, Token: *token, TunnelPorts: *tunnelPorts, Obfs4Addr: *obfs4Addr, diff --git a/http_proxy.go b/http_proxy.go index eb06d9a5..4e945866 100644 --- a/http_proxy.go +++ b/http_proxy.go @@ -29,6 +29,7 @@ import ( "github.com/getlantern/kcpwrapper" "github.com/getlantern/http-proxy-lantern/v2/broflake" + "github.com/getlantern/http-proxy-lantern/v2/datacap" "github.com/getlantern/http-proxy-lantern/v2/opsfilter" "github.com/getlantern/http-proxy-lantern/v2/otel" "github.com/getlantern/http-proxy-lantern/v2/shadowsocks" @@ -111,6 +112,7 @@ type Proxy struct { ProxiedSitesSamplePercentage float64 ProxiedSitesTrackingID string ReportingRedisClient *rclient.Client + DatacapSidecarClient datacap.DatacapSidecarClient ThrottleRefreshInterval time.Duration Token string TunnelPorts string @@ -513,13 +515,15 @@ func (p *Proxy) createFilterChain(bl *blacklist.Blacklist) (filters.Chain, proxy filterChain = filterChain.Append(proxy.OnFirstOnly(tokenfilter.New(p.Token, p.instrument))) } - if p.ReportingRedisClient == nil { - log.Debug("Not enabling bandwidth limiting") - } else { + if p.DatacapSidecarClient != nil { + filterChain = filterChain.Append(proxy.OnFirstOnly(datacap.NewFilter(p.DatacapSidecarClient, p.instrument, !p.Pro))) + } else if p.ReportingRedisClient != nil { filterChain = filterChain.Append( proxy.OnFirstOnly(devicefilter.NewPre( redis.NewDeviceFetcher(p.ReportingRedisClient), p.throttleConfig, !p.Pro, p.instrument)), ) + } else { + log.Debug("Not enabling bandwidth limiting") } filterChain = filterChain.Append( @@ -682,7 +686,7 @@ func (p *Proxy) buildOTELOpts(endpoint string, includeProxyName bool) *otel.Opts } func (p *Proxy) configureBandwidthReporting() *reportingConfig { - return newReportingConfig(p.CountryLookup, p.ReportingRedisClient, p.instrument, p.throttleConfig) + return newReportingConfig(p.CountryLookup, p.ReportingRedisClient, p.DatacapSidecarClient, p.instrument, p.throttleConfig) } func (p *Proxy) loadThrottleConfig() { diff --git a/instrument/instrument_test.go b/instrument/instrument_test.go index cd6fb3e0..76fb19ee 100644 --- a/instrument/instrument_test.go +++ b/instrument/instrument_test.go @@ -24,7 +24,7 @@ func TestOriginRoot(t *testing.T) { requireSuccess("facebook.com", "sub.facebook.com") requireSuccess("facebook.com", "facebook.com") requireSuccess("facebook", "facebook") - requireSuccess("facebook.com", "157.240.221.48") + requireSuccess("facebook.com", "157.240.229.35") requireSuccess("AS62041", ipWithASN) // Telegram IP addresses don't resolve, but we can get their ASN } diff --git a/reporting.go b/reporting.go index 76c3089d..4b8d7d68 100644 --- a/reporting.go +++ b/reporting.go @@ -10,12 +10,12 @@ import ( "github.com/getlantern/geo" "github.com/getlantern/http-proxy-lantern/v2/common" - "github.com/getlantern/http-proxy-lantern/v2/listeners" - "github.com/getlantern/measured" - + "github.com/getlantern/http-proxy-lantern/v2/datacap" "github.com/getlantern/http-proxy-lantern/v2/instrument" + "github.com/getlantern/http-proxy-lantern/v2/listeners" "github.com/getlantern/http-proxy-lantern/v2/redis" "github.com/getlantern/http-proxy-lantern/v2/throttle" + "github.com/getlantern/measured" ) var ( @@ -27,7 +27,7 @@ type reportingConfig struct { wrapper func(ls net.Listener) net.Listener } -func newReportingConfig(countryLookup geo.CountryLookup, rc *rclient.Client, instrument instrument.Instrument, throttleConfig throttle.Config) *reportingConfig { +func newReportingConfig(countryLookup geo.CountryLookup, rc *rclient.Client, dc datacap.DatacapSidecarClient, instrument instrument.Instrument, throttleConfig throttle.Config) *reportingConfig { proxiedBytesReporter := func(ctx map[string]interface{}, stats *measured.Stats, deltaStats *measured.Stats, final bool) { if deltaStats.SentTotal == 0 && deltaStats.RecvTotal == 0 { // nothing to report @@ -66,13 +66,23 @@ func newReportingConfig(countryLookup geo.CountryLookup, rc *rclient.Client, ins var reporter listeners.MeasuredReportFN if throttleConfig == nil { - log.Debug("No throttling configured, don't bother reporting bandwidth usage to Redis") + log.Debug("No throttling configured, don't bother reporting bandwidth usage") reporter = func(ctx map[string]interface{}, stats *measured.Stats, deltaStats *measured.Stats, final bool) { // noop } + } else if dc != nil { + log.Debug("Using datacap sidecar client for bandwidth usage reporting") + reporter = datacap.NewMeasuredReporter(countryLookup, dc, measuredReportingInterval) } else if rc != nil { + log.Debug("Using redis client for bandwidth usage reporting") reporter = redis.NewMeasuredReporter(countryLookup, rc, measuredReportingInterval, throttleConfig) + } else { + log.Debug("No reporting client configured, using noop reporter") + reporter = func(ctx map[string]interface{}, stats *measured.Stats, deltaStats *measured.Stats, + final bool) { + // noop + } } reporter = combineReporter(reporter, proxiedBytesReporter) wrapper := func(ls net.Listener) net.Listener {