Skip to content

coregx/stream

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

2 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

🌊 stream - Real-time Communications for Go 1.25+

Server-Sent Events and WebSocket implementations - Zero external dependencies, RFC-compliant, production-ready

Go Reference Go Report Card Tests codecov License: MIT Release


⚑ Quick Start

SSE (Server-Sent Events)

package main

import (
    "net/http"
    "time"
    "github.com/coregx/stream/sse"
)

func main() {
    http.HandleFunc("/events", func(w http.ResponseWriter, r *http.Request) {
        conn, _ := sse.Upgrade(w, r)
        defer conn.Close()

        ticker := time.NewTicker(1 * time.Second)
        defer ticker.Stop()

        for {
            select {
            case t := <-ticker.C:
                event := sse.NewEvent(t.Format(time.RFC3339)).WithType("time")
                conn.Send(event)
            case <-conn.Done():
                return
            }
        }
    })

    http.ListenAndServe(":8080", nil)
}

WebSocket

package main

import (
    "log"
    "net/http"
    "github.com/coregx/stream/websocket"
)

func main() {
    http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
        conn, _ := websocket.Upgrade(w, r, nil)
        defer conn.Close()

        for {
            msgType, data, err := conn.Read()
            if err != nil {
                break
            }
            conn.Write(msgType, data)
        }
    })

    log.Fatal(http.ListenAndServe(":8080", nil))
}

Broadcasting with Hub:

hub := websocket.NewHub()
go hub.Run()
defer hub.Close()

http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
    conn, _ := websocket.Upgrade(w, r, nil)
    hub.Register(conn)
    defer hub.Unregister(conn)

    for {
        _, data, _ := conn.Read()
        hub.Broadcast(data)
    }
})

🌟 Why stream?

Zero Dependencies, Maximum Control

// Pure stdlib - no external dependencies in production
import "github.com/coregx/stream/sse"
import "github.com/coregx/stream/websocket"

stream is built with zero external dependencies for production code. No vendor lock-in, no dependency hell, just pure Go stdlib implementations of SSE and WebSocket protocols.

RFC-Compliant Protocols

Fully standards-compliant implementations ensure compatibility with all browsers and clients.

Broadcasting Made Simple

// Hub pattern for efficient broadcasting
hub := websocket.NewHub()
hub.BroadcastText("Hello, everyone!")
hub.BroadcastJSON(Message{Type: "update", Data: "..."})

Built-in Hub pattern for efficient message broadcasting to multiple clients with minimal allocations.


πŸ“¦ Installation

go get github.com/coregx/stream

Requirements: Go 1.25+ (uses encoding/json/v2 and modern generics)


πŸš€ Features

SSE (Server-Sent Events)

  • βœ… RFC Compliant - text/event-stream standard
  • βœ… Event Types - Named events (message, update, custom)
  • βœ… Event IDs - Client reconnection with Last-Event-ID
  • βœ… Retry Control - Configurable reconnection delays
  • βœ… Automatic Flushing - Real-time event delivery
  • βœ… Graceful Shutdown - Clean connection closure
  • βœ… 92.3% Test Coverage - 215 tests, comprehensive validation

WebSocket

  • βœ… RFC 6455 Compliant - Full WebSocket protocol
  • βœ… Text & Binary - Both message types supported
  • βœ… Control Frames - Ping/Pong, Close handshake
  • βœ… Broadcasting Hub - Efficient multi-client messaging
  • βœ… Connection Management - Auto cleanup, timeouts
  • βœ… Frame Masking - Client-to-server masking (RFC requirement)
  • βœ… 84.3% Test Coverage - 99 tests, production-ready

Common Features

  • πŸš€ Zero Dependencies - Pure stdlib implementation
  • 🎯 Type-Safe - Modern Go 1.25+ with generics
  • ⚑ High Performance - <100 ΞΌs broadcasts, minimal allocations
  • πŸ§ͺ Well-Tested - 314 tests total, 84.3% coverage
  • 🏒 Production Ready - Used in coregx ecosystem
  • πŸ“š Comprehensive Docs - Guides, examples, API reference

