Skip to content

Commit 604b37b

Browse files
authored
Merge pull request #8 from 30x/XAPID-1070
Xapid 1070 add JWT token to download requests from apid to blobserver
2 parents 6cad27e + 5efac39 commit 604b37b

File tree

3 files changed

+38
-29
lines changed

3 files changed

+38
-29
lines changed

bundle.go

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,12 @@ type bundleManager struct {
4848
apiMan apiManagerInterface
4949
concurrentDownloads int
5050
markDeploymentFailedAfter time.Duration
51-
bundleDownloadConnTimeout time.Duration
5251
bundleRetryDelay time.Duration
5352
bundleCleanupDelay time.Duration
5453
downloadQueue chan *DownloadRequest
5554
isClosed *int32
5655
workers []*BundleDownloader
56+
client *http.Client
5757
}
5858

5959
type blobServerResponse struct {
@@ -103,7 +103,7 @@ func (bm *bundleManager) makeDownloadRequest(id string) *DownloadRequest {
103103
blobId: id,
104104
backoffFunc: createBackoff(retryIn, maxBackOff),
105105
markFailedAt: markFailedAt,
106-
connTimeout: bm.bundleDownloadConnTimeout,
106+
client: bm.client,
107107
}
108108
}
109109

@@ -158,8 +158,8 @@ type DownloadRequest struct {
158158
blobId string
159159
backoffFunc func()
160160
markFailedAt time.Time
161-
connTimeout time.Duration
162161
blobServerURL string
162+
client *http.Client
163163
}
164164

165165
func (r *DownloadRequest) downloadBundle() error {
@@ -172,7 +172,7 @@ func (r *DownloadRequest) downloadBundle() error {
172172
}
173173
}
174174

175-
downloadedFile, err := downloadFromURI(r.blobServerURL, r.blobId, r.connTimeout)
175+
downloadedFile, err := downloadFromURI(r.client, r.blobServerURL, r.blobId)
176176

