-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathxsk.go
1431 lines (1207 loc) · 39.5 KB
/
xsk.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package gobpfld
import (
"errors"
"fmt"
"io"
"os"
"strings"
"sync"
"syscall"
"time"
"unsafe"
bpfSyscall "github.com/dylandreimerink/gobpfld/internal/syscall"
"github.com/dylandreimerink/gobpfld/kernelsupport"
"golang.org/x/sys/unix"
)
// A FrameReader can read whole or partial ethernet frames. Every time ReadFrame is called, p will be filled with up
// to len(p) bytes from a single frame. These bytes include both the header and body of the ethernet frame.
// If p to small to fit the whole frame, the remaining bytes of the frame are discarded. The next call to ReadFrame
// will start at the next frame.
//
// n will be set to the number of bytes read from the the frame. err is non nil if any error has occurred during the
// process. If both n is 0 and err is nil nothing was read for an expected reason like a timout or external interrupt.
type FrameReader interface {
ReadFrame(p []byte) (n int, err error)
}
type FrameWriter interface {
WriteFrame(p []byte) (n int, err error)
}
type FrameLeaser interface {
ReadLease() (*XSKLease, error)
WriteLease() (*XSKLease, error)
}
var (
_ FrameReader = (*XSKMultiSocket)(nil)
_ FrameWriter = (*XSKMultiSocket)(nil)
_ FrameLeaser = (*XSKMultiSocket)(nil)
_ io.Closer = (*XSKMultiSocket)(nil)
)
// XSKMultiSocket is a collection of XSKSockets. The multi socket balances reads and writes between all XSKSockets.
// This is useful for multi queue netdevices since a XSKSocket can only read or write from one rx/tx queue pair
// at a time. A multi queue allows you to bundle all of these sockets so you get a socket for the whole netdevice.
//
// An alternative use for the multi socket is to add sockets from multiple netdevices.
//
// TODO look into using epoll for multi sockets. Using poll for single sockets still makes sense since there is always
// 1 fd, but for multi sockets we can have much more. For high-end NICs with ~40 rx/tx queues(mallanox for example)
// it makes sense to start using epoll since it is supposed to scale better. Should make it configurable when adding
// support in case freeBSD or other unix-like os adds XSK support since epoll is non-POSIX
//
// TODO dynamic socket adding/removing. Should not be to hard, the main edge case to solve is dealing with
// pending/blocking syscalls for read/write. But presumably epoll can allow us to dynamically add/remove
// fds without interrupting the reads/writes. Otherwise adding/removing sockets will have to request both the
// rmu and wmu.
type XSKMultiSocket struct {
sockets []*XSKSocket
rmu sync.Mutex
wmu sync.Mutex
readIter int
writeIter int
readTimeout int
writeTimeout int
}
func NewXSKMultiSocket(xskSockets ...*XSKSocket) (*XSKMultiSocket, error) {
if len(xskSockets) == 0 {
return nil, fmt.Errorf("need at least one socket")
}
for _, sock := range xskSockets {
if sock == nil {
return nil, fmt.Errorf("socket value can't be nil")
}
}
return &XSKMultiSocket{
sockets: xskSockets,
}, nil
}
// SetWriteTimeout sets the timeout for Write and XSKLease.WriteBack calls.
// If ms == 0 (default), we will never block/wait and error if we can't write at once.
// If ms == -1, we will block forever until we can write.
// If ms > 0, we will wait for x miliseconds for an oppurunity to write or error afterwards.
func (xms *XSKMultiSocket) SetWriteTimeout(ms int) error {
if ms < -1 {
return fmt.Errorf("timeout must be -1, 0, or positive amount of miliseconds")
}
xms.writeTimeout = ms
return nil
}
// SetReadTimeout sets the timeout for Read and ReadLease calls.
// If ms == 0 (default), we will never block/wait and return no data if there isn't any ready.
// If ms == -1, we will block forever until we can read.
// If ms > 0, we will wait for x miliseconds for an oppurunity to read or return no data.
func (xms *XSKMultiSocket) SetReadTimeout(ms int) error {
if ms < -1 {
return fmt.Errorf("timeout must be -1, 0, or positive amount of miliseconds")
}
xms.readTimeout = ms
return nil
}
func (xms *XSKMultiSocket) ReadFrame(p []byte) (n int, err error) {
xms.rmu.Lock()
defer xms.rmu.Unlock()
var (
desc *descriptor
sock *XSKSocket
)
pollFds := make([]unix.PollFd, len(xms.sockets))
// Save the current iter value, we need it during poll resolving
curItr := xms.readIter
// Check every socket in case there is a frame ready
for i := 0; i < len(xms.sockets); i++ {
// Use readIter which keeps it value between calls to Read this ensures that we attempt to read from
// all sockets equally
sock = xms.sockets[xms.readIter]
desc = sock.rx.Dequeue()
xms.readIter++
if xms.readIter >= len(xms.sockets) {
xms.readIter = 0
}
if desc != nil {
break
}
pollFds[i] = unix.PollFd{
Fd: int32(xms.sockets[xms.readIter].fd),
Events: unix.POLLIN,
}
}
// If none of the sockets have frames ready at the moment, we poll to wait
if desc == nil {
n, err := unix.Poll(pollFds, xms.readTimeout)
if err != nil {
// Sometimes a poll is interrupted by a signal, no a real error
// lets treat it like a timeout
if err == syscall.EINTR {
return 0, nil
}
return 0, fmt.Errorf("poll: %w", err)
}
// If n == 0, the timeout was reached
if n == 0 {
return 0, nil
}
// now that there is at least one socket with a frame, we need to find it
for i := 0; i < len(xms.sockets); i++ {
if pollFds[i].Revents&unix.POLLIN > 0 {
sock = xms.sockets[curItr]
desc = sock.rx.Dequeue()
if desc != nil {
break
}
}
curItr++
if curItr >= len(xms.sockets) {
curItr = 0
}
}
// If poll returned n>0 but dequeueing still failed
if desc == nil {
return 0, nil
}
}
len := copy(p, sock.umem[desc.addr:desc.addr+uint64(desc.len)])
err = sock.fill.Enqueue(addrToFrameStart(desc.addr, sock.settings.FrameSize))
if err != nil {
return len, fmt.Errorf("fill enqueue: %w", err)
}
err = sock.wakeupFill()
if err != nil {
return len, err
}
return len, nil
}
func (xms *XSKMultiSocket) WriteFrame(p []byte) (n int, err error) {
xms.wmu.Lock()
defer xms.wmu.Unlock()
pollFds := make([]unix.PollFd, len(xms.sockets))
// Check every socket in case there is a frame ready
for i := 0; i < len(xms.sockets); i++ {
pollFds[i] = unix.PollFd{
Fd: int32(xms.sockets[xms.writeIter].fd),
Events: unix.POLLOUT,
}
xms.writeIter++
if xms.writeIter >= len(xms.sockets) {
xms.writeIter = 0
}
}
n, err = unix.Poll(pollFds, xms.writeTimeout)
if err != nil {
return 0, fmt.Errorf("poll: %w", err)
}
if n == 0 {
return 0, fmt.Errorf("timeout")
}
var (
addr uint64
sock *XSKSocket
)
for i := 0; i < len(xms.sockets); i++ {
sock = xms.sockets[xms.writeIter]
xms.writeIter++
if xms.writeIter >= len(xms.sockets) {
xms.writeIter = 0
}
if pollFds[i].Revents&unix.POLLOUT > 0 {
addr = <-sock.txAddrs
break
}
}
len := copy(sock.umem[addr:addr+uint64(len(p))], p)
err = sock.enqueueTx(descriptor{
addr: addr,
len: uint32(len),
})
if err != nil {
sock.txAddrs <- addr
return 0, err
}
err = sock.wakeupTx()
if err != nil {
return 0, err
}
return len, nil
}
func (xms *XSKMultiSocket) Close() error {
for _, sock := range xms.sockets {
err := sock.Close()
if err != nil {
return err
}
}
return nil
}
// WriteLease creates a XSKLease which points to a piece of preallocated memory. This memory can be used to
// build packets for writing. Unlike XSKLeases gotten from ReadLease, write leases have no Headroom.
// The Data slice of the lease is the full length of the usable frame, this length should not be exceeded.
// Any memory held by the lease can't be reused until released or written.
//
// This function blocks until a frame for transmission is available and is not subject to the write timeout.
func (xms *XSKMultiSocket) WriteLease() (lease *XSKLease, err error) {
sock := xms.sockets[xms.writeIter]
xms.writeIter++
if xms.writeIter >= len(xms.sockets) {
xms.writeIter = 0
}
addr := <-sock.txAddrs
return &XSKLease{
Headroom: 0,
Data: sock.umem[addr : addr+uint64(sock.settings.FrameSize)],
dataAddr: addr,
sock: sock,
fromTx: true,
}, nil
}
// ReadLease reads a frame from the socket and returns its memory in a XSKLease. After reading the contents of the
// frame it can be released or written, both will allow the memory to be reused. Calling Write on the lease will
// cause the contents of Data to be written back to the network interface. The contents of Data can be modified
// before calling Write thus allowing a program to implement zero-copy/zero-allocation encaptulation or
// request/response protocols.
func (xms *XSKMultiSocket) ReadLease() (lease *XSKLease, err error) {
xms.rmu.Lock()
defer xms.rmu.Unlock()
var (
desc *descriptor
sock *XSKSocket
)
pollFds := make([]unix.PollFd, len(xms.sockets))
// Save the current iter value, we need it during poll resolving
curItr := xms.readIter
// Check every socket in case there is a frame ready
for i := 0; i < len(xms.sockets); i++ {
// Use readIter which keeps it value between calls to Read this ensures that we attempt to read from
// all sockets equally
sock = xms.sockets[xms.readIter]
desc = sock.rx.Dequeue()
xms.readIter++
if xms.readIter >= len(xms.sockets) {
xms.readIter = 0
}
if desc != nil {
break
}
pollFds[i] = unix.PollFd{
Fd: int32(xms.sockets[xms.readIter].fd),
Events: unix.POLLIN,
}
}
// If none of the sockets have frames ready at the moment, we poll to wait
if desc == nil {
n, err := unix.Poll(pollFds, xms.readTimeout)
if err != nil {
// Sometimes a poll is interrupted by a signal, no a real error
// lets treat it like a timeout
if err == syscall.EINTR {
return nil, nil
}
return nil, fmt.Errorf("poll: %w", err)
}
// If n == 0, the timeout was reached
if n == 0 {
return nil, nil
}
// now that there is at least one socket with a frame, we need to find it
for i := 0; i < len(xms.sockets); i++ {
if pollFds[i].Revents&unix.POLLIN > 0 {
sock = xms.sockets[curItr]
desc = sock.rx.Dequeue()
if desc != nil {
break
}
}
curItr++
if curItr >= len(xms.sockets) {
curItr = 0
}
}
// If poll returned n>0 but dequeueing still failed
if desc == nil {
return nil, nil
}
}
return &XSKLease{
Headroom: sock.settings.Headroom,
Data: sock.umem[desc.addr-uint64(sock.settings.Headroom) : desc.addr+uint64(desc.len)],
dataAddr: desc.addr,
sock: sock,
}, nil
}
// XSKLease is used to "lease" a piece of buffer memory from the socket and return it after the user
// is done using it. This allows us to implement true zero copy packet access.
// After a XSKLease is released or written the underlaying array of Data will be repurposed, to avoid strage bugs
// users must use Data or sub-slices of Data after the lease has been released.
type XSKLease struct {
Data []byte
// The amount of bytes which are prefixed at the start which don't contain frame data.
// This headroom can be used to add an extra header(encapsulation) without having to
// copy or move the existing packet data.
Headroom int
// dataAddr is the memory address at the start of the headroom.
dataAddr uint64
sock *XSKSocket
// If true the frame address originates from the txAddrs chan
fromTx bool
}
// Release releases the leased memory so the kernel can fill it with new data.
func (xl *XSKLease) Release() error {
// Remove reference to Data since it is invalid from now
xl.Data = nil
frameAddr := addrToFrameStart(xl.dataAddr, xl.sock.settings.FrameSize)
// If the this is a tx lease, we can just return the unused address to the txAddrs buffer
if xl.fromTx {
xl.sock.txAddrs <- frameAddr
} else {
// else, this lease was a rx lease in which case it must be returned to the fill ring
xl.sock.fmu.Lock()
defer xl.sock.fmu.Unlock()
// Enqueue the address of the frame on the fill queue so it can be reused
err := xl.sock.fill.Enqueue(frameAddr)
if err != nil {
return fmt.Errorf("enqueue fill: %w", err)
}
err = xl.sock.wakeupFill()
if err != nil {
return err
}
}
return nil
}
// Write writes a lease to the network interface. The len property of the 'Data' slice - 'Headroom' is the length of
// the packet. Make sure to resize the Data to the size of the data to be transmitted.
// The headroom should always be included(never resize the start of the slice). The 'Headroom' should be used
// to indicate from which byte the headroom starts.
// After Write has been called the lease will be released and the Data slice or its subslices should not
// be used anymore.
func (xl *XSKLease) Write() error {
xl.sock.wmu.Lock()
defer xl.sock.wmu.Unlock()
if len(xl.Data) > xl.sock.settings.FrameSize {
return fmt.Errorf("lease has been expanded beyond framesize, can't transmit")
}
err := xl.sock.enqueueTx(descriptor{
// When enqueueing, we don't want to send the headroom bytes
addr: xl.dataAddr + uint64(xl.Headroom),
// Data should contain headroom + packet, since we will not be sending headroom
// we need to subtract the amout of headroom from the length of Data to get the correct packet length
len: uint32(len(xl.Data) - xl.Headroom),
})
if err != nil {
return fmt.Errorf("tx enqueue: %w", err)
}
err = xl.sock.wakeupTx()
if err != nil {
return err
}
// If the lease was from the fill->rx lifecycle
if !xl.fromTx {
// Since a frame from the fill->rx lifecycle was used to transmit, we will now get a frame from
// the tx->completion lifecycle and insert it into the fill ring so we end up with the same
// amount of frames available for both cycles. If we don't do this the fill->rx cycle will run
// out of frames.
// The completion queue is full at rest at max capacity, so first dequeue one frame to make
// room for the frame we are about to enqueue in tx, just in case the kernel can transmit
// faster than we can dequeue.
addr := <-xl.sock.txAddrs
err := xl.sock.fill.Enqueue(addr)
if err != nil {
return fmt.Errorf("fill enqueue: %w", err)
}
err = xl.sock.wakeupFill()
if err != nil {
return err
}
}
// Set data to nil to indicate that it is no longer valid to use
xl.Data = nil
return nil
}
// The addresses we get back from the rx ring have offsets due to headspacing, both user configured
// and default headspacing created by the network driver. This function round the address
// to the nearest start of a frame in umem when re-enqueueing the frame address
// https://www.spinics.net/lists/xdp-newbies/msg01479.html
func addrToFrameStart(addr uint64, frameSize int) uint64 {
return (addr / uint64(frameSize)) * uint64(frameSize)
}
// xskAddrRing is a ring buffer containing decriptors used for the rx and tx rings
type xskDescRing struct {
xskRing
}
func (dr *xskDescRing) Dequeue() *descriptor {
producer := (*uint32)(dr.producer)
consumer := (*uint32)(dr.consumer)
if (*producer - *consumer) == 0 {
return nil
}
// The linux kernel uses the wraparound of an integer to reset the consumer and
// producer. And since ring buffers are always a factor of 2 we can just throw away
// all bits which fall outsize of this size to get a always increasing offset
// between 0 and dr.elemCount
off := *consumer & (dr.elemCount - 1)
desc := (*descriptor)(unsafe.Pointer(uintptr(dr.ring) + uintptr(off)*descSize))
*consumer++
return desc
}
func (dr *xskDescRing) Enqueue(desc descriptor) error {
producer := (*uint32)(dr.producer)
consumer := (*uint32)(dr.consumer)
// If the diff between producer and consumer is larger than the elem count the buffer is full
if (*producer - *consumer) == dr.elemCount-1 {
return errBufferFull
}
// The linux kernel uses the wraparound of an integer to reset the consumer and
// producer. And since ring buffers are always a factor of 2 we can just throw away
// all bits which fall outsize of this size to get a always increasing offset
// between 0 and dr.elemCount
off := *producer & (dr.elemCount - 1)
// Write the address to the current producer pos
*(*descriptor)(unsafe.Pointer(uintptr(dr.ring) + uintptr(off)*descSize)) = desc
*producer++
return nil
}
// xskAddrRing is a ring buffer containing addresses (uint64) used for the fill and completion rings
type xskAddrRing struct {
xskRing
}
const addrSize = unsafe.Sizeof(uint64(0))
func (ar *xskAddrRing) Dequeue() *uint64 {
producer := (*uint32)(ar.producer)
consumer := (*uint32)(ar.consumer)
if (*producer - *consumer) == 0 {
return nil
}
// The linux kernel uses the wraparound of an integer to reset the consumer and
// producer. And since ring buffers are always a factor of 2 we can just throw away
// all bits which fall outsize of this size to get a always increasing offset
// between 0 and ar.elemCount
off := *consumer & (ar.elemCount - 1)
addr := (*uint64)(unsafe.Pointer(uintptr(ar.ring) + uintptr(off)*addrSize))
*consumer++
return addr
}
var errBufferFull = errors.New("ring buffer is full")
func (ar *xskAddrRing) Enqueue(addr uint64) error {
producer := (*uint32)(ar.producer)
consumer := (*uint32)(ar.consumer)
// If the diff between producer and consumer is larger than the elem count the buffer is full
if (*producer - *consumer) == ar.elemCount-1 {
return errBufferFull
}
// The linux kernel uses the wraparound of an integer to reset the consumer and
// producer. And since ring buffers are always a factor of 2 we can just throw away
// all bits which fall outsize of this size to get a always increasing offset
// between 0 and dr.elemCount
off := *producer & (ar.elemCount - 1)
// Write the address to the current producer pos
*(*uint64)(unsafe.Pointer(uintptr(ar.ring) + uintptr(off)*addrSize)) = addr
*producer++
return nil
}
type xskRing struct {
// Hold a reference to the mmap so we can unmmap it later
mmap []byte
elemCount uint32
// This double pointer is owned by the producer, it points to the last element in the ring buffer that was added
producer unsafe.Pointer
// This double pointer is owned by the consumer, it points to the last element in the ring buffer that was consumed
consumer unsafe.Pointer
// A pointer to the start of the ring buffer
ring unsafe.Pointer
flags unsafe.Pointer
}
func (xr *xskRing) Close() error {
if xr.mmap != nil {
return syscall.Munmap(xr.mmap)
}
xr.mmap = nil
return nil
}
func newXskRing(mmap []byte, off ringOffset, elemCount uint32) xskRing {
return xskRing{
mmap: mmap,
consumer: unsafe.Pointer(&mmap[off.consumer]),
producer: unsafe.Pointer(&mmap[off.producer]),
ring: unsafe.Pointer(&mmap[off.desc]),
flags: unsafe.Pointer(&mmap[off.flags]),
elemCount: elemCount,
}
}
// https://elixir.bootlin.com/linux/latest/source/include/uapi/linux/if_xdp.h
// struct xdp_umem_reg {
// __u64 addr; /* Start of packet data area */
// __u64 len; /* Length of packet data area */
// __u32 chunk_size;
// __u32 headroom;
// __u32 flags;
// };
type umemReg struct {
addr uint64
len uint64
chunkSize uint32
headroom uint32
flags uint32 //nolint:structcheck // unused reserved for future use
}
// struct xdp_ring_offset {
// __u64 producer;
// __u64 consumer;
// __u64 desc;
// __u64 flags;
// };
type ringOffset struct {
producer uint64
consumer uint64
desc uint64
flags uint64
}
type ringOffsetNoFlags struct {
producer uint64
consumer uint64
desc uint64
}
// struct xdp_mmap_offsets {
// struct xdp_ring_offset rx;
// struct xdp_ring_offset tx;
// struct xdp_ring_offset fr; /* Fill */
// struct xdp_ring_offset cr; /* Completion */
// };
type mmapOffsets struct {
rx ringOffset
tx ringOffset
fr ringOffset
cr ringOffset
}
// struct xdp_desc {
// __u64 addr;
// __u32 len;
// __u32 options;
// };
type descriptor struct {
addr uint64
len uint32
// options is reserved and not used, setting it to anything other than 0 is invalid in 5.12.2
// https://elixir.bootlin.com/linux/v5.12.2/source/net/xdp/xsk_queue.h#L141
options uint32 //nolint:structcheck // not used but reserved for future use (also for descSize)
}
var descSize = unsafe.Sizeof(descriptor{})
// struct sockaddr_xdp {
// __u16 sxdp_family;
// __u16 sxdp_flags;
// __u32 sxdp_ifindex;
// __u32 sxdp_queue_id;
// __u32 sxdp_shared_umem_fd;
// };
type xdpSockAddr struct {
sxdpFamily uint16
sxdpFlags uint16
sxdpIfIndex uint32
sxdpQueueID uint32
sxdpSharedUmemFD uint32
}
type XSKSettings struct {
// Size of the umem frames/packet buffers (2048 or 4096)
FrameSize int
// Amount of frames/packets which can be used, must be a power of 2
FrameCount int
// The index of the network device on which XSK will be used
NetDevIfIndex int
// The id of the Queue on which this XSK will be used
QueueID int
// How much unused space should be left at the start of each buffer.
// This can be used to for example encapsulate a packet whichout having to move or copy memory
Headroom int
// Is Tx disabled for this socket?
DisableTx bool
// Is Rx disabled for this socket?
DisableRx bool
// If true, XDP_USE_NEED_WAKEUP is not used. Should be on by default
// unless there is a reason it doesn't work (like on older kernels)
DisableNeedWakeup bool
// If true, zero copy mode is forced. By default zero copy mode is attempted and if not available
// in the driver will automatically fallback to copy mode.
ForceZeroCopy bool
// If true, copy mode is always used and zero copy mode never attempted.
ForceCopy bool
// The minimum time between two checks of the completion queue. A lower value allows for more transmitted
// packets per seconds at the cost of higher CPU usage, even when not transmitting.
// By default this value is 10ms which seems a sane value, it means that there is a theorethical max TX rate of
// (1000/10) * (tx ring size) which is 100 * 2048 = 204,800 packets per second when DisableRx = false
// or 100 * 4096 = 409,600 when DisableRx = true at the default FrameCount of 4096.
// Setting this setting to 0 will cause one goroutine to busy poll(use 100% CPU) per socket.
CQConsumeInterval *time.Duration
}
// Same defaults as libbpf https://elixir.bootlin.com/linux/latest/source/tools/lib/bpf/xsk.h#L192
const (
defaultFrameCount = 4096
defaultFrameSize = 4096
)
var (
_ FrameReader = (*XSKSocket)(nil)
_ FrameWriter = (*XSKSocket)(nil)
_ FrameLeaser = (*XSKSocket)(nil)
_ io.Closer = (*XSKSocket)(nil)
)
// A XSKSocket can bind to one queue on one netdev
type XSKSocket struct {
fd int
// memory region where frames are exchanged with kernel
umem []byte
settings XSKSettings
// Buffered channel containing addresses frames which can be used
// for transmission
txAddrs chan uint64
completionTicker *time.Ticker
rmu sync.Mutex
wmu sync.Mutex
fmu sync.Mutex
rx xskDescRing
tx xskDescRing
fill xskAddrRing
completion xskAddrRing
readTimeout int
writeTimeout int
}
func NewXSKSocket(settings XSKSettings) (_ *XSKSocket, err error) {
if !kernelsupport.CurrentFeatures.Map.Has(kernelsupport.KFeatMapAFXDP) {
return nil, fmt.Errorf("XSK/AF_XDP is not supported by the current kernel version")
}
if settings.FrameCount == 0 {
settings.FrameCount = defaultFrameCount
}
if settings.FrameSize == 0 {
settings.FrameSize = defaultFrameSize
}
if !isPowerOfTwo(settings.FrameCount) {
return nil, fmt.Errorf("frame count must be a power of 2")
}
if settings.FrameSize != 2048 && settings.FrameSize != 4096 {
// TODO allow frame sizes which are not aligned to 2k but enable
// XDP_UMEM_UNALIGNED_CHUNK_FLAG when this happens
return nil, fmt.Errorf("frame size must be 2048 or 4096")
}
if settings.DisableTx && settings.DisableRx {
return nil, fmt.Errorf("tx and rx can't both be disabled")
}
if settings.ForceCopy && settings.ForceZeroCopy {
return nil, fmt.Errorf("can't force both zero-copy and copy mode")
}
umemSize := settings.FrameSize * settings.FrameCount
xskSock := &XSKSocket{
umem: make([]byte, umemSize),
settings: settings,
}
xskSock.fd, err = syscall.Socket(unix.AF_XDP, syscall.SOCK_RAW, 0)
if err != nil {
return nil, fmt.Errorf("syscall socket: %w", err)
}
// If we return with an error, close the socket so we don't leak resources
defer func() {
if err != nil {
xskSock.Close()
}
}()
reg := umemReg{
addr: uint64(uintptr(unsafe.Pointer(&xskSock.umem[0]))),
len: uint64(len(xskSock.umem)),
chunkSize: uint32(settings.FrameSize),
headroom: uint32(settings.Headroom),
// TODO flags
}
// Register the umem
err = bpfSyscall.Setsockopt(
xskSock.fd,
unix.SOL_XDP,
unix.XDP_UMEM_REG,
unsafe.Pointer(®),
unsafe.Sizeof(reg),
)
if err != nil {
return nil, fmt.Errorf("set sockopt UMEM_REG: %w", err)
}
// Assume both are enabled
rxCount := settings.FrameCount / 2
txCount := rxCount
// If tx is disabled
if settings.DisableTx {
txCount = 0
rxCount = settings.FrameCount
} else if settings.DisableRx {
txCount = settings.FrameCount
rxCount = 0
}
// Tell the kernel how large the fill ring should be
err = bpfSyscall.Setsockopt(
xskSock.fd,
unix.SOL_XDP,
unix.XDP_UMEM_FILL_RING,
unsafe.Pointer(&rxCount),
unsafe.Sizeof(rxCount),
)
if err != nil {
return nil, fmt.Errorf("set sockopt XDP_UMEM_FILL_RING: %w", err)
}
// Tell the kernel how large the completion ring should be
err = bpfSyscall.Setsockopt(
xskSock.fd,
unix.SOL_XDP,
unix.XDP_UMEM_COMPLETION_RING,
unsafe.Pointer(&txCount),
unsafe.Sizeof(txCount),
)
if err != nil {
return nil, fmt.Errorf("set sockopt XDP_UMEM_COMPLETION_RING: %w", err)
}
offsets, err := getMMapOffsets(xskSock.fd)
if err != nil {
return nil, fmt.Errorf("get mmap offsets: %w", err)
}
mmap, err := syscall.Mmap(
xskSock.fd,
unix.XDP_UMEM_PGOFF_FILL_RING,
int(offsets.fr.desc)+rxCount*int(unsafe.Sizeof(uint64(0))),
unix.PROT_READ|unix.PROT_WRITE,
unix.MAP_SHARED|unix.MAP_POPULATE,
)
if err != nil {
return nil, fmt.Errorf("mmap fill ring: %w", err)
}
xskSock.fill = xskAddrRing{
xskRing: newXskRing(mmap, offsets.fr, uint32(rxCount)),
}
mmap, err = syscall.Mmap(
xskSock.fd,
unix.XDP_UMEM_PGOFF_COMPLETION_RING,
int(offsets.cr.desc)+txCount*int(unsafe.Sizeof(uint64(0))),
unix.PROT_READ|unix.PROT_WRITE,
unix.MAP_SHARED|unix.MAP_POPULATE,
)
if err != nil {
return nil, fmt.Errorf("mmap completion ring: %w", err)
}
xskSock.completion = xskAddrRing{
xskRing: newXskRing(mmap, offsets.cr, uint32(txCount)),
}
xskSock.txAddrs = make(chan uint64, txCount+1)
txOffset := rxCount * settings.FrameSize
// Fill the txAddrs channel with available addresses to use during transmisstion
for i := 0; i < txCount; i++ {
xskSock.txAddrs <- uint64(txOffset + i*settings.FrameSize)
}
// TODO allow for completion worker pooling (having one worker check multiple sockets)
// this would allow a user to dedicate 1 or 2 CPU cores to busy polling all sockets of a
// particular netdev or even the whole host.
interval := 10 * time.Millisecond
if settings.CQConsumeInterval != nil {
interval = *settings.CQConsumeInterval
}
xskSock.completionTicker = time.NewTicker(interval)
go xskSock.completionWorker()
// Tell the kernel how large the rx ring should be
err = bpfSyscall.Setsockopt(
xskSock.fd,
unix.SOL_XDP,
unix.XDP_RX_RING,
unsafe.Pointer(&rxCount),
unsafe.Sizeof(rxCount),
)
if err != nil {
return nil, fmt.Errorf("set sockopt XDP_RX_RING: %w", err)
}
// Tell the kernel how large the tx ring should be
err = bpfSyscall.Setsockopt(
xskSock.fd,
unix.SOL_XDP,
unix.XDP_TX_RING,
unsafe.Pointer(&txCount),
unsafe.Sizeof(txCount),
)
if err != nil {
return nil, fmt.Errorf("set sockopt XDP_TX_RING: %w", err)
}
mmap, err = syscall.Mmap(
xskSock.fd,
unix.XDP_PGOFF_RX_RING,
int(offsets.rx.desc)+rxCount*int(unsafe.Sizeof(descriptor{})),
unix.PROT_READ|unix.PROT_WRITE,
unix.MAP_SHARED|unix.MAP_POPULATE,
)
if err != nil {
return nil, fmt.Errorf("mmap rx ring: %w", err)
}
xskSock.rx = xskDescRing{
xskRing: newXskRing(mmap, offsets.rx, uint32(rxCount)),
}
mmap, err = syscall.Mmap(
xskSock.fd,
unix.XDP_PGOFF_TX_RING,
int(offsets.tx.desc)+txCount*int(unsafe.Sizeof(descriptor{})),
unix.PROT_READ|unix.PROT_WRITE,
unix.MAP_SHARED|unix.MAP_POPULATE,
)
if err != nil {
return nil, fmt.Errorf("mmap tx ring: %w", err)
}
xskSock.tx = xskDescRing{
xskRing: newXskRing(mmap, offsets.tx, uint32(txCount)),
}
var flags uint16