-
Notifications
You must be signed in to change notification settings - Fork 59
/
stream.go
183 lines (166 loc) · 4.74 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
// Copyright © Go Opus Authors (see AUTHORS file)
//
// License for use of this code is detailed in the LICENSE file
// +build !nolibopusfile
package opus
import (
"fmt"
"io"
"unsafe"
)
/*
#cgo pkg-config: opusfile
#include <opusfile.h>
#include <stdint.h>
#include <string.h>
OggOpusFile *my_open_callbacks(uintptr_t p, int *error);
*/
import "C"
// Stream wraps a io.Reader in a decoding layer. It provides an API similar to
// io.Reader, but it provides raw PCM data instead of the encoded Opus data.
//
// This is not the same as directly decoding the bytes on the io.Reader; opus
// streams are Ogg Opus audio streams, which package raw Opus data.
//
// This wraps libopusfile. For more information, see the api docs on xiph.org:
//
// https://www.opus-codec.org/docs/opusfile_api-0.7/index.html
type Stream struct {
id uintptr
oggfile *C.OggOpusFile
read io.Reader
// Preallocated buffer to pass to the reader
buf []byte
}
var streams = newStreamsMap()
//export go_readcallback
func go_readcallback(p unsafe.Pointer, cbuf *C.uchar, cmaxbytes C.int) C.int {
streamId := uintptr(p)
stream := streams.Get(streamId)
if stream == nil {
// This is bad
return -1
}
maxbytes := int(cmaxbytes)
if maxbytes > cap(stream.buf) {
maxbytes = cap(stream.buf)
}
// Don't bother cleaning up old data because that's not required by the
// io.Reader API.
n, err := stream.read.Read(stream.buf[:maxbytes])
// Go allows returning non-nil error (like EOF) and n>0, libopusfile doesn't
// expect that. So return n first to indicate the valid bytes, let the
// subsequent call (which will be n=0, same-error) handle the actual error.
if n == 0 && err != nil {
if err == io.EOF {
return 0
} else {
return -1
}
}
C.memcpy(unsafe.Pointer(cbuf), unsafe.Pointer(&stream.buf[0]), C.size_t(n))
return C.int(n)
}
// NewStream creates and initializes a new stream. Don't call .Init() on this.
func NewStream(read io.Reader) (*Stream, error) {
var s Stream
err := s.Init(read)
if err != nil {
return nil, err
}
return &s, nil
}
// Init initializes a stream with an io.Reader to fetch opus encoded data from
// on demand. Errors from the reader are all transformed to an EOF, any actual
// error information is lost. The same happens when a read returns succesfully,
// but with zero bytes.
func (s *Stream) Init(read io.Reader) error {
if s.oggfile != nil {
return fmt.Errorf("opus stream is already initialized")
}
if read == nil {
return fmt.Errorf("Reader must be non-nil")
}
s.read = read
s.buf = make([]byte, maxEncodedFrameSize)
s.id = streams.NextId()
var errno C.int
// Immediately delete the stream after .Init to avoid leaking if the
// caller forgets to (/ doesn't want to) call .Close(). No need for that,
// since the callback is only ever called during a .Read operation; just
// Save and Delete from the map around that every time a reader function is
// called.
streams.Save(s)
defer streams.Del(s)
oggfile := C.my_open_callbacks(C.uintptr_t(s.id), &errno)
if errno != 0 {
return StreamError(errno)
}
s.oggfile = oggfile
return nil
}
// Read a chunk of raw opus data from the stream and decode it. Returns the
// number of decoded samples per channel. This means that a dual channel
// (stereo) feed will have twice as many samples as the value returned.
//
// Read may successfully read less bytes than requested, but it will never read
// exactly zero bytes succesfully if a non-zero buffer is supplied.
//
// The number of channels in the output data must be known in advance. It is
// possible to extract this information from the stream itself, but I'm not
// motivated to do that. Feel free to send a pull request.
func (s *Stream) Read(pcm []int16) (int, error) {
if s.oggfile == nil {
return 0, fmt.Errorf("opus stream is uninitialized or already closed")
}
if len(pcm) == 0 {
return 0, nil
}
streams.Save(s)
defer streams.Del(s)
n := C.op_read(
s.oggfile,
(*C.opus_int16)(&pcm[0]),
C.int(len(pcm)),
nil)
if n < 0 {
return 0, StreamError(n)
}
if n == 0 {
return 0, io.EOF
}
return int(n), nil
}
// ReadFloat32 is the same as Read, but decodes to float32 instead of int16.
func (s *Stream) ReadFloat32(pcm []float32) (int, error) {
if s.oggfile == nil {
return 0, fmt.Errorf("opus stream is uninitialized or already closed")
}
if len(pcm) == 0 {
return 0, nil
}
streams.Save(s)
defer streams.Del(s)
n := C.op_read_float(
s.oggfile,
(*C.float)(&pcm[0]),
C.int(len(pcm)),
nil)
if n < 0 {
return 0, StreamError(n)
}
if n == 0 {
return 0, io.EOF
}
return int(n), nil
}
func (s *Stream) Close() error {
if s.oggfile == nil {
return fmt.Errorf("opus stream is uninitialized or already closed")
}
C.op_free(s.oggfile)
if closer, ok := s.read.(io.Closer); ok {
return closer.Close()
}
return nil
}