Skip to content
This repository was archived by the owner on Apr 15, 2025. It is now read-only.

Commit 4e70836

Browse files
committed
Bugfixes for the OSDF staging code.
With this, end-to-end tests of the staging binary works for a simple job ad.
1 parent f4b09af commit 4e70836

File tree

4 files changed

+68
-27
lines changed

4 files changed

+68
-27
lines changed

cmd/osdf_stage/main.go

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ type Options struct {
4444
OriginPrefix string `long:"origin-prefix" short:"o" description:"Prefix corresponding to the local origin"`
4545

4646
// Shadow origin prefix; e.g., osdf://ospool/osgconnect-shadow/
47-
ShadowOriginPrefix string `long:"shadow-prefix" short:"s" description:"Prefix corresponding to the shadow origin"`
47+
ShadowOriginPrefix string `long:"shadow-prefix" short:"s" description:"Prefix corresponding to the shadow origin" required:"true"`
4848

4949
// Sources to ingest
5050
Sources []string `positional-arg-name:"sources" short:"i" long:"input" description:"Source file(s)" default:"-"`
@@ -59,6 +59,22 @@ func main() {
5959
// Capture the start time of the transfer
6060
if _, err := parser.Parse(); err != nil {
6161
if flagsErr, ok := err.(*flags.Error); ok && flagsErr.Type == flags.ErrHelp {
62+
fmt.Fprintln(os.Stderr, `
63+
This utility parses a job ClassAd and, for each "osdf://" URL found in
64+
the input files that is in a locally-mounted origin, copies the file
65+
over to a "shadow origin". The files in the shadow origin are given a
66+
unique based on their last modification time; this means that local
67+
files can be modified without causing cache consistency issues.
68+
69+
Terminology:
70+
- Origin prefix: Where in the OSDF namespace the origin exports its
71+
files. Example: osdf://osg-connect/protected
72+
- Mount prefix: The location in the locally-mounted filesystem that
73+
correspondings to the files in the origin prefix. Example:
74+
/mnt/cephfs/protected
75+
- Shadow prefix: Where in the OSDF namespace the resulting files should
76+
be uploaded. Example: osdf://osg-connect-shadow/protected
77+
`);
6278
os.Exit(0)
6379
} else {
6480
log.Errorln(err)
@@ -89,7 +105,8 @@ func main() {
89105
log.Errorln("Origin prefix scheme must be osdf://:", originPrefixUri.Scheme)
90106
os.Exit(1)
91107
}
92-
originPrefixPath := path.Clean("/" + originPrefixUri.Path)
108+
originPrefixPath := path.Clean("/" + originPrefixUri.Host + "/" + originPrefixUri.Path)
109+
log.Debugln("Local origin prefix:", originPrefixPath)
93110

94111
if options.Version {
95112
fmt.Println("Version:", version)
@@ -126,7 +143,7 @@ func main() {
126143
os.Exit(1)
127144
}
128145
inputList, err := classad.Get("TransferInput")
129-
if err != nil {
146+
if err != nil || inputList == nil {
130147
// No TransferInput, no need to transform...
131148
os.Exit(0)
132149
}
@@ -137,14 +154,15 @@ func main() {
137154
}
138155
re := regexp.MustCompile("[,\\s]+")
139156
for _, source := range re.Split(inputListStr, -1) {
157+
log.Debugln("Examining transfer input file", source)
140158
if (strings.HasPrefix(source, options.MountPrefix)) {
141159
sources = append(sources, source)
142160
} else {
143161
// Replace the osdf:// prefix with the local mount path
144162
source_uri, err := url.Parse(source)
145163
source_uri_scheme := strings.SplitN(source_uri.Scheme, "+", 2)[0]
146-
if err != nil && source_uri_scheme == "osdf" {
147-
source_path := path.Clean("/" + source_uri.Path)
164+
if err == nil && source_uri_scheme == "osdf" {
165+
source_path := path.Clean("/" + source_uri.Host + "/" + source_uri.Path)
148166
if (strings.HasPrefix(source_path, originPrefixPath)) {
149167
sources = append(sources, options.MountPrefix + source_path[len(originPrefixPath):])
150168
continue
@@ -169,7 +187,14 @@ func main() {
169187
for _, src := range sources {
170188
_, newSource, result := stashcp.DoShadowIngest(src, options.MountPrefix, options.ShadowOriginPrefix)
171189
if result != nil {
172-
break
190+
// What's the correct behavior on failure? For now, we silently put the transfer
191+
// back on the original list. This is arguably the wrong approach as it might
192+
// give the user surprising semantics -- but keeping this until we have a bit more
193+
// confidence in the approach.
194+
extraSources = append(extraSources, src)
195+
log.Errorf("Failed to ingest %s: %s. Adding original back to the transfer list",
196+
src, result.Error())
197+
continue
173198
}
174199
xformSources = append(xformSources, newSource)
175200
}

handle_http.go

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,16 @@ func (e *StoppedTransferError) Error() string {
3939
}
4040

4141

42+
type HttpErrResp struct {
43+
Code int
44+
Err string
45+
}
46+
47+
func (e *HttpErrResp) Error() string {
48+
return e.Err
49+
}
50+
51+
4252
// SlowTransferError is an error that is returned when a transfer takes longer than the configured timeout
4353
type SlowTransferError struct {
4454
BytesTransferred int64
@@ -898,9 +908,12 @@ func walkDir(path string, client *gowebdav.Client) ([]string, error) {
898908
return files, nil
899909
}
900910

901-
func stat_http(dest *url.URL, namespace Namespace) (uint64, error) {
911+
func StatHttp(dest *url.URL, namespace Namespace) (uint64, error) {
902912

903-
token_name := getTokenName(dest)
913+
scitoken_contents, err := getToken(dest, namespace, false, "")
914+
if err != nil {
915+
return 0, err
916+
}
904917

905918
// Parse the writeback host as a URL
906919
writebackhostUrl, err := url.Parse(namespace.WriteBackHost)
@@ -910,19 +923,9 @@ func stat_http(dest *url.URL, namespace Namespace) (uint64, error) {
910923
dest.Host = writebackhostUrl.Host
911924
dest.Scheme = "https"
912925

913-
var token string
914-
if namespace.UseTokenOnRead {
915-
token, err = getToken(token_name)
916-
if err != nil {
917-
log.Errorln("Failed to get token though required to read from this namespace:", err)
918-
return 0, err
919-
}
920-
}
921-
922-
_, canDisableProxy := os.LookupEnv("OSG_DISABLE_PROXY_FALLBACK")
923-
canDisableProxy = !canDisableProxy
926+
canDisableProxy := CanDisableProxy()
927+
disableProxy := !IsProxyEnabled()
924928

925-
disableProxy := false
926929
var resp *http.Response
927930
for {
928931
defaultTransport := http.DefaultTransport.(*http.Transport).Clone()
@@ -940,8 +943,8 @@ func stat_http(dest *url.URL, namespace Namespace) (uint64, error) {
940943
return 0, err
941944
}
942945

943-
if token != "" {
944-
req.Header.Set("Authorization", "Bearer " + token)
946+
if scitoken_contents != "" {
947+
req.Header.Set("Authorization", "Bearer " + scitoken_contents)
945948
}
946949

947950
resp, err = client.Do(req)
@@ -979,7 +982,7 @@ func stat_http(dest *url.URL, namespace Namespace) (uint64, error) {
979982
return 0, err
980983
}
981984
defer resp.Body.Close()
982-
return 0, errors.New(fmt.Sprintf("Request failed (HTTP status %d): %s", resp.StatusCode, string(response_b)))
985+
return 0, &HttpErrResp{resp.StatusCode, fmt.Sprintf("Request failed (HTTP status %d): %s", resp.StatusCode, string(response_b))}
983986
}
984987
}
985988

handle_ingest.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ func generate_destination(filePath string, originPrefix string, shadowOriginPref
3737
func DoShadowIngest(sourceFile string, originPrefix string, shadowOriginPrefix string) (int64, string, error) {
3838
for idx := 0; idx < 10; idx++ {
3939
shadowFile, localSize, err := generate_destination(sourceFile, originPrefix, shadowOriginPrefix)
40+
log.Debugln("Resulting shadow URL:", shadowFile)
4041
if err != nil {
4142
return 0, "", err
4243
}
@@ -48,12 +49,19 @@ func DoShadowIngest(sourceFile string, originPrefix string, shadowOriginPrefix s
4849
maxRuntime := float64(localSize / 10*1024*1024) + 300
4950
for {
5051
remoteSize, err := CheckOSDF(shadowFile, methods)
51-
if err != nil {
52+
if httpErr, ok := err.(*HttpErrResp); ok {
53+
if httpErr.Code == 404 {
54+
break
55+
} else {
56+
return 0, "", err
57+
}
58+
} else if err != nil {
5259
return 0, "", err
5360
}
5461
if (localSize == remoteSize) {
5562
return 0, shadowFile, err
5663
}
64+
log.Debugf("Remote file exists but it is incorrect size; actual size %v, expected %v.", remoteSize, localSize)
5765

5866
// If the remote file size is growing, then wait a bit; perhaps someone
5967
// else is uploading the same file concurrently.
@@ -71,8 +79,9 @@ func DoShadowIngest(sourceFile string, originPrefix string, shadowOriginPrefix s
7179
log.Warnln("Remote uploader took too long to upload file; will do upload from this client")
7280
break
7381
}
82+
log.Debugf("Will sleep for 5 seconds to see if another client is currently uploading the file")
7483
// TODO: Could use a clever backoff scheme here.
75-
time.Sleep(5)
84+
time.Sleep(5 * time.Second)
7685
}
7786

7887
uploadBytes, err := DoStashCPSingle(sourceFile, shadowFile, methods, false)

main.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,10 @@ func CheckOSDF(destination string, methods []string) (remoteSize uint64, err err
227227
if dest_uri.Scheme == "" {
228228
dest_uri.Scheme = "osdf"
229229
}
230+
if dest_uri.Host != "" {
231+
dest_uri.Path = path.Clean("/" + dest_uri.Host + "/" + dest_uri.Path)
232+
dest_uri.Host = ""
233+
}
230234

231235
ns, err := MatchNamespace(dest_uri.Path)
232236
if err != nil {
@@ -238,7 +242,7 @@ func CheckOSDF(destination string, methods []string) (remoteSize uint64, err err
238242
switch method {
239243
case "http":
240244
log.Info("Trying HTTP...")
241-
if remoteSize, err = stat_http(dest_uri, ns); err == nil {
245+
if remoteSize, err = StatHttp(dest_uri, ns); err == nil {
242246
return remoteSize, nil
243247
}
244248
default:
@@ -339,7 +343,7 @@ func DoStashCPSingle(sourceFile string, destination string, methods []string, re
339343
}
340344

341345
if dest_url.Host != "" {
342-
dest_url.Path = path.Join(dest_url.Host, dest_url.Path)
346+
dest_url.Path = "/" + path.Join(dest_url.Host, dest_url.Path)
343347
}
344348

345349
sourceScheme, _ := getTokenName(source_url)

0 commit comments

Comments
 (0)