Skip to content

Commit

Permalink
Allow multiple IBV UDP source ports
Browse files Browse the repository at this point in the history
This commit instantiates multiple IBV rules for different
UDP source ports. This effectively increases the size of the
IBV receive buffers, providing performance increases.

This commit was tested in the lab with 360 antennas (2 x 33.75 Gb/s throughput;
2 x 34.1 Gb/s including app header (but not UDP/IP/Eth header).

Over 15 hours, it lost 102 packets (all in one burst from one of the
two parallel pipelines). This was tested with the GPU and output threads
active. (17.18 seconds integration using BDA configuration)

Possibly performance could be further improved by using >8 source ports,
but this is not possible with the current 360 antenna lab emulator,
which only uses 8 SNAP boards.
  • Loading branch information
Jack H committed Nov 26, 2019
1 parent 2d2fa57 commit 4eef974
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 134 deletions.
285 changes: 152 additions & 133 deletions src/hera_ibv_thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -598,35 +598,44 @@ static void *run(hashpipe_thread_args_t * args)
st_p = &st; // allow global (this source file) access to the status buffer

// IB Verbs structures
struct hashpipe_ibv_context hibv_ctx = {0};
struct hashpipe_ibv_context * hibv_ctx;
struct hashpipe_ibv_recv_pkt * hibv_rpkt;
struct hashpipe_ibv_recv_pkt * curr_rpkt;
char ifname[IFNAMSIZ];
int bindport;
int i;

hashpipe_status_lock_safe(&st);
hgets(st.buf, "BINDHOST", IFNAMSIZ, ifname);
hgeti4(st.buf, "BINDPORT", &bindport);
hashpipe_status_unlock_safe(&st);

strncpy(hibv_ctx.interface_name, ifname, IFNAMSIZ);
hibv_ctx.interface_name[IFNAMSIZ-1] = '\0'; // Ensure NUL termination
hibv_ctx.send_pkt_num = 1;
hibv_ctx.recv_pkt_num = 8192;
hibv_ctx.pkt_size_max = 5000;
hibv_ctx.max_flows = 2;

fprintf(stdout, "Initializing IBV socket\n");
if(hashpipe_ibv_init(&hibv_ctx)) {
hashpipe_error(__FUNCTION__, "Failed to initialize IBV");
hibv_ctx = (struct hashpipe_ibv_context *)calloc(N_SRC_PORTS, sizeof(struct hashpipe_ibv_context));
if (!hibv_ctx) {
hashpipe_error(__FUNCTION__, "Failed to allocate IBV context(s)");
}
for (i=0; i<N_SRC_PORTS; i++) {
strncpy(hibv_ctx[i].interface_name, ifname, IFNAMSIZ);
hibv_ctx[i].interface_name[IFNAMSIZ-1] = '\0'; // Ensure NUL termination
hibv_ctx[i].send_pkt_num = 1;
hibv_ctx[i].recv_pkt_num = 8192;
hibv_ctx[i].pkt_size_max = 5000;
hibv_ctx[i].max_flows = N_SRC_PORTS;
if(hashpipe_ibv_init(&(hibv_ctx[i]))) {
hashpipe_error(__FUNCTION__, "Failed to initialize IBV (%d)", i);
}
}

printf("max_qp_wr=%u\n", hibv_ctx.dev_attr.max_qp_wr);
printf("max_qp_wr=%u\n", hibv_ctx[0].dev_attr.max_qp_wr);

// Subscribe to RX flows
if(hashpipe_ibv_flow(&hibv_ctx, 0, IBV_FLOW_SPEC_UDP,
hibv_ctx.mac, NULL, 0, 0, 0, 0, 0, bindport)) {
hashpipe_error(__FUNCTION__, "Failed to configure IBV flow rule");
for (i=0; i<N_SRC_PORTS; i++) {
fprintf(stdout, "Configuring IBV for port %d\n", bindport+i);
if(hashpipe_ibv_flow(&(hibv_ctx[i]), 0, IBV_FLOW_SPEC_UDP,
hibv_ctx[i].mac, NULL, 0, 0, 0, 0, bindport+i, bindport)) {
hashpipe_error(__FUNCTION__, "Failed to configure IBV flow rule %d", i);
}
}


Expand Down Expand Up @@ -656,6 +665,7 @@ static void *run(hashpipe_thread_args_t * args)
}
hashpipe_status_unlock_safe(&st);
}
fprintf(stdout, "Holdoff released\n");

#ifdef DEBUG_SEMS
fprintf(stderr, "s/tid %lu/NET/' <<.\n", pthread_self());
Expand Down Expand Up @@ -703,9 +713,11 @@ static void *run(hashpipe_thread_args_t * args)
// Drop all packets to date
fprintf(stdout, "Dropping existing packets\n");
int dropcnt = 0;
while((hibv_rpkt = hashpipe_ibv_recv_pkts(&hibv_ctx, 8192))) {
hashpipe_ibv_release_pkts(&hibv_ctx, hibv_rpkt);
dropcnt += 1;
for (i=0; i<N_SRC_PORTS; i++) {
while((hibv_rpkt = hashpipe_ibv_recv_pkts(&hibv_ctx[i], 8192))) {
hashpipe_ibv_release_pkts(&hibv_ctx[i], hibv_rpkt);
dropcnt += 1;
}
}
fprintf(stdout, "Dropped waiting packets: %d\n", dropcnt);

Expand Down Expand Up @@ -750,124 +762,131 @@ static void *run(hashpipe_thread_args_t * args)
#ifndef TIMING_TEST
/* Read packet */
clock_gettime(CLOCK_MONOTONIC, &recv_start);
do {
clock_gettime(CLOCK_MONOTONIC, &start);
//p.packet_size = recv(up.sock, p.data, HASHPIPE_MAX_PACKET_SIZE, 0);
hibv_rpkt = hashpipe_ibv_recv_pkts(&hibv_ctx, 1);
clock_gettime(CLOCK_MONOTONIC, &recv_stop);
} while (!hibv_rpkt && run_threads());

if(!run_threads()) break;
for (i=0; i<N_SRC_PORTS; i++) {
clock_gettime(CLOCK_MONOTONIC, &start);
// Try to receive packets with 1ms timeout.
// Note that trying to receive from ports with no traffic _will_
// degrade performance. Waiting 1ms corresponds to ~80 packets
// For 4kB packets, 40Gb/s traffic.
hibv_rpkt = hashpipe_ibv_recv_pkts(&hibv_ctx[i], 1);
clock_gettime(CLOCK_MONOTONIC, &recv_stop);
// If no packets, move on to the next port
if (!hibv_rpkt && run_threads()) {
continue;
}

if(!run_threads()) break;

#endif
wait_ns = ELAPSED_NS(recv_start, start);
recv_ns = ELAPSED_NS(start, recv_stop);
elapsed_wait_ns += wait_ns;
elapsed_recv_ns += recv_ns;
min_wait_ns = MIN(wait_ns, min_wait_ns);
min_recv_ns = MIN(recv_ns, min_recv_ns);
max_wait_ns = MAX(wait_ns, max_wait_ns);
max_recv_ns = MAX(recv_ns, max_recv_ns);

burst_packet_count = 0;
for (curr_rpkt = hibv_rpkt; curr_rpkt; curr_rpkt = (struct hashpipe_ibv_recv_pkt *)curr_rpkt->wr.next) {
clock_gettime(CLOCK_MONOTONIC, &proc_start);
packet_count++;
burst_packet_count++;
// Make sure received packet size matches expected packet size. Allow
// for optional 8 byte CRC in received packet. Zlib's crc32 function
// is too slow to use in realtime, so CRCs cannot be checked on the
// fly. If data errors are suspected, a separate CRC checking utility
// should be used to read the packets from the network and verify CRCs.
int packet_size = curr_rpkt->length;
if (expected_packet_size != packet_size-8 && expected_packet_size != packet_size) {
// Log warning and ignore wrongly sized packet
#ifdef DEBUG_NET
hashpipe_warn(__FUNCTION__, "Invalid pkt size (%d)", packet_size);
#endif
continue;
}
// Copy packet into any blocks where it belongs.
const uint64_t mcnt = process_packet((paper_input_databuf_t *)db, (unsigned char *)curr_rpkt->wr.sg_list->addr);

clock_gettime(CLOCK_MONOTONIC, &stop);
proc_ns = ELAPSED_NS(proc_start, stop);
elapsed_proc_ns += proc_ns;
// Update min max values
min_proc_ns = MIN(proc_ns, min_proc_ns);
max_proc_ns = MAX(proc_ns, max_proc_ns);

if(mcnt != -1) {
// Update status
ns_per_wait = (float)elapsed_wait_ns / packet_count;
ns_per_recv = (float)elapsed_recv_ns / packet_count;
ns_per_proc = (float)elapsed_proc_ns / packet_count;
//fprintf(stdout, "ns_per_recv: %f, total_ns: %lu, packet count: %lu\n", ns_per_recv, elapsed_recv_ns, packet_count);

hashpipe_status_lock_busywait_safe(&st);

hputu8(st.buf, "NETMCNT", mcnt);
// Gbps = bits_per_packet / ns_per_packet
// (N_BYTES_PER_PACKET excludes header, so +8 for the header)
hputr4(st.buf, "NETGBPS", 8*(N_BYTES_PER_PACKET+8)/(ns_per_recv+ns_per_proc));
hputr4(st.buf, "NETWATNS", ns_per_wait);
hputr4(st.buf, "NETRECNS", ns_per_recv);
hputr4(st.buf, "NETPRCNS", ns_per_proc);

// Get and put min and max values. The "get-then-put" allows the
// user to reset the min max values in the status buffer.
hgeti8(st.buf, "NETWATMN", (long *)&status_ns);
status_ns = MIN(min_wait_ns, status_ns);
hputi8(st.buf, "NETWATMN", status_ns);

hgeti8(st.buf, "NETRECMN", (long *)&status_ns);
status_ns = MIN(min_recv_ns, status_ns);
hputi8(st.buf, "NETRECMN", status_ns);

hgeti8(st.buf, "NETPRCMN", (long *)&status_ns);
status_ns = MIN(min_proc_ns, status_ns);
hputi8(st.buf, "NETPRCMN", status_ns);

hgeti8(st.buf, "NETWATMX", (long *)&status_ns);
status_ns = MAX(max_wait_ns, status_ns);
hputi8(st.buf, "NETWATMX", status_ns);

hgeti8(st.buf, "NETRECMX", (long *)&status_ns);
status_ns = MAX(max_recv_ns, status_ns);
hputi8(st.buf, "NETRECMX", status_ns);

hgeti8(st.buf, "NETPRCMX", (long *)&status_ns);
status_ns = MAX(max_proc_ns, status_ns);
hputi8(st.buf, "NETPRCMX", status_ns);

hputu8(st.buf, "NETPKTS", pktsock_pkts);
hputu8(st.buf, "NETDROPS", pktsock_drops);

hgetu8(st.buf, "NETPKTTL", (long unsigned int*)&pktsock_pkts_total);
hgetu8(st.buf, "NETDRPTL", (long unsigned int*)&pktsock_drops_total);
hputu8(st.buf, "NETPKTTL", pktsock_pkts_total + pktsock_pkts);
hputu8(st.buf, "NETDRPTL", pktsock_drops_total + pktsock_drops);

hashpipe_status_unlock_safe(&st);

// Start new average
elapsed_wait_ns = 0;
elapsed_recv_ns = 0;
elapsed_proc_ns = 0;
packet_count = 0;
}
}
hashpipe_status_lock_safe(&st);
hputu8(st.buf, "BURSTPKT", burst_packet_count);
hashpipe_status_unlock_safe(&st);

// Warn if it looks like overflows are a danger
if (burst_packet_count*2 > hibv_ctx.recv_pkt_num) {
fprintf(stderr, "WARNING: got %lu packets in last burst\n", burst_packet_count);
}
// Release packets
if (hashpipe_ibv_release_pkts(&hibv_ctx, hibv_rpkt)) {
hashpipe_error(__FUNCTION__, "error releasing packets");
wait_ns = ELAPSED_NS(recv_start, start);
recv_ns = ELAPSED_NS(start, recv_stop);
elapsed_wait_ns += wait_ns;
elapsed_recv_ns += recv_ns;
min_wait_ns = MIN(wait_ns, min_wait_ns);
min_recv_ns = MIN(recv_ns, min_recv_ns);
max_wait_ns = MAX(wait_ns, max_wait_ns);
max_recv_ns = MAX(recv_ns, max_recv_ns);

burst_packet_count = 0;
for (curr_rpkt = hibv_rpkt; curr_rpkt; curr_rpkt = (struct hashpipe_ibv_recv_pkt *)curr_rpkt->wr.next) {
clock_gettime(CLOCK_MONOTONIC, &proc_start);
packet_count++;
burst_packet_count++;
// Make sure received packet size matches expected packet size. Allow
// for optional 8 byte CRC in received packet. Zlib's crc32 function
// is too slow to use in realtime, so CRCs cannot be checked on the
// fly. If data errors are suspected, a separate CRC checking utility
// should be used to read the packets from the network and verify CRCs.
int packet_size = curr_rpkt->length;
if (expected_packet_size != packet_size-8 && expected_packet_size != packet_size) {
// Log warning and ignore wrongly sized packet
#ifdef DEBUG_NET
hashpipe_warn(__FUNCTION__, "Invalid pkt size (%d)", packet_size);
#endif
continue;
}
// Copy packet into any blocks where it belongs.
const uint64_t mcnt = process_packet((paper_input_databuf_t *)db, (unsigned char *)curr_rpkt->wr.sg_list->addr);

clock_gettime(CLOCK_MONOTONIC, &stop);
proc_ns = ELAPSED_NS(proc_start, stop);
elapsed_proc_ns += proc_ns;
// Update min max values
min_proc_ns = MIN(proc_ns, min_proc_ns);
max_proc_ns = MAX(proc_ns, max_proc_ns);

if(mcnt != -1) {
// Update status
ns_per_wait = (float)elapsed_wait_ns / packet_count;
ns_per_recv = (float)elapsed_recv_ns / packet_count;
ns_per_proc = (float)elapsed_proc_ns / packet_count;
//fprintf(stdout, "ns_per_recv: %f, total_ns: %lu, packet count: %lu\n", ns_per_recv, elapsed_recv_ns, packet_count);

hashpipe_status_lock_busywait_safe(&st);

hputu8(st.buf, "NETMCNT", mcnt);
// Gbps = bits_per_packet / ns_per_packet
// (N_BYTES_PER_PACKET excludes header, so +8 for the header)
hputr4(st.buf, "NETGBPS", 8*(N_BYTES_PER_PACKET+8)/(ns_per_recv+ns_per_proc));
hputr4(st.buf, "NETWATNS", ns_per_wait);
hputr4(st.buf, "NETRECNS", ns_per_recv);
hputr4(st.buf, "NETPRCNS", ns_per_proc);

// Get and put min and max values. The "get-then-put" allows the
// user to reset the min max values in the status buffer.
hgeti8(st.buf, "NETWATMN", (long *)&status_ns);
status_ns = MIN(min_wait_ns, status_ns);
hputi8(st.buf, "NETWATMN", status_ns);

hgeti8(st.buf, "NETRECMN", (long *)&status_ns);
status_ns = MIN(min_recv_ns, status_ns);
hputi8(st.buf, "NETRECMN", status_ns);

hgeti8(st.buf, "NETPRCMN", (long *)&status_ns);
status_ns = MIN(min_proc_ns, status_ns);
hputi8(st.buf, "NETPRCMN", status_ns);

hgeti8(st.buf, "NETWATMX", (long *)&status_ns);
status_ns = MAX(max_wait_ns, status_ns);
hputi8(st.buf, "NETWATMX", status_ns);

hgeti8(st.buf, "NETRECMX", (long *)&status_ns);
status_ns = MAX(max_recv_ns, status_ns);
hputi8(st.buf, "NETRECMX", status_ns);

hgeti8(st.buf, "NETPRCMX", (long *)&status_ns);
status_ns = MAX(max_proc_ns, status_ns);
hputi8(st.buf, "NETPRCMX", status_ns);

hputu8(st.buf, "NETPKTS", pktsock_pkts);
hputu8(st.buf, "NETDROPS", pktsock_drops);

hgetu8(st.buf, "NETPKTTL", (long unsigned int*)&pktsock_pkts_total);
hgetu8(st.buf, "NETDRPTL", (long unsigned int*)&pktsock_drops_total);
hputu8(st.buf, "NETPKTTL", pktsock_pkts_total + pktsock_pkts);
hputu8(st.buf, "NETDRPTL", pktsock_drops_total + pktsock_drops);

hashpipe_status_unlock_safe(&st);

// Start new average
elapsed_wait_ns = 0;
elapsed_recv_ns = 0;
elapsed_proc_ns = 0;
packet_count = 0;
}
}
hashpipe_status_lock_safe(&st);
hputu8(st.buf, "BURSTPKT", burst_packet_count);
hashpipe_status_unlock_safe(&st);

// Warn if it looks like overflows are a danger
if (burst_packet_count*2 > hibv_ctx[i].recv_pkt_num) {
fprintf(stderr, "WARNING: got %lu packets in last burst\n", burst_packet_count);
}
// Release packets
if (hashpipe_ibv_release_pkts(&hibv_ctx[i], hibv_rpkt)) {
hashpipe_error(__FUNCTION__, "error releasing packets");
}
}
#if defined TIMING_TEST || defined NET_TIMING_TEST

Expand Down
5 changes: 5 additions & 0 deletions src/paper_databuf.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@
//#define N_FENGINES 192 # isn't this always the same as the number of antennas?
#define N_CHAN_PER_F N_CHAN_TOTAL

// Number of F-engine source ports used.
// Using more ports means more IBV receive buffers,
// which potentially increases performance
#define N_SRC_PORTS 8

// Number of separate X-engines which deal with
// alternate time chunks
#define TIME_DEMUX 2
Expand Down
2 changes: 1 addition & 1 deletion src/scripts/xtor_up.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def run_on_hosts(hosts, cmd, user=None, wait=True):
key = 'hashpipe://%s/%d/set' % (host, i)
r.publish(key, 'TIMEIDX=%d' % (hn//nhosts_per_timeslice))

time.sleep(2)
time.sleep(10)

# Let the network threads begin processing
for hn, host in enumerate(hosts):
Expand Down

0 comments on commit 4eef974

Please sign in to comment.