Skip to content

Commit

Permalink
feat: support buffer type
Browse files Browse the repository at this point in the history
  • Loading branch information
siyul-park committed Jan 4, 2025
1 parent b2a1601 commit eb7f2cf
Show file tree
Hide file tree
Showing 27 changed files with 456 additions and 129 deletions.
13 changes: 13 additions & 0 deletions examples/loopback.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
- kind: listener
name: listener
protocol: http
port: '{{ .PORT }}'
ports:
out:
- name: loopback
port: in

- kind: snippet
name: loopback
language: cel
code: self
6 changes: 3 additions & 3 deletions examples/system.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@

- kind: if
name: specs_read_or_watch
when: '!has(self.query.watch) || !self.query.watch.all(x, x == "true") || !has(self.header.Connection) || !has(self.header.Upgrade)'
when: '!has(self.header.Connection) || !has(self.header.Upgrade)'
ports:
out[0]:
- name: specs_read_with_query
Expand Down Expand Up @@ -357,7 +357,7 @@

- kind: if
name: values_read_or_watch
when: '!has(self.query.watch) || !self.query.watch.all(x, x == "true") || !has(self.header.Connection) || !has(self.header.Upgrade)'
when: '!has(self.header.Connection) || !has(self.header.Upgrade)'
ports:
out[0]:
- name: values_read_with_query
Expand Down Expand Up @@ -557,7 +557,7 @@

- kind: if
name: charts_read_or_watch
when: '!has(self.query.watch) || !self.query.watch.all(x, x == "true") || !has(self.header.Connection) || !has(self.header.Upgrade)'
when: '!has(self.header.Connection) || !has(self.header.Upgrade)'
ports:
out[0]:
- name: charts_read_with_query
Expand Down
14 changes: 8 additions & 6 deletions ext/pkg/mime/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func Encode(writer io.Writer, value types.Value, header textproto.MIMEHeader) er

flush := func() {
if c, ok := w.(io.Closer); ok && w != cwriter {
c.Close()
_ = c.Close()
}
header.Set(HeaderContentLength, strconv.Itoa(count))
}
Expand Down Expand Up @@ -219,6 +219,12 @@ func Encode(writer io.Writer, value types.Value, header textproto.MIMEHeader) er
}

switch v := value.(type) {
case types.Buffer:
if _, err := io.Copy(w, v); err != nil {
return err
}
flush()
return nil
case types.Binary:
if _, err := w.Write(v.Bytes()); err != nil {
return err
Expand Down Expand Up @@ -327,11 +333,7 @@ func Decode(reader io.Reader, header textproto.MIMEHeader) (types.Value, error)
})
}

data, err := io.ReadAll(r)
if err != nil {
return nil, err
}
return types.NewBinary(data), nil
return types.NewBuffer(r), nil
}

