-
Notifications
You must be signed in to change notification settings - Fork 63
IPC
This page teaches you how processes communicate and synchronize with each other in MentOS.
Important: All code examples and APIs shown here are MentOS-specific. The implementation is in:
-
Header files:
lib/inc/sys/sem.h,lib/inc/sys/msg.h,lib/inc/sys/shm.h -
Kernel implementation:
kernel/src/ipc/sem.c,kernel/src/ipc/msg.c,kernel/src/ipc/shm.c -
System calls:
kernel/inc/system/syscall.h(sys_semget, sys_msgget, sys_shmget, etc.)
You can find working test programs in userspace/tests/ (t_msgget.c, t_shmget.c, etc.) that demonstrate these IPC mechanisms on real MentOS systems.
Imagine you're building a web server:
- Process A - Accepts connections from clients
- Process B - Processes requests
- Process C - Accesses the database
How do they coordinate? How does A tell B "here's a new request"? How does B tell C "fetch this data"?
The challenge: Processes are isolated - they have separate memory spaces and can't directly access each other's variables.
The solution: Inter-Process Communication (IPC) - mechanisms that let processes:
- Send messages to each other
- Share memory for fast data exchange
- Synchronize their actions (locks, barriers)
MentOS provides three classical System V IPC mechanisms:
┌──────────────────┬─────────────────────┬──────────────────────┐
│ Semaphores │ Message Queues │ Shared Memory │
├──────────────────┼─────────────────────┼──────────────────────┤
│ Synchronization │ Async messaging │ Fast data sharing │
│ (locks, signals) │ (send/receive) │ (direct memory) │
│ │ │ │
│ Use when: │ Use when: │ Use when: │
│ • Need mutex │ • Sending commands │ • Large data │
│ • Counting │ • Job queues │ • High speed │
│ • Barriers │ • Event passing │ • Low latency │
└──────────────────┴─────────────────────┴──────────────────────┘
Let's learn each one with practical examples!
A semaphore is a counter that processes can:
- Decrease (P operation / wait) - "Wait for resource"
- Increase (V operation / signal) - "Release resource"
If a process tries to decrease below 0, it blocks (sleeps) until another process increases it.
Real-world analogy: A parking lot with N spaces:
- sem_value = N means N spaces available
- Car enters → decrease (P operation)
- Car leaves → increase (V operation)
- If sem_value = 0, cars wait
Problem: Two processes want to write to the same file. If both write simultaneously, data gets corrupted.
Solution: Use a binary semaphore (0 or 1) as a lock.
// File: mutex_writer.c
#include <sys/sem.h>
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
int main(void)
{
// 1. Create/get semaphore initialized to 1 (unlocked)
int semid = semget(1234, 1, IPC_CREAT | 0666);
union semun {
int val;
} arg;
arg.val = 1;
semctl(semid, 0, SETVAL, &arg); // Set to 1 (unlocked)
// 2. LOCK (P operation - decrease semaphore)
struct sembuf lock = {0, -1, 0};
printf("Trying to acquire lock...\n");
semop(semid, &lock, 1); // Blocks if sem = 0
printf("Lock acquired!\n");
// 3. CRITICAL SECTION
int fd = open("/shared_file.txt", O_WRONLY | O_APPEND);
write(fd, "My data\n", 8);
close(fd);
sleep(2); // Simulate work
// 4. UNLOCK (V operation - increase semaphore)
struct sembuf unlock = {0, 1, 0};
semop(semid, &unlock, 1);
printf("Lock released!\n");
return 0;
}What happens:
Process A Process B
│ │
├─ semop(-1) → sem=0 │
├─ Critical section ├─ semop(-1) → BLOCKS (sem=0)
├─ (writing file) │ (waits here)
├─ semop(+1) → sem=1 │
│ ├─ Unblocked! sem=0
│ ├─ Critical section
│ ├─ semop(+1) → sem=1
Problem: Producer creates items, consumer processes them. Need to track how many items are available.
// File: producer.c
#include <sys/sem.h>
#include <stdio.h>
int main(void)
{
// Create semaphore initialized to 0 (no items)
int semid = semget(5678, 1, IPC_CREAT | 0666);
union semun { int val; } arg;
arg.val = 0;
semctl(semid, 0, SETVAL, &arg);
// Produce 10 items
for (int i = 0; i < 10; i++) {
printf("Producing item %d\n", i);
// Signal that item is ready (V operation)
struct sembuf signal = {0, 1, 0};
semop(semid, &signal, 1); // sem++
sleep(1); // Simulate work
}
return 0;
}// File: consumer.c
#include <sys/sem.h>
#include <stdio.h>
int main(void)
{
int semid = semget(5678, 1, 0); // Get existing semaphore
// Consume 10 items
for (int i = 0; i < 10; i++) {
// Wait for item (P operation)
struct sembuf wait = {0, -1, 0};
semop(semid, &wait, 1); // Blocks if sem=0
printf("Consuming item %d\n", i);
}
// Clean up
semctl(semid, 0, IPC_RMID, NULL);
return 0;
}Behavior:
Time Producer Semaphore Consumer
0 Produces item 0 1 (blocked, waiting)
1 Produces item 1 2 Consumes item 0 (sem=1)
2 Produces item 2 2 Consumes item 1 (sem=1)
3 (paused) 1 Consumes item 2 (sem=0, blocks)
4 Produces item 3 1 Consumes item 3
...
Problem: 5 processes must all reach a checkpoint before any can continue.
// File: barrier.c
#include <sys/sem.h>
#include <stdio.h>
#include <unistd.h>
#define NUM_PROCS 5
int main(void)
{
int semid = semget(9999, 1, IPC_CREAT | 0666);
union semun { int val; } arg;
arg.val = 0;
semctl(semid, 0, SETVAL, &arg); // Initialize to 0
pid_t pid = fork();
if (pid == 0) {
// Child process
printf("Process %d: doing work...\n", getpid());
sleep(getpid() % 3); // Random work time
printf("Process %d: reached barrier\n", getpid());
// Signal arrival
struct sembuf signal = {0, 1, 0};
semop(semid, &signal, 1);
// Wait for all to arrive (sem must reach NUM_PROCS)
struct sembuf wait = {0, -NUM_PROCS, 0};
semop(semid, &wait, 1);
printf("Process %d: barrier released!\n", getpid());
return 0;
}
// Parent: wait for child
wait(NULL);
return 0;
}// Create/get semaphore set
long semid = semget(key_t key, int nsems, int flags);
// key: IPC key (use ftok() or hard-coded number)
// nsems: number of semaphores in set
// flags: IPC_CREAT | IPC_EXCL | 0666 (permissions)
// Operations
struct sembuf {
unsigned short sem_num; // Which semaphore in set
short sem_op; // Operation: <0=wait, >0=signal, 0=wait-for-zero
short sem_flg; // IPC_NOWAIT, SEM_UNDO
};
long semop(int semid, struct sembuf *ops, unsigned nsops);
// Control
union semun { int val; };
long semctl(int semid, int semnum, int cmd, union semun *arg);
// cmd: GETVAL, SETVAL, IPC_RMID (remove), IPC_STATMentOS Implementation Details:
- Header:
lib/inc/sys/sem.h- Defines semaphore structures and function declarations - Kernel:
kernel/src/ipc/sem.c- Implements semget, semop, semctl system calls - Syscalls:
kernel/inc/system/syscall.h- System call interface (sys_semget, sys_semop, sys_semctl) - See also:
userspace/tests/- Test programs demonstrating semaphore usage
A message queue is a mailbox where processes can:
- Send messages (like putting a letter in a mailbox)
- Receive messages (like checking your mailbox)
Messages are typed - you can send/receive specific message types.
Key features:
- Asynchronous - sender doesn't wait for receiver
- Ordered - FIFO (first in, first out)
- Persistent - messages stay until received
- Type-based - can select messages by type
Problem: A controller process wants to send commands to worker processes.
// File: controller.c
#include <sys/msg.h>
#include <stdio.h>
#include <string.h>
struct msg {
long mtype; // Message type (required!)
char command[100]; // Message data
};
int main(void)
{
// Create message queue
int msqid = msgget(1111, IPC_CREAT | 0666);
// Send commands
struct msg cmd;
cmd.mtype = 1; // Command type
strcpy(cmd.command, "START");
msgsnd(msqid, &cmd, strlen(cmd.command) + 1, 0);
printf("Sent: START\n");
sleep(2);
cmd.mtype = 1;
strcpy(cmd.command, "STOP");
msgsnd(msqid, &cmd, strlen(cmd.command) + 1, 0);
printf("Sent: STOP\n");
return 0;
}// File: worker.c
#include <sys/msg.h>
#include <stdio.h>
struct msg {
long mtype;
char command[100];
};
int main(void)
{
int msqid = msgget(1111, 0); // Get existing queue
struct msg cmd;
// Receive commands
while (1) {
msgrcv(msqid, &cmd, sizeof(cmd.command), 1, 0);
printf("Worker received: %s\n", cmd.command);
if (strcmp(cmd.command, "STOP") == 0) {
break;
}
}
// Clean up
msgctl(msqid, IPC_RMID, NULL);
return 0;
}Problem: Multiple clients submit jobs. Some are high-priority, some are low-priority.
// File: job_server.c
#include <sys/msg.h>
#include <stdio.h>
#include <string.h>
struct job_msg {
long mtype; // 1=high priority, 2=low priority
int job_id;
char data[256];
};
int main(void)
{
int msqid = msgget(2222, IPC_CREAT | 0666);
struct job_msg job;
while (1) {
// Receive high-priority jobs first
if (msgrcv(msqid, &job, sizeof(job) - sizeof(long), 1, IPC_NOWAIT) > 0) {
printf("[HIGH] Processing job %d: %s\n", job.job_id, job.data);
}
// Then low-priority
else if (msgrcv(msqid, &job, sizeof(job) - sizeof(long), 2, IPC_NOWAIT) > 0) {
printf("[LOW] Processing job %d: %s\n", job.job_id, job.data);
}
else {
sleep(1); // No jobs, wait
}
}
return 0;
}// File: client.c
#include <sys/msg.h>
#include <stdio.h>
#include <string.h>
struct job_msg {
long mtype;
int job_id;
char data[256];
};
int main(int argc, char *argv[])
{
int msqid = msgget(2222, 0);
struct job_msg job;
job.mtype = (argc > 1 && strcmp(argv[1], "high") == 0) ? 1 : 2;
job.job_id = getpid();
sprintf(job.data, "Task from process %d", getpid());
msgsnd(msqid, &job, sizeof(job) - sizeof(long), 0);
printf("Submitted %s priority job %d\n",
job.mtype == 1 ? "HIGH" : "LOW", job.job_id);
return 0;
}// Create/get message queue
int msqid = msgget(key_t key, int msgflg);
// Message structure (user-defined, but must start with mtype)
struct msgbuf {
long mtype; // Message type (MUST BE FIRST, > 0)
char mtext[...]; // Your data
};
// Send message
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
// msgsz = size of mtext (NOT including mtype)
// msgflg: 0 or IPC_NOWAIT
// Receive message
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
// msgtyp: 0=any, >0=specific type, <0=any type <= abs(msgtyp)
// msgflg: 0 or IPC_NOWAIT
// Control
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
// cmd: IPC_RMID (remove), IPC_STAT, IPC_SETMentOS Implementation Details:
- Header:
lib/inc/sys/msg.h- Defines message queue structures (msgbuf, msqid_ds, etc.) - Kernel:
kernel/src/ipc/msg.c- Implements msgget, msgsnd, msgrcv, msgctl system calls - Syscalls:
kernel/inc/system/syscall.h- System call interface (sys_msgget, sys_msgsnd, sys_msgrcv, sys_msgctl) - Constants: MSGMAX (8192), MSGMNB (16384) defined in
lib/inc/sys/msg.h - Example:
userspace/tests/t_msgget.c- Complete working test program on MentOS
Shared memory lets processes directly access the same physical RAM. It's the fastest IPC mechanism because no copying is involved.
Analogy: Instead of sending letters (message queue), you and your friend write on the same whiteboard.
Trade-off: Fast, but requires manual synchronization (use semaphores for locking!).
Process A Process B
┌──────────────────┐ ┌──────────────────┐
│ Virtual Memory │ │ Virtual Memory │
│ 0x40000000 │ │ 0x50000000 │
│ │ │ │ │ │
│ ↓ │ │ ↓ │
│ [shared region] │ │ [shared region] │
└───────┬──────────┘ └───────┬──────────┘
│ │
└──────────┬───────────────────────┘
↓
┌─────────────────┐
│ Physical RAM │
│ (same pages) │
└─────────────────┘
Both processes map the same physical memory into their address spaces (at potentially different virtual addresses).
// File: shm_producer.c
#include <sys/shm.h>
#include <sys/sem.h>
#include <stdio.h>
#include <string.h>
struct shared_data {
int count;
char buffer[1024];
};
int main(void)
{
// Create shared memory segment
int shmid = shmget(3333, sizeof(struct shared_data), IPC_CREAT | 0666);
// Attach to our address space
struct shared_data *data = shmat(shmid, NULL, 0);
// Create mutex semaphore
int semid = semget(4444, 1, IPC_CREAT | 0666);
union semun { int val; } arg;
arg.val = 1;
semctl(semid, 0, SETVAL, arg);
// Produce data
for (int i = 0; i < 10; i++) {
// Lock
struct sembuf lock = {0, -1, 0};
semop(semid, &lock, 1);
// Write to shared memory
data->count = i;
sprintf(data->buffer, "Item %d", i);
printf("Produced: %s\n", data->buffer);
// Unlock
struct sembuf unlock = {0, 1, 0};
semop(semid, &unlock, 1);
sleep(1);
}
// Detach
shmdt(data);
return 0;
}// File: shm_consumer.c
#include <sys/shm.h>
#include <sys/sem.h>
#include <stdio.h>
struct shared_data {
int count;
char buffer[1024];
};
int main(void)
{
// Get existing shared memory
int shmid = shmget(3333, sizeof(struct shared_data), 0);
struct shared_data *data = shmat(shmid, NULL, 0);
// Get existing semaphore
int semid = semget(4444, 1, 0);
int last_count = -1;
// Consume data
while (1) {
// Lock
struct sembuf lock = {0, -1, 0};
semop(semid, &lock, 1);
// Read from shared memory
if (data->count != last_count) {
printf("Consumed: %s\n", data->buffer);
last_count = data->count;
}
// Unlock
struct sembuf unlock = {0, 1, 0};
semop(semid, &unlock, 1);
if (last_count >= 9) break;
usleep(100000); // 100ms
}
// Clean up
shmdt(data);
shmctl(shmid, IPC_RMID, NULL);
semctl(semid, 0, IPC_RMID);
return 0;
}Problem: Process A generates 1MB of data that Process B needs to analyze.
// File: data_generator.c
#include <sys/shm.h>
#include <stdio.h>
#include <stdlib.h>
#define DATA_SIZE (1024 * 1024) // 1MB
int main(void)
{
// Create 1MB shared memory
int shmid = shmget(5555, DATA_SIZE, IPC_CREAT | 0666);
char *data = shmat(shmid, NULL, 0);
// Generate data
printf("Generating 1MB of data...\n");
for (int i = 0; i < DATA_SIZE; i++) {
data[i] = rand() % 256;
}
printf("Data ready in shared memory!\n");
// Don't detach - keep it alive
pause(); // Wait for signal
return 0;
}// File: data_analyzer.c
#include <sys/shm.h>
#include <stdio.h>
#define DATA_SIZE (1024 * 1024)
int main(void)
{
// Attach to existing shared memory
int shmid = shmget(5555, DATA_SIZE, 0);
char *data = shmat(shmid, NULL, 0);
// Analyze data (compute average)
long sum = 0;
for (int i = 0; i < DATA_SIZE; i++) {
sum += (unsigned char)data[i];
}
printf("Average value: %.2f\n", (double)sum / DATA_SIZE);
shmdt(data);
return 0;
}Why use shared memory here?
- No copying - 1MB stays in one place
- Fast - Direct memory access
- Alternative (message queue) would copy 1MB twice (send + receive)
// Create/get shared memory segment
int shmid = shmget(key_t key, size_t size, int shmflg);
// size: bytes to allocate
// shmflg: IPC_CREAT | 0666
// Attach to address space
void *shmat(int shmid, const void *shmaddr, int shmflg);
// shmaddr: NULL = kernel chooses address
// shmflg: SHM_RDONLY or 0
// Returns: pointer to shared memory
// Detach from address space
int shmdt(const void *shmaddr);
// Control
int shmctl(int shmid, int cmd, struct shmid_ds *buf);
// cmd: IPC_RMID (remove), IPC_STAT, IPC_SETMentOS Implementation Details:
- Header:
lib/inc/sys/shm.h- Defines shared memory structures (shmid_ds, shmatt_t, etc.) - Kernel:
kernel/src/ipc/shm.c- Implements shmget, shmat, shmdt, shmctl system calls - Syscalls:
kernel/inc/system/syscall.h- System call interface (sys_shmget, sys_shmat, sys_shmdt, sys_shmctl) - Flags: SHM_RDONLY, SHM_RND, SHM_REMAP, SHM_EXEC defined in
lib/inc/sys/shm.h - Example:
userspace/tests/t_shmget.c- Complete working test program on MentOS
┌────────────────────┬──────────────┬───────────────┬─────────────┐
│ Need │ Semaphore │ Message Queue │ Shared Mem │
├────────────────────┼──────────────┼───────────────┼─────────────┤
│ Synchronization │ ✓ Perfect │ ✗ No │ ✗ No │
│ Small messages │ ✗ No │ ✓ Good │ △ Overkill │
│ Large data (>4KB) │ ✗ No │ △ Slow (copy) │ ✓ Perfect │
│ Event notification │ ✓ Yes │ ✓ Yes │ ✗ No │
│ Atomic ops │ ✓ Yes │ ✗ No │ ✗ Manual │
│ FIFO ordering │ ✗ No │ ✓ Yes │ ✗ Manual │
│ Persistence │ ✓ Yes │ ✓ Yes │ ✓ Yes │
└────────────────────┴──────────────┴───────────────┴─────────────┘
General guidelines:
- Synchronization → Semaphores
- Commands/events → Message queues
- Large data → Shared memory (+ semaphores for locking)
// Multiple processes read, one writes
int readers = 0;
sem_wait(&mutex);
readers++;
if (readers == 1) sem_wait(&write_lock); // First reader locks writers
sem_post(&mutex);
// Read data
sem_wait(&mutex);
readers--;
if (readers == 0) sem_post(&write_lock); // Last reader unlocks writers
sem_post(&mutex);struct ring_buffer {
int head, tail;
char data[BUFFER_SIZE];
};
// Producer
shm->data[shm->head] = item;
shm->head = (shm->head + 1) % BUFFER_SIZE;
// Consumer
item = shm->data[shm->tail];
shm->tail = (shm->tail + 1) % BUFFER_SIZE;# List all IPC objects
ipcs
# Remove specific semaphore
ipcrm -s <semid>
# Remove specific message queue
ipcrm -q <msqid>
# Remove specific shared memory
ipcrm -m <shmid>
# Remove all IPC objects for user
ipcrm -a- System Calls - IPC system calls (semget, msgget, shmget, etc.)
- Kernel - How kernel implements IPC
- Userspace Programs - Using IPC in programs
Key takeaway: IPC is about communication and synchronization. Choose the mechanism that fits your problem: semaphores for locking, message queues for commands, shared memory for large data!
-
MSG_EXCEPT- Receive any except type msgtyp
Returns:
- Number of bytes in mtext on success
- -1 on error
struct msgbuf msg;
ssize_t n = msgrcv(msqid, &msg, 256, 0, 0);
if (n < 0) {
perror("msgrcv");
exit(1);
}
printf("Received from type %ld: %s\n", msg.mtype, msg.mtext);int result = msgctl(int msqid, int cmd, struct msqid_ds *buf);Commands:
-
IPC_STAT- Get queue statistics -
IPC_SET- Set queue permissions -
IPC_RMID- Remove queue
Structure:
struct msqid_ds {
struct ipc_perm msg_perm;
time_t msg_stime; // Last send time
time_t msg_rtime; // Last receive time
time_t msg_ctime; // Last change time
unsigned long msg_cbytes; // Current bytes in queue
unsigned long msg_qnum; // Current messages in queue
unsigned long msg_qbytes; // Max bytes allowed
pid_t msg_lspid; // PID of last sender
pid_t msg_lrpid; // PID of last receiver
};struct msqid_ds stats;
if (msgctl(msqid, IPC_STAT, &stats) < 0) {
perror("msgctl");
exit(1);
}
printf("Messages in queue: %lu\n", stats.msg_qnum);
printf("Bytes in queue: %lu\n", stats.msg_cbytes);Shared memory provides fast inter-process communication by mapping the same physical memory into multiple processes' address spaces.
Key Files:
-
kernel/inc/sys/shm.h- Shared memory definitions -
kernel/src/ipc/shm.c- Shared memory implementation
#include <sys/shm.h>
int shmid = shmget(key_t key, size_t size, int shmflg);Parameters:
-
key: IPC key -
size: Size in bytes (rounded up to page size) -
shmflg: Flags
Example:
int shmid = shmget(5678, 4096, IPC_CREAT | 0666);void *addr = shmat(int shmid, const void *shmaddr, int shmflg);Parameters:
-
shmid: Shared memory ID -
shmaddr: Preferred address (usually NULL - kernel chooses) -
shmflg: Flags-
SHM_RDONLY- Read-only attachment
-
Returns:
- Address of attached segment on success
- (void *)-1 on error
Example:
char *shm_addr = shmat(shmid, NULL, 0);
if (shm_addr == (void *)-1) {
perror("shmat");
exit(1);
}
// Use like normal memory
strcpy(shm_addr, "Shared data");int result = shmdt(const void *shmaddr);Parameters:
-
shmaddr: Address returned by shmat
Returns:
- 0 on success
- -1 on error
Example:
if (shmdt(shm_addr) < 0) {
perror("shmdt");
exit(1);
}int result = shmctl(int shmid, int cmd, struct shmid_ds *buf);Commands:
-
IPC_STAT- Get segment information -
IPC_SET- Set segment permissions -
IPC_RMID- Remove segment (after last detach) -
SHM_LOCK- Lock in memory (prevent swapping) -
SHM_UNLOCK- Unlock from memory
Structure:
struct shmid_ds {
struct ipc_perm shm_perm;
size_t shm_segsz; // Segment size in bytes
time_t shm_atime; // Last attach time
time_t shm_dtime; // Last detach time
time_t shm_ctime; // Last change time
pid_t shm_cpid; // Creator PID
pid_t shm_lpid; // Last operation PID
unsigned long shm_nattch; // Number attached
};Example:
struct shmid_ds stats;
if (shmctl(shmid, IPC_STAT, &stats) < 0) {
perror("shmctl");
exit(1);
}
printf("Segment size: %zu bytes\n", stats.shm_segsz);
printf("Processes attached: %lu\n", stats.shm_nattach);
// Remove when done
shmctl(shmid, IPC_RMID, NULL);#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/msg.h>
#include <unistd.h>
struct msgbuf {
long mtype;
char mtext[256];
};
// Producer
int producer(int msqid) {
struct msgbuf msg;
for (int i = 0; i < 5; i++) {
msg.mtype = 1;
sprintf(msg.mtext, "Message %d from producer", i);
msgsnd(msqid, &msg, strlen(msg.mtext) + 1, 0);
sleep(1);
}
return 0;
}
// Consumer
int consumer(int msqid) {
struct msgbuf msg;
for (int i = 0; i < 5; i++) {
msgrcv(msqid, &msg, 256, 1, 0);
printf("Received: %s\n", msg.mtext);
}
return 0;
}
int main() {
int msqid = msgget(IPC_PRIVATE, IPC_CREAT | 0666);
if (fork() == 0) {
producer(msqid);
} else {
consumer(msqid);
msgctl(msqid, IPC_RMID, NULL);
}
return 0;
}Goal: Use semaphores to synchronize a producer and consumer process.
Program - userspace/bin/semaphore_test.c:
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sys/wait.h>
union semun {
int val;
struct semid_ds *buf;
ushort *array;
};
int main() {
printf("=== Semaphore Producer-Consumer ===\n\n");
// Create semaphore set: sem[0] = buffer empty, sem[1] = buffer full
int semid = semget(IPC_PRIVATE, 2, IPC_CREAT | 0666);
union semun arg;
// Initialize: buffer empty (sem[0] = 1), not full (sem[1] = 0)
arg.val = 1;
semctl(semid, 0, SETVAL, arg);
arg.val = 0;
semctl(semid, 1, SETVAL, arg);
pid_t pid = fork();
if (pid == 0) {
// Producer: produce 5 items
for (int i = 1; i <= 5; i++) {
// Wait for buffer to be empty
struct sembuf ops = {0, -1, 0};
semop(semid, &ops, 1);
printf("Producer: Produced item %d\n", i);
// Signal buffer is full
ops.sem_num = 1;
ops.sem_op = 1;
semop(semid, &ops, 1);
sleep(1);
}
} else if (pid > 0) {
// Consumer: consume 5 items
for (int i = 1; i <= 5; i++) {
// Wait for buffer to be full
struct sembuf ops = {1, -1, 0};
semop(semid, &ops, 1);
printf("Consumer: Consumed item %d\n", i);
// Signal buffer is empty
ops.sem_num = 0;
ops.sem_op = 1;
semop(semid, &ops, 1);
sleep(1);
}
wait(NULL);
semctl(semid, 0, IPC_RMID, 0);
}
return 0;
}Build and Run:
# Add to userspace/bin/CMakeLists.txt
# Run in MentOS
/bin/semaphore_testObserve: Semaphores ensure producer waits for consumer and vice versa!
Goal: Use shared memory to efficiently share large data between processes.
Program - userspace/bin/shm_array.c:
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/wait.h>
#define ARRAY_SIZE 100
int main() {
printf("=== Shared Memory Array ===\n\n");
// Create shared memory for array
int shmid = shmget(IPC_PRIVATE, ARRAY_SIZE * sizeof(int),
IPC_CREAT | 0666);
// Attach to process
int *shared_array = (int *) shmat(shmid, NULL, 0);
pid_t pid = fork();
if (pid == 0) {
// Child: fill array
printf("Child: Filling shared array...\n");
for (int i = 0; i < ARRAY_SIZE; i++) {
shared_array[i] = i * 10;
}
printf("Child: Filled %d elements\n", ARRAY_SIZE);
sleep(2); // Let parent read
// Detach
shmdt((void *) shared_array);
} else if (pid > 0) {
// Parent: read array
sleep(1); // Wait for child to fill
printf("Parent: Reading shared array...\n");
printf("First 10 elements: ");
for (int i = 0; i < 10; i++) {
printf("%d ", shared_array[i]);
}
printf("\n");
printf("Last 10 elements: ");
for (int i = ARRAY_SIZE - 10; i < ARRAY_SIZE; i++) {
printf("%d ", shared_array[i]);
}
printf("\n");
wait(NULL);
// Detach and remove
shmdt((void *) shared_array);
shmctl(shmid, IPC_RMID, NULL);
}
return 0;
}Key Points:
- Shared memory is very fast (direct memory access)
- But requires careful synchronization
- Combine with semaphores for safety!
Goal: Use message queues to distribute work to multiple workers.
Program - userspace/bin/job_queue.c:
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/wait.h>
#define MAX_TEXT 256
struct msg_buffer {
long mtype;
char mtext[MAX_TEXT];
} msg;
int main() {
printf("=== Message Queue Job Distribution ===\n\n");
// Create message queue
int msqid = msgget(IPC_PRIVATE, IPC_CREAT | 0666);
// Fork 3 worker processes
for (int w = 0; w < 3; w++) {
if (fork() == 0) {
// Worker: receive and process jobs
printf("Worker %d (PID %d): Ready\n", w, getpid());
for (int j = 0; j < 2; j++) {
// Receive job (any type)
msgrcv(msqid, &msg, MAX_TEXT, 0, 0);
printf("Worker %d: Processing job: %s\n", w, msg.mtext);
sleep(1); // Simulate work
}
exit(0);
}
}
// Parent: send jobs
sleep(1); // Let workers initialize
printf("Parent: Sending 6 jobs...\n");
for (int j = 1; j <= 6; j++) {
msg.mtype = 1;
snprintf(msg.mtext, MAX_TEXT, "Job #%d", j);
msgsnd(msqid, &msg, strlen(msg.mtext) + 1, 0);
printf("Parent: Sent job %d\n", j);
sleep(1);
}
// Wait for workers
for (int w = 0; w < 3; w++) {
wait(NULL);
}
// Clean up
msgctl(msqid, IPC_RMID, NULL);
printf("Done!\n");
return 0;
}Observe:
- Jobs are distributed fairly among workers
- Use message types to route different types of messages
- Scalable: add more workers without changing queue code
Goal: See what IPC resources are created and managed.
Steps:
-
Create any of the above programs running in background:
/bin/semaphore_test & -
In another terminal, check resources:
/bin/ipcs # List all IPC /bin/ipcs -s # Show semaphores /bin/ipcs -q # Show message queues /bin/ipcs -m # Show shared memory
-
Remove specific resource:
/bin/ipcrm -s <SEMID> /bin/ipcrm -q <MSQID> /bin/ipcrm -m <SHMID>
Goal: Use IPC primitives to build a thread-safe shared counter.
Program - userspace/bin/counter.c:
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sys/shm.h>
#include <sys/wait.h>
union semun {
int val;
struct semid_ds *buf;
ushort *array;
};
int main() {
printf("=== Thread-Safe Counter ===\n\n");
// Shared memory: counter
int shmid = shmget(IPC_PRIVATE, sizeof(int), IPC_CREAT | 0666);
int *counter = (int *) shmat(shmid, NULL, 0);
*counter = 0;
// Semaphore: mutex for counter
int semid = semget(IPC_PRIVATE, 1, IPC_CREAT | 0666);
union semun arg;
arg.val = 1;
semctl(semid, 0, SETVAL, arg);
printf("Initial counter: %d\n", *counter);
// Fork 5 processes to increment counter
for (int p = 0; p < 5; p++) {
if (fork() == 0) {
for (int i = 0; i < 10; i++) {
// Lock
struct sembuf ops = {0, -1, 0};
semop(semid, &ops, 1);
// Increment
(*counter)++;
// Unlock
ops.sem_op = 1;
semop(semid, &ops, 1);
}
exit(0);
}
}
// Wait for all processes
for (int p = 0; p < 5; p++) {
wait(NULL);
}
printf("Final counter: %d (expected: 50)\n", *counter);
// Cleanup
shmdt((void *) counter);
shmctl(shmid, IPC_RMID, NULL);
semctl(semid, 0, IPC_RMID, 0);
return 0;
}Challenge: Without semaphores, counter would have race conditions. With semaphores, it's correct!
Advanced Goal: Build a publish-subscribe system
Requirements:
- Multiple publishers send messages
- Multiple subscribers receive messages by type
- Broker distributes messages efficiently
Hints:
- Use message types for topics
- Track subscriptions
- May need multiple queues or type filtering
MentOS provides userspace utilities for managing IPC:
ipcs - List all IPC resources
ipcs # Show semaphores, queues, shared memory
ipcs -s # Show semaphores only
ipcs -q # Show message queues only
ipcs -m # Show shared memory onlyipcrm - Remove IPC resources
ipcrm -s SEMID # Remove semaphore set
ipcrm -q MSQID # Remove message queue
ipcrm -m SHMID # Remove shared memory- System Calls - Syscall details
- Kernel - IPC implementation in kernel
- System Calls - How to add new syscalls (see "Creating a New System Call" section)
- POSIX Specifications - Standard behavior