Skip to content

Commit

Permalink
remove some flags, add rate flag, semaphore in main loop, ip -> addre…
Browse files Browse the repository at this point in the history
…ss, add port to ip
  • Loading branch information
Kyagara committed Nov 8, 2024
1 parent d080608 commit 2d03494
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 69 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ go install github.com/Kyagara/hagelslag-go@latest

- Improve logging.

- At high rates, DB connection errors out.
- Keep the port as the document ID?

- The amount of workers/tasks running might not be good, find a better default based on some values.
- At high rates, DB connection errors out.

- Add a maximum read size to http scanner, currently it reads until EOF.

Expand Down
49 changes: 18 additions & 31 deletions hagelslag.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"io"
"net"
"os"
"runtime"
"strings"
"sync"
"sync/atomic"
Expand All @@ -25,13 +24,11 @@ var (
)

type Hagelslag struct {
Scanner Scanner
StartingIP string
URI string
OnlyConnect bool
NumWorkers int
MaxTasks int
TasksPerThread int
Scanner Scanner
StartingIP string
URI string
OnlyConnect bool
Rate int
}

type Scanner interface {
Expand All @@ -52,17 +49,14 @@ func NewHagelslag() (Hagelslag, error) {
scannerName := flag.String("scanner", "http", "Scanner to use (default: http)")
uri := flag.String("uri", "mongodb://localhost:27017", "MongoDB URI (default: mongodb://localhost:27017)")
onlyConnect := flag.Bool("only-connect", false, "Only connect to IPs, skipping scan/save (default: false)")
numWorkers := flag.Int("workers", runtime.NumCPU(), "Number of workers to use (default: number of threads)")
tasksPerThread := flag.Int("tasks-per-thread", 512, "Tasks per thread (default: 512)")
rate := flag.Int("rate", 1000, "Limit of connections (default: 1000)")
flag.Parse()

h := Hagelslag{
StartingIP: *ip,
URI: *uri,
OnlyConnect: *onlyConnect,
NumWorkers: *numWorkers,
MaxTasks: *tasksPerThread * 2 * *numWorkers,
TasksPerThread: *tasksPerThread,
StartingIP: *ip,
URI: *uri,
OnlyConnect: *onlyConnect,
Rate: *rate,
}

scanner := strings.ToLower(*scannerName)
Expand All @@ -81,7 +75,7 @@ func NewHagelslag() (Hagelslag, error) {
return h, nil
}

func (h Hagelslag) worker(ips <-chan string, wg *sync.WaitGroup) {
func (h Hagelslag) worker(addresses chan string, semaphore chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()

options := options.Client().
Expand All @@ -99,7 +93,6 @@ func (h Hagelslag) worker(ips <-chan string, wg *sync.WaitGroup) {
}

name := h.Scanner.Name()
port := h.Scanner.Port()
network := h.Scanner.Network()

dialer := net.Dialer{
Expand All @@ -109,13 +102,8 @@ func (h Hagelslag) worker(ips <-chan string, wg *sync.WaitGroup) {

collection := client.Database("hagelslag").Collection(name)

// Responsible for controlling how many tasks can be processed by a worker
semaphore := make(chan struct{}, h.TasksPerThread)

for ip := range ips {
// Get a slot to work on a task
semaphore <- struct{}{}
go h.spawn(semaphore, ip, port, network, dialer, collection)
for address := range addresses {
go h.spawn(semaphore, address, network, dialer, collection)
}

err = client.Disconnect(context.TODO())
Expand All @@ -124,12 +112,11 @@ func (h Hagelslag) worker(ips <-chan string, wg *sync.WaitGroup) {
}
}

func (h Hagelslag) spawn(semaphore <-chan struct{}, ip string, port string, network string, dialer net.Dialer, collection *mongo.Collection) {
func (h Hagelslag) spawn(semaphore chan struct{}, address string, network string, dialer net.Dialer, collection *mongo.Collection) {
// Release the slot when done
defer func() { <-semaphore }()

// Connect
address := net.JoinHostPort(ip, port)
conn, err := dialer.Dial(network, address)
if err != nil {
// Don't log timeouts
Expand All @@ -153,7 +140,7 @@ func (h Hagelslag) spawn(semaphore <-chan struct{}, ip string, port string, netw
return
}

response, latency, err := h.Scanner.Scan(ip, conn)
response, latency, err := h.Scanner.Scan(address, conn)
if len(response) == 0 && err == nil {
// No response, or wrong response (not wanted, can be discarded)
return
Expand All @@ -169,17 +156,17 @@ func (h Hagelslag) spawn(semaphore <-chan struct{}, ip string, port string, netw
return
}

os.Stderr.WriteString("\nERROR SCAN " + ip + ": " + err.Error())
os.Stderr.WriteString("\nERROR SCAN " + address + ": " + err.Error())
return
}

err = h.Scanner.Save(ip, latency, response, collection)
err = h.Scanner.Save(address, latency, response, collection)
if err != nil {
if shuttingDown {
return
}

os.Stderr.WriteString("\nERROR SAVE " + ip + ": " + err.Error())
os.Stderr.WriteString("\nERROR SAVE " + address + ": " + err.Error())
return
}

Expand Down
40 changes: 36 additions & 4 deletions ip.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"strings"
)

func ipFromUint32(address uint32) string {
var ip [15]byte
func ipFromUint32(address uint32, port uint16) string {
var ip [21]byte
i := 0

// Helper function to write a 3-digit segment into the buffer
Expand Down Expand Up @@ -41,6 +41,38 @@ func ipFromUint32(address uint32) string {
i++

appendSegment(byte(address))

ip[i] = ':'
i++

start := i
if port >= 10000 {
ip[i] = '0' + byte(port/10000)
i++
port %= 10000
}

if port >= 1000 || i > start {
ip[i] = '0' + byte(port/1000)
i++
port %= 1000
}

if port >= 100 || i > start {
ip[i] = '0' + byte(port/100)
i++
port %= 100
}

if port >= 10 || i > start {
ip[i] = '0' + byte(port/10)
i++
port %= 10
}

ip[i] = '0' + byte(port)
i++

return string(ip[:i])
}

Expand Down Expand Up @@ -74,8 +106,8 @@ func parseIP(ip string) (uint32, error) {
return 0, fmt.Errorf("invalid segment '%s' in IP '%s'", octets[3], ip)
}

address := (uint32(segA) << 24) | (uint32(segB) << 16) | (uint32(segC) << 8) | uint32(segD)
return address, nil
parsed := (uint32(segA) << 24) | (uint32(segB) << 16) | (uint32(segC) << 8) | uint32(segD)
return parsed, nil
}

// Check if the IP is in any reserved range, skips to the next available range if it is.
Expand Down
5 changes: 3 additions & 2 deletions ip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@ func BenchmarkIPToString(b *testing.B) {

for s := 0; s < b.N; s++ {
i := uint32(0)
g := 10000000
port := uint16(25565)
g := 1000000

for {
if g == 0 {
break
}
g--

ip := ipFromUint32(i)
ip := ipFromUint32(i, port)
if ip == "" {
b.Fail()
}
Expand Down
73 changes: 43 additions & 30 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import (
"fmt"
"os"
"os/signal"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
Expand Down Expand Up @@ -36,71 +39,81 @@ func main() {
os.Exit(1)
}

address, err := parseIP(hagelslag.StartingIP)
if err != nil {
fmt.Printf("Error parsing starting IP: %s\n", err)
os.Exit(1)
}
format := "\r\033[KRate: %d | Success: %d | At: %s"

writer := bufio.NewWriter(os.Stderr)
defer writer.Flush()

// IP/s
ipRate := int64(0)

format := "\r\033[KSuccess: %d | Rate: %d | At: %s"

ticker := time.NewTicker(1 * time.Second)
// Channel that will print status every second
status := time.NewTicker(1 * time.Second).C

ips := make(chan string, hagelslag.MaxTasks)
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)

semaphore := make(chan struct{}, hagelslag.Rate)
addresses := make(chan string)

var wg sync.WaitGroup
for range hagelslag.NumWorkers {
for range runtime.NumCPU() {
wg.Add(1)
go hagelslag.worker(ips, &wg)
go hagelslag.worker(addresses, semaphore, &wg)
}

ip, err := parseIP(hagelslag.StartingIP)
if err != nil {
fmt.Printf("Error parsing starting IP: %s\n", err)
os.Exit(1)
}

portInt, err := strconv.Atoi(hagelslag.Scanner.Port())
if err != nil {
fmt.Printf("Error parsing port: %s\n", err)
os.Exit(1)
}

port := uint16(portInt)

// Main loop
for {
select {
case <-ticker.C:
ipsPerSecond := atomic.LoadInt64(&ipRate)
case <-status:
success := atomic.LoadInt64(&successCount)
fmt.Fprintf(writer, format, success, ipsPerSecond, ipFromUint32(address))
fmt.Fprintf(writer, format, hagelslag.Rate, success, ipFromUint32(ip, port))
writer.Flush()
atomic.StoreInt64(&ipRate, 0)

case <-signals:
fmt.Printf("\nShutting down...\n")
shuttingDown = true
close(ips)
close(addresses)
wg.Wait()
ip := ipFromUint32(address)
if ip == "255.0.0.0" {
address := ipFromUint32(ip, port)
if strings.HasPrefix(address, "255.0.0.0") {
fmt.Println("Done.")
} else {
fmt.Printf("Last IP: %s\n", ip)
fmt.Printf("Last IP: %s\n", address)
}

return

default:
if address >= 0xFF000000 {
if ip >= 0xFF000000 {
signals <- syscall.SIGTERM
continue
}

ip := ipFromUint32(address)

if isReserved(&address) {
os.Stderr.WriteString("\nReserved range reached, skipping to next available range.\nAt: " + ip)
if isReserved(&ip) {
os.Stderr.WriteString("\nReserved range reached, skipping to next available range.\n")
}

ips <- ip
atomic.AddInt64(&ipRate, 1)
address := ipFromUint32(ip, port)

// Get a slot to work on a task
semaphore <- struct{}{}

// Send the address to workers
addresses <- address

address++
ip++
}
}
}
Expand Down

0 comments on commit 2d03494

Please sign in to comment.