func randomMultipartBoundary() string {
Expand Down
15 changes: 10 additions & 5 deletions ext/pkg/mime/encoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func TestDecode(t *testing.T) {
{
whenValue: []byte("--MyBoundary\r\n" +
"Content-Disposition: form-data; name=\"test\"; filename=\"test\"\r\n" +
"Content-Type: application/octet-stream\r\n" +
"Content-Type: text/plain\r\n" +
"\r\n" +
"test\r\n" +
"--MyBoundary\r\n" +
Expand All @@ -186,11 +186,11 @@ func TestDecode(t *testing.T) {
),
types.NewString("files"), types.NewMap(
types.NewString("test"), types.NewSlice(types.NewMap(
types.NewString("data"), types.NewBinary([]byte("test")),
types.NewString("data"), types.NewString("test"),
types.NewString("filename"), types.NewString("test"),
types.NewString("header"), types.NewMap(
types.NewString("Content-Disposition"), types.NewSlice(types.NewString("form-data; name=\"test\"; filename=\"test\"")),
types.NewString("Content-Type"), types.NewSlice(types.NewString("application/octet-stream")),
types.NewString("Content-Type"), types.NewSlice(types.NewString("text/plain")),
),
types.NewString("size"), types.NewInt64(4),
)),
Expand All @@ -200,7 +200,7 @@ func TestDecode(t *testing.T) {
{
whenValue: []byte("testtesttest"),
whenType: ApplicationOctetStream,
expect: types.NewBinary([]byte("testtesttest")),
expect: types.NewBuffer(bytes.NewBuffer([]byte("testtesttest"))),
},
}

Expand All @@ -210,7 +210,12 @@ func TestDecode(t *testing.T) {
HeaderContentType: []string{tt.whenType},
})
assert.NoError(t, err)
assert.Equal(t, tt.expect.Interface(), decode.Interface())

var expect any
var actual any
_ = types.Unmarshal(tt.expect, &expect)
_ = types.Unmarshal(decode, &actual)
assert.Equal(t, expect, actual)
})
}
}
10 changes: 5 additions & 5 deletions ext/pkg/mime/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,16 @@ func DetectTypesFromBytes(value []byte) []string {
// DetectTypesFromValue determines the content types based on the type of types passed.
func DetectTypesFromValue(value types.Value) []string {
switch value.(type) {
case types.Binary:
case types.Binary, types.Buffer:
return []string{ApplicationOctetStream}
case types.String:
return []string{TextPlainCharsetUTF8, ApplicationJSONCharsetUTF8}
return []string{TextPlainCharsetUTF8, ApplicationOctetStream, ApplicationJSONCharsetUTF8, ApplicationXMLCharsetUTF8, ApplicationFormURLEncoded, MultipartFormData}
case types.Slice:
return []string{ApplicationJSONCharsetUTF8}
return []string{ApplicationJSONCharsetUTF8, ApplicationXMLCharsetUTF8, ApplicationFormURLEncoded}
case types.Map, types.Error:
return []string{ApplicationJSONCharsetUTF8, ApplicationFormURLEncoded, MultipartFormData}
return []string{ApplicationJSONCharsetUTF8, ApplicationXMLCharsetUTF8, ApplicationFormURLEncoded, MultipartFormData}
default:
return []string{ApplicationJSONCharsetUTF8}
return []string{ApplicationJSONCharsetUTF8, ApplicationXMLCharsetUTF8, TextPlainCharsetUTF8, ApplicationOctetStream}
}
}

Expand Down
12 changes: 8 additions & 4 deletions ext/pkg/mime/type_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,25 @@ func TestDetectTypesFromValue(t *testing.T) {
when: types.NewBinary(nil),
expect: []string{ApplicationOctetStream},
},
{
when: types.NewBuffer(nil),
expect: []string{ApplicationOctetStream},
},
{
when: types.NewString(""),
expect: []string{TextPlainCharsetUTF8, ApplicationJSONCharsetUTF8},
expect: []string{TextPlainCharsetUTF8, ApplicationOctetStream, ApplicationJSONCharsetUTF8, ApplicationXMLCharsetUTF8, ApplicationFormURLEncoded, MultipartFormData},
},
{
when: types.NewSlice(),
expect: []string{ApplicationJSONCharsetUTF8},
expect: []string{ApplicationJSONCharsetUTF8, ApplicationXMLCharsetUTF8, ApplicationFormURLEncoded},
},
{
when: types.NewMap(),
expect: []string{ApplicationJSONCharsetUTF8, ApplicationFormURLEncoded, MultipartFormData},
expect: []string{ApplicationJSONCharsetUTF8, ApplicationXMLCharsetUTF8, ApplicationFormURLEncoded, MultipartFormData},
},
{
when: types.NewError(nil),
expect: []string{ApplicationJSONCharsetUTF8, ApplicationFormURLEncoded, MultipartFormData},
expect: []string{ApplicationJSONCharsetUTF8, ApplicationXMLCharsetUTF8, ApplicationFormURLEncoded, MultipartFormData},
},
}

