-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathring_buffer.c
170 lines (143 loc) · 4.09 KB
/
ring_buffer.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
#include <stdlib.h>
#include <stddef.h>
#include <assert.h>
#include <stdbool.h>
#include <string.h>
#include <pthread.h>
#include "ring_buffer.h"
typedef struct rbhandle_t_prv {
uint8_t* buf;
ecomp_t cmp;
int entry_size;
size_t head;
size_t tail;
size_t max;
bool full;
pthread_mutex_t queue_locker; //mutex for all queue related operations
pthread_cond_t prod_locker; //locks the producer when upper limit is riched, resumed by consumers signal
pthread_cond_t cons_locker; //locks the consumer when buffer is empty, resumed by producers signal
} rbhandle_t_prv;
//Queue freeing
static void
free_queue(struct rbhandle_t_prv* queue) {
pthread_cond_destroy(&queue->prod_locker);
pthread_cond_destroy(&queue->cons_locker);
pthread_mutex_destroy(&queue->queue_locker);
}
//Lock queue for short term manipulations (long suspends are using prod_locker and cons_locker)
static void
lock_queue(struct rbhandle_t_prv* queue) {
pthread_mutex_lock(&queue->queue_locker);
}
//Unlock queue
static void
unlock_queue(struct rbhandle_t_prv* queue) {
pthread_mutex_unlock(&queue->queue_locker);
}
static void
advance_pointer(rbhandle_t self)
{
rbhandle_t_prv* h = (rbhandle_t_prv*)self;
if(h->full)
{
h->tail = (h->tail + 1) % h->max;
}
h->head = (h->head + 1) % h->max;
h->full = (h->head == h->tail);
}
static bool
ring_buf_empty(rbhandle_t self)
{
rbhandle_t_prv* h = (rbhandle_t_prv*)self;
return (!h->full && (h->head == h->tail));
}
static void
retreat_pointer(rbhandle_t self)
{
rbhandle_t_prv* h = (rbhandle_t_prv*)self;
h->full = false;
h->tail = (h->tail + 1) % h->max;
}
static void*
get_entry_ptr(rbhandle_t self, int num)
{
rbhandle_t_prv* h = (rbhandle_t_prv*)self;
if(num > h->head)
return 0;
return h->buf + num * h->entry_size;
}
/**
* Create a new ring buffer instance
*/
rbhandle_t
RBufferCreate(int buf_size, ecomp_t comp, int entry_size)
{
void* buf = malloc(buf_size * entry_size);
//assert(buf);
rbhandle_t_prv* r = malloc(sizeof(rbhandle_t_prv));
r->cmp = comp;
r->entry_size = entry_size;
r->buf = buf;
r->max = buf_size;
r->head = 0;
r->tail = 0;
r->full = false;
pthread_mutex_init(&r->queue_locker, NULL);
pthread_cond_init(&r->prod_locker, NULL);
pthread_cond_init(&r->cons_locker, NULL);
return r;
}
/**
* Insert new entry to the ring buffer
*/
int
RBufferInsert(rbhandle_t self, void* e)
{
//assert(self && e);
rbhandle_t_prv* h = (rbhandle_t_prv*)self;
int r = -1;
lock_queue(h); //enter critical section
// Ensures that buffer is sorted, otherwise don't insert the new element
if(ring_buf_empty(h) || 0 > h->cmp(get_entry_ptr(h, h->head), e)) {
memcpy(get_entry_ptr(h, h->head), e, h->entry_size);
if(h->full) {
pthread_cond_wait(&h->prod_locker, &h->queue_locker); //suspend - queue is saturated (consumer signal will resume)
}
advance_pointer(h);
r = 0;
}
pthread_cond_signal(&h->cons_locker); //wake up any of suspended consumer - there is fresh data to read
unlock_queue(h); //leave the critical section
return r;
}
/**
* Get an entry from the ring buffer
* Non-blocking call. Returns immediatelly if buffer is empty
*/
int
RBufferGet(rbhandle_t self, void** e)
{
//assert(self && e && self->buf);
rbhandle_t_prv* h = (rbhandle_t_prv*)self;
lock_queue(h); //enter critical section
if(ring_buf_empty(h)) {
pthread_cond_wait(&h->cons_locker, &h->queue_locker); //suspend - queue is empty (producer signal will resume)
}
memcpy(*e, get_entry_ptr(h, h->tail), h->entry_size);
retreat_pointer(h);
pthread_cond_signal(&h->prod_locker); //wake up any of suspended producers - new free space is available
unlock_queue(h); //leave the critical section
return 0;
}
/**
* Destroy the ring buffer
*
*/
void
RBufferDestroy(rbhandle_t self)
{
assert(self);
rbhandle_t_prv* h = (rbhandle_t_prv*)self;
free_queue(h);
free(h);
}