diff --git a/kadai3-2/int128/.gitignore b/kadai3-2/int128/.gitignore new file mode 100644 index 0000000..7db9bab --- /dev/null +++ b/kadai3-2/int128/.gitignore @@ -0,0 +1 @@ +/.work/ \ No newline at end of file diff --git a/kadai3-2/int128/README.md b/kadai3-2/int128/README.md new file mode 100644 index 0000000..3a74b32 --- /dev/null +++ b/kadai3-2/int128/README.md @@ -0,0 +1,58 @@ +# kadai3-2 + +> 分割ダウンロードを行う +> +> - Rangeアクセスを用いる +> - いくつかのゴルーチンでダウンロードしてマージする +> - エラー処理を工夫する: golang.org/x/sync/errgourpパッケージなどを使ってみる +> - キャンセルが発生した場合の実装を行う + +実行例: + +``` +% go run main.go https://upload.wikimedia.org/wikipedia/en/a/a9/Example.jpg +2018/07/04 17:14:28 Downloading https://upload.wikimedia.org/wikipedia/en/a/a9/Example.jpg to Example.jpg +2018/07/04 17:14:29 Total 27661 bytes +2018/07/04 17:14:29 Get 20748-27660 bytes of content +2018/07/04 17:14:29 Get 6916-13831 bytes of content +2018/07/04 17:14:29 Get 0-6915 bytes of content +2018/07/04 17:14:29 Get 13832-20747 bytes of content +2018/07/04 17:14:30 Wrote 6916-13831 bytes of content +2018/07/04 17:14:30 Wrote 20748-27660 bytes of content +2018/07/04 17:14:30 Wrote 0-6915 bytes of content +2018/07/04 17:14:30 Wrote 13832-20747 bytes of content +2018/07/04 17:14:30 Wrote 27661 bytes +``` + +大きいファイルの場合: + +``` +% go run main.go https://storage.googleapis.com/kubernetes-release/release/v1.11.0/bin/darwin/amd64/kubectl +2018/07/04 19:11:22 Downloading https://storage.googleapis.com/kubernetes-release/release/v1.11.0/bin/darwin/amd64/kubectl to kubectl +2018/07/04 19:11:23 Total 54949920 bytes +2018/07/04 19:11:23 Get 41212440-54949919 bytes of content +2018/07/04 19:11:23 Get 0-13737479 bytes of content +2018/07/04 19:11:23 Get 13737480-27474959 bytes of content +2018/07/04 19:11:23 Get 27474960-41212439 bytes of content +2018/07/04 19:11:27 Wrote 27474960-41212439 bytes of content +2018/07/04 19:11:28 Wrote 13737480-27474959 bytes of content +2018/07/04 19:11:30 Wrote 0-13737479 bytes of content +2018/07/04 19:11:30 Wrote 41212440-54949919 bytes of content +2018/07/04 19:11:30 Wrote 54949920 bytes +``` + +途中でWifi接続を切断した場合: + +``` +% go run main.go https://storage.googleapis.com/kubernetes-release/release/v1.11.0/bin/darwin/amd64/kubectl +2018/07/04 19:13:22 Downloading https://storage.googleapis.com/kubernetes-release/release/v1.11.0/bin/darwin/amd64/kubectl to kubectl +2018/07/04 19:13:23 Total 54949920 bytes +2018/07/04 19:13:23 Get 41212440-54949919 bytes of content +2018/07/04 19:13:23 Get 13737480-27474959 bytes of content +2018/07/04 19:13:23 Get 0-13737479 bytes of content +2018/07/04 19:13:23 Get 27474960-41212439 bytes of content +2018/07/04 19:13:30 Wrote 27474960-41212439 bytes of content +2018/07/04 19:13:30 Wrote 13737480-27474959 bytes of content +2018/07/04 19:13:49 Could not download https://storage.googleapis.com/kubernetes-release/release/v1.11.0/bin/darwin/amd64/kubectl: Could not write partial content: read tcp 172.16.3.103:59942->172.217.161.208:443: read: network is down +exit status 1 +``` diff --git a/kadai3-2/int128/download/download.go b/kadai3-2/int128/download/download.go new file mode 100644 index 0000000..b0cae7b --- /dev/null +++ b/kadai3-2/int128/download/download.go @@ -0,0 +1,123 @@ +// Package download supports parallel downloading with Range requests of RFC7233. +package download + +import ( + "context" + "fmt" + "io" + "log" + "net/http" + + "golang.org/x/sync/errgroup" +) + +// ErrRangeNotSupported shows server does not support Range request. +type ErrRangeNotSupported error + +// Download represents a download for the HTTP resource. +type Download struct { + URL string + Client *http.Client +} + +// New returns a new Download. +func New(url string) *Download { + return &Download{ + URL: url, + Client: &http.Client{}, + } +} + +// GetContent performs parallel download. +func (d *Download) GetContent(ctx context.Context, w io.WriterAt) (*Range, error) { + complete, err := d.GetCompleteRange(ctx) + switch err.(type) { + case nil: + case ErrRangeNotSupported: + return nil, err // TODO: Fallback to single download. + default: + return nil, fmt.Errorf("Could not download %s: %s", d.URL, err) + } + log.Printf("Total %d bytes", complete.Length()) + eg, ctx := errgroup.WithContext(ctx) + parts := complete.Split(4) // TODO: Parameterize number of parallelism. + for _, part := range parts { + part := part + eg.Go(func() error { + log.Printf("Get %d-%d bytes of content", part.Start, part.End) + c, err := d.GetPartialContent(ctx, part) + if err != nil { + return fmt.Errorf("Could not get partial content: %s", err) + } + defer c.Body.Close() + if _, err := io.Copy(NewRangeWriter(w, c.ContentRange.Partial), c.Body); err != nil { + return fmt.Errorf("Could not write partial content: %s", err) + } + log.Printf("Wrote %d-%d bytes of content", part.Start, part.End) + return nil + }) + } + if err := eg.Wait(); err != nil { + return nil, err + } + return complete, nil +} + +// GetCompleteRange returns complete range of the HTTP content. +// This sends a request with Range: bytes 0-0 and parse the Content-Range response. +func (d *Download) GetCompleteRange(ctx context.Context) (*Range, error) { + c, err := d.GetPartialContent(ctx, Range{0, 0}) + if err != nil { + return nil, fmt.Errorf("Could not determine content length: %s", err) + } + defer c.Body.Close() + if c.ContentRange.Complete == nil { + header := c.Header.Get("Content-Range") + return nil, ErrRangeNotSupported(fmt.Errorf("Unknown length: Content-Range: %s", header)) + } + return c.ContentRange.Complete, nil +} + +// PartialContentResponse represents a response of 206 Partial Content. +type PartialContentResponse struct { + *http.Response // HTTP response + ContentRange *ContentRange // Content-Range header +} + +// GetPartialContent sends a Range request. +// If the status code is 206, it returns the content. +// If the status code is 200 or 416, it returns the ErrRangeNotSupported. +// Caller must close the body when the returned error is nil. +func (d *Download) GetPartialContent(ctx context.Context, rng Range) (*PartialContentResponse, error) { + req, err := http.NewRequest("GET", d.URL, nil) + if err != nil { + return nil, fmt.Errorf("Could not create a request for %s: %s", d.URL, err) + } + req = req.WithContext(ctx) + req.Header.Add("Range", rng.HeaderValue()) + logHTTPRequest(req) + res, err := d.Client.Do(req) + if err != nil { + return nil, fmt.Errorf("Could not send a request for %s: %s", d.URL, err) + } + logHTTPResponse(res) + + switch res.StatusCode { + case http.StatusPartialContent: + crng, err := ParseContentRange(res.Header.Get("Content-Range")) + if err != nil { + res.Body.Close() + return nil, fmt.Errorf("Invalid Content-Range header: %s", err) + } + return &PartialContentResponse{res, crng}, nil + case http.StatusOK: + res.Body.Close() + return nil, ErrRangeNotSupported(fmt.Errorf("Server does not support Range request: %s", res.Status)) + case http.StatusRequestedRangeNotSatisfiable: + res.Body.Close() + return nil, ErrRangeNotSupported(fmt.Errorf("Server does not support Range request: %s", res.Status)) + default: + res.Body.Close() + return nil, fmt.Errorf("HTTP error: %s", res.Status) + } +} diff --git a/kadai3-2/int128/download/log.go b/kadai3-2/int128/download/log.go new file mode 100644 index 0000000..9edc021 --- /dev/null +++ b/kadai3-2/int128/download/log.go @@ -0,0 +1,29 @@ +package download + +import ( + "log" + "net/http" + "os" +) + +func logHTTPRequest(req *http.Request) { + if os.Getenv("DEBUG") != "" { + log.Printf("<- %s %s", req.Method, req.URL) + for key, values := range req.Header { + for _, value := range values { + log.Printf("<- %s: %s", key, value) + } + } + } +} + +func logHTTPResponse(res *http.Response) { + if os.Getenv("DEBUG") != "" { + log.Printf("-> %s %s", res.Proto, res.Status) + for key, values := range res.Header { + for _, value := range values { + log.Printf("-> %s: %s", key, value) + } + } + } +} diff --git a/kadai3-2/int128/download/range.go b/kadai3-2/int128/download/range.go new file mode 100644 index 0000000..323a41c --- /dev/null +++ b/kadai3-2/int128/download/range.go @@ -0,0 +1,76 @@ +package download + +import ( + "fmt" +) + +// ContentRange represents a HTTP Content-Range header. +// See also RFC7233. +type ContentRange struct { + Partial Range // Received range of the content + Complete *Range // Complete range of the content (may be nil) +} + +// ParseContentRange parses a Content-Range header and returns the Range. +func ParseContentRange(header string) (*ContentRange, error) { + rng := Range{} + if _, err := fmt.Sscanf(header, "bytes %d-%d/*", &rng.Start, &rng.End); err == nil { + return &ContentRange{rng, nil}, nil + } + var length int64 + if _, err := fmt.Sscanf(header, "bytes %d-%d/%d", &rng.Start, &rng.End, &length); err == nil { + return &ContentRange{rng, &Range{0, length - 1}}, nil + } + return nil, fmt.Errorf("Invalid Content-Range header: %s", header) +} + +// Range represents range for [start, end]. +// See also RFC7233. +type Range struct { + Start int64 // Start position + End int64 // End position +} + +// HeaderValue returns a value of Range header, e.g. bytes=100-200. +func (r *Range) HeaderValue() string { + return fmt.Sprintf("bytes=%d-%d", r.Start, r.End) +} + +// Length returns length. +func (r *Range) Length() int64 { + return r.End - r.Start + 1 +} + +// Split splits the range into chunks. +// If count is 0, it returns an empty slice. +// If count is too big, it returns a possible longest slice. +func (r *Range) Split(count int) []Range { + if count < 1 { + return []Range{} + } + unit := divCeil(r.Length(), int64(count)) + chunks := make([]Range, 0, count) + for p := r.Start; p <= r.End; p += unit { + rng := Range{ + Start: p, + End: min(p+unit-1, r.End), + } + chunks = append(chunks, rng) + } + return chunks +} + +// divCeil(a, b) = ceil(a / b) +func divCeil(a int64, b int64) int64 { + if a%b > 0 { + return a/b + 1 + } + return a / b +} + +func min(a int64, b int64) int64 { + if a < b { + return a + } + return b +} diff --git a/kadai3-2/int128/download/range_test.go b/kadai3-2/int128/download/range_test.go new file mode 100644 index 0000000..6b24e85 --- /dev/null +++ b/kadai3-2/int128/download/range_test.go @@ -0,0 +1,65 @@ +package download + +import ( + "fmt" + "reflect" + "testing" +) + +func TestParseContentRange(t *testing.T) { + matrix := []struct { + header string + expected *ContentRange + }{ + {"bytes 0-0/100", &ContentRange{Range{0, 0}, &Range{0, 99}}}, + {"bytes 0-0/*", &ContentRange{Range{0, 0}, nil}}, + {"bytes 12345-67890/123456", &ContentRange{Range{12345, 67890}, &Range{0, 123455}}}, + {"bytes 12345-67890/*", &ContentRange{Range{12345, 67890}, nil}}, + } + for _, m := range matrix { + t.Run(m.header, func(t *testing.T) { + rng, err := ParseContentRange(m.header) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(m.expected, rng) { + t.Errorf("range wants %+v but %+v", m.expected, rng) + } + }) + } +} + +func TestParseWrongContentRange(t *testing.T) { + rng, err := ParseContentRange("foo") + if err == nil { + t.Errorf("wants error but %+v", rng) + } +} + +func TestRangeSplit(t *testing.T) { + matrix := []struct { + r Range + count int + expected []Range + }{ + {Range{1, 1}, 0, []Range{}}, + {Range{1, 1}, 1, []Range{Range{1, 1}}}, + + {Range{1, 2}, 1, []Range{Range{1, 2}}}, + {Range{1, 2}, 2, []Range{Range{1, 1}, Range{2, 2}}}, + {Range{1, 2}, 3, []Range{Range{1, 1}, Range{2, 2}}}, + + {Range{1, 3}, 1, []Range{Range{1, 3}}}, + {Range{1, 3}, 2, []Range{Range{1, 2}, Range{3, 3}}}, + {Range{1, 3}, 3, []Range{Range{1, 1}, Range{2, 2}, Range{3, 3}}}, + {Range{1, 3}, 4, []Range{Range{1, 1}, Range{2, 2}, Range{3, 3}}}, + } + for _, m := range matrix { + t.Run(fmt.Sprintf("%+v/Count:%d", m.r, m.count), func(t *testing.T) { + chunks := m.r.Split(m.count) + if !reflect.DeepEqual(m.expected, chunks) { + t.Errorf("chunks wants %+v but %+v", m.expected, chunks) + } + }) + } +} diff --git a/kadai3-2/int128/download/writer.go b/kadai3-2/int128/download/writer.go new file mode 100644 index 0000000..cb2f18c --- /dev/null +++ b/kadai3-2/int128/download/writer.go @@ -0,0 +1,47 @@ +package download + +import ( + "fmt" + "io" +) + +// Position represents relative position in the range. +type Position struct { + Range Range + Offset int64 // offset in the range +} + +// Absolute returns absolute position. +func (p *Position) Absolute() int64 { + return p.Range.Start + p.Offset +} + +// Forward forwards the position. +func (p *Position) Forward(n int64) { + p.Offset += n +} + +// CanForward returns true if incremented position is in the range. +func (p *Position) CanForward(n int64) bool { + return p.Absolute()+n-1 <= p.Range.End +} + +// RangeWriter supports partial write. +type RangeWriter struct { + io.WriterAt + position Position // relative position in the range +} + +// NewRangeWriter returns a new RangeWriter. +func NewRangeWriter(w io.WriterAt, r Range) *RangeWriter { + return &RangeWriter{w, Position{r, 0}} +} + +func (w *RangeWriter) Write(p []byte) (int, error) { + if !w.position.CanForward(int64(len(p))) { + return 0, fmt.Errorf("Write position exceeds the range: len(p)=%d, position=%+v", len(p), w.position) + } + n, err := w.WriteAt(p, w.position.Absolute()) + w.position.Forward(int64(n)) + return n, err +} diff --git a/kadai3-2/int128/main.go b/kadai3-2/int128/main.go new file mode 100644 index 0000000..bab6f9c --- /dev/null +++ b/kadai3-2/int128/main.go @@ -0,0 +1,44 @@ +package main + +import ( + "context" + "fmt" + "log" + "os" + "path/filepath" + + "github.com/gopherdojo/dojo2/kadai3-2/int128/download" +) + +func main() { + // TODO: support flags + switch len(os.Args) { + case 2: + doDownload(os.Args[1]) + default: + fmt.Fprintf(os.Stderr, "usage: %s URL\n", os.Args[0]) + os.Exit(1) + } +} + +func doDownload(url string) { + filename := filepath.Base(url) + if filename == "" { + filename = "file" + } + + w, err := os.Create(filename) + if err != nil { + log.Fatalf("Could not create file %s: %s", filename, err) + } + defer w.Close() + + log.Printf("Downloading %s to %s", url, filename) + d := download.New(url) + ctx := context.Background() + rng, err := d.GetContent(ctx, w) + if err != nil { + log.Fatalf("Could not download %s: %s", url, err) + } + log.Printf("Wrote %d bytes", rng.Length()) +}