generated from RTradeLtd/repo-template
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathconverter_chunked.go
148 lines (141 loc) · 3.47 KB
/
converter_chunked.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package ipldeml
import (
"bytes"
"errors"
xpb "github.com/RTradeLtd/TxPB/v3/go"
"github.com/RTradeLtd/ipld-eml/pb"
)
// contains converter function to deal with chunked messages
// GetEmailChunked is used to return an email from its chunked storage format
func (c *Converter) GetEmailChunked(hash string) (*pb.Email, error) {
ep, err := c.GetChunkedEmail(hash)
if err != nil {
return nil, err
}
var (
data []byte
max = len(ep.Parts)
)
for i := 0; i < max; i++ {
resp, err := c.xclient.Dag(c.ctx, &xpb.DagRequest{
RequestType: xpb.DAGREQTYPE_DAG_GET,
Hash: ep.Parts[int32(i)],
})
if err != nil {
return nil, err
}
data = append(data, resp.GetRawData()...)
}
email := new(pb.Email)
if err := email.Unmarshal(data); err != nil {
return nil, err
}
return email, nil
}
// PutEmailChunked allows storing an email as a custom ipld dag object
// as opposed to a unixfs object type
func (c *Converter) PutEmailChunked(email *pb.Email) (string, error) {
data, err := email.Marshal()
if err != nil {
return "", err
}
var dataSize = len(data)
maxSize := (1024 * 1024 * 1024) - 1024
if len(data) >= maxSize {
return "", errors.New("do normal uplaod")
}
var (
parts = make(map[int32]string)
lastChunk = 0
)
for i := 0; ; i++ {
if lastChunk >= dataSize {
break
}
barrier := lastChunk + maxSize
if barrier > dataSize {
barrier = dataSize
}
resp, err := c.xclient.Dag(c.ctx, &xpb.DagRequest{
Data: data[lastChunk:barrier],
})
if err != nil {
return "", err
}
lastChunk = barrier
parts[int32(i)] = resp.GetHashes()[0]
}
ep := &pb.ChunkedEmail{
Parts: parts,
}
epd, err := ep.Marshal()
if err != nil {
return "", err
}
resp, err := c.xclient.UploadFile(c.ctx, bytes.NewReader(epd), 0, nil, false)
if err != nil {
return "", err
}
return resp.GetHash(), nil
}
// GetChunkedEmail returns a ChunkedEmail object
func (c *Converter) GetChunkedEmail(hash string) (*pb.ChunkedEmail, error) {
resp, err := c.xclient.DownloadFile(c.ctx, &xpb.DownloadRequest{Hash: hash}, false)
if err != nil {
return nil, err
}
ep := new(pb.ChunkedEmail)
if err := ep.Unmarshal(resp.Bytes()); err != nil {
return nil, err
}
return ep, nil
}
// CalculateChunkedEmailSize is used to calculate the size of chunked ipld eml objects
func (c *Converter) CalculateChunkedEmailSize(hashes ...string) (int64, error) {
if len(hashes) == 0 {
return 0, errors.New("no hashes provided")
}
var fileHashes = make(map[string]bool)
var newHashes []string
for _, hash := range hashes {
chnk, err := c.GetChunkedEmail(hash)
if err != nil {
return 0, err
}
for _, chash := range chnk.GetParts() {
if !fileHashes[chash] {
fileHashes[chash] = true
newHashes = append(newHashes, chash)
}
}
em, err := c.GetEmailChunked(hash)
if err != nil {
return 0, err
}
for _, embed := range em.EmbeddedFiles {
if !fileHashes[embed.DataHash] {
fileHashes[embed.DataHash] = true
newHashes = append(newHashes, embed.DataHash)
}
}
for _, attach := range em.Attachments {
if !fileHashes[attach.DataHash] {
fileHashes[attach.DataHash] = true
newHashes = append(newHashes, attach.DataHash)
}
}
}
var size int64
hashes = append(hashes, newHashes...)
for _, hash := range hashes {
resp, err := c.xclient.Dag(c.ctx, &xpb.DagRequest{
RequestType: xpb.DAGREQTYPE_DAG_STAT,
Hash: hash,
})
if err != nil {
return 0, err
}
size += resp.GetNodeStats()[hash].GetCumulativeSize()
}
return size, nil
}