Expand Down
5 changes: 4 additions & 1 deletion ext/pkg/network/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,10 @@ func (n *HTTPNode) action(proc *process.Process, inPck *packet.Packet) (*packet.
if err != nil {
return nil, packet.New(types.NewError(err))
}
defer w.Body.Close()

proc.AddExitHook(process.ExitFunc(func(err error) {
_ = w.Body.Close()
}))

body, err := mime.Decode(w.Body, textproto.MIMEHeader(w.Header))
if err != nil {
Expand Down
15 changes: 5 additions & 10 deletions ext/pkg/network/listener.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package network

import (
"bytes"
"crypto/tls"
"fmt"
"io"
"net"
"net/http"
"net/textproto"
Expand Down Expand Up @@ -261,6 +259,9 @@ func (n *HTTPListenNode) negotiate(req *HTTPPayload, res *HTTPPayload) {
accept := req.Header.Get(mime.HeaderAccept)
offers := mime.DetectTypesFromValue(res.Body)
contentType := mime.Negotiate(accept, offers)
if contentType == "" && len(offers) > 0 {
contentType = offers[0]
}
if contentType != "" {
res.Header.Set(mime.HeaderContentType, contentType)
}
Expand Down Expand Up @@ -302,17 +303,11 @@ func (n *HTTPListenNode) write(w http.ResponseWriter, res *HTTPPayload) error {
}
}

buf := bytes.NewBuffer(nil)
if err := mime.Encode(buf, res.Body, textproto.MIMEHeader(h)); err != nil {
return err
}

status := res.Status
if status == 0 {
status = http.StatusOK
}
w.WriteHeader(status)

_, err := io.Copy(w, buf)
return err
w.WriteHeader(status)
return mime.Encode(w, res.Body, textproto.MIMEHeader(h))
}
30 changes: 15 additions & 15 deletions ext/pkg/network/websocket.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package network

import (
"bytes"
"context"
"net/http"
"net/textproto"
Expand Down Expand Up @@ -202,28 +201,29 @@ func (n *WebSocketConnNode) consume(proc *process.Process) {
}

for inPck := range inReader.Read() {
var inPayload *WebSocketPayload
if err := types.Unmarshal(inPck.Payload(), &inPayload); err != nil {
var inPayload WebSocketPayload
_ = types.Unmarshal(inPck.Payload(), &inPayload)
if inPayload.Type == 0 && inPayload.Data == nil {
inPayload.Data = inPck.Payload()
if _, ok := inPayload.Data.(types.Binary); !ok {
kind := types.KindOf(inPayload.Data)
if kind != types.KindBuffer && kind != types.KindBinary {
inPayload.Type = websocket.TextMessage
} else {
inPayload.Type = websocket.BinaryMessage
}
}

w := mime.WriterFunc(func(b []byte) (int, error) {
if err := conn.WriteMessage(inPayload.Type, b); err != nil {
return 0, err
}
return len(b), nil
})
w, err := conn.NextWriter(inPayload.Type)
if err != nil {
errPck := packet.New(types.NewError(err))
inReader.Receive(packet.Send(errWriter, errPck))
continue
}

if err := mime.Encode(w, inPayload.Data, textproto.MIMEHeader{}); err != nil {
errPck := packet.New(types.NewError(err))
if errWriter.Write(errPck) > 0 {
<-errWriter.Receive()
}
inReader.Receive(packet.Send(errWriter, errPck))
continue
}

inReader.Receive(packet.None)
Expand All @@ -237,7 +237,7 @@ func (n *WebSocketConnNode) produce(proc *process.Process) {
}

for {
typ, p, err := conn.ReadMessage()
typ, reader, err := conn.NextReader()
if err != nil || typ == websocket.CloseMessage {
outWriter := n.outPort.Open(proc)

Expand All @@ -262,7 +262,7 @@ func (n *WebSocketConnNode) produce(proc *process.Process) {
child := proc.Fork()
outWriter := n.outPort.Open(child)

data, err := mime.Decode(bytes.NewReader(p), textproto.MIMEHeader{})
data, err := mime.Decode(reader, textproto.MIMEHeader{})
if err != nil {
data = types.NewString(err.Error())
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/resource/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ type Stream interface {

// Event represents a change event for a Resource.
type Event struct {
OP EventOP // Operation type (Store, Swap, Delete)
ID uuid.UUID // ID of the changed Resource
ID uuid.UUID `json:"id" map:"id"`
OP EventOP `json:"op" map:"op"`
}

// EventOP represents the type of operation that triggered an Event.
Expand Down
2 changes: 1 addition & 1 deletion pkg/types/binary.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func newBinaryDecoder() encoding2.DecodeCompiler[Value] {
}
return errors.WithStack(encoding2.ErrUnsupportedType)
}), nil
} else if typ.Elem().Kind() == reflect.Interface {
} else if typ.Elem() == types[KindUnknown] {
return encoding2.DecodeFunc(func(source Value, target unsafe.Pointer) error {
if s, ok := source.(Binary); ok {
*(*any)(target) = s.Interface()
Expand Down
1 change: 0 additions & 1 deletion pkg/types/binary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ func TestBinary_Decode(t *testing.T) {

d, err := base64.StdEncoding.DecodeString(decoded)
assert.NoError(t, err)

assert.Equal(t, source, d)
})

Expand Down
2 changes: 1 addition & 1 deletion pkg/types/boolean.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func newBooleanDecoder() encoding.DecodeCompiler[Value] {
}
return errors.WithStack(encoding.ErrUnsupportedType)
}), nil
} else if typ.Elem().Kind() == reflect.Interface {
} else if typ.Elem() == types[KindUnknown] {
return encoding.DecodeFunc(func(source Value, target unsafe.Pointer) error {
if s, ok := source.(Boolean); ok {
*(*any)(target) = s.Interface()
Expand Down
Loading

0 comments on commit eb7f2cf

Please sign in to comment.