πŸ“š Documentation


🎯 Use Cases

Real-time Dashboards (SSE)

// Push live metrics to dashboard
conn.Send(sse.NewEvent(metricsJSON).WithType("metrics"))

Perfect for server-to-client updates: live metrics, notifications, stock prices, or any real-time data stream.

Chat Applications (WebSocket)

// Bidirectional messaging
hub.BroadcastText(fmt.Sprintf("%s: %s", username, message))

Full-duplex communication for chat, collaborative editing, multiplayer games.

Live Notifications (SSE)

// Server pushes notifications
event := sse.NewEvent(notification).WithType("alert").WithRetry(3000)
conn.Send(event)

Lightweight server push for notifications without WebSocket overhead.

IoT & Sensors (WebSocket)

// Binary data streaming
conn.Write(websocket.BinaryMessage, sensorData)

Efficient binary data transfer for IoT devices, sensors, telemetry.


πŸ“Š Benchmarks

SSE Performance

BenchmarkSSE_Send-8              50000    23.4 ΞΌs/op     0 allocs/op
BenchmarkSSE_Broadcast-8         30000    47.2 ΞΌs/op     1 allocs/op
BenchmarkSSE_E2E_Latency-8       20000    68.5 ΞΌs/op     2 allocs/op

WebSocket Performance

BenchmarkWS_Echo-8               100000   15.3 ΞΌs/op     0 allocs/op
BenchmarkWS_Broadcast-8          50000    32.1 ΞΌs/op     1 allocs/op
BenchmarkWS_Hub_1000clients-8    10000    156 ΞΌs/op      3 allocs/op

High throughput: >20,000 messages/sec per connection, minimal allocations, sub-100ΞΌs latency.


πŸ”§ Advanced Usage

SSE with Custom Headers

conn, err := sse.Upgrade(w, r)
if err != nil {
    http.Error(w, err.Error(), http.StatusBadRequest)
    return
}

// Set custom retry interval
event := sse.NewEvent("data").WithRetry(5000) // 5 seconds

// Named event types
event = sse.NewEvent("update").WithType("user-joined")

// Event IDs for reconnection
event = sse.NewEvent("data").WithID("msg-123")

WebSocket with Configuration

opts := &websocket.UpgradeOptions{
    ReadBufferSize:  4096,
    WriteBufferSize: 4096,
    CheckOrigin: func(r *http.Request) bool {
        return r.Header.Get("Origin") == "https://example.com"
    },
}

conn, err := websocket.Upgrade(w, r, opts)

Graceful Shutdown

// SSE
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

select {
case <-ctx.Done():
    conn.Close()
case <-conn.Done():
    // Client disconnected
}

// WebSocket Hub
hub.Close() // Gracefully closes all connections

🀝 Sister Projects

Part of the coregx ecosystem - production-ready Go libraries:

  • fursy - HTTP Router with generics, OpenAPI, RFC 9457
  • relica - Database Query Builder (coming soon)
  • stream - Real-time Communications (this library)

πŸ“Š Status

Metric Value
Version v0.1.0 (Production Ready)
Test Coverage 84.3% (SSE: 92.3%, WebSocket: 84.3%)
Tests 314 total (215 SSE, 99 WebSocket)
Test Lines 9,245 lines
Benchmarks 23 (E2E latency, throughput, load tests)
Dependencies 0 (production)
Go Version 1.25+

πŸ“„ License

MIT License - see LICENSE file for details.


πŸ™ Acknowledgments

Special Thanks

Professor Ancha Baranova - This project would not have been possible without her invaluable help and support. Her assistance was crucial in making all coregx projects a reality.


Built with ❀️ for the Go community by coregx