Skip to content

Commit

Permalink
feat: support buffer type
Browse files Browse the repository at this point in the history
  • Loading branch information
siyul-park committed Jan 4, 2025
1 parent b2a1601 commit e62abc7
Show file tree
Hide file tree
Showing 40 changed files with 772 additions and 623 deletions.
4 changes: 2 additions & 2 deletions examples/httpproxy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@
- name: proxy
port: in

- kind: proxy
- kind: http
name: proxy
urls: [https://www.google.com/]
url: https://echo.free.beeceptor.com/
13 changes: 13 additions & 0 deletions examples/loopback.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
- kind: listener
name: listener
protocol: http
port: '{{ .PORT }}'
ports:
out:
- name: loopback
port: in

- kind: snippet
name: loopback
language: cel
code: self
6 changes: 3 additions & 3 deletions examples/system.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@

- kind: if
name: specs_read_or_watch
when: '!has(self.query.watch) || !self.query.watch.all(x, x == "true") || !has(self.header.Connection) || !has(self.header.Upgrade)'
when: '!has(self.header.Connection) || !has(self.header.Upgrade)'
ports:
out[0]:
- name: specs_read_with_query
Expand Down Expand Up @@ -357,7 +357,7 @@

- kind: if
name: values_read_or_watch
when: '!has(self.query.watch) || !self.query.watch.all(x, x == "true") || !has(self.header.Connection) || !has(self.header.Upgrade)'
when: '!has(self.header.Connection) || !has(self.header.Upgrade)'
ports:
out[0]:
- name: values_read_with_query
Expand Down Expand Up @@ -557,7 +557,7 @@

- kind: if
name: charts_read_or_watch
when: '!has(self.query.watch) || !self.query.watch.all(x, x == "true") || !has(self.header.Connection) || !has(self.header.Upgrade)'
when: '!has(self.header.Connection) || !has(self.header.Upgrade)'
ports:
out[0]:
- name: charts_read_with_query
Expand Down
1 change: 0 additions & 1 deletion ext/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ Facilitates smooth execution of network-related tasks across various protocols.
- **[WebSocket Node](./docs/websocket_node.md)**: Establishes WebSocket connections and handles message sending and receiving.
- **[Gateway Node](./docs/gateway_node.md)**: Upgrades HTTP connections to WebSocket for real-time data communication.
- **[Listener Node](./docs/listener_node.md)**: Receives network requests on specified protocols and ports.
- **[Proxy Node](./docs/proxy_node.md)**: Proxies HTTP requests to other servers and returns their responses.
- **[Router Node](./docs/router_node.md)**: Routes input packets to multiple output ports based on conditions.

### **System**
Expand Down
1 change: 0 additions & 1 deletion ext/README_kr.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
- **[WebSocket 노드](./docs/websocket_node_kr.md)**: WebSocket 연결을 설정하고 메시지를 송수신합니다.
- **[Gateway 노드](./docs/gateway_node_kr.md)**: HTTP 연결을 WebSocket으로 업그레이드하여 실시간 데이터 통신을 지원합니다.
- **[Listener 노드](./docs/listener_node_kr.md)**: 지정된 프로토콜과 포트에서 네트워크 요청을 수신합니다.
- **[Proxy 노드](./docs/proxy_node_kr.md)**: HTTP 요청을 다른 서버로 프록시하여 응답을 반환합니다.
- **[Router 노드](./docs/router_node_kr.md)**: 입력 패킷을 조건에 따라 여러 출력 포트로 라우팅합니다.

### **System**
Expand Down
52 changes: 0 additions & 52 deletions ext/docs/proxy_node.md

This file was deleted.

52 changes: 0 additions & 52 deletions ext/docs/proxy_node_kr.md

This file was deleted.

81 changes: 75 additions & 6 deletions ext/pkg/mime/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,35 @@ import (
"github.com/andybalholm/brotli"
)

type multiWriter struct {
pipe []io.Writer
}

type multiReader struct {
pipe []io.Reader
}

const (
EncodingGzip = "gzip"
EncodingDeflate = "deflate"
EncodingBr = "br"
EncodingIdentity = "identity"
)

var _ io.Writer = (*multiWriter)(nil)
var _ io.Closer = (*multiWriter)(nil)
var _ io.Reader = (*multiReader)(nil)
var _ io.Closer = (*multiReader)(nil)

// Compress compresses input data using the specified encoding, returns original if unsupported.
func Compress(writer io.Writer, encoding string) (io.Writer, error) {
switch encoding {
case EncodingGzip:
return gzip.NewWriter(writer), nil
return newMultiWriter(writer, gzip.NewWriter(writer)), nil
case EncodingDeflate:
return zlib.NewWriter(writer), nil
return newMultiWriter(writer, zlib.NewWriter(writer)), nil
case EncodingBr:
return brotli.NewWriter(writer), nil
return newMultiWriter(writer, brotli.NewWriter(writer)), nil
default:
return writer, nil
}
Expand All @@ -33,12 +46,68 @@ func Compress(writer io.Writer, encoding string) (io.Writer, error) {
func Decompress(reader io.Reader, encoding string) (io.Reader, error) {
switch encoding {
case EncodingGzip:
return gzip.NewReader(reader)
r, err := gzip.NewReader(reader)
if err != nil {
return nil, err
}
return newMultiReader(reader, r), nil
case EncodingDeflate:
return zlib.NewReader(reader)
r, err := zlib.NewReader(reader)
if err != nil {
return nil, err
}
return newMultiReader(reader, r), nil
case EncodingBr:
return brotli.NewReader(reader), nil
return newMultiReader(reader, brotli.NewReader(reader)), nil
default:
return reader, nil
}
}

// newMultiWriter creates a writer that writes to multiple writers in sequence.
func newMultiWriter(pipe ...io.Writer) io.Writer {
return &multiWriter{pipe: pipe}
}

// newMultiReader creates a reader that reads from multiple readers in sequence.
func newMultiReader(pipe ...io.Reader) io.Reader {
return &multiReader{pipe: pipe}
}

func (w *multiWriter) Write(p []byte) (n int, err error) {
if len(w.pipe) == 0 {
return 0, io.ErrClosedPipe
}
return w.pipe[len(w.pipe)-1].Write(p)
}

func (w *multiWriter) Close() error {
for i := len(w.pipe) - 1; i >= 0; i-- {
if c, ok := w.pipe[i].(io.Closer); ok {
if err := c.Close(); err != nil {
return err
}
}
}
w.pipe = nil
return nil
}

func (r *multiReader) Read(p []byte) (n int, err error) {
if len(r.pipe) == 0 {
return 0, io.ErrClosedPipe
}
return r.pipe[len(r.pipe)-1].Read(p)
}

func (r *multiReader) Close() error {
for i := len(r.pipe) - 1; i >= 0; i-- {
if c, ok := r.pipe[i].(io.Closer); ok {
if err := c.Close(); err != nil {
return err
}
}
}
r.pipe = nil
return nil
}
Loading

0 comments on commit e62abc7

Please sign in to comment.