177177
if err != nil {
178178
log.Errorf("Unable to download blob file blobId=%s err:%v", r.blobId, err)
@@ -210,7 +210,7 @@ func getBlobFilePath(blobId string) string {
210210
return path.Join(bundlePath, base64.StdEncoding.EncodeToString([]byte(blobId)))
211211
}
212212

213-
func getSignedURL(blobServerURL string, blobId string, bundleDownloadConnTimeout time.Duration) (string, error) {
213+
func getSignedURL(client *http.Client, blobServerURL string, blobId string) (string, error) {
214214

215215
blobUri, err := url.Parse(blobServerURL)
216216
if err != nil {
@@ -224,7 +224,7 @@ func getSignedURL(blobServerURL string, blobId string, bundleDownloadConnTimeout
224224

225225
uri := blobUri.String()
226226

227-
surl, err := getURIReader(uri, bundleDownloadConnTimeout)
227+
surl, err := getUriReaderWithAuth(client, uri)
228228
if err != nil {
229229
log.Errorf("Unable to get signed URL from BlobServer %s: %v", uri, err)
230230
return "", err
@@ -248,12 +248,12 @@ func getSignedURL(blobServerURL string, blobId string, bundleDownloadConnTimeout
248248

249249
// downloadFromURI involves retrieving the signed URL for the blob, and storing the resource locally
250250
// after downloading the resource from GCS (via the signed URL)
251-
func downloadFromURI(blobServerURL string, blobId string, bundleDownloadConnTimeout time.Duration) (tempFileName string, err error) {
251+
func downloadFromURI(client *http.Client, blobServerURL string, blobId string) (tempFileName string, err error) {
252252

253253
var tempFile *os.File
254254
log.Debugf("Downloading bundle: %s", blobId)
255255

256-
uri, err := getSignedURL(blobServerURL, blobId, bundleDownloadConnTimeout)
256+
uri, err := getSignedURL(client, blobServerURL, blobId)
257257
if err != nil {
258258
log.Errorf("Unable to get signed URL for blobId {%s}, error : {%v}", blobId, err)
259259
return
@@ -268,7 +268,7 @@ func downloadFromURI(blobServerURL string, blobId string, bundleDownloadConnTime
268268
tempFileName = tempFile.Name()
269269

270270
var confReader io.ReadCloser
271-
confReader, err = getURIReader(uri, bundleDownloadConnTimeout)
271+
confReader, err = getUriReaderWithAuth(client, uri)
272272
if err != nil {
273273
log.Errorf("Unable to retrieve bundle %s: %v", uri, err)
274274
return
@@ -286,12 +286,14 @@ func downloadFromURI(blobServerURL string, blobId string, bundleDownloadConnTime
286286
}
287287

288288
// retrieveBundle retrieves bundle data from a URI
289-
func getURIReader(uriString string, bundleDownloadConnTimeout time.Duration) (io.ReadCloser, error) {
290-
291-
client := http.Client{
292-
Timeout: bundleDownloadConnTimeout,
289+
func getUriReaderWithAuth(client *http.Client, uriString string) (io.ReadCloser, error) {
290+
req, err := http.NewRequest("GET", uriString, nil)
291+
if err != nil {
292+
return nil, err
293293
}
294-
res, err := client.Get(uriString)
294+
// add Auth
295+
req.Header.Add("Authorization", getBearerToken())
296+
res, err := client.Do(req)
295297
if err != nil {
296298
return nil, err
297299
}

bundle_test.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package apiGatewayConfDeploy
1717
import (
1818
. "github.com/onsi/ginkgo"
1919
. "github.com/onsi/gomega"
20+
"net/http"
2021
"sync/atomic"
2122
"time"
2223
)
@@ -65,11 +66,16 @@ var _ = Describe("api", func() {
6566
apiMan: dummyApiMan,
6667
concurrentDownloads: concurrentDownloads,
6768
markDeploymentFailedAfter: 5 * time.Second,
68-
bundleDownloadConnTimeout: time.Second,
6969
bundleRetryDelay: time.Second,
7070
bundleCleanupDelay: 5 * time.Second,
7171
downloadQueue: make(chan *DownloadRequest, downloadQueueSize),
7272
isClosed: new(int32),
73+
client: &http.Client{
74+
Timeout: time.Second,
75+
Transport: &http.Transport{
76+
MaxIdleConnsPerHost: 10,
77+
},
78+
},
7379
}
7480
testBundleMan.initializeBundleDownloading()
7581
time.Sleep(100 * time.Millisecond)
@@ -94,7 +100,7 @@ var _ = Describe("api", func() {
94100
// setup timeout
95101
atomic.StoreInt32(blobServer.signedTimeout, 1)
96102
atomic.StoreInt32(blobServer.blobTimeout, 1)
97-
testBundleMan.bundleDownloadConnTimeout = 500 * time.Millisecond
103+
testBundleMan.client.Timeout = 500 * time.Millisecond
98104
testBundleMan.bundleRetryDelay = 50 * time.Millisecond
99105

100106
// download blobs
@@ -109,7 +115,7 @@ var _ = Describe("api", func() {
109115
// setup timeout
110116
atomic.StoreInt32(blobServer.signedTimeout, 1)
111117
atomic.StoreInt32(blobServer.blobTimeout, 1)
112-
testBundleMan.bundleDownloadConnTimeout = 100 * time.Millisecond
118+
testBundleMan.client.Timeout = 100 * time.Millisecond
113119
testBundleMan.bundleRetryDelay = 100 * time.Millisecond
114120
testBundleMan.markDeploymentFailedAfter = 200 * time.Millisecond
115121

init.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ const (
4141
configDownloadQueueSize = "apigeesync_download_queue_size"
4242
configBlobServerBaseURI = "apigeesync_blob_server_base"
4343
configStoragePath = "local_storage_path"
44-
maxIdleConnsPerHost = 10
44+
maxIdleConnsPerHost = 50
4545
httpTimeout = time.Minute
4646
)
4747

@@ -118,20 +118,21 @@ func initPlugin(s apid.Services) (apid.PluginData, error) {
118118
apidClusterId = config.GetString(configApidClusterID)
119119

120120
// initialize tracker client
121+
httpClient := &http.Client{
122+
Transport: &http.Transport{
123+
MaxIdleConnsPerHost: maxIdleConnsPerHost,
124+
},
125+
Timeout: httpTimeout,
126+
CheckRedirect: func(req *http.Request, _ []*http.Request) error {
127+
req.Header.Set("Authorization", getBearerToken())
128+
return nil
129+
},
130+
}
121131

122132
client := &trackerClient{
123133
trackerBaseUrl: configApiServerBaseURI,
124134
clusterId: apidClusterId,
125-
httpclient: &http.Client{
126-
Transport: &http.Transport{
127-
MaxIdleConnsPerHost: maxIdleConnsPerHost,
128-
},
129-
Timeout: httpTimeout,
130-
CheckRedirect: func(req *http.Request, _ []*http.Request) error {
131-
req.Header.Set("Authorization", getBearerToken())
132-
return nil
133-
},
134-
},
135+
httpclient: httpClient,
135136
}
136137

137138
// initialize db manager
@@ -176,11 +177,11 @@ func initPlugin(s apid.Services) (apid.PluginData, error) {
176177
apiMan: apiMan,
177178
concurrentDownloads: concurrentDownloads,
178179
markDeploymentFailedAfter: markDeploymentFailedAfter,
179-
bundleDownloadConnTimeout: bundleDownloadConnTimeout,
180180
bundleRetryDelay: time.Second,
181181
bundleCleanupDelay: bundleCleanupDelay,
182182
downloadQueue: make(chan *DownloadRequest, downloadQueueSize),
183183
isClosed: new(int32),
184+
client: httpClient,
184185
}
185186

186187
bundleMan.initializeBundleDownloading()

0 commit comments

Comments
 (0)