-
Notifications
You must be signed in to change notification settings - Fork 0
/
intra.go
101 lines (95 loc) · 2.32 KB
/
intra.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package streisand
import (
"bytes"
"encoding/hex"
"fmt"
"io"
"net/http"
"net/url"
"os"
"time"
)
func init() {
// We know that the only server we send Expect:100-continue to is ourselves, so there's no need for this fallback for servers which don't support it properly.
// This prevents us from unintentionally sending an internal put to an overloaded server which didn't respond in 1s.
http.DefaultTransport.(*http.Transport).ExpectContinueTimeout = 30 * time.Second
}
// pushBlob sends a blob to a target server (given as host:port)
// Passing in an fh is optional, but if you do, it will be closed before returning.
func (s *server) pushBlob(target string, hash []byte, fh *os.File) error {
// TODO: locking
if fh == nil {
var err error
fh, err = s.store.Get(hash)
if err != nil {
return err
}
}
defer fh.Close()
st, err := fh.Stat()
if err != nil {
return err
}
u := url.URL{
Scheme: "http",
Host: target,
Path: "/internal/upload",
}
req, err := http.NewRequest("POST", u.String(), fh)
if err != nil {
return err
}
req.Header.Set("Expect", "100-continue")
req.Header.Set("X-StreiSANd-Hash", hex.EncodeToString(hash))
req.Header.Set("Content-Length", fmt.Sprint(st.Size()))
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
if resp.StatusCode != 200 {
return fmt.Errorf("HTTP error: %s", resp.Status)
}
return nil
}
func (s *server) pullBlob(target string, hash []byte) error {
u := url.URL{
Scheme: "http",
Host: target,
Path: "/internal/blob/" + hex.EncodeToString(hash),
}
req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
return err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
return fmt.Errorf("HTTP error: %s", resp.Status)
}
w, err := s.store.NewWriter()
if err != nil {
return err
}
defer w.Abort()
if _, err := io.Copy(w, resp.Body); err != nil {
return err
}
if err := resp.Body.Close(); err != nil {
return err
}
s.mutex.Lock()
defer s.mutex.Unlock()
if err := w.Close(); err != nil {
return err
}
if bytes.Compare(hash, w.Hash()) != 0 {
return fmt.Errorf("remote returned incorrect file: want %q, got %q", hex.EncodeToString(hash), hex.EncodeToString(w.Hash()))
}
if w.IsNew() {
s.xors.Add((*Hash)(hash))
}
return nil
}