diff --git a/README.md b/README.md index 4346e16..db4af63 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,25 @@ -# PLUS Middlebox using FD.io/VPP +# VPP-based passive latency measurement middlebox -## IETF hackathon code -Checkout branch *quic_IETF_hackathon* +This VPP plugin adds support for passive latency measurements in FD.io. +The current implementation will estimate the RTT of: -## Installation +- QUIC flows using the latency spin signal (and other techniques) described + in our IMC'18 paper [Three Bits Suffice](https://nsg.ee.ethz.ch/fileadmin/user_upload/spinbit.pdf). + The following [fork of minq](https://github.com/pietdevaere/minq) adds the latency spin signal to QUIC traffic + such that it is detectable by the VPP plugin. +- TCP flows using the latency spin signal and/or TCP timestamps. + We provide [patches](https://github.com/mami-project/three-bits-suffice/tree/master/tcp/kernel_patches) + to add latency spin signal support to the Linux kernel. +- [PLUS](https://nsg.ee.ethz.ch/fileadmin/user_upload/CNSM_2017.pdf) flows using the PSN and PSE header fields. + For example [puic-go](https://github.com/mami-project/puic-go) can be used to add PLUS support to quic-go. + +## Installation + +You can either use Vagrant to set up everything automatically +or compile the plugin in an existing VPP installation. +The plugin is tested with the stable FD.io version 17.10. + +### Using Vagrant If not already available, install *Vagrant* and *VirtualBox* on your machine. Go to the Vagrant directory and execute: ``` @@ -14,7 +30,7 @@ To start Vagrant and connect via ssh (root access without password). Part of the Vagrant setup adapted from the [vpp-mb](https://github.com/mami-project/vpp-mb) project. -## Additional Vagrant commands +### Additional Vagrant commands Rsync the vpp-plus directory once more (e.g. useful after git pull): ``` cd vagrant @@ -32,6 +48,18 @@ vagrant up vagrant ssh ``` +### Compiling the plugin +To compile the plugin manually or adapt changes inside Vagrant, use: +``` +cd latency-plugin +sudo autoreconf -fis +sudo ./configure +make +sudo make install +``` + +Restart VPP, e.g. `sudo service vpp restart` + ## Important VPP commands Start VPP: `sudo service vpp start` @@ -41,89 +69,138 @@ You can either access the VPP shell with `sudo vppctl` and then interactively ex Use `sudo vppctl help` for a list of supported commands. -### Important general commands +### General commands List of interfaces: `sudo vppctl show interface` (you can also shorten the commands, e.g. `sudo vppctl sh int`) Show the VPP graph: `sudo vppctl show vlib graph` -Add packet trace for (50 packets) `sudo vppctl trace add af-packet-input 50` +Add a packet trace storing 50 packets `sudo vppctl trace add af-packet-input 50` -Display the trace: `sudo vppctl show trace` +Display the captured packets in the trace: `sudo vppctl show trace` -Execute multiple commands from a file: `sudo vppctl exec ` +Execute multiple VPP commands from a file (one command per line): `sudo vppctl exec ` -### PLUS specific commands -Add an interface to the PLUS plugin: `sudo vppctl plus ` +### Latency plugin specific commands +To get an overview, use: `sudo vppctl latency help` -Remove an interface: `sudo vppctl plus disable` +Add an interface to the plugin: `sudo vppctl latency interface ` -List all active PLUS flows: `sudo vppctl plus stat` +Remove an interface: `sudo vppctl latency interface disable` -## Connect Vagrant VM to host machine and run go plus-echo test -This setup assumes you use VirtualBox as provider for the Vagrant VM! +List all currently active flows with latency estimations: `sudo vppctl latency stats` -On your local machine in VirtualBox: Go to `Virtualbox --> Preferences...`. In the "Network tab" add *two* "Host-only Networks". Change the configuration: -``` -Network 1: IPv4 Address: 192.168.100.1, IPv4 Network Mask: 255.255.255.0 -Network 2: IPv4 Address: 192.168.101.1, IPv4 Network Mask: 255.255.255.0 -``` -Now start the Vagrant VM and execute the following commands *inside the VM*: -``` -sudo service vpp start -sudo vppctl ex /home/vagrant/plus-mb/scripts/external_vpp_interface.conf # Add IPs to the interfaces inside VPP -sudo vppctl plus GigabitEthernet0/8/0 # Add interfaces to the plus plugin -sudo vppctl plus GigabitEthernet0/9/0 -``` -VPP should now be ready. Back on the *local machine*: -``` -sudo route add 192.168.100.1/32 gw 192.168.101.2 # Add static routes for go client and server -sudo route add 192.168.101.1/32 gw 192.168.100.2 -# if not available, install golan-1.9: sudo apt-get install golang-1.9-go -go get github.com/FMNSSun/plus-echo -cd go/src/github.com/FMNSSun/plus-echo -go build client.go -go build server.go -./server -local-addr=192.168.101.1:4000 -./client -local-addr=192.168.100.1:3000 -remote-addr=192.168.101.1:4000 # in a different terminal -``` -The go PLUS client should send a PLUS packet to the server and get a reply back. - -In the Vagrant vm you should see two observed PlUS packets using e.g. `sudo vppctl plus stat` +Set the IPv4 address the plugin is listening to `sudo vppctl latency mb_ip ` -**Important:** To add a packet trace, use `sudo vppctl trace add dpdk-input 50`. +Add a UDP port number that indicates QUIC traffic `sudo vppctl latency quic_port `. +Can be repeated with different ports. -## Simple example -Go to the *scripts* directory and make `ns_setup.sh` executable (`chmod +x ns_setup.sh`) +Add NAT-like functionalities `sudo vppctl latency nat `. This is useful if you +want to deploy the middlebox such that it can make on-path measurements taking traffic in +both directions into account. Can be repeated with different pairs of ports and IPs. +See next section for more information. -Execute `ns_setup.sh` to generate virtual namespaces veth pairs (`sudo ./ns_setup.sh`) - -Start VPP: `sudo service vpp start` +## On-path latency measurements +To be able to perform on-path measurements and observing traffic from the client +to the server **and** the reverse traffic, we added NAT-like functionalities to the +latency plugin. -Execute the file `vpp_interface.conf` to connect the virtual namespaces to VPP: +As an example, assume the VPP middlebox has the IP 1.2.3.4 (defined with `sudo vppctl latency mb_ip 1.2.3.4`). +Now we would like to be able to forward traffic towards the server 5.6.7.8 through the middlebox. +For that, we arbitrarily associate port 8888 with the dst IP 5.6.7.8 and add that to the plugin with +`sudo vppctl latency 5.6.7.8 8888`. Any client can now send traffic to 5.6.7.8 (over the middlebox) +by sending traffic towards the IP of the middlebox (1.2.3.4) with dst port 8888. +Whenever the plugin receives traffic with dst port 8888, it will: -`sudo vppctl exec /home/vagrant/plus-mb/scripts/vpp_interface.conf` +1. save the observed src IP and replace it with its own IP (1.2.3.4) +1. replace the dst IP with the IP of the corresponding server (5.6.7.8) +1. send the traffic towards the new destination (src and dst ports are not changed) -(Use `sudo vppctl sh int` to confirm that the new interfaces are visible: *host-vpp1* and *host-vpp2*) +Once it receives traffic back from the server, it reverses the process and sends it to the original client. -Add the two interfaces to the PLUS plugin, such that it analyzes traffic coming from these interfaces: +Following a list of sample commands to configure VPP and the plugin to implement the previous example. +We assume that the server running VPP is inside a network with IP space 1.2.3.0/24 and the gateway +towards the Internet has the IP 1.2.3.1. The VPP server has one interface (called `GigabitEthernet3/0/0`). +The actual interface name depends one the used implementation/hardware and can be found with `sudo vppctl sh int`. -`sudo vppctl plus host-vpp1` and `sudo vppctl plus host-vpp2` - -Use the (very basic) Python scripts to generate PLUS traffic. The sender and receiver are each executed in a separate namespace (connected via VPP). +``` + set int state GigabitEthernet3/0/0 up + set int ip address GigabitEthernet3/0/0 1.2.3.4/24 + ip route add 0.0.0.0/0 via 1.2.3.1 GigabitEthernet3/0/0 + latency interface GigabitEthernet3/0/0 + latency mb_ip 1.2.3.4 + latency nat 5.6.7.8 8888 + latency quic_port 8888 +``` -Receiver: `sudo ip netns exec vpp2 python receiver.py` +The last command declares traffic towards/from port 8888 as QUIC traffic. +All these commands can be saved in a file (e.g. `setup.conf`) and executed +with `sudo vppctl exec setup.conf`. -Sender: `sudo ip netns exec vpp1 python sender.py` +### Connect Vagrant to host machine -Use `sudo vppctl plus stat` to see the generated flow or use the packet trace commands from above. +If you use the Vagrant installation and want to connect the VM to the host machine, +use "Host-only Networks" (assuming VirtualBox as provider for the Vagrant VM). +On your local machine in VirtualBox: Go to `Virtualbox --> Preferences...`. +In the "Network tab" add two "Host-only Networks" and change the configuration: +``` +Network 1: IPv4 Address: 192.168.100.1, IPv4 Network Mask: 255.255.255.0 +Network 2: IPv4 Address: 192.168.101.1, IPv4 Network Mask: 255.255.255.0 +``` -## Ongoing work -* Performance improvements (implementation of double loop) -* Full multi-thread support -* More test cases -* Support for moving endpoints (e.g. src IP change) -* Support for IP headers with options -* IPv6 support +Restart the Vagrant VM and VPP should see the interfaces when using `sudo vppctl sh int` +as `GigabitEthernet0/8/0` and `GigabitEthernet0/9/0`. Add the corresponding IPs: +``` +set int state GigabitEthernet0/8/0 up +set int ip address GigabitEthernet0/8/0 192.168.100.2/24 +set int state GigabitEthernet0/9/0 up +set int ip address GigabitEthernet0/9/0 192.168.101.2/24 +``` -## Known limitations -* Currently only support for 2048 concurrent flows (should be more than enough for initial tests) +## Measurement results + +The VPP plugin writes latency measurement results to the `/tmp` folder using different +files for QUIC, TCP and PLUS traffic (`/tmp/latency_{plus,tcp,quic}_printf.out`). +The data is saved as CSV files. All latency estimations are in seconds. + +### QUIC latency measurements +Header of the CSV file: `time,pn,host,spin_data,spin_new,pn_spin_data,pn_spin_new,vec_data,vec_new,heur_data,heur_new` +- `time`: time since start of VPP in seconds +- `pn`: packet number of observed QUIC packet +- `host`: server or client direction +- `spin_data`: latency estimation taking only the latency spin bit into account +- `spin_new`: does the `spin_data` contain a new estimation (0 or 1) +- `pn_spin_data`: latency estimation based on the spin bit only but rejecting reordered packets based on the packet number +- `pn_spin_new`: does the `pn_spin_new` contain a new estimation (0 or 1) +- `vec_data`: latency estimation based on the full spin signal (spin bit and VEC) +- `vec_new`: does the `vec_data` contain a new estimation (0 or 1) +- `heur_data`: latency estimation based on the spin bit only but rejecting RTT samples based on a heuristic +- `heur_new`: does the `heur_data` contain a new estimation (0 or 1) + +More information can be found in our [IMC paper](https://nsg.ee.ethz.ch/fileadmin/user_upload/spinbit.pdf). + +### TCP latency measurements +Header of the CSV file: `time,host,seq_num,vec_data,vec_new,single_ts_rtt_data,single_ts_rtt_new,all_ts_rtt_data,all_ts_rtt_new,vec_ne_zero_data,vec_ne_zero_new` +- `time`: time since start of VPP in seconds +- `host`: server or client direction +- `seq_num`: sequence number of observed TCP packet +- `vec_data`: latency estimation based on the full spin signal (spin bit and VEC) +- `vec_new`: does the `vec_data` contain a new estimation (0 or 1) +- `single_ts_rtt_data`: latency estimation based on one timestamp per RTT +- `single_ts_rtt_new`: does the `single_ts_rtt_data` contain a new estimation (0 or 1) +- `all_ts_rtt_data`: latency estimation based on every available timestamp value +- `all_ts_rtt_new`: does the `all_ts_rtt_data` contain a new estimation (0 or 1) +- `vec_ne_zero_data`: latency estimation based on the full spin signal (spin bit and VEC) taking every non-zero VEC value into account +- `vec_ne_zero_new`: does the `vec_ne_zero_data` contain a new estimation (0 or 1) + +### PLUS latency measurements +Header of the CSV file: `time,host,#pkt,psn,pse,cat,psn_pse_data,psn_pse_new` +- `time`: time since start of VPP in seconds +- `host`: server of client directions +- `#pkt`: number of packet in PLUS flow +- `psn`: Packet Serial Number of the observed PLUS packet +- `pse`: Packet Serial Echo of the observed PLUS packet +- `cat`: Connection and Association Token of the observed PLUS packet +- `psn_pse_data`: latency estimation based on the PSN and PSE +- `psn_pse_new`: does the `psn_pse_data` contain a new estimation (0 or 1) + +More information can be found in our [PLUS paper](https://nsg.ee.ethz.ch/fileadmin/user_upload/CNSM_2017.pdf). diff --git a/plus-plugin/Makefile.am b/latency-plugin/Makefile.am similarity index 98% rename from plus-plugin/Makefile.am rename to latency-plugin/Makefile.am index 673b2ef..93d4c55 100644 --- a/plus-plugin/Makefile.am +++ b/latency-plugin/Makefile.am @@ -29,7 +29,7 @@ ACLOCAL_AMFLAGS = -I m4 vppapitestpluginsdir = ${libdir}/vpp_api_test_plugins vpppluginsdir = ${libdir}/vpp_plugins -include plus.am +include latency.am %.api.h: %.api mkdir -p `dirname $@` ; \ diff --git a/plus-plugin/configure.ac b/latency-plugin/configure.ac similarity index 81% rename from plus-plugin/configure.ac rename to latency-plugin/configure.ac index 204da2f..983ef25 100644 --- a/plus-plugin/configure.ac +++ b/latency-plugin/configure.ac @@ -1,4 +1,4 @@ -AC_INIT(vpp_plugins, 1.0) +AC_INIT(latency_plugins, 1.0) LT_INIT AM_INIT_AUTOMAKE AM_SILENT_RULES([yes]) diff --git a/plus-plugin/plus.am b/latency-plugin/latency.am similarity index 61% rename from plus-plugin/plus.am rename to latency-plugin/latency.am index b15effa..24e53be 100644 --- a/plus-plugin/plus.am +++ b/latency-plugin/latency.am @@ -11,21 +11,21 @@ # See the License for the specific language governing permissions and # limitations under the License. -vppapitestplugins_LTLIBRARIES += plus_test_plugin.la -vppplugins_LTLIBRARIES += plus_plugin.la +vppapitestplugins_LTLIBRARIES += latency_test_plugin.la +vppplugins_LTLIBRARIES += latency_plugin.la -plus_plugin_la_SOURCES = \ - plus/plus.c \ - plus/node.c \ - plus/plus_plugin.api.h +latency_plugin_la_SOURCES = \ + latency/latency.c \ + latency/node.c \ + latency/latency_plugin.api.h -API_FILES += plus/plus.api +API_FILES += latency/latency.api nobase_apiinclude_HEADERS += \ - plus/plus_all_api_h.h \ - plus/plus_msg_enum.h \ - plus/plus.api.h + latency/latency_all_api_h.h \ + latency/latency_msg_enum.h \ + latency/latency.api.h -plus_test_plugin_la_SOURCES = plus/plus_test.c plus/plus_plugin.api.h +latency_test_plugin_la_SOURCES = latency/latency_test.c latency/latency_plugin.api.h # vi:syntax=automake diff --git a/plus-plugin/plus/plus.api b/latency-plugin/latency/latency.api similarity index 95% rename from plus-plugin/plus/plus.api rename to latency-plugin/latency/latency.api index baa7fb4..34e448d 100644 --- a/plus-plugin/plus/plus.api +++ b/latency-plugin/latency/latency.api @@ -16,7 +16,7 @@ /* Define a simple binary API to control the feature */ -autoreply define plus_enable_disable { +autoreply define latency_enable_disable { /* Client identifier, set from api_main.my_client_index */ u32 client_index; diff --git a/latency-plugin/latency/latency.c b/latency-plugin/latency/latency.c new file mode 100644 index 0000000..cc3fa1f --- /dev/null +++ b/latency-plugin/latency/latency.c @@ -0,0 +1,1302 @@ +/* + * Copyright (c) 2015 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * @file + * @brief Latency plugin, plugin API / trace / CLI handling. + */ + +#include +#include +#include + +#include +#include +#include + +/* define message IDs */ +#include + +/* define message structures */ +#define vl_typedefs +#include +#undef vl_typedefs + +/* define generated endian-swappers */ +#define vl_endianfun +#include +#undef vl_endianfun + +/* instantiate all the print functions we know about */ +#define vl_print(handle, ...) vlib_cli_output (handle, __VA_ARGS__) +#define vl_printfun +#include +#undef vl_printfun + +/* Get the API version number */ +#define vl_api_version(n,v) static u32 api_version=(v); +#include +#undef vl_api_version + +#define REPLY_MSG_ID_BASE pm->msg_id_base +#include + +/* List of message types that this plugin understands */ +#define foreach_latency_plugin_api_msg \ +_(LATENCY_ENABLE_DISABLE, latency_enable_disable) + +/* *INDENT-OFF* */ +VLIB_PLUGIN_REGISTER () = { + .version = LATENCY_PLUGIN_BUILD_VER, + .description = "LATENCY middlebox VPP Plugin", +}; +/* *INDENT-ON* */ + +/** + * @brief Enable/disable the plugin. + * + * Action function shared between message handler and debug CLI. + */ +int latency_enable_disable (latency_main_t * pm, u32 sw_if_index, + int enable_disable) +{ + vnet_sw_interface_t * sw; + int rv = 0; + + /* Utterly wrong? */ + if (pool_is_free_index (pm->vnet_main->interface_main.sw_interfaces, + sw_if_index)) + return VNET_API_ERROR_INVALID_SW_IF_INDEX; + + /* Not a physical port? */ + sw = vnet_get_sw_interface (pm->vnet_main, sw_if_index); + if (sw->type != VNET_SW_INTERFACE_TYPE_HARDWARE) + return VNET_API_ERROR_INVALID_SW_IF_INDEX; + + + vnet_feature_enable_disable ("ip4-unicast", "latency", + sw_if_index, enable_disable, 0, 0); + return rv; +} + +static clib_error_t * +latency_enable_disable_command_fn (vlib_main_t * vm, + unformat_input_t * input, + vlib_cli_command_t * cmd) +{ + latency_main_t * pm = &latency_main; + u32 sw_if_index = ~0; + int enable_disable = 1; + + int rv; + + while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT) { + if (unformat (input, "disable")) + enable_disable = 0; + else if (unformat (input, "%U", unformat_vnet_sw_interface, + pm->vnet_main, &sw_if_index)) + ; + else + break; + } + + if (sw_if_index == ~0) + return clib_error_return (0, "Please specify an interface..."); + + rv = latency_enable_disable (pm, sw_if_index, enable_disable); + + switch(rv) { + case 0: + break; + + case VNET_API_ERROR_INVALID_SW_IF_INDEX: + return clib_error_return + (0, "Invalid interface, only works on physical ports"); + break; + + case VNET_API_ERROR_UNIMPLEMENTED: + return clib_error_return (0, "Device driver doesn't support redirection"); + break; + + default: + return clib_error_return (0, "latency_enable_disable returned %d", + rv); + } + return 0; +} + +/** + * @brief format function (print each active flow) + */ +u8 * format_sessions(u8 *s, va_list *args) { + latency_main_t * pm = &latency_main; + + s = format(s, "Total flows: %u, total active flows: %u\n", + pm->total_flows, pm->active_flows); + latency_session_t * session; + + s = format(s, "=======================================================\n"); + + /* Iterate through all pool entries */ + pool_foreach (session, pm->session_pool, ({ + switch (session->p_type) { + case P_TCP: + s = format(s, "TCP: observed packets: %u\n", session->pkt_count); + s = format(s, "VEC (client, server): %.*lfs %.*lfs\n", + STAT_PRECISION, session->tcp->status_spin_observer.rtt_client, + STAT_PRECISION, session->tcp->status_spin_observer.rtt_server); + s = format(s, "TS single (client, server): %.*lfs %.*lfs\n", + STAT_PRECISION, session->tcp->ts_one_RTT_observer.rtt_client, + STAT_PRECISION, session->tcp->ts_one_RTT_observer.rtt_server); + s = format(s, "TS all (client, server): %.*lfs %.*lfs\n", + STAT_PRECISION, session->tcp->ts_all_RTT_observer.rtt_client, + STAT_PRECISION, session->tcp->ts_all_RTT_observer.rtt_server); + break; + + case P_QUIC: + s = format(s, "QUIC: observed packets: %u\n", session->pkt_count); + s = format(s, "Spin basic (client, server): %.*lfs %.*lfs\n", + STAT_PRECISION, session->quic->basic_spin_observer.rtt_client, + STAT_PRECISION, session->quic->basic_spin_observer.rtt_server); + s = format(s, "Spin pn (client, server): %.*lfs %.*lfs\n", + STAT_PRECISION, session->quic->pn_spin_observer.rtt_client, + STAT_PRECISION, session->quic->pn_spin_observer.rtt_server); + s = format(s, "VEC (client, server): %.*lfs %.*lfs\n", + STAT_PRECISION, session->quic->status_spin_observer.rtt_client, + STAT_PRECISION, session->quic->status_spin_observer.rtt_server); + s = format(s, "Spin heur (client, server): %.*lfs %.*lfs\n", + STAT_PRECISION, session->quic->dyna_heur_spin_observer.rtt_client[session->quic->dyna_heur_spin_observer.index_client], + STAT_PRECISION, session->quic->dyna_heur_spin_observer.rtt_server[session->quic->dyna_heur_spin_observer.index_server]); + break; + + case P_PLUS: + s = format(s, "PLUS: observed packets: %u\n", session->pkt_count); + s = format(s, "PSN/PSE (client, server): %.*lfs %.*lfs\n", + STAT_PRECISION, session->plus->plus_single_observer.rtt_src, + STAT_PRECISION, session->plus->plus_single_observer.rtt_dst); + break; + + default: + s = format(s, "Unknown protocol type - error!"); + break; + } + s = format(s, "=======================================================\n"); + })); + return s; +} + +static clib_error_t * latency_show_stats_fn(vlib_main_t * vm, + unformat_input_t * input, vlib_cli_command_t * cmd) { + vl_print(vm, "%U", format_sessions); + return 0; +} + +static clib_error_t * latency_show_version_fn(vlib_main_t * vm, + unformat_input_t * input, vlib_cli_command_t * cmd) { + vl_print(vm, "V 0.1, support for TCP, QUIC and PLUS"); + return 0; +} + +static clib_error_t * latency_add_port_fn(vlib_main_t * vm, + unformat_input_t * input, vlib_cli_command_t * cmd) { + latency_main_t * pm = &latency_main; + u32 quic_port = 0; + + if (!unformat (input, "%d", &quic_port)) { + return clib_error_return (0, "Please specify a correct port."); + } + if (quic_port >= 65536) { + return clib_error_return (0, "Please specify a correct port."); + } + + hash_set(pm->hash_quic_ports, clib_host_to_net_u16(quic_port), 1); + + return 0; +} + +static clib_error_t * latency_add_nat_fn(vlib_main_t * vm, + unformat_input_t * input, vlib_cli_command_t * cmd) { + latency_main_t * pm = &latency_main; + u32 port, ip[4]; + ip4_address_t ip4; + + if (!unformat(input, "%d.%d.%d.%d %d", &ip[0], &ip[1], &ip[2], &ip[3], &port)) { + return clib_error_return (0, "Please enter a correct IP and port, e.g.: latency nat 1.2.3.4 555"); + } + if (ip[0] >= 256 || ip[1] >= 256 || ip[2] >= 256 || ip[3] >= 256) { + return clib_error_return (0, "Please enter a correct IP and port, e.g.: latency nat 1.2.3.4 555"); + } + if (port >= 65536) { + return clib_error_return (0, "Please enter a correct IP and port, e.g.: latency nat 1.2.3.4 555"); + } + + ip4.as_u8[3] = ip[0]; + ip4.as_u8[2] = ip[1]; + ip4.as_u8[1] = ip[2]; + ip4.as_u8[0] = ip[3]; + + hash_set(pm->hash_server_ports_to_ips, + clib_host_to_net_u16(port), + clib_host_to_net_u32(ip4.as_u32)); + + return 0; +} + +static clib_error_t * latency_add_ip_fn(vlib_main_t * vm, + unformat_input_t * input, vlib_cli_command_t * cmd) { + latency_main_t * pm = &latency_main; + u32 ip[4]; + ip4_address_t ip4; + + if (!unformat(input, "%d.%d.%d.%d", &ip[0], &ip[1], &ip[2], &ip[3])) { + return clib_error_return (0, "Please enter a correct IP, e.g.: latency ip 10.0.0.1"); + } + if (ip[0] >= 256 || ip[1] >= 256 || ip[2] >= 256 || ip[3] >= 256) { + return clib_error_return (0, "Please enter a correct IP, e.g.: latency ip 10.0.0.1"); + } + + ip4.as_u8[3] = ip[0]; + ip4.as_u8[2] = ip[1]; + ip4.as_u8[1] = ip[2]; + ip4.as_u8[0] = ip[3]; + + pm->mb_ip = clib_host_to_net_u32(ip4.as_u32); + + return 0; +} + +/** + * @brief CLI command to enable/disable the latency plugin. + */ +VLIB_CLI_COMMAND (sr_content_command, static) = { + .path = "latency interface", + .short_help = + "Add an interface to the latency plugin: latency interface [disable]", + .function = latency_enable_disable_command_fn, +}; + +/** + * @brief CLI command to show all active flows + */ +VLIB_CLI_COMMAND (sr_content_command_stats, static) = { + .path = "latency stats", + .short_help = "Show latency information for all tracked flows: latency stats", + .function = latency_show_stats_fn, +}; + +/** + * @brief CLI command to show version + */ +VLIB_CLI_COMMAND (sr_content_command_version, static) = { + .path = "latency version", + .short_help = "Show latency plugin version information: latency version", + .function = latency_show_version_fn, +}; + +/** + * @brief CLI command to add QUIC dst port + */ +VLIB_CLI_COMMAND (sr_content_command_port, static) = { + .path = "latency quic_port", + .short_help = "Add a port to identify QUIC traffic (can be repeated): latency quic_port ", + .function = latency_add_port_fn, +}; + +/** + * @brief CLI command to add IP port "NAT" entry + */ +VLIB_CLI_COMMAND (sr_content_command_nat, static) = { + .path = "latency nat", + .short_help = "Add middlebox NAT functionality (can be repeated): latency nat ", + .function = latency_add_nat_fn, +}; + +/** + * @brief CLI command to add MB IP + */ +VLIB_CLI_COMMAND (sr_content_command_ip, static) = { + .path = "latency mb_ip", + .short_help = "Set IP of the VPP middlebox: latency mb_ip ", + .function = latency_add_ip_fn, +}; + +/** + * @brief LATENCY API message handler. + */ +static void vl_api_latency_enable_disable_t_handler + (vl_api_latency_enable_disable_t * mp) { + vl_api_latency_enable_disable_reply_t * rmp; + latency_main_t * pm = &latency_main; + int rv; + + rv = latency_enable_disable (pm, ntohl(mp->sw_if_index), + (int) (mp->enable_disable)); + + REPLY_MACRO(VL_API_LATENCY_ENABLE_DISABLE_REPLY); +} + +/** + * @brief Set up the API message handling tables. + */ +static clib_error_t * +latency_plugin_api_hookup (vlib_main_t *vm) { + latency_main_t * pm = &latency_main; +#define _(N,n) \ + vl_msg_api_set_handlers((VL_API_##N + pm->msg_id_base), \ + #n, \ + vl_api_##n##_t_handler, \ + vl_noop_handler, \ + vl_api_##n##_t_endian, \ + vl_api_##n##_t_print, \ + sizeof(vl_api_##n##_t), 1); + foreach_latency_plugin_api_msg; +#undef _ + + return 0; +} + +#define vl_msg_name_crc_list +#include +#undef vl_msg_name_crc_list + +static void +setup_message_id_table (latency_main_t * pm, api_main_t *am) { +#define _(id,n,crc) \ + vl_msg_api_add_msg_name_crc (am, #n "_" #crc, id + pm->msg_id_base); + foreach_vl_msg_name_crc_latency; +#undef _ +} + +/** + * @brief create the hash key + */ +void make_key(latency_key_t * kv, u32 src_ip, u32 dst_ip, + u16 src_p, u16 dst_p, u8 protocol) { + if (src_ip == 0) { + src_ip = latency_main.mb_ip; + } + kv->s_x_d_ip = src_ip ^ dst_ip; + kv->s_x_d_port = src_p ^ dst_p; + kv->protocol = protocol; +} + +void make_plus_key(latency_key_t * kv, u32 src_ip, u32 dst_ip, + u16 src_p, u16 dst_p, u8 protocol, u64 cat) { + make_key(kv, src_ip, dst_ip, src_p, dst_p, protocol); + kv->as_u64 = kv->as_u64 ^ cat; +} + +/** + * @brief get session pointer if corresponding key is known + */ +latency_session_t * get_session_from_key(latency_key_t * kv_in) { + BVT(clib_bihash_kv) kv, kv_return; + latency_main_t *pm = &latency_main; + BVT(clib_bihash) *bi_table; + bi_table = &pm->latency_table; + kv.key = kv_in->as_u64; + int rv = BV(clib_bihash_search) (bi_table, &kv, &kv_return); + if (rv != 0) { + /* Key does not exist */ + return 0; + } else { + return get_latency_session(kv_return.value); + } +} + +bool ip_nat_translation(ip4_header_t *ip0, u32 init_src_ip, u32 new_dst_ip) { + if (ip0->src_address.as_u32 == init_src_ip) { + ip0->src_address.as_u32 = latency_main.mb_ip; + ip0->dst_address.as_u32 = new_dst_ip; + return true; + } + if (ip0->src_address.as_u32 == new_dst_ip) { + ip0->src_address.as_u32 = latency_main.mb_ip; + ip0->dst_address.as_u32 = init_src_ip; + return true; + } + return false; +} + +/* Update all RTT estimations for QUIC packets */ +void update_quic_rtt_estimate(vlib_main_t * vm, quic_observer_t * session, + f64 now, u16 src_port, u16 init_src_port, u8 measurement, + u32 packet_number, u32 pkt_count) { + + bool spin = measurement & ONE_BIT_SPIN; + u8 status_bits = (measurement & STATUS_MASK) >> STATUS_SHIFT; + bool basic = basic_latency_estimate(vm, &(session->basic_spin_observer), + now, src_port, init_src_port, spin); + + // TODO: will fail if packet number is 0 + bool pn = false; + if (packet_number) { + pn = pn_latency_estimate(vm, &(session->pn_spin_observer), + now, src_port, init_src_port, spin, packet_number); + } + /* VEC estimator */ + bool status = status_estimate(vm, &(session->status_spin_observer), + now, src_port, init_src_port, spin, status_bits); + bool dyna = heuristic_estimate(vm, &(session->dyna_heur_spin_observer), + now, src_port, init_src_port, spin); + + /* Now it is time to print the rtt estimates to a file */ + /* If this is the first time we run, print CSV file header */ + if (pkt_count == 1){ + latency_printf(0, "%s,%s,%s", "time", "pn", "host"); + latency_printf(0, ",%s,%s", "spin_data", "spin_new"); + latency_printf(0, ",%s,%s", "pn_spin_data", "pn_spin_new"); + latency_printf(0, ",%s,%s", "vec_data", "vec_new"); + latency_printf(0, ",%s,%s", "heur_data", "heur_new"); + latency_printf(0, "\n"); + } + + /* If at least one update */ + if (basic || pn || status || dyna) { + /* Now print the actual data */ + if (src_port == init_src_port) { + latency_printf(0, "%.*lf,%u,%s", TIME_PRECISION, now, + packet_number, "server"); + latency_printf(0, ",%.*lf", RTT_PRECISION, session->basic_spin_observer.rtt_server); + latency_printf(0, ",%d", session->basic_spin_observer.new_server); + latency_printf(0, ",%.*lf", RTT_PRECISION, session->pn_spin_observer.rtt_server); + latency_printf(0, ",%d", session->pn_spin_observer.new_server); + latency_printf(0, ",%.*lf", RTT_PRECISION, session->status_spin_observer.rtt_server); + latency_printf(0, ",%d", session->status_spin_observer.new_server); + latency_printf(0, ",%.*lf", RTT_PRECISION, + session->dyna_heur_spin_observer.rtt_server[session->dyna_heur_spin_observer.index_server]); + latency_printf(0, ",%d", session->dyna_heur_spin_observer.new_server); + latency_printf(1, "\n"); + + session->basic_spin_observer.new_server = false; + session->pn_spin_observer.new_server = false; + session->status_spin_observer.new_server = false; + session->dyna_heur_spin_observer.new_server = false; + + } else { + latency_printf(0, "%.*lf,%u,%s", TIME_PRECISION, now, + packet_number, "client"); + latency_printf(0, ",%.*lf", RTT_PRECISION, session->basic_spin_observer.rtt_client); + latency_printf(0, ",%d", session->basic_spin_observer.new_client); + latency_printf(0, ",%.*lf", RTT_PRECISION, session->pn_spin_observer.rtt_client); + latency_printf(0, ",%d", session->pn_spin_observer.new_client); + latency_printf(0, ",%.*lf", RTT_PRECISION, session->status_spin_observer.rtt_client); + latency_printf(0, ",%d", session->status_spin_observer.new_client); + latency_printf(0, ",%.*lf", RTT_PRECISION, + session->dyna_heur_spin_observer.rtt_client[session->dyna_heur_spin_observer.index_client]); + latency_printf(0, ",%d", session->dyna_heur_spin_observer.new_client); + latency_printf(1, "\n"); + + session->basic_spin_observer.new_client = false; + session->pn_spin_observer.new_client = false; + session->status_spin_observer.new_client = false; + session->dyna_heur_spin_observer.new_client = false; + } + } +} + +/** + * BASIC latency estimator + */ +bool basic_latency_estimate(vlib_main_t * vm, basic_spin_observer_t *observer, + f64 now, u16 src_port, u16 init_src_port, bool spin) { + /* if this is a packet from the SERVER */ + if (src_port != init_src_port) { + if (observer->spin_server != spin) { + observer->spin_server = spin; + observer->rtt_server = now - observer->time_last_spin_server; + observer->new_server = true; + observer->time_last_spin_server = now; + return true; + } + /* if this is a packet from the CLIENT */ + } else { + if (observer->spin_client != spin) { + observer->spin_client = spin; + observer->rtt_client = now - observer->time_last_spin_client; + observer->new_client = true; + observer->time_last_spin_client = now; + return true; + } + } + return false; +} + +/* + * (PN) observer + */ +//TODO this does not handle PN wrap around yet +bool pn_latency_estimate(vlib_main_t * vm, pn_spin_observer_t *observer, + f64 now, u16 src_port, u16 init_src_port, bool spin, u32 packet_number) { + /* if this is a packet from the SERVER */ + if (src_port != init_src_port) { + /* check if arrived in order and has different spin */ + if (packet_number > observer->pn_server && observer->spin_server != spin) { + observer->spin_server = spin; + observer->pn_server = packet_number; + observer->rtt_server = now - observer->time_last_spin_server; + observer->new_server = true; + observer->time_last_spin_server = now; + return true; + } + /* if this is a packet from the CLIENT */ + } else { + /* check if arrived in order and has different spin */ + if (packet_number > observer->pn_client && observer->spin_client != spin) { + observer->spin_client = spin; + observer->pn_client = packet_number; + observer->rtt_client = now - observer->time_last_spin_client; + observer->new_client = true; + observer->time_last_spin_client = now; + return true; + } + } + return false; +} + +/* + * VEC observer + */ +bool status_estimate(vlib_main_t * vm, status_spin_observer_t *observer, + f64 now, u16 src_port, u16 init_src_port, bool spin, u8 status) { + bool update = false; + /* if this is a packet from the SERVER */ + if (src_port != init_src_port) { + /* check if arrived in order and has different spin */ + if (observer->spin_server != spin) { + observer->spin_server = spin; + /* only report and store RTT if it was valid over the entire round trip */ + if (status == STATUS_VALID){ + observer->rtt_server = now - observer->time_last_spin_server; + observer->new_server = true; + update = true; + } + } + if (status != STATUS_INVALID) observer->time_last_spin_server = now; + + /* if this is a packet from the CLIENT */ + } else { + /* check if arrived in order and has different spin */ + if (observer->spin_client != spin) { + observer->spin_client = spin; + /* only report and store RTT if it was valid over the entire round trip */ + if (status == STATUS_VALID){ + observer->rtt_client = now - observer->time_last_spin_client; + observer->new_client = true; + update = true; + } + } + if (status != STATUS_INVALID) observer->time_last_spin_client = now; + } + return update; +} + +/* + * VEC ne zero estimate + */ +bool vec_ne_zero_estimate(vlib_main_t * vm, status_spin_observer_t *observer, + f64 now, u16 src_port, u16 init_src_port, bool spin, u8 status) { + bool update = false; + /* if this is a packet from the SERVER */ + if (src_port != init_src_port) { + /* check if arrived in order and has different spin */ + if (observer->spin_server != spin) { + observer->spin_server = spin; + /* only report and store RTT if it was valid over the entire round trip */ + if (status != STATUS_INVALID){ + observer->rtt_server = now - observer->time_last_spin_server; + observer->new_server = true; + update = true; + } + } + if (status != STATUS_INVALID) observer->time_last_spin_server = now; + + /* if this is a packet from the CLIENT */ + } else { + /* check if arrived in order and has different spin */ + if (observer->spin_client != spin) { + observer->spin_client = spin; + /* only report and store RTT if it was valid over the entire round trip */ + if (status != STATUS_INVALID){ + observer->rtt_client = now - observer->time_last_spin_client; + observer->new_client = true; + update = true; + } + } + if (status != STATUS_INVALID) observer->time_last_spin_client = now; + } + return update; +} + +/* + * Dynamic heuristic observer + */ +bool heuristic_estimate(vlib_main_t * vm, dyna_heur_spin_observer_t *observer, + f64 now, u16 src_port, u16 init_src_port, bool spin) { + bool update = false; + /* if this is a packet from the SERVER */ + if (src_port != init_src_port) { + if (observer->spin_server != spin) { + observer->spin_server = spin; + f64 rtt_candidate = now - observer->time_last_spin_server; + + /* calculate the acceptance threshold */ + f64 acceptance_threshold = observer->rtt_server[0]; + for(int i = 1; i < DYNA_HEUR_HISTORY_SIZE; i++){ + if (observer->rtt_server[i] < acceptance_threshold){ + acceptance_threshold = observer->rtt_server[i]; + } + } + acceptance_threshold *= DYNA_HEUR_THRESHOLD; + + if (rtt_candidate > acceptance_threshold || + observer->rejected_server >= DYNA_HEUR_MAX_REJECT){ + observer->rejected_server = 0; + observer->index_server = + (observer->index_server + 1) % DYNA_HEUR_HISTORY_SIZE; + observer->rtt_server[observer->index_server] = rtt_candidate; + observer->new_server = true; + update = true; + /* The assumption is that a packet has been held back long enough to arrive + * after the valid spin edge, therefore, we completely ignore this false spin edge + * and do not report the time at which we saw this packet */ + observer->time_last_spin_server = now; + + /* if the rtt_candidate is rejected */ + } else { + observer->rejected_server++; + } + } + + /* if this is a packet from the CLIENT */ + } else { + if (observer->spin_client != spin){ + observer->spin_client = spin; + f64 rtt_candidate = now - observer->time_last_spin_client; + + /* calculate the acceptance threshold */ + f64 acceptance_threshold = observer->rtt_client[0]; + for(int i = 1; i < DYNA_HEUR_HISTORY_SIZE; i++){ + if (observer->rtt_client[i] < acceptance_threshold){ + acceptance_threshold = observer->rtt_client[i]; + } + } + acceptance_threshold *= DYNA_HEUR_THRESHOLD; + + if (rtt_candidate > acceptance_threshold || + observer->rejected_client >= DYNA_HEUR_MAX_REJECT){ + observer->rejected_client = 0; + observer->index_client = + (observer->index_client + 1) % DYNA_HEUR_HISTORY_SIZE; + observer->rtt_client[observer->index_client] = rtt_candidate; + observer->new_client = true; + update = true; + /* see comment for packets from server */ + observer->time_last_spin_client = now; + } else { + observer->rejected_client++; + } + } + } + return update; +} + +/* Update all RTT estimations for TCP packets */ +void update_tcp_rtt_estimate(vlib_main_t * vm, tcp_observer_t * session, + f64 now, u16 src_port, u16 init_src_port, u8 measurement, + u32 tsval, u32 tsecr, u32 pkt_count, u32 seq_num) { + + bool spin = measurement & TCP_SPIN; + u8 status_bits = (measurement & TCP_VEC_MASK) >> TCP_VEC_SHIFT; + bool status = status_estimate(vm, &(session->status_spin_observer), + now, src_port, init_src_port, spin, status_bits); + bool vec_status = vec_ne_zero_estimate(vm, &(session->vec_ne_zero), + now, src_port, init_src_port, spin, status_bits); + bool single = ts_single_estimate(vm, &(session->ts_one_RTT_observer), + now, src_port, init_src_port, tsval, tsecr); + bool all = ts_all_estimate(vm, &(session->ts_all_RTT_observer), + now, src_port, init_src_port, tsval, tsecr); + + if (pkt_count == 1){ + tcp_printf(0, "%s,%s,%s", "time", "host", "seq_num"); + tcp_printf(0, ",%s,%s", "vec_data", "vec_new"); + tcp_printf(0, ",%s,%s", "single_ts_rtt_data", "single_ts_rtt_new"); + tcp_printf(0, ",%s,%s", "all_ts_rtt_data", "all_ts_rtt_new"); + tcp_printf(0, ",%s,%s", "vec_ne_zero_data", "vec_ne_zero_new"); + tcp_printf(0, "\n"); + } + + /* If we have at least one update */ + if (status || single || all || vec_status) { + /* Now print the actual data */ + if (src_port != init_src_port) { + tcp_printf(0, "%.*lf,%s,%u", TIME_PRECISION, now, "server", seq_num); + tcp_printf(0, ",%.*lf", RTT_PRECISION, session->status_spin_observer.rtt_server); + tcp_printf(0, ",%d", session->status_spin_observer.new_server); + tcp_printf(0, ",%.*lf", RTT_PRECISION, session->ts_one_RTT_observer.rtt_server); + tcp_printf(0, ",%d", session->ts_one_RTT_observer.new_server); + tcp_printf(0, ",%.*lf", RTT_PRECISION, session->ts_all_RTT_observer.rtt_server); + tcp_printf(0, ",%d", session->ts_all_RTT_observer.new_server); + tcp_printf(0, ",%.*lf", RTT_PRECISION, session->vec_ne_zero.rtt_server); + tcp_printf(0, ",%d", session->vec_ne_zero.new_server); + + tcp_printf(1, "\n"); + + session->status_spin_observer.new_server = false; + session->vec_ne_zero.new_server = false; + session->ts_one_RTT_observer.new_server = false; + session->ts_all_RTT_observer.new_server = false; + + } else { + tcp_printf(0, "%.*lf,%s,%u", TIME_PRECISION, now, "client", seq_num); + tcp_printf(0, ",%.*lf", RTT_PRECISION, session->status_spin_observer.rtt_client); + tcp_printf(0, ",%d", session->status_spin_observer.new_client); + tcp_printf(0, ",%.*lf", RTT_PRECISION, session->ts_one_RTT_observer.rtt_client); + tcp_printf(0, ",%d", session->ts_one_RTT_observer.new_client); + tcp_printf(0, ",%.*lf", RTT_PRECISION, session->ts_all_RTT_observer.rtt_client); + tcp_printf(0, ",%d", session->ts_all_RTT_observer.new_client); + tcp_printf(0, ",%.*lf", RTT_PRECISION, session->vec_ne_zero.rtt_client); + tcp_printf(0, ",%d", session->vec_ne_zero.new_client); + + tcp_printf(1, "\n"); + + session->status_spin_observer.new_client = false; + session->vec_ne_zero.new_client = false; + session->ts_one_RTT_observer.new_client = false; + session->ts_all_RTT_observer.new_client = false; + + } + } +} + +/* One RTT estimation per RTT */ +bool ts_single_estimate(vlib_main_t * vm, + timestamp_observer_single_RTT_t * observer, + f64 now, u16 src_port, u16 init_src_port, u32 tsval, u32 tsecr) { + bool update = false; + if (src_port == init_src_port) { + if (!observer->ts_init_client) { + observer->ts_init_client = tsval; + observer->time_init_client = now; + } else { + if (tsecr && observer->ts_ack_client && + tsecr >= observer->ts_ack_client) { + observer->rtt_client = now - observer->time_init_client; + observer->ts_init_client = tsval; + observer->ts_ack_client = 0; + observer->time_init_client = now; + observer->new_client = true; + update = true; + } + } + if (tsecr && !observer->ts_ack_server && + tsecr >= observer->ts_init_server) { + observer->ts_ack_server = tsval; + } + } + else { + if (!observer->ts_init_server) { + observer->ts_init_server = tsval; + observer->time_init_server = now; + } + else { + if (tsecr && observer->ts_ack_server && + tsecr >= observer->ts_ack_server) { + observer->rtt_server = now - observer->time_init_server; + observer->ts_init_server = tsval; + observer->ts_ack_server = 0; + observer->time_init_server = now; + observer->new_server = true; + update = true; + } + } + if (tsecr && !observer->ts_ack_client && + tsecr >= observer->ts_init_client) { + observer->ts_ack_client = tsval; + } + } + return update; +} + +/* RTT estimation for every possible timestamp value */ +bool ts_all_estimate(vlib_main_t * vm, timestamp_observer_all_RTT_t * observer, + f64 now, u16 src_port, u16 init_src_port, u32 tsval, u32 tsecr) { + bool update = false; + + // TODO state will explode in case of large reordering + if (src_port == init_src_port) { + uword* init_t = hash_get(observer->hash_init_client, tsval); + uword* ack_t = hash_get(observer->hash_ack_client, tsecr); + uword* init_t_server = hash_get(observer->hash_init_server, tsecr); + if (!init_t) { + time_test_t* temp; + vec_alloc(temp, 1); + memset(temp, 0, sizeof (time_test_t)); + temp->time = now; + hash_set(observer->hash_init_client, tsval, temp); + } + if (tsecr && ack_t) { + time_test_t* ack_time = (time_test_t *) ack_t[0]; + observer->rtt_client = now - ack_time->time; + vec_free(ack_time); + hash_unset(observer->hash_ack_client, tsecr); + observer->new_client = true; + update = true; + } + if (tsecr && init_t_server) { + hash_set(observer->hash_ack_server, tsval, (time_test_t*) init_t_server[0]); + hash_unset(observer->hash_init_server, tsecr); + } + } + else { + uword* init_t = hash_get(observer->hash_init_server, tsval); + uword* ack_t = hash_get(observer->hash_ack_server, tsecr); + uword* init_t_client = hash_get(observer->hash_init_client, tsecr); + if (!init_t) { + time_test_t* temp; + vec_alloc(temp, 1); + memset(temp, 0, sizeof (time_test_t)); + temp->time = now; + hash_set(observer->hash_init_server, tsval, temp); + } + if (tsecr && ack_t) { + time_test_t* ack_time = (time_test_t *) ack_t[0]; + observer->rtt_server = now - ack_time->time; + vec_free(ack_time); + hash_unset(observer->hash_ack_server, tsecr); + observer->new_server = true; + update = true; + } + if (tsecr && init_t_client) { + hash_set(observer->hash_ack_client, tsval, (time_test_t*) init_t_client[0]); + hash_unset(observer->hash_init_client, tsecr); + } + } + // TODO: check all hash sizes and clear if too big + // perhaps also based on amount of observed packets, e.g. every 5000 packets + // E.g. if sum(4 hashes) > 500 -> clear all hashes, + // set both times to 0 and update to true + return update; +} + +void update_plus_rtt_estimate(vlib_main_t * vm, plus_observer_t * session, + f64 now, u16 src_port, u16 init_src_port, u32 psn, + u32 pse, u64 cat, u32 pkt_count) { + + bool new_rtt = psn_single_estimate(vm, &(session->plus_single_observer), + src_port, init_src_port, psn, pse, now); + + if (pkt_count == 1){ + /* TODO: add CAT */ + plus_printf(0, "%s,%s,%s,%s,%s,%s", "time", "host", "#pkt", "psn", "pse", "cat"); + plus_printf(0, ",%s,%s", "psn_pse_data", "psn_pse_new"); + plus_printf(0, "\n"); + } + + /* If we have at least one update */ + if (new_rtt) { + /* Now print the actual data */ + if (src_port != init_src_port) { + plus_printf(0, "%.*lf,%s,%u,%u,%u,%llu", TIME_PRECISION, now, "server", pkt_count, psn, pse, cat); + + plus_printf(0, ",%.*lf", RTT_PRECISION, session->plus_single_observer.rtt_dst); + plus_printf(0, ",%d", session->plus_single_observer.new_server); + + plus_printf(1, "\n"); + + session->plus_single_observer.new_server = false; + } else { + plus_printf(0, "%.*lf,%s,%u,%u,%u,%llu", TIME_PRECISION, now, "client", pkt_count, psn, pse, cat); + + plus_printf(0, ",%.*lf", RTT_PRECISION, session->plus_single_observer.rtt_src); + plus_printf(0, ",%d", session->plus_single_observer.new_client); + + plus_printf(1, "\n"); + + session->plus_single_observer.new_client = false; + } + } +} + +bool psn_single_estimate(vlib_main_t * vm, plus_single_observer_t * session, + u16 src_port, u16 init_src_port, u32 psn, u32 pse, f64 now) { + /* Decide direction */ + if (src_port == init_src_port) { + /* Is the RTT estimation for the last packet completed? */ + if (session->time_src == 0) { + session->psn_src = psn; + session->time_src = now; + } + if (session->time_dst && comes_after_u32(pse, session->psn_dst)) { + session->rtt_src = now - session->time_dst; + session->time_dst = 0; + session->new_client = true; + return true; + } + } else { + if (session->time_dst == 0) { + session->psn_dst = psn; + session->time_dst = now; + } + if (session->time_src && comes_after_u32(pse, session->psn_src)) { + session->rtt_dst = now - session->time_src; + session->time_src = 0; + session->new_server = true; + return true; + } + } + return false; +} + +/** + * @brief update the state of the session with the given key + */ +void update_state(latency_key_t * kv_in, uword new_state) +{ + BVT(clib_bihash_kv) kv; + latency_main_t *pm = &latency_main; + BVT(clib_bihash) *bi_table; + bi_table = &pm->latency_table; + kv.key = kv_in->as_u64; + kv.value = new_state; + BV(clib_bihash_add_del) (bi_table, &kv, 1 /* is_add */); +} + +/** + * @brief create a new session for a new flow + */ +u32 create_session(sup_protocols_t p_type) { + latency_session_t * session; + latency_main_t * pm = &latency_main; + pm->active_flows ++; + pm->total_flows ++; + pool_get (pm->session_pool, session); + memset(session, 0, sizeof (*session)); + /* Correct session index */ + session->index = session - pm->session_pool; + session->state = 0; + + switch (p_type) { + case P_TCP: + session->p_type = P_TCP; + vec_alloc(session->tcp, 1); + memset(session->tcp, 0, sizeof (tcp_observer_t)); + session->tcp->status_spin_observer.spin_client = SPIN_NOT_KNOWN; + session->tcp->status_spin_observer.spin_server = SPIN_NOT_KNOWN; + session->tcp->vec_ne_zero.spin_client = SPIN_NOT_KNOWN; + session->tcp->vec_ne_zero.spin_server = SPIN_NOT_KNOWN; + session->tcp->ts_all_RTT_observer.hash_init_client = + hash_create(0, sizeof(time_test_t*)); + session->tcp->ts_all_RTT_observer.hash_init_server = + hash_create(0, sizeof(time_test_t*)); + session->tcp->ts_all_RTT_observer.hash_ack_client = + hash_create(0, sizeof(time_test_t*)); + session->tcp->ts_all_RTT_observer.hash_ack_server = + hash_create(0, sizeof(time_test_t*)); + break; + + case P_QUIC: + session->p_type = P_QUIC; + vec_alloc(session->quic, 1); + memset(session->quic, 0, sizeof (quic_observer_t)); + session->quic->basic_spin_observer.spin_client = SPIN_NOT_KNOWN; + session->quic->basic_spin_observer.spin_server = SPIN_NOT_KNOWN; + session->quic->pn_spin_observer.spin_client = SPIN_NOT_KNOWN; + session->quic->pn_spin_observer.spin_server = SPIN_NOT_KNOWN; + session->quic->status_spin_observer.spin_client = SPIN_NOT_KNOWN; + session->quic->status_spin_observer.spin_server = SPIN_NOT_KNOWN; + session->quic->dyna_heur_spin_observer.spin_client = SPIN_NOT_KNOWN; + session->quic->dyna_heur_spin_observer.spin_server = SPIN_NOT_KNOWN; + break; + + case P_PLUS: + session->p_type = P_PLUS; + vec_alloc(session->plus, 1); + memset(session->plus, 0, sizeof (plus_observer_t)); + break; + + case P_UNKNOWN: + default: + session->p_type = P_UNKNOWN; + break; + } + + return session->index; +} + +/** + * @brief clean session after timeout + */ +void clean_session(u32 index) +{ + latency_main_t * pm = &latency_main; + latency_session_t * session = get_latency_session(index); + + /* If main loop (in node.c) is executed sparsely, it can happen that + * the timer wheel triggers multiple times for the same session. + * We remove/clean the session only the first time. */ + if (session == 0) { + return; + } + pm->active_flows --; + + switch (session->p_type) { + case P_TCP: + // TODO: fix potential memory leak + // Iterate over all remaining key, value pairs and free value pointers + hash_free(session->tcp->ts_all_RTT_observer.hash_init_client); + hash_free(session->tcp->ts_all_RTT_observer.hash_init_server); + hash_free(session->tcp->ts_all_RTT_observer.hash_ack_client); + hash_free(session->tcp->ts_all_RTT_observer.hash_ack_server); + vec_free(session->tcp); + break; + + case P_QUIC: + vec_free(session->quic); + break; + + case P_PLUS: + vec_free(session->plus); + break; + + default: + break; + } + + BVT(clib_bihash_kv) kv; + BVT(clib_bihash) * bi_table; + bi_table = &pm->latency_table; + + /* Clear hash and pool entry + * First for the key in reverse direction */ + kv.key = session->key_reverse; + BV(clib_bihash_add_del) (bi_table, &kv, 0 /* is_add */); + kv.key = session->key; + BV(clib_bihash_add_del) (bi_table, &kv, 0 /* is_add */); + pool_put (pm->session_pool, session); +} + +/** + * @brief callback function for expired timer + */ +static void timer_expired_callback(u32 * expired_timers) { + int i; + u32 index, timer_id; + + /* Iterate over all expired timers */ + for (i = 0; i < vec_len(expired_timers); i = i+1) { + /* Extract index and timer wheel id */ + index = expired_timers[i] & 0x7FFFFFFF; + timer_id = expired_timers[i] >> 31; + + /* Only use timer with ID 0 at the moment */ + ASSERT (timer_id == 0); + + clean_session(index); + } +} + +/** + * @brief parse TCP headers (from tcp_input.c) + */ +int +tcp_options_parse_mod (tcp_header_t * th, u32 * tsval, u32 * tsecr) { + const u8 *data; + u8 opt_len, opts_len, kind; + + opts_len = (tcp_doff (th) << 2) - sizeof (tcp_header_t); + data = (const u8 *) (th + 1); + + for (; opts_len > 0; opts_len -= opt_len, data += opt_len) { + kind = data[0]; + + /* Get options length */ + if (kind == TCP_OPTION_EOL) + break; + else if (kind == TCP_OPTION_NOOP) { + opt_len = 1; + continue; + } + else { + /* broken options */ + if (opts_len < 2) + return -1; + + opt_len = data[1]; + + /* weird option length */ + if (opt_len < 2 || opt_len > opts_len) + return -1; + } + + /* Parse options */ + switch (kind) { + case TCP_OPTION_MSS: + break; + case TCP_OPTION_WINDOW_SCALE: + break; + case TCP_OPTION_TIMESTAMP: + if (opt_len == TCP_OPTION_LEN_TIMESTAMP) { + *tsval = clib_net_to_host_u32 (*(u32 *) (data + 2)); + *tsecr = clib_net_to_host_u32 (*(u32 *) (data + 6)); + } + break; + case TCP_OPTION_SACK_PERMITTED: + break; + case TCP_OPTION_SACK_BLOCK: + /* If too short or not correctly formatted, break */ + if (opt_len < 10 || ((opt_len - 2) % TCP_OPTION_LEN_SACK_BLOCK)) + break; + break; + default: + continue; + } + } + return 0; +} + +/* Output to CLI / stdout, this is a modified copy of `vlib_cli_output` */ +void latency_printf (int flush, char *fmt, ...) { + va_list va; + u8 *s; + + static FILE *output_file_spin = NULL; + + va_start (va, fmt); + s = va_format (0, fmt, &va); + va_end (va); + + if (output_file_spin == NULL){ + output_file_spin = fopen("/tmp/latency_quic_printf.out", "w"); + } + fprintf(output_file_spin, "%s", s); + + if (flush){ + fflush(output_file_spin); + } + + vec_free (s); +} + +/* Output to CLI / stdout, this is a modified copy of `vlib_cli_output` */ +void tcp_printf (int flush, char *fmt, ...) { + va_list va; + u8 *s; + + static FILE *output_file_tcp = NULL; + + va_start (va, fmt); + s = va_format (0, fmt, &va); + va_end (va); + + if (output_file_tcp == NULL){ + output_file_tcp = fopen("/tmp/latency_tcp_printf.out", "w"); + } + fprintf(output_file_tcp, "%s", s); + + if (flush){ + fflush(output_file_tcp); + } + + vec_free (s); +} + +/* Output to CLI / stdout, this is a modified copy of `vlib_cli_output` */ +void plus_printf (int flush, char *fmt, ...) { + va_list va; + u8 *s; + + static FILE *output_file_plus = NULL; + + va_start (va, fmt); + s = va_format (0, fmt, &va); + va_end (va); + + if (output_file_plus == NULL){ + output_file_plus = fopen("/tmp/latency_plus_printf.out", "w"); + } + fprintf(output_file_plus, "%s", s); + + if (flush){ + fflush(output_file_plus); + } + + vec_free (s); +} + +/** + * @brief Initialize the latency plugin. + */ +static clib_error_t * latency_init (vlib_main_t * vm) +{ + + // TODO: set mb_IP to good default value!!! + + latency_main_t * pm = &latency_main; + clib_error_t * error = 0; + u8 * name; + + pm->vnet_main = vnet_get_main (); + name = format (0, "latency_%08x%c", api_version, 0); + + /* Ask for a correctly-sized block of API message decode slots */ + pm->msg_id_base = vl_msg_api_get_msg_ids + ((char *) name, VL_MSG_FIRST_AVAILABLE); + + error = latency_plugin_api_hookup (vm); + + /* Add our API messages to the global name_crc hash table */ + setup_message_id_table (pm, &api_main); + + /* Create hashes */ + pm->hash_quic_ports = hash_create(0, sizeof(u16)); + // add ports with swapped bytes! + hash_set(pm->hash_quic_ports, 20753, 1); // 4433 + // additional ports + + pm->hash_server_ports_to_ips = hash_create(0, sizeof(u32)); + + /* Init bihash */ + BV (clib_bihash_init) (&pm->latency_table, "latency", 2048, 512<<20); + + /* Timer wheel has 2048 slots, so we predefine pool with + * 2048 entries as well */ + pool_init_fixed(pm->session_pool, 2048); + + /* Init timer wheel with 100ms resolution */ + tw_timer_wheel_init_2t_1w_2048sl (&pm->tw, + timer_expired_callback, 100e-3, ~0); + pm->tw.last_run_time = vlib_time_now (vm); + + /* Set counters to zero*/ + pm->total_flows = 0; + pm->active_flows = 0; + + vec_free(name); + + return error; +} + +VLIB_INIT_FUNCTION (latency_init); + +/** + * @brief Hook the LATENCY plugin into the VPP graph hierarchy. + */ +VNET_FEATURE_INIT (latency, static) = +{ + /* It runs in the device-input arc before the ip4-lookup node */ + .arc_name = "ip4-unicast", + .node_name = "latency", + .runs_before = VNET_FEATURES ("ip4-lookup"), +}; diff --git a/latency-plugin/latency/latency.h b/latency-plugin/latency/latency.h new file mode 100644 index 0000000..5f2c439 --- /dev/null +++ b/latency-plugin/latency/latency.h @@ -0,0 +1,457 @@ +/* + * Copyright (c) 2015 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +/* High-level overview: + * + * Used data structures: + * - A bihash_8_8 (bounded-index extensible hash) - 8 byte key and 8 byte value. + * - A pool is used to save the state for each LATENCY flow (fixed sized struct) + * - A timer wheel (2t_1w_2048sl = 2 timers per object, 1 wheel, 2048 slots) + * + * The key in the hash table consist of (XOR is used to match both directions): + * "5 tuple": + * - XOR of src and dst IP + * - XOR of src and dst port + * - protocol + * + * - for PLUS packets we also take the CAT value into the hash key + * + * The value corresponding to a key (in the hash table) is the pool index + * for the state of the matching LATENCY flow. + * + * Besides the actual "state" of the flow we also save e.g. counters, RTT + * estimates, ... + */ + +#ifndef __included_latency_h__ +#define __included_latency_h__ + +/* Quic handshake states for handshake RTT measurement */ +#define LATENCY_HANDSHAKE_IDLE 0 +#define LATENCY_HANDSHAKE_CLIENT_INITIAL 1 +#define LATENCY_HANDSHAKE_SERVER_CLEARTEXT 2 +#define LATENCY_HANDSHAKE_CLIENT_CLEARTEXT 3 + +/* Quic header types */ +#define LATENCY_PACKET_LONG_VERSION_NEGOTIATION 0x81 +#define LATENCY_PACKET_LONG_CLIENT_INITIAL 0x82 +#define LATENCY_PACKET_LONG_SERVER_STATELESS_RETRY 0x83 +#define LATENCY_PACKET_LONG_SERVER_CLEARTEXT 0x84 +#define LATENCY_PACKET_LONG_CLIENT_CLEARTEXT 0x85 +#define LATENCY_PACKET_LONG_0_RTT_PROTECTED 0x86 +#define LATENCY_PACKET_LONG_1_RTT_PROTECTED_PHASE_1 0x87 +#define LATENCY_PACKET_LONG_1_RTT_PROTECTED_PHASE_2 0x88 +#define LATENCY_PACKET_SHORT_1_OCTET 0x01 +#define LATENCY_PACKET_SHORT_2_OCTET 0x02 +#define LATENCY_PACKET_SHORT_4_OCTET 0x03 + +#define LATENCY_PACKET_SHORT_MASK 0b10011111 +#define LATENCY_PACKET_LONG_MASK 0b11111111 + +#include +#include +#include + +#include +#include +#include + +/* We use the bihash_8_8 hash function*/ +/* 8 byte key and 8 byte value */ +#include + +#include + +/* Timer wheel (2 timers, 1 wheel, 2048 slots) */ +#include + +/* Defines all the LATENCY states */ +#define foreach_latency_state \ +_(ACTIVE, "default state for TCP and QUIC") \ +_(P_ZERO, "PLUS: no flow") \ +_(P_UNIFLOW, "PLUS: flow in one direction") \ +_(P_ASSOCIATING, "PLUS: also flow in reverse direction") \ +_(P_ASSOCIATED, "PLUS: flow confirmed") \ +_(P_STOPWAIT, "PLSU: stop signal in one direction") \ +_(P_STOPPING, "PLSU: stop signal also in other direction") \ +_(ERROR, "error state for all flows") + +typedef enum { +#define _(sym,str) LATENCY_STATE_##sym, + foreach_latency_state +#undef _ +} latency_state_t; + +#define foreach_protocol \ +_(TCP, "TCP flow") \ +_(QUIC, "QUIC flow") \ +_(PLUS, "PLUS flow") \ +_(UNKNOWN, "UNKNOWN flow") + +typedef enum { +#define _(sym,str) P_##sym, + foreach_protocol +#undef _ +} sup_protocols_t; + +/* For output */ +#define TIME_PRECISION 8 +#define RTT_PRECISION 4 +#define STAT_PRECISION 8 + +#define SPIN_NOT_KNOWN 255 + +#define TWO_BIT_SPIN 0xc0 +#define ONE_BIT_SPIN 0x40 +#define VALID_BIT 0x20 +#define BLOCKING_BIT 0x10 +#define TWO_BIT_SPIN_OFFSET 6 +#define VALID_EDGE_BIT 0x01 +#define STATUS_MASK 0x0c +#define STATUS_SHIFT 2 +#define TCP_SPIN 0x01 +#define TCP_VEC_MASK 0x06 +#define TCP_VEC_SHIFT 1 + +/* Endian correction + * Could also be solved by converting input first */ +#if CLIB_ARCH_IS_LITTLE_ENDIAN + #define MAGIC_MASK 0xf0ffffff + #define MAGIC 0xf07f00d8 + #define STOP 0x02000000 + #define EXTENDED 0x01000000 +#else + #define MAGIC_MASK 0xfffffff0 + #define MAGIC 0xd8007ff0 + #define STOP 0x00000002 + #define EXTENDED 0x00000001 +#endif + +#define MAX_PSN 4294967296 +#define MAX_SKIP 100 + +/* To save current time in hashes */ +typedef struct { + f64 time; +} time_test_t; + +/* Structs for the different spin observers */ +typedef struct { + u8 spin_client; + u8 spin_server; + f64 time_last_spin_client; + f64 time_last_spin_server; + f64 rtt_client; + f64 rtt_server; + bool new_client; + bool new_server; +} basic_spin_observer_t; + +typedef struct { + u8 spin_client; + u8 spin_server; + f64 time_last_spin_client; + f64 time_last_spin_server; + f64 rtt_client; + f64 rtt_server; + u32 pn_client; + u32 pn_server; + bool new_client; + bool new_server; +} pn_spin_observer_t; + +#define STATUS_INVALID 0b00 +#define STATUS_HANDSHAKE_1 0b01 +#define STATUS_HANDSHAKE_2 0b10 +#define STATUS_VALID 0b11 +typedef struct { + u8 spin_client; + u8 spin_server; + f64 time_last_spin_client; + f64 time_last_spin_server; + f64 rtt_client; + f64 rtt_server; + bool new_client; + bool new_server; +} status_spin_observer_t; + +#define DYNA_HEUR_THRESHOLD 0.1 +#define DYNA_HEUR_HISTORY_SIZE 10 +#define DYNA_HEUR_MAX_REJECT 5 +typedef struct { + u8 spin_client; + u8 spin_server; + f64 time_last_spin_client; + f64 time_last_spin_server; + f64 rtt_client[DYNA_HEUR_HISTORY_SIZE]; + f64 rtt_server[DYNA_HEUR_HISTORY_SIZE]; + u8 index_client; + u8 index_server; + u8 rejected_client; + u8 rejected_server; + bool new_client; + bool new_server; +} dyna_heur_spin_observer_t; + +/* main QUIC observer struct */ +typedef struct { + u64 id; + + /* Data structures for the various spin bit observers */ + basic_spin_observer_t basic_spin_observer; + pn_spin_observer_t pn_spin_observer; + status_spin_observer_t status_spin_observer; + dyna_heur_spin_observer_t dyna_heur_spin_observer; +} quic_observer_t; + +/* structs for the different TCP TS observers */ +typedef struct { + f64 time_init_client; + f64 time_init_server; + f64 rtt_client; + f64 rtt_server; + u32 ts_init_client; + u32 ts_init_server; + u32 ts_ack_client; + u32 ts_ack_server; + bool new_client; + bool new_server; +} timestamp_observer_single_RTT_t; + +typedef struct { + uword *hash_init_client; + uword *hash_init_server; + uword *hash_ack_client; + uword *hash_ack_server; + f64 rtt_client; + f64 rtt_server; + bool new_client; + bool new_server; +} timestamp_observer_all_RTT_t; + +/* main TCP observer struct */ +typedef struct { + /* Data structures for the latency and timestamp observer */ + status_spin_observer_t status_spin_observer; + status_spin_observer_t vec_ne_zero; + timestamp_observer_single_RTT_t ts_one_RTT_observer; + timestamp_observer_all_RTT_t ts_all_RTT_observer; +} tcp_observer_t; + +/* struct for PLUS PSE/PSN observer */ +typedef struct { + u32 psn_src; + f64 time_src; + f64 rtt_src; + u32 psn_dst; + f64 time_dst; + f64 rtt_dst; + bool new_server; + bool new_client; +} plus_single_observer_t; + +/* main PLUS observer struct */ +typedef struct { + u8 state; + /* PSN which moved state to ASSOCIATING */ + u32 psn_associating; + /* PSN which moved state to STOPWAIT */ + u32 psn_stopwait; + u64 cat; + + plus_single_observer_t plus_single_observer; +} plus_observer_t; + +/* State for each observed LATENCY session */ +typedef struct { + latency_state_t state; + + sup_protocols_t p_type; + + /* Pool index (saved in hash table) */ + u32 index; + u32 timer; + u64 key; + u64 key_reverse; + + u32 init_src_ip; + u16 init_src_port; + u32 new_dst_ip; + + /* Number of observed packets */ + u32 pkt_count; + + /* QUIC and TCP observers + * only required values are allocated */ + quic_observer_t * quic; + tcp_observer_t * tcp; + plus_observer_t * plus; +} latency_session_t; + +/* Main latency struct */ +typedef struct { + /* API message ID base */ + u16 msg_id_base; + + /* single MB IP */ + u32 mb_ip; + + /* convenience */ + vnet_main_t * vnet_main; + + /* Hash table */ + BVT (clib_bihash) latency_table; + + /* Session pool */ + latency_session_t * session_pool; + + /* Contains all ports that indicated QUIC traffic */ + uword *hash_quic_ports; + + /* To translate dst port to required dst IP */ + uword *hash_server_ports_to_ips; + + /* Counter values*/ + u32 total_flows; + u32 active_flows; + u32 active_tcp; + u32 active_quic; + + /* Timer wheel*/ + tw_timer_wheel_2t_1w_2048sl_t tw; +} latency_main_t; + +/* Hash key struct */ +typedef CLIB_PACKED (struct { + union { + struct { + /* IP and port XOR */ + u32 s_x_d_ip; + u16 s_x_d_port; + u16 protocol; + }; + u64 as_u64; + }; +}) latency_key_t; + +latency_main_t latency_main; + +extern vlib_node_registration_t latency_node; + +u64 get_state(latency_key_t * kv_in); +void update_state(latency_key_t * kv_in, uword new_state); +void make_key(latency_key_t * kv, u32 src_ip, u32 dst_ip, + u16 src_p, u16 dst_p, u8 protocol); +void make_plus_key(latency_key_t * kv, u32 src_ip, u32 dst_ip, + u16 src_p, u16 dst_p, u8 protocol, u64 cat); +latency_session_t * get_session_from_key(latency_key_t * kv_in); +u32 create_session(sup_protocols_t p_type); + +void update_quic_rtt_estimate(vlib_main_t * vm, quic_observer_t * session, + f64 now, u16 src_port, u16 init_src_port, u8 measurement, + u32 packet_number, u32 pkt_count); +bool basic_latency_estimate(vlib_main_t * vm, basic_spin_observer_t *observer, + f64 now, u16 src_port, u16 init_src_port, bool spin); +bool pn_latency_estimate(vlib_main_t * vm, pn_spin_observer_t *observer, + f64 now, u16 src_port, u16 init_src_port, bool spin, u32 packet_number); +bool status_estimate(vlib_main_t * vm, status_spin_observer_t *observer, + f64 now, u16 src_port, u16 init_src_port, bool spin, u8 status); +bool vec_ne_zero_estimate(vlib_main_t * vm, status_spin_observer_t *observer, + f64 now, u16 src_port, u16 init_src_port, bool spin, u8 status); +bool heuristic_estimate(vlib_main_t * vm, dyna_heur_spin_observer_t *observer, + f64 now, u16 src_port, u16 init_src_port, bool spin); +void update_tcp_rtt_estimate(vlib_main_t * vm, tcp_observer_t * session, + f64 now, u16 src_port, u16 init_src_port, u8 measurement, u32 tsval, + u32 tsecr, u32 pkt_count, u32 seq_num); +bool ts_single_estimate(vlib_main_t * vm, + timestamp_observer_single_RTT_t * observer, + f64 now, u16 src_port, u16 init_src_port, u32 tsval, u32 tsecr); +bool ts_all_estimate(vlib_main_t * vm, timestamp_observer_all_RTT_t * observer, + f64 now, u16 src_port, u16 init_src_port, u32 tsval, u32 tsecr); +int tcp_options_parse_mod (tcp_header_t * th, u32 * tsval, u32 * tsecr); +void update_plus_rtt_estimate(vlib_main_t * vm, plus_observer_t * session, + f64 now, u16 src_port, u16 init_src_port, u32 psn, + u32 pse, u64 cat, u32 pkt_count); +bool psn_single_estimate(vlib_main_t * vm, plus_single_observer_t * session, + u16 src_port, u16 init_src_port, u32 psn, u32 pse, f64 now); +bool ip_nat_translation(ip4_header_t *ip0, u32 init_src_ip, u32 new_dst_ip); + +void clean_session(u32 index); +void latency_printf (int flush, char *fmt, ...); +void tcp_printf (int flush, char *fmt, ...); +void plus_printf (int flush, char *fmt, ...); + +/** + * @brief get latency session for index + */ +always_inline latency_session_t * get_latency_session(u32 index) { + if (pool_is_free_index (latency_main.session_pool, index)) + return 0; + return pool_elt_at_index (latency_main.session_pool, index); +} + +/** + * @brief start a timer in the timer wheel + */ +always_inline void start_timer(latency_session_t * session, u64 interval) { + session->timer = tw_timer_start_2t_1w_2048sl (&latency_main.tw, + session->index, 0, interval); +} + +/** + * @brief update the timer + */ +always_inline void update_timer(latency_session_t * session, u64 interval) { + if(session->timer != ~0) { + tw_timer_stop_2t_1w_2048sl (&latency_main.tw, session->timer); + } + session->timer = tw_timer_start_2t_1w_2048sl (&latency_main.tw, + session->index, 0, interval); +} + +always_inline bool is_quic(u16 src_port, u16 dst_port) { + return hash_get(latency_main.hash_quic_ports, src_port) + || hash_get(latency_main.hash_quic_ports, dst_port); +} + +always_inline void get_new_dst(u32 *new_dst_ip, u16 src_port) { + uword* temp_ip = hash_get(latency_main.hash_server_ports_to_ips, src_port); + if (temp_ip) { + *new_dst_ip = *((u32 *) temp_ip); + return; + } + *new_dst_ip = 0; + return; +} + +always_inline bool comes_after_u32(u32 now, u32 old) { + i64 ret = (now - old) % MAX_PSN; + if (ret < 0) { + ret += MAX_PSN; + } + return ret < MAX_SKIP; +} + +/** + * @brief expire timers + */ +always_inline void expire_timers(f64 now) { + tw_timer_expire_timers_2t_1w_2048sl (&latency_main.tw, now); +} + +#define LATENCY_PLUGIN_BUILD_VER "0.1" + +#endif /* __included_latency_h__ */ diff --git a/plus-plugin/plus/plus_all_api_h.h b/latency-plugin/latency/latency_all_api_h.h similarity index 95% rename from plus-plugin/plus/plus_all_api_h.h rename to latency-plugin/latency/latency_all_api_h.h index 9798de9..f9f43c7 100644 --- a/plus-plugin/plus/plus_all_api_h.h +++ b/latency-plugin/latency/latency_all_api_h.h @@ -13,4 +13,4 @@ * limitations under the License. */ /* Include the generated file, see BUILT_SOURCES in Makefile.am */ -#include +#include diff --git a/plus-plugin/plus/plus_msg_enum.h b/latency-plugin/latency/latency_msg_enum.h similarity index 84% rename from plus-plugin/plus/plus_msg_enum.h rename to latency-plugin/latency/latency_msg_enum.h index bf57bbf..8b6c75e 100644 --- a/plus-plugin/plus/plus_msg_enum.h +++ b/latency-plugin/latency/latency_msg_enum.h @@ -12,17 +12,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#ifndef included_plus_msg_enum_h -#define included_plus_msg_enum_h +#ifndef included_latency_msg_enum_h +#define included_latency_msg_enum_h #include #define vl_msg_id(n,h) n, typedef enum { -#include +#include /* We'll want to know how many messages IDs we need... */ VL_MSG_FIRST_AVAILABLE, } vl_msg_id_t; #undef vl_msg_id -#endif /* included_plus_msg_enum_h */ +#endif /* included_latency_msg_enum_h */ diff --git a/plus-plugin/plus/plus_test.c b/latency-plugin/latency/latency_test.c similarity index 83% rename from plus-plugin/plus/plus_test.c rename to latency-plugin/latency/latency_test.c index c0e0216..e771f53 100644 --- a/plus-plugin/plus/plus_test.c +++ b/latency-plugin/latency/latency_test.c @@ -14,7 +14,7 @@ */ /* *------------------------------------------------------------------ - * plus_test.c - test harness plugin + * latency_test.c - test harness plugin *------------------------------------------------------------------ */ @@ -24,34 +24,34 @@ #include #include -#define __plugin_msg_base plus_test_main.msg_id_base +#define __plugin_msg_base latency_test_main.msg_id_base #include uword unformat_sw_if_index (unformat_input_t * input, va_list * args); /* Declare message IDs */ -#include +#include /* define message structures */ #define vl_typedefs -#include +#include #undef vl_typedefs /* declare message handlers for each api */ #define vl_endianfun /* define message structures */ -#include +#include #undef vl_endianfun /* instantiate all the print functions we know about */ #define vl_print(handle, ...) #define vl_printfun -#include +#include #undef vl_printfun /* Get the API version number. */ #define vl_api_version(n,v) static u32 api_version=(v); -#include +#include #undef vl_api_version @@ -59,18 +59,18 @@ typedef struct { /* API message ID base */ u16 msg_id_base; vat_main_t *vat_main; -} plus_test_main_t; +} latency_test_main_t; -plus_test_main_t plus_test_main; +latency_test_main_t latency_test_main; #define foreach_standard_reply_retval_handler \ -_(plus_enable_disable_reply) +_(latency_enable_disable_reply) #define _(n) \ static void vl_api_##n##_t_handler \ (vl_api_##n##_t * mp) \ { \ - vat_main_t * vam = plus_test_main.vat_main; \ + vat_main_t * vam = latency_test_main.vat_main; \ i32 retval = ntohl(mp->retval); \ if (vam->async_mode) { \ vam->async_errors += (retval < 0); \ @@ -87,15 +87,15 @@ foreach_standard_reply_retval_handler; * we just generated */ #define foreach_vpe_api_reply_msg \ -_(PLUS_ENABLE_DISABLE_REPLY, plus_enable_disable_reply) +_(LATENCY_ENABLE_DISABLE_REPLY, latency_enable_disable_reply) -static int api_plus_enable_disable (vat_main_t * vam) +static int api_latency_enable_disable (vat_main_t * vam) { unformat_input_t * i = vam->input; int enable_disable = 1; u32 sw_if_index = ~0; - vl_api_plus_enable_disable_t * mp; + vl_api_latency_enable_disable_t * mp; int ret; /* Parse args required to build the message */ @@ -116,7 +116,7 @@ static int api_plus_enable_disable (vat_main_t * vam) } /* Construct the API message */ - M(PLUS_ENABLE_DISABLE, mp); + M(LATENCY_ENABLE_DISABLE, mp); mp->sw_if_index = ntohl (sw_if_index); mp->enable_disable = enable_disable; @@ -133,11 +133,11 @@ static int api_plus_enable_disable (vat_main_t * vam) * and that the data plane plugin processes */ #define foreach_vpe_api_msg \ -_(plus_enable_disable, " [disable]") +_(latency_enable_disable, " [disable]") -static void plus_api_hookup (vat_main_t *vam) +static void latency_api_hookup (vat_main_t *vam) { - plus_test_main_t * pm = &plus_test_main; + latency_test_main_t * pm = &latency_test_main; /* Hook up handlers for replies from the data plane plug-in */ #define _(N,n) \ vl_msg_api_set_handlers((VL_API_##N + pm->msg_id_base), \ @@ -163,16 +163,16 @@ static void plus_api_hookup (vat_main_t *vam) clib_error_t * vat_plugin_register (vat_main_t *vam) { - plus_test_main_t * pm = &plus_test_main; + latency_test_main_t * pm = &latency_test_main; u8 * name; pm->vat_main = vam; - name = format (0, "plus_%08x%c", api_version, 0); + name = format (0, "latency_%08x%c", api_version, 0); pm->msg_id_base = vl_client_get_first_plugin_msg_id ((char *) name); if (pm->msg_id_base != (u16) ~0) - plus_api_hookup (vam); + latency_api_hookup (vam); vec_free(name); diff --git a/latency-plugin/latency/node.c b/latency-plugin/latency/node.c new file mode 100644 index 0000000..8b67d7e --- /dev/null +++ b/latency-plugin/latency/node.c @@ -0,0 +1,599 @@ +/* + * Copyright (c) 2015 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include + +/* Register the latency node */ +vlib_node_registration_t latency_node; + +/* Used to display LATENCY packets in the packet trace */ +typedef struct { + u16 src_port; + u16 dst_port; + u32 new_src_ip; + u32 new_dst_ip; + u16 type; + u32 pkt_count; +} latency_trace_t; + +/* packet trace format function */ +static u8 * format_latency_trace (u8 * s, va_list * args) { + /* Ignore two first arguments */ + CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *); + CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *); + + latency_trace_t * t = va_arg (*args, latency_trace_t *); + + const char * typeNames[] = {"TCP", "QUIC", "PLUS"}; + + /* show LATENCY packet */ + s = format (s, "LATENCY packet: type: %s\n", typeNames[t->type]); + s = format (s, " src port: %u, dst port: %u\n", t->src_port, t->dst_port); + s = format (s, " (new) src ip: %u, (new) dst ip: %u\n", t->new_src_ip, t->new_dst_ip); + s = format (s, " pkt number in flow: %u\n", t->pkt_count); + + return s; +} + +/* Current implementation does not drop any packets */ +#define foreach_latency_error \ +_(TEMP, "Currently not used") + +typedef enum { +#define _(sym,str) LATENCY_ERROR_##sym, + foreach_latency_error +#undef _ + LATENCY_N_ERROR, +} latency_error_t; + + +static char * latency_error_strings[] = { +#define _(sym,string) string, + foreach_latency_error +#undef _ +}; + +/* Protocols */ +#define UDP_PROTOCOL 17 +#define TCP_PROTOCOL 6 + +/* Header sizes in bytes */ +#define SIZE_IP4 20 +#define SIZE_UDP 8 +#define SIZE_TCP 20 +#define SIZE_QUIC_MIN 3 +#define SIZE_PLUS 20 +#define SIZE_PLUS_EXT_HELLO 3 + +/* QUIC bits */ +#define IS_LONG 0x80 +#define HAS_ID 0x40 +#define KEY_FLAG 0x20 +#define LATENCY_TYPE 0x1F +#define SIZE_TYPE 1 + +/* Only true for current pinq implementation (IETF draft 05) + https://github.com/pietdevaere/minq */ +#define P_NUMBER_8 0x01 +#define P_NUMBER_16 0x02 +#define P_NUMBER_32 0x03 + +#define SIZE_NUMBER_8 1 +#define SIZE_NUMBER_16 2 +#define SIZE_NUMBER_32 4 + +#define SIZE_ID 8 +#define SIZE_VERSION 4 +#define SIZE_LATENCY_SPIN 1 + +/* For reserved bits + * spin in data_offset_and_reserved 00001110 */ +#define TCP_LATENCY_MASK 0x0E +#define TCP_LATENCY_SHIFT 1 + +/* Timeout values (in 100ms) */ +#define TIMEOUT 300 + +/* PLUS timeouts */ +#define TO_IDLE 100 +#define TO_ASSOCIATED 30 +#define TO_STOP 20 + +/* We run before IP4_lookup node */ +typedef enum { + IP4_LOOKUP, + LATENCY_N_NEXT, +} latency_next_t; + +/** + * @brief Main loop function + * */ +static uword +latency_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node, + vlib_frame_t * frame) { + + u32 n_left_from, * from, * to_next; + latency_next_t next_index; + + from = vlib_frame_vector_args (frame); + n_left_from = frame->n_vectors; + next_index = node->cached_next_index; + + while (n_left_from > 0) { + + u32 n_left_to_next; + + vlib_get_next_frame (vm, node, next_index, + to_next, n_left_to_next); + + /* Currently, only single loop implemented + * TODO: implement double loop */ + while (n_left_from > 0 && n_left_to_next > 0) { + + /* Advance timer wheel */ + expire_timers(vlib_time_now (vm)); + + u32 bi0; + vlib_buffer_t * b0; + u32 next0 = 0; + + /* speculatively enqueue b0 to the current next frame */ + bi0 = from[0]; + to_next[0] = bi0; + from += 1; + to_next += 1; + n_left_from -= 1; + n_left_to_next -= 1; + + b0 = vlib_get_buffer (vm, bi0); + + /* Keeps track of all the buffer movement */ + u8 total_advance = 0; + bool make_measurement = true; + bool is_udp = true; + + /* Contains TCP, QUIC or PLUS session */ + latency_session_t * session = NULL; + + udp_header_t * udp0 = NULL; + tcp_header_t * tcp0 = NULL; + + if (PREDICT_TRUE(b0->current_length >= SIZE_IP4)) { + + /* Get IP4 header */ + // TODO: add support for IP options and IPv6 headers + ip4_header_t *ip0 = vlib_buffer_get_current(b0); + vlib_buffer_advance (b0, SIZE_IP4); + total_advance += SIZE_IP4; + + /* Ignore IPv6 packets */ + if (PREDICT_FALSE((ip0->ip_version_and_header_length & 0xF0) == 0x60)) { + goto skip_packet; + } + + if (ip0->protocol == UDP_PROTOCOL && b0->current_length >= SIZE_UDP) { + /* Get UDP header */ + udp0 = vlib_buffer_get_current(b0); + vlib_buffer_advance (b0, SIZE_UDP); + total_advance += SIZE_UDP; + is_udp = true; + + /* QUIC "detection", see if either endpoint is on the QUIC_PORT */ + if (is_quic(udp0->src_port, udp0->dst_port) && + b0->current_length >= SIZE_QUIC_MIN) { + + /* Get QUIC header */ + u64 connection_id; + u32 packet_number, CLIB_UNUSED(latency_version); + u8 *type = vlib_buffer_get_current(b0); + + /* LONG HEADER */ + /* We expect most packets to have the short header */ + if (PREDICT_FALSE(*type & IS_LONG)) { + vlib_buffer_advance(b0, SIZE_TYPE); + total_advance += SIZE_TYPE; + + /* Get connection ID */ + u64 *temp_id = vlib_buffer_get_current(b0); + connection_id = clib_net_to_host_u64(*temp_id); + vlib_buffer_advance(b0, SIZE_ID); + total_advance += SIZE_ID; + + /* Get packet number PN */ + u32* temp_pn = vlib_buffer_get_current(b0); + packet_number = clib_net_to_host_u32(*temp_pn); + vlib_buffer_advance(b0, SIZE_NUMBER_32); + total_advance += SIZE_NUMBER_32; + + /* Get version */ + u32 *temp_version = vlib_buffer_get_current(b0); + latency_version = clib_net_to_host_u32(*temp_version); + vlib_buffer_advance(b0, SIZE_VERSION); + total_advance += SIZE_VERSION; + + /* SHORT HEADER */ + } else { + vlib_buffer_advance (b0, SIZE_TYPE); + total_advance += SIZE_TYPE; + + /* No latency version in the short header */ + latency_version = 0; + + /* Get connection ID */ + connection_id = 0; + + /* Only true for current pinq implementation (IETF draft 05) + * For newest IETF draft (08) HAS_ID meaning is reversed */ + if (*type & HAS_ID && b0->current_length >= SIZE_ID) { + u64 *temp_id = vlib_buffer_get_current(b0); + connection_id = clib_net_to_host_u64(*temp_id); + + vlib_buffer_advance (b0, SIZE_ID); + total_advance += SIZE_ID; + } + + /* Get the packet number */ + switch (*type & LATENCY_TYPE) { + case P_NUMBER_8: + if (PREDICT_TRUE(b0->current_length >= SIZE_NUMBER_8)) { + u8 *temp_8 = vlib_buffer_get_current(b0); + packet_number = *temp_8; + vlib_buffer_advance (b0, SIZE_NUMBER_8); + total_advance += SIZE_NUMBER_8; + } else { + goto skip_packet; + } + break; + + case P_NUMBER_16: + if (PREDICT_TRUE(b0->current_length >= SIZE_NUMBER_16)) { + u16 *temp_16 = vlib_buffer_get_current(b0); + packet_number = clib_net_to_host_u16(*temp_16); + vlib_buffer_advance (b0, SIZE_NUMBER_16); + total_advance += SIZE_NUMBER_16; + } else { + goto skip_packet; + } + break; + + case P_NUMBER_32: + if (PREDICT_TRUE(b0->current_length >= SIZE_NUMBER_32)) { + u32 *temp_32 = vlib_buffer_get_current(b0); + packet_number = clib_net_to_host_u32(*temp_32); + vlib_buffer_advance (b0, SIZE_NUMBER_32); + total_advance += SIZE_NUMBER_32; + } else { + goto skip_packet; + } + break; + + default: + goto skip_packet; + } + } + + u8 measurement; + if (PREDICT_TRUE(b0->current_length >= SIZE_LATENCY_SPIN)) { + u8 *temp_m = vlib_buffer_get_current(b0); + measurement = *temp_m; + } else { + goto skip_packet; + } + + latency_key_t kv; + + make_key(&kv, ip0->src_address.as_u32, ip0->dst_address.as_u32, udp0->src_port, + udp0->dst_port, ip0->protocol); + + /* Try to get a session for the key */ + session = get_session_from_key(&kv); + + /* Only for the first packet of a flow we do not have a matching session */ + if (PREDICT_FALSE(!session)) { + + /* Only consider flows for known dst (dst port) */ + u32 new_dst_ip; + get_new_dst(&new_dst_ip, udp0->dst_port); + if (!new_dst_ip) { + goto skip_packet; + } + + /* Create new session */ + u32 index = create_session(P_QUIC); + session = get_latency_session(index); + + /* Save key for reverse lookup */ + session->key = kv.as_u64; + + /* Initialize values */ + session->quic->id = connection_id; + session->init_src_port = udp0->src_port; + session->init_src_ip = ip0->src_address.as_u32; + session->new_dst_ip = new_dst_ip; + + update_state(&kv, session->index); + + /* Packets in reverse direction will get same session + * Necessary because we rewrite the IPs */ + make_key(&kv, 0, new_dst_ip, udp0->src_port, + udp0->dst_port, ip0->protocol); + update_state(&kv, session->index); + + session->key_reverse = kv.as_u64; + + session->pkt_count = 1; + + start_timer(session, TIMEOUT); + } + + /* Do latency RTT estimation */ + update_quic_rtt_estimate(vm, session->quic, vlib_time_now (vm), + udp0->src_port, session->init_src_port, measurement, + packet_number, session->pkt_count); + + /* PLUS packet */ + } else { + if (b0->current_length >= SIZE_PLUS) { + plus_header_t *plus0 = vlib_buffer_get_current(b0); + vlib_buffer_advance (b0, SIZE_PLUS); + total_advance += SIZE_PLUS; + if (PREDICT_TRUE((plus0->magic_and_flags & MAGIC_MASK) == MAGIC)) { + latency_key_t kv; + make_plus_key(&kv, ip0->src_address.as_u32, ip0->dst_address.as_u32, + udp0->src_port, udp0->dst_port, ip0->protocol, + plus0->CAT); + + session = get_session_from_key(&kv); + + if (PREDICT_FALSE(!session)) { + + /* Only consider flows for known dst (dst port) */ + u32 new_dst_ip; + get_new_dst(&new_dst_ip, udp0->dst_port); + if (!new_dst_ip) { + goto skip_packet; + } + + /* Create new session */ + u32 index = create_session(P_PLUS); + session = get_latency_session(index); + + /* Save key for reverse lookup */ + session->key = kv.as_u64; + session->plus->cat = plus0->CAT; + + /* Initialize values */ + session->init_src_port = udp0->src_port; + session->init_src_ip = ip0->src_address.as_u32; + session->new_dst_ip = new_dst_ip; + update_state(&kv, session->index); + + /* Packets in reverse direction will get same session + * Necessary because we rewrite the IPs */ + make_plus_key(&kv, 0, new_dst_ip, udp0->src_port, + udp0->dst_port, ip0->protocol, + plus0->CAT); + + update_state(&kv, session->index); + + session->key_reverse = kv.as_u64; + + session->pkt_count = 1; + + start_timer(session, TIMEOUT); + } + + /* Do PLUS PSN PSE RTT estimation */ + update_plus_rtt_estimate(vm, session->plus, vlib_time_now (vm), + udp0->src_port, session->init_src_port, + clib_net_to_host_u32(plus0->PSN), + clib_net_to_host_u32(plus0->PSE), + clib_net_to_host_u64(plus0->CAT), + session->pkt_count); + + /* Handle extended header */ + plus_ext_hop_c_h_t *plus_ext_hop_c0; + + /* Enough space for extended header */ + if ((plus0->magic_and_flags & EXTENDED) && b0->current_length + >= SIZE_PLUS + SIZE_PLUS_EXT_HELLO) { + plus_ext_hop_c0 = vlib_buffer_get_current(b0); + + u8 ii = plus_ext_hop_c0->PCF_len_and_II & 0x03; + /* "Hop count" header */ + if (plus_ext_hop_c0->PCF_type == 1 && ii == 0) { + plus_ext_hop_c0->PCF_hop_c += 1; + } + } + + } + } + } + } else { + /* TCP spin and TS */ + if (ip0->protocol == TCP_PROTOCOL && b0->current_length >= SIZE_TCP) { + + /* Get TCP header */ + tcp0 = vlib_buffer_get_current(b0); + vlib_buffer_advance (b0, SIZE_TCP); + total_advance += SIZE_TCP; + is_udp = false; + + /* For timestamp values */ + u32 tsval = 0; + u32 tsecr = 0; + + int parse_ret = tcp_options_parse_mod(tcp0, &tsval, &tsecr); + + if (parse_ret) { + goto skip_packet; + } + + /* Ignore SYN ACK packets, no VEC */ + if (PREDICT_FALSE(tcp_syn(tcp0) && tcp_ack(tcp0))) { + make_measurement = false; + } + + /* VEC data from reserved space */ + u8 measurement = (tcp0->data_offset_and_reserved & TCP_LATENCY_MASK) + >> TCP_LATENCY_SHIFT; + + latency_key_t kv; + make_key(&kv, ip0->src_address.as_u32, ip0->dst_address.as_u32, + tcp0->src_port, tcp0->dst_port, ip0->protocol); + + session = get_session_from_key(&kv); + + /* Only first packet in a flow should not have a session */ + if (PREDICT_FALSE(!session)) { + + /* Only consider flows for known dst (dst port) */ + u32 new_dst_ip; + get_new_dst(&new_dst_ip, tcp0->dst_port); + if (!new_dst_ip) { + goto skip_packet; + } + + /* Create new session */ + u32 index = create_session(P_TCP); + session = get_latency_session(index); + + /* Save key for reverse lookup */ + session->key = kv.as_u64; + + /* Initialize values */ + session->init_src_port = tcp0->src_port; + session->init_src_ip = ip0->src_address.as_u32; + session->new_dst_ip = new_dst_ip; + update_state(&kv, session->index); + + /* Packets in reverse direction will get same session + * Necessary because we rewrite the IPs */ + make_key(&kv, 0, new_dst_ip, tcp0->src_port, + tcp0->dst_port, ip0->protocol); + update_state(&kv, session->index); + + session->key_reverse = kv.as_u64; + + session->pkt_count = 1; + + start_timer(session, TIMEOUT); + } + + /* Do timestamp and latency RTT estimation */ + if (PREDICT_TRUE(make_measurement)) { + update_tcp_rtt_estimate(vm, session->tcp, vlib_time_now (vm), + tcp0->src_port, session->init_src_port, measurement, + tsval, tsecr, session->pkt_count, + clib_net_to_host_u32(tcp0->seq_number)); + } + } + } + + if (!session) { + goto skip_packet; + } + + /* Keep track of packets for each flow */ + session->pkt_count ++; + + /* NAT-like IP translation */ + if (!ip_nat_translation(ip0, session->init_src_ip, session->new_dst_ip)) { + goto skip_packet; + } + + /* Update UDP and IP checksum */ + if (is_udp) { + udp0->checksum = 0; + udp0->checksum = ip4_tcp_udp_compute_checksum (vm, b0, ip0); + } else { + tcp0->checksum = 0; + tcp0->checksum = ip4_tcp_udp_compute_checksum (vm, b0, ip0); + } + ip0->checksum = ip4_header_checksum (ip0); + + /* Currently only ACTIVE and ERROR state + * The timer is just used to free memory if flow is no longer observed + * PLUS states not implemented at the moment */ + switch ((latency_state_t) session->state) { + case LATENCY_STATE_ACTIVE: + update_timer(session, TIMEOUT); + break; + + case LATENCY_STATE_ERROR: + break; + + default: + break; + } + + /* If packet trace is active */ + if (PREDICT_FALSE((node->flags & VLIB_NODE_FLAG_TRACE) + && (b0->flags & VLIB_BUFFER_IS_TRACED))) { + + latency_trace_t *t = vlib_add_trace (vm, node, b0, sizeof (*t)); + if (is_udp) { + t->src_port = clib_net_to_host_u16(udp0->src_port); + t->dst_port = clib_net_to_host_u16(udp0->dst_port); + } else { + t->src_port = clib_net_to_host_u16(tcp0->src_port); + t->dst_port = clib_net_to_host_u16(tcp0->dst_port); + } + t->new_src_ip = clib_net_to_host_u32(ip0->src_address.as_u32); + t->new_dst_ip = clib_net_to_host_u32(ip0->dst_address.as_u32); + t->type = session->p_type; + t->pkt_count = session->pkt_count; + } + + /* Move buffer pointer back such that next node gets expected position */ +skip_packet: + vlib_buffer_advance (b0, -total_advance); + } + + /* verify speculative enqueue, maybe switch current next frame */ + vlib_validate_buffer_enqueue_x1 (vm, node, next_index, to_next, + n_left_to_next, bi0, next0); + } + + vlib_put_next_frame (vm, node, next_index, n_left_to_next); + } + + return frame->n_vectors; +} + + +VLIB_REGISTER_NODE (latency_node) = { + .function = latency_node_fn, + .name = "latency", + .vector_size = sizeof (u32), + .format_trace = format_latency_trace, + .type = VLIB_NODE_TYPE_INTERNAL, + + .n_errors = ARRAY_LEN(latency_error_strings), + .error_strings = latency_error_strings, + + .n_next_nodes = LATENCY_N_NEXT, + + /* Next node is the ip4-lookup node */ + .next_nodes = { + [IP4_LOOKUP] = "ip4-lookup", + }, +}; diff --git a/plus-plugin/plus/plus_packet.h b/latency-plugin/latency/plus_packet.h similarity index 100% rename from plus-plugin/plus/plus_packet.h rename to latency-plugin/latency/plus_packet.h diff --git a/plus-plugin/plus/node.c b/plus-plugin/plus/node.c deleted file mode 100644 index 9c43b50..0000000 --- a/plus-plugin/plus/node.c +++ /dev/null @@ -1,342 +0,0 @@ -/* - * Copyright (c) 2015 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include -#include -#include -#include -#include -#include - -/* Register the plus node */ -vlib_node_registration_t plus_node; - -/* Used to display PLUS packets in the packet trace */ -typedef struct { - u64 cat; - u32 psn; - u32 pse; - u8 state; - u32 stop; - u32 extended; - u8 pcf_type; - u8 pcf_len; - u8 pcf_ii; - u8 pcf_value; -} plus_trace_t; - -/* packet trace format function */ -static u8 * format_plus_trace (u8 * s, va_list * args) -{ - /* Ignore two first arguments */ - CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *); - CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *); - - plus_trace_t * t = va_arg (*args, plus_trace_t *); - - /* Show PLUS packet */ - s = format (s, "PLUS packet: CAT: %lu, PSN: %u, PSE: %u\n", t->cat, t->psn, t->pse); - const char * stateNames[] = {"ZERO", "UNIFLOW", "ASSOCIATING", "ASSOCIATED", - "STOPWAIT", "STOPPING", "ERROR"}; - s = format (s, " Current state: %s, stop bit: %u, extended bit: %u\n", - stateNames[t->state], t->stop ? 1 : 0, t->extended ? 1 : 0); - if (t->pcf_type) - s = format (s, " PCF type: %u, PCF len: %u, PCF II: %u, PCF hop count value: %u", - t->pcf_type, t->pcf_len, t->pcf_ii, t->pcf_value); - return s; -} - -/* Current implementation does not drop any packets */ -#define foreach_plus_error \ -_(TEMP, "Currently not used") - -typedef enum { -#define _(sym,str) PLUS_ERROR_##sym, - foreach_plus_error -#undef _ - PLUS_N_ERROR, -} plus_error_t; - - -static char * plus_error_strings[] = { -#define _(sym,string) string, - foreach_plus_error -#undef _ -}; - -/* Header sizes in bytes */ -#define SIZE_IP4 20 -#define SIZE_UDP 8 -#define SIZE_PLUS 20 -#define SIZE_PLUS_EXT_HELLO 3 - -/* Timeout values (in 100ms) */ -#define TO_IDLE 100 -#define TO_ASSOCIATED 30 -#define TO_STOP 20 - -/* We run before ip4-lookup node */ -typedef enum { - IP4_LOOKUP, - PLUS_N_NEXT, -} plus_next_t; - -/** - * @brief Main loop function - * */ -static uword -plus_node_fn (vlib_main_t * vm, - vlib_node_runtime_t * node, - vlib_frame_t * frame) { - u32 n_left_from, * from, * to_next; - plus_next_t next_index; - - from = vlib_frame_vector_args (frame); - n_left_from = frame->n_vectors; - next_index = node->cached_next_index; - - while (n_left_from > 0) { - - u32 n_left_to_next; - - vlib_get_next_frame (vm, node, next_index, - to_next, n_left_to_next); - - /* Single loop */ - while (n_left_from > 0 && n_left_to_next > 0) { - /* Advance timer wheel */ - expire_timers(vlib_time_now (vm)); - - u32 bi0; - vlib_buffer_t * b0; - u32 next0 = 0; - - /* speculatively enqueue b0 to the current next frame */ - bi0 = from[0]; - to_next[0] = bi0; - from += 1; - to_next += 1; - n_left_from -= 1; - n_left_to_next -= 1; - - b0 = vlib_get_buffer (vm, bi0); - - /* Keeps track of all the buffer movement */ - u8 total_advance = 0; - /* If we have an extended header */ - bool ext_hop_c = false; - - /* Currently, most packets should be PLUS packets */ - if (PREDICT_TRUE(b0->current_length >= SIZE_IP4 + SIZE_UDP + SIZE_PLUS)) { - /* Get IP4 header */ - ip4_header_t *ip0 = vlib_buffer_get_current(b0); - - vlib_buffer_advance (b0, SIZE_IP4); - total_advance += SIZE_IP4; - - /* Get UDP header */ - udp_header_t *udp0 = vlib_buffer_get_current(b0); - - vlib_buffer_advance (b0, SIZE_UDP); - total_advance += SIZE_UDP; - - /* Get PLUS header */ - plus_header_t *plus0 = vlib_buffer_get_current(b0); - - /* Most packets should have valid magic number. - Masks and so on defined in plus.h */ - if (PREDICT_TRUE((plus0->magic_and_flags & MAGIC_MASK) == MAGIC)) { - /* Stores the corresponding key */ - plus_key_t kv; - make_key(&kv, &ip0->src_address, &ip0->dst_address, udp0->src_port, - udp0->dst_port, ip0->protocol , plus0->CAT); - - /* Try to get a session for the key */ - plus_session_t * session = get_session_from_key(&kv); - - /* Only for the first packet of a flow we do not have a matching session */ - if (PREDICT_FALSE(!session)) { - /* Create new session */ - u32 index = create_session(plus0->CAT); - session = get_plus_session(index); - - /* Save key for reverse lookup */ - session->key[0] = kv.as_u64[0]; - session->key[1] = kv.as_u64[1]; - - /* Initialize values */ - session->psn_src = plus0->PSN; - session->psn_dst = 0; - session->src = ip0->src_address.as_u32; - update_state(&kv, session->index); - session->pkt_count = 0; - session->time_src = 0; - session->time_dst = 0; - } - - /* Keep track of packets for each flow */ - session->pkt_count ++; - - u32 src_ip = ip0->src_address.as_u32; - u32 psn = clib_net_to_host_u32(plus0->PSN); - u32 pse = clib_net_to_host_u32(plus0->PSE); - - /* RTT estimation update */ - update_rtt_estimate(session, vlib_time_now (vm), src_ip, psn, pse); - - /* State update - * Delayed/reordered packets do currently reset the timers. - * */ - switch ((plus_state_t) session->state) { - case PLUS_STATE_ZERO: - session->state = PLUS_STATE_UNIFLOW; - start_timer(session, TO_IDLE); - /* Save direction for future state transitions */ - session->src_ip_dir = src_ip; - break; - - case PLUS_STATE_UNIFLOW: - update_timer(session, TO_IDLE); - - /* Packet observation in other direction */ - if (session->src_ip_dir != src_ip) - { - session->state = PLUS_STATE_ASSOCIATING; - session->psn_associating = psn; - } - break; - - case PLUS_STATE_ASSOCIATING: - update_timer(session, TO_IDLE); - - /* Confirmation */ - if (session->src_ip_dir == src_ip && comes_after_u32( - pse, session->psn_associating)) { - session->state = PLUS_STATE_ASSOCIATED; - } - break; - - case PLUS_STATE_ASSOCIATED: - update_timer(session, TO_ASSOCIATED); - - /* Unlikely that the flow ends */ - if (PREDICT_FALSE(plus0->magic_and_flags & STOP)) { - session->state = PLUS_STATE_STOPWAIT; - session->src_ip_dir = src_ip; - session->psn_stopwait = psn; - } - break; - - case PLUS_STATE_STOPWAIT: - update_timer(session, TO_ASSOCIATED); - - /* Stop bit in other direction and matching PSE value */ - if (plus0->magic_and_flags & STOP && session->src_ip_dir != src_ip - && session->psn_stopwait == pse) { - /* Timer is not reset */ - update_timer(session, TO_STOP); - session->state = PLUS_STATE_STOPPING; - } - break; - - case PLUS_STATE_STOPPING: - break; - - default: - break; - } - - /* Handle extended header */ - plus_ext_hop_c_h_t *plus_ext_hop_c0; - - /* Enough space for extended header */ - if ((plus0->magic_and_flags & EXTENDED) && b0->current_length - >= SIZE_PLUS + SIZE_PLUS_EXT_HELLO) { - vlib_buffer_advance (b0, SIZE_PLUS); - total_advance += SIZE_PLUS; - plus_ext_hop_c0 = vlib_buffer_get_current(b0); - - u8 ii = plus_ext_hop_c0->PCF_len_and_II & 0x03; - /* "Hop count" header */ - if (plus_ext_hop_c0->PCF_type == 1 && ii == 0) { - plus_ext_hop_c0->PCF_hop_c += 1; - ext_hop_c = true; - } - } - - /* If packet trace is active */ - if (PREDICT_FALSE((node->flags & VLIB_NODE_FLAG_TRACE) - && (b0->flags & VLIB_BUFFER_IS_TRACED))) { - /* Set correct trace value */ - plus_trace_t *t = vlib_add_trace (vm, node, b0, sizeof (*t)); - t->cat = clib_net_to_host_u64(plus0->CAT); - t->psn = psn; - t->pse = pse; - t->stop = plus0->magic_and_flags & STOP; - t->extended = plus0->magic_and_flags & EXTENDED; - t->state = session->state; - if (ext_hop_c) { - t->pcf_type = plus_ext_hop_c0->PCF_type; - t->pcf_len = (plus_ext_hop_c0->PCF_len_and_II & 0xFC) >> 2; - t->pcf_ii = plus_ext_hop_c0->PCF_len_and_II & 0x03; - t->pcf_value = plus_ext_hop_c0->PCF_hop_c; - } else { - t->pcf_type = 0; - } - } - } - - /* Move buffer pointer back such that ip4-lookup get expected position */ - vlib_buffer_advance (b0, -total_advance); - - /* Update UDP checksum if extended header */ - if (ext_hop_c) { - /* To make sure that old checksum is not used in computation of new one */ - udp0->checksum = 0; - udp0->checksum = ip4_tcp_udp_compute_checksum (vm, b0, ip0); - if (udp0->checksum == 0) - udp0->checksum = 0xffff; - } - } - /* verify speculative enqueue, maybe switch current next frame */ - vlib_validate_buffer_enqueue_x1 (vm, node, next_index, - to_next, n_left_to_next, - bi0, next0); - } - - vlib_put_next_frame (vm, node, next_index, n_left_to_next); - } - - return frame->n_vectors; -} - -VLIB_REGISTER_NODE (plus_node) = { - .function = plus_node_fn, - .name = "plus", - .vector_size = sizeof (u32), - .format_trace = format_plus_trace, - .type = VLIB_NODE_TYPE_INTERNAL, - - .n_errors = ARRAY_LEN(plus_error_strings), - .error_strings = plus_error_strings, - - .n_next_nodes = PLUS_N_NEXT, - - /* Next node is the ip4-lookup node */ - .next_nodes = { - [IP4_LOOKUP] = "ip4-lookup", - }, -}; diff --git a/plus-plugin/plus/plus.c b/plus-plugin/plus/plus.c deleted file mode 100644 index 13fa05a..0000000 --- a/plus-plugin/plus/plus.c +++ /dev/null @@ -1,431 +0,0 @@ -/* - * Copyright (c) 2015 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * @file - * @brief Plus plugin, plugin API / trace / CLI handling. - */ - -#include -#include -#include - -#include -#include -#include - -/* define message IDs */ -#include - -/* define message structures */ -#define vl_typedefs -#include -#undef vl_typedefs - -/* define generated endian-swappers */ -#define vl_endianfun -#include -#undef vl_endianfun - -/* instantiate all the print functions we know about */ -#define vl_print(handle, ...) vlib_cli_output (handle, __VA_ARGS__) -#define vl_printfun -#include -#undef vl_printfun - -/* Get the API version number */ -#define vl_api_version(n,v) static u32 api_version=(v); -#include -#undef vl_api_version - -#define REPLY_MSG_ID_BASE pm->msg_id_base -#include - -/* List of message types that this plugin understands */ -#define foreach_plus_plugin_api_msg \ -_(PLUS_ENABLE_DISABLE, plus_enable_disable) - -/* *INDENT-OFF* */ -VLIB_PLUGIN_REGISTER () = { - .version = PLUS_PLUGIN_BUILD_VER, - .description = "PLUS middlebox VPP Plugin", -}; -/* *INDENT-ON* */ - -/** - * @brief Enable/disable the plugin. - * - * Action function shared between message handler and debug CLI. - */ - -int plus_enable_disable (plus_main_t * pm, u32 sw_if_index, - int enable_disable) -{ - vnet_sw_interface_t * sw; - int rv = 0; - - /* Utterly wrong? */ - if (pool_is_free_index (pm->vnet_main->interface_main.sw_interfaces, - sw_if_index)) - return VNET_API_ERROR_INVALID_SW_IF_INDEX; - - /* Not a physical port? */ - sw = vnet_get_sw_interface (pm->vnet_main, sw_if_index); - if (sw->type != VNET_SW_INTERFACE_TYPE_HARDWARE) - return VNET_API_ERROR_INVALID_SW_IF_INDEX; - - vnet_feature_enable_disable ("ip4-unicast", "plus", - sw_if_index, enable_disable, 0, 0); - return rv; -} - -static clib_error_t * -plus_enable_disable_command_fn (vlib_main_t * vm, - unformat_input_t * input, - vlib_cli_command_t * cmd) -{ - plus_main_t * pm = &plus_main; - u32 sw_if_index = ~0; - int enable_disable = 1; - - int rv; - - while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT) { - if (unformat (input, "disable")) - enable_disable = 0; - else if (unformat (input, "%U", unformat_vnet_sw_interface, - pm->vnet_main, &sw_if_index)) - ; - else - break; - } - - if (sw_if_index == ~0) - return clib_error_return (0, "Please specify an interface..."); - - rv = plus_enable_disable (pm, sw_if_index, enable_disable); - - switch(rv) { - case 0: - break; - - case VNET_API_ERROR_INVALID_SW_IF_INDEX: - return clib_error_return - (0, "Invalid interface, only works on physical ports"); - break; - - case VNET_API_ERROR_UNIMPLEMENTED: - return clib_error_return (0, "Device driver doesn't support redirection"); - break; - - default: - return clib_error_return (0, "plus_enable_disable returned %d", - rv); - } - return 0; -} - -/** - * @brief format function (print each active flow) - */ -u8 * format_sessions(u8 *s, va_list *args) { - plus_main_t * pm = &plus_main; - const char * stateNames[] = {"ZERO", "UNIFLOW", "ASSOCIATING", "ASSOCIATED", - "STOPWAIT", "STOPPING", "ERROR"}; - s = format(s, "Total flows: %u, total active flows: %u\n", - pm->total_flows, pm->active_flows); - plus_session_t * session; - s = format(s, "=======================================================\n"); - /* Iterate through all pool entries */ - pool_foreach (session, pm->session_pool, ({ - s = format(s, "Flow CAT: %lu, observed packets: %u\n", - clib_net_to_host_u64(session->cat), session->pkt_count); - f64 rtt_estimation = session->rtt_src + session->rtt_dst; - s = format(s, "Current state: %s, estimated RTT: %.*lfs\n", - stateNames[session->state], rtt_estimation, 9); - s = format(s, "=======================================================\n"); - })); - return s; -} - -static clib_error_t * plus_show_stats_fn(vlib_main_t * vm, - unformat_input_t * input, vlib_cli_command_t * cmd) -{ - vl_print(vm, "%U", format_sessions); - return 0; -} - -/** - * @brief CLI command to enable/disable the plus plugin. - */ -VLIB_CLI_COMMAND (sr_content_command, static) = { - .path = "plus", - .short_help = - "plus [disable]", - .function = plus_enable_disable_command_fn, -}; - -/** - * @brief CLI command to show all active flows - */ -VLIB_CLI_COMMAND (sr_content_command_stats, static) = { - .path = "plus stats", - .short_help = "Show PLUS middlebox stats", - .function = plus_show_stats_fn, -}; - -/** - * @brief PLUS API message handler. - */ -static void vl_api_plus_enable_disable_t_handler -(vl_api_plus_enable_disable_t * mp) -{ - vl_api_plus_enable_disable_reply_t * rmp; - plus_main_t * pm = &plus_main; - int rv; - - rv = plus_enable_disable (pm, ntohl(mp->sw_if_index), - (int) (mp->enable_disable)); - - REPLY_MACRO(VL_API_PLUS_ENABLE_DISABLE_REPLY); -} - -/** - * @brief Set up the API message handling tables. - */ -static clib_error_t * -plus_plugin_api_hookup (vlib_main_t *vm) -{ - plus_main_t * pm = &plus_main; -#define _(N,n) \ - vl_msg_api_set_handlers((VL_API_##N + pm->msg_id_base), \ - #n, \ - vl_api_##n##_t_handler, \ - vl_noop_handler, \ - vl_api_##n##_t_endian, \ - vl_api_##n##_t_print, \ - sizeof(vl_api_##n##_t), 1); - foreach_plus_plugin_api_msg; -#undef _ - - return 0; -} - -#define vl_msg_name_crc_list -#include -#undef vl_msg_name_crc_list - -static void -setup_message_id_table (plus_main_t * pm, api_main_t *am) -{ -#define _(id,n,crc) \ - vl_msg_api_add_msg_name_crc (am, #n "_" #crc, id + pm->msg_id_base); - foreach_vl_msg_name_crc_plus; -#undef _ -} - -/** - * @brief create the hash key - */ -void make_key(plus_key_t * kv, ip4_address_t * src_ip, ip4_address_t * dst_ip, - u16 src_p, u16 dst_p, u8 protocol, u64 cat) -{ - kv->s_x_d_ip = src_ip->as_u32 ^ dst_ip->as_u32; - kv->s_x_d_port = src_p ^ dst_p; - kv->protocol = protocol; - kv->cat = cat; -} - -/** - * @brief get session pointer if corresponding key is known - */ -plus_session_t * get_session_from_key(plus_key_t * kv_in) -{ - BVT(clib_bihash_kv) kv, kv_return; - plus_main_t *pm = &plus_main; - BVT(clib_bihash) *bi_table; - bi_table = &pm->plus_table; - kv.key[0] = kv_in->as_u64[0]; - kv.key[1] = kv_in->as_u64[1]; - int rv = BV(clib_bihash_search) (bi_table, &kv, &kv_return); - if (rv != 0) { - /* Key does not exist */ - return 0; - } else { - return get_plus_session(kv_return.value); - } -} - -/** - * @brief update RTT estimations. - * TODO: Simplify using arrays - */ -void update_rtt_estimate(plus_session_t * session, f64 now, u32 src_address, - u32 psn, u32 pse) { - /* Decide direction */ - if (src_address == session->src) { - /* Is the RTT estimation for the last packet completed? */ - if (session->time_src == 0) { - session->psn_src = psn; - session->time_src = now; - } - if (comes_after_u32(pse, session->psn_dst)) { - session->rtt_src = now - session->time_dst; - session->time_dst = 0; - } - } else { - if (session->time_dst == 0) { - session->psn_dst = psn; - session->time_dst = now; - } - if (comes_after_u32(pse, session->psn_src)) { - session->rtt_dst = now - session->time_src; - session->time_src = 0; - } - } -} - -/** - * @brief update the state of the session with the given key - */ -void update_state(plus_key_t * kv_in, uword new_state) -{ - BVT(clib_bihash_kv) kv; - plus_main_t *pm = &plus_main; - BVT(clib_bihash) *bi_table; - bi_table = &pm->plus_table; - kv.key[0] = kv_in->as_u64[0]; - kv.key[1] = kv_in->as_u64[1]; - kv.value = new_state; - BV(clib_bihash_add_del) (bi_table, &kv, 1 /* is_add */); -} - -/** - * @brief create a new session for a new flow - */ -u32 create_session(u64 cat) { - plus_session_t * session; - plus_main_t * pm = &plus_main; - pm->active_flows ++; - pm->total_flows ++; - pool_get (pm->session_pool, session); - memset (session, 0, sizeof (*session)); - /* Correct session index */ - session->index = session - pm->session_pool; - session->state = 0; - session->cat = cat; - return session->index; -} - -/** - * @brief clean session after timeout - */ -void clean_session(u32 index) -{ - plus_main_t * pm = &plus_main; - plus_session_t * session = get_plus_session(index); - - /* If main loop (in node.c) is executed sparsely, it can happen that - * the timer wheel triggers multiple times for the same session. - * We remove/clean the session only the first time. */ - if (session == 0) { - return; - } - pm->active_flows --; - - BVT(clib_bihash_kv) kv; - BVT(clib_bihash) * bi_table; - bi_table = &pm->plus_table; - kv.key[0] = session->key[0]; - kv.key[1] = session->key[1]; - - /* clear hash and pool entry */ - BV(clib_bihash_add_del) (bi_table, &kv, 0 /* is_add */); - pool_put (pm->session_pool, session); -} - -/** - * @brief callback function for expired timer - */ -static void timer_expired_callback(u32 * expired_timers) -{ - int i; - u32 index, timer_id; - - /* Iterate over all expired timers */ - for (i = 0; i < vec_len(expired_timers); i = i+1) - { - /* Extract index and timer wheel id */ - index = expired_timers[i] & 0x7FFFFFFF; - timer_id = expired_timers[i] >> 31; - - /* Only use timer with ID 0 at the moment */ - ASSERT (timer_id == 0); - - clean_session(index); - } -} - -/** - * @brief Initialize the plus plugin. - */ -static clib_error_t * plus_init (vlib_main_t * vm) -{ - plus_main_t * pm = &plus_main; - clib_error_t * error = 0; - u8 * name; - - pm->vnet_main = vnet_get_main (); - name = format (0, "plus_%08x%c", api_version, 0); - - /* Ask for a correctly-sized block of API message decode slots */ - pm->msg_id_base = vl_msg_api_get_msg_ids - ((char *) name, VL_MSG_FIRST_AVAILABLE); - - error = plus_plugin_api_hookup (vm); - - /* Add our API messages to the global name_crc hash table */ - setup_message_id_table (pm, &api_main); - - /* Init bihash */ - BV (clib_bihash_init) (&pm->plus_table, "plus", 2048, 512<<20); - - /* Timer wheel has 2048 slots, so we predefine pool with 2048 entries as well */ - pool_init_fixed(pm->session_pool, 2048); - - /* Init timer wheel with 100ms resolution */ - tw_timer_wheel_init_2t_1w_2048sl (&pm->tw, timer_expired_callback, 100e-3, ~0); - pm->tw.last_run_time = vlib_time_now (vm); - - /* Set counters to zero*/ - pm->total_flows = 0; - pm->active_flows = 0; - - vec_free(name); - - return error; -} - -VLIB_INIT_FUNCTION (plus_init); - -/** - * @brief Hook the PLUS plugin into the VPP graph hierarchy. - */ -VNET_FEATURE_INIT (plus, static) = -{ - /* It runs in the ip4-unicast arc before the ip4-lookup */ - .arc_name = "ip4-unicast", - .node_name = "plus", - .runs_before = VNET_FEATURES ("ip4-lookup"), -}; diff --git a/plus-plugin/plus/plus.h b/plus-plugin/plus/plus.h deleted file mode 100644 index 275c9bb..0000000 --- a/plus-plugin/plus/plus.h +++ /dev/null @@ -1,232 +0,0 @@ -/* - * Copyright (c) 2015 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -/* High-level overview: - * - * Used data structures: - * - A bihash_16_8 (bounded-index extensible hash) - 16 byte key and 8 byte value. - * - A pool is used to save the state for each PLUS flow (fixed sized struct) - * - A timer wheel (2t_1w_2048sl = 2 timers per object, 1 wheel, 2048 slots) - * - * The key in the hash table consist of (XOR is used to match both directions): - * "5 tuple": - * - XOR of src and dst IP - * - XOR of src and dst port - * - protocol - * CAT - * - * The value corresponding to a key (in the hash table) is the pool index - * for the state of the matching PLUS flow. - * - * Besides the actual "state" of the flow we also save e.g. counters, RTT - * estimates, ... - * - * The timer wheel is used to implement the various timeout values in the - * PLUS state machine. If a flow times out, all the state is deleted. - * The timer wheel advances every time the main loop (in node.c) is executed. - * Therefore, if we only observe a few PLUS packets, it can happen that some - * flows are still displayed as "active", even though they are already timed out. - * They will be deleted as soon as the main loop is executed again. - * - * Currently, only one extended header is detected - a hop count of PLUS-aware MBs. - * PCF type = 1, PCF len = 1, PCF II = 0 (not protected), PCF Value = hop count - * The implementation will increase the PCF value by one. - */ - -#ifndef __included_plus_h__ -#define __included_plus_h__ - -#include -#include -#include - -#include -#include -#include - -/* We use the bihash_16_8 hash function*/ -/* 16 byte key and 8 byte value */ -#include - -#include - -/* Timer wheel (2 timers, 1 wheel, 2048 slots) */ -#include - -/* Defines all the PLUS states */ -#define foreach_plus_state \ -_(ZERO, "no flow") \ -_(UNIFLOW, "flow in one direction") \ -_(ASSOCIATING, "also flow in reverse direction") \ -_(ASSOCIATED, "flow confirmed") \ -_(STOPWAIT, "stop signal in one direction") \ -_(STOPPING, "stop signal also in other direction") \ -_(ERROR, "error state") - -typedef enum { -#define _(sym,str) PLUS_STATE_##sym, - foreach_plus_state -#undef _ -} plus_state_t; - -/* Endian correction */ -#if CLIB_ARCH_IS_LITTLE_ENDIAN - #define MAGIC_MASK 0xf0ffffff - #define MAGIC 0xf07f00d8 - #define STOP 0x02000000 - #define EXTENDED 0x01000000 -#else - #define MAGIC_MASK 0xfffffff0 - #define MAGIC 0xd8007ff0 - #define STOP 0x00000002 - #define EXTENDED 0x00000001 -#endif - -/* Max values for advancement checks */ -#define MAX_PSN 4294967296 -#define MAX_SKIP 100 - -/* State for each observed PLUS session */ -typedef struct -{ - u8 state; - /* PSN which moved state to ASSOCIATING */ - u32 psn_associating; - /* PSN which moved state to STOPWAIT */ - u32 psn_stopwait; - u32 src_ip_dir; - u64 cat; - /* Pool index (saved in hash table) */ - u32 index; - u32 timer; - u64 key[2]; - u32 src; - /* For RTT estimations */ - u32 psn_src; - f64 time_src; - f64 rtt_src; - u32 psn_dst; - f64 time_dst; - f64 rtt_dst; - /* Number of observed packets */ - u32 pkt_count; -} plus_session_t; - -/* Main plus struct */ -typedef struct { - /* API message ID base */ - u16 msg_id_base; - - /* convenience */ - vnet_main_t * vnet_main; - - /* Hash table */ - BVT (clib_bihash) plus_table; - - /* Session pool */ - plus_session_t * session_pool; - - /* Counter values*/ - u32 total_flows; - u32 active_flows; - - /* Timer wheel*/ - tw_timer_wheel_2t_1w_2048sl_t tw; -} plus_main_t; - -/* Hash key struct */ -typedef CLIB_PACKED (struct { - union - { - struct - { - /* IP and port XOR */ - u32 s_x_d_ip; - u16 s_x_d_port; - /* Protocol (8 -> 16 bit for better alignment) */ - u16 protocol; - u64 cat; - }; - u64 as_u64[2]; - }; -}) plus_key_t; - -plus_main_t plus_main; - -extern vlib_node_registration_t plus_node; - -u64 get_state(plus_key_t * kv_in); -void update_state(plus_key_t * kv_in, uword new_state); -void make_key(plus_key_t * kv, ip4_address_t * src_ip, ip4_address_t * dst_ip, - u16 src_p, u16 dst_p, u8 protocol, u64 cat); -plus_session_t * get_session_from_key(plus_key_t * kv_in); -u32 create_session(u64 cat); -void update_rtt_estimate(plus_session_t * session, f64 now, u32 src_address, - u32 psn, u32 pse); -void clean_session(u32 index); - -/** - * @brief get plus session for index - */ -always_inline plus_session_t * get_plus_session(u32 index) -{ - if (pool_is_free_index (plus_main.session_pool, index)) - return 0; - return pool_elt_at_index (plus_main.session_pool, index); -} - -/** - * @brief start a timer in the timer wheel - */ -always_inline void start_timer(plus_session_t * session, u64 interval) { - session->timer = tw_timer_start_2t_1w_2048sl (&plus_main.tw, - session->index, 0, interval); -} - -/** - * @brief update the timer - */ -always_inline void update_timer(plus_session_t * session, u64 interval) { - if(session->timer != ~0) { - tw_timer_stop_2t_1w_2048sl (&plus_main.tw, session->timer); - } - session->timer = tw_timer_start_2t_1w_2048sl (&plus_main.tw, - session->index, 0, interval); -} - -/** - * @brief expire timers - */ -always_inline void expire_timers(f64 now) { - tw_timer_expire_timers_2t_1w_2048sl (&plus_main.tw, now); -} - -/** - * @brief check if a sequence number comes logically after another one. - * Supports sequence number overflow. - * Distance must be smaller than MAX_SKIP. - */ -always_inline bool comes_after_u32(u32 now, u32 old) { - i64 ret = (now - old) % MAX_PSN; - if (ret < 0) { - ret += MAX_PSN; - } - return ret < MAX_SKIP; -} - -#define PLUS_PLUGIN_BUILD_VER "0.2" - -#endif /* __included_plus_h__ */ diff --git a/scripts/external_vpp_interface.conf b/scripts/external_vpp_interface.conf deleted file mode 100644 index 2569e3e..0000000 --- a/scripts/external_vpp_interface.conf +++ /dev/null @@ -1,7 +0,0 @@ -set int state GigabitEthernet0/8/0 down -set int state GigabitEthernet0/8/0 up -set int ip address GigabitEthernet0/8/0 192.168.100.2/24 -set int state GigabitEthernet0/9/0 down -set int state GigabitEthernet0/9/0 up -set int ip address GigabitEthernet0/9/0 192.168.101.2/24 - diff --git a/scripts/ns_setup.sh b/scripts/ns_setup.sh deleted file mode 100644 index 0a96937..0000000 --- a/scripts/ns_setup.sh +++ /dev/null @@ -1,41 +0,0 @@ -#!/bin/bash - -if [ $USER != "root" ] ; then - echo "Restarting script with sudo..." - sudo $0 ${*} - exit -fi - -# delete previous incarnations if they exist -ip link del dev veth_vpp1 -ip link del dev veth_vpp2 -ip netns del vpp1 -ip netns del vpp2 - -#create namespaces -ip netns add vpp1 -ip netns add vpp2 - -# create and configure 1st veth pair -ip link add name veth_vpp1 type veth peer name vpp1 -ip link set dev vpp1 up -ip link set dev veth_vpp1 up netns vpp1 - -ip netns exec vpp1 \ - bash -c " - ip link set dev lo up - ip addr add 172.16.1.2/24 dev veth_vpp1 - ip route add 172.16.2.0/24 via 172.16.1.1 -" - -# create and configure 2st veth pair -ip link add name veth_vpp2 type veth peer name vpp2 -ip link set dev vpp2 up -ip link set dev veth_vpp2 up netns vpp2 - -ip netns exec vpp2 \ - bash -c " - ip link set dev lo up - ip addr add 172.16.2.2/24 dev veth_vpp2 - ip route add 172.16.1.0/24 via 172.16.2.1 -" diff --git a/scripts/receiver.py b/scripts/receiver.py deleted file mode 100644 index c9b59f9..0000000 --- a/scripts/receiver.py +++ /dev/null @@ -1,100 +0,0 @@ -import logging -logging.getLogger("scapy.runtime").setLevel(logging.ERROR) - -from scapy.layers.inet import * -from scapy.all import * - -FORMAT = "\x1b[1;32;40m[PLUS_RECEIVER:%(lineno)3s - %(funcName)15s()] %(message)s\x1b[0m" - -logging.basicConfig(format=FORMAT) -log = logging.getLogger('sender') -log.setLevel(logging.DEBUG) - -class PLUS(Packet): - - name = 'PLUS' - fields_desc = [XBitField("magic", 0xd8007ff, 28), - BitField("LoLa", 0, 1), - BitField("RoI", 0, 1), - BitField("stop", 0, 1), - BitField("extended", 0, 1), - BitField("CAT", 0, 64), - BitField("PSN", 0, 32), - BitField("PSE", 0, 32), - ConditionalField(BitField("PCF_Type", 0, 8), lambda pkt:pkt.extended == 1), - ConditionalField(BitField("PCF_Len", 0, 6), lambda pkt:pkt.extended == 1), - ConditionalField(BitField("PCF_II", 0, 2), lambda pkt:pkt.extended == 1), - ConditionalField(BitField("PCF_Value", 0, 8), lambda pkt:pkt.extended == 1)] - -bind_layers(UDP, PLUS) -split_layers(UDP, DNS) - -class PLUSReceiver(Automaton): - - def parse_args(self, sender, receiver, s_port, r_port, **kargs): - Automaton.parse_args(self, **kargs) - self.sender = sender - self.receiver = receiver - self.s_port = s_port - self.r_port = r_port - self.CAT = 0 - self.PSN = random.randint(0, 2**32-1) - self.PSE = 0 - self.counter = 0 - - def send_pkt(self, PSE_to_send=0, is_stop=0, is_L=0, is_R=0, opt=0, opt_type=0, opt_len=0, II=0, opt_value='0'): - pkt = IP(src=self.receiver, dst=self.sender, ttl=30) / UDP(sport=self.s_port, dport=self.r_port) / PLUS( - CAT=self.CAT, PSN=self.PSN, PSE=PSE_to_send, stop=is_stop, LoLa=is_L, RoI=is_R, extended=opt, - PCF_Type=opt_type, PCF_Len=opt_len, PCF_II=II, PCF_Value=(opt_len - len(opt_value)) * '0' + opt_value) - - send(pkt, verbose=0) - - log.debug('Send pkt [CAT: {0}, PSN: {1}, PSE: {2}, LoLa: {5}, RoI {6}, S: {3}, Ext: {4}]'.format( - self.CAT, self.PSN, PSE_to_send, is_stop, opt, is_L, is_R)) - if opt: - log.debug(' Extended: [PCF Type: {0} PCF Len: {1} PCF II: {2} PCF Value {3}]'.format( - opt_type, opt_len, II, opt_value)) - self.PSN = (self.PSN + 1) % 2**32 - - def master_filter(self, pkt): - return (IP in pkt and pkt[IP].src == self.sender and pkt[IP].dst == self.receiver and - PLUS in pkt and ICMP not in pkt) - - @ATMT.state(initial=1) - def S_start(self): - pass - - @ATMT.receive_condition(S_start) - def start_in_main(self, pkt): - plus_in = pkt.getlayer(PLUS) - log.debug('Received packet PSN: {0}'.format(plus_in.PSN)) - if plus_in.extended: - log.debug(' Extended: [PCF Type: {0} PCF Len: {1} PCF II: {2} PCF Value {3}]'.format( - plus_in.PCF_Type, plus_in.PCF_Len, plus_in.PCF_II, plus_in.PCF_Value)) - - self.PSE = pkt.getlayer(PLUS).PSN - self.CAT = pkt.getlayer(PLUS).CAT - - if pkt.getlayer(PLUS).stop == 1: - log.debug('Send stop signal') - self.send_pkt(PSE_to_send=self.PSE, is_stop=1) - else: - self.send_pkt(PSE_to_send=self.PSE) - - raise self.S_start() - - @ATMT.state(final=1) - def S_end(self): - pass - -if __name__ == "__main__": - - sender_IP = "172.16.1.2" - receiver_IP = "172.16.2.2" - s_port = 4000 - r_port = 3000 - - test = PLUSReceiver(sender_IP, receiver_IP, s_port, r_port) - log.debug('receiver start') - - test.run() diff --git a/scripts/sender.py b/scripts/sender.py deleted file mode 100644 index ee26094..0000000 --- a/scripts/sender.py +++ /dev/null @@ -1,105 +0,0 @@ -import time -import logging - -logging.getLogger("scapy.runtime").setLevel(logging.ERROR) - -from scapy.layers.inet import * -from scapy.all import * - -FORMAT = "\x1b[1;32;40m[PLUS_SENDER:%(lineno)3s - %(funcName)15s()] %(message)s\x1b[0m" - -logging.basicConfig(format=FORMAT) -log = logging.getLogger('sender') -log.setLevel(logging.DEBUG) - -class PLUS(Packet): - - name = 'PLUS' - fields_desc = [XBitField("magic", 0xd8007ff, 28), - BitField("LoLa", 0, 1), - BitField("RoI", 0, 1), - BitField("stop", 0, 1), - BitField("extended", 0, 1), - BitField("CAT", 0, 64), - BitField("PSN", 0, 32), - BitField("PSE", 0, 32), - ConditionalField(BitField("PCF_Type", 0, 8), lambda pkt:pkt.extended == 1), - ConditionalField(BitField("PCF_Len", 0, 6), lambda pkt:pkt.extended == 1), - ConditionalField(BitField("PCF_II", 0, 2), lambda pkt:pkt.extended == 1), - ConditionalField(BitField("PCF_Value", 0, 8), lambda pkt:pkt.extended == 1)] - -bind_layers(UDP, PLUS) -split_layers(UDP, DNS) - -class PLUSSender(Automaton): - - def parse_args(self, sender, receiver, s_port, r_port, **kargs): - Automaton.parse_args(self, **kargs) - self.sender = sender - self.receiver = receiver - self.s_port = s_port - self.r_port = r_port - self.CAT = random.randint(0, 2**64-1) - self.PSN = random.randint(0, 2**32-1) - self.PSE = 0 - self.counter = 0 - self.test_counter = 0 - - def send_pkt(self, PSE_to_send=0, is_stop=0, is_L=0, is_R=0, opt=0, - opt_type=0, opt_len=0, II=0, opt_value=0): - pkt = IP(src=self.sender, dst=self.receiver, ttl=30) / UDP( - sport=self.s_port, dport=self.r_port) / PLUS(CAT=self.CAT, - PSN=self.PSN, PSE=PSE_to_send, stop=is_stop, LoLa=is_L, RoI=is_R, - extended=opt, PCF_Type=opt_type, PCF_Len=opt_len, PCF_II=II, - PCF_Value=opt_value%2**8) - - send(pkt, verbose=0) - log.debug('Send pkt [CAT: {0}, PSN: {1}, PSE: {2}, LoLa: {5}, RoI {6}, S: {3}, Ext: {4}]'.format( - self.CAT, self.PSN, PSE_to_send, is_stop, opt, is_L, is_R)) - if opt: - log.debug(' Extended: [PCF Type: {0} PCF Len: {1} PCF II: {2} PCF Value: {3}]'.format( - opt_type, opt_len, II, opt_value%2**8)) - self.PSN = (self.PSN + 1) % 2**32 - self.test_counter += 1 - - def master_filter(self, pkt): - return (IP in pkt and pkt[IP].src == self.receiver and pkt[IP].dst == self.sender and - PLUS in pkt and ICMP not in pkt) - - @ATMT.state(initial=1) - def S_start(self): - - if self.test_counter == 10: - self.send_pkt(PSE_to_send=self.PSE, is_stop=1, opt=1, opt_type=1, opt_len=1, - II=0, opt_value=self.test_counter) - raise self.S_end() - else: - self.send_pkt(PSE_to_send=self.PSE, opt=1, opt_type=1, opt_len=1, II=0, - opt_value=self.test_counter) - time.sleep(0.3) - - @ATMT.timeout(S_start, 1) - def timeout_start(self): - raise self.S_start() - - @ATMT.state(final=1) - def S_end(self): - pass - - @ATMT.receive_condition(S_start) - def pkt_in_init(self, pkt): - self.PSE = pkt.getlayer(PLUS).PSN - log.debug('Got answer from receiver, PSE: {0}'.format(self.PSE)) - raise self.S_start() - -if __name__ == "__main__": - - sender_IP = "172.16.1.2" - receiver_IP = "172.16.2.2" - s_port = 3000 - r_port = 4000 - - test = PLUSSender(sender_IP, receiver_IP, s_port, r_port) - log.debug('sender start') - - test.run() diff --git a/scripts/vpp_interface.conf b/scripts/vpp_interface.conf deleted file mode 100644 index ae34ab9..0000000 --- a/scripts/vpp_interface.conf +++ /dev/null @@ -1,10 +0,0 @@ -create host-interface name vpp1 -create host-interface name vpp2 - -set int state host-vpp1 up -set int state host-vpp2 up -set int ip address host-vpp1 172.16.1.1/24 -set int ip address host-vpp2 172.16.2.1/24 - -ip probe-neighbor 172.16.1.2 host-vpp1 -ip probe-neighbor 172.16.2.2 host-vpp2 diff --git a/vagrant/Vagrantfile b/vagrant/Vagrantfile index 658066d..ee29292 100644 --- a/vagrant/Vagrantfile +++ b/vagrant/Vagrantfile @@ -24,9 +24,9 @@ Vagrant.configure(2) do |config| vb.memory = "4096" vb.cpus = "2" - # rsync the vpp-mb directory if provision hasn't happened yet + # rsync the vpp-latency-mb directory if provision hasn't happened yet unless File.exist? (".vagrant/machines/default/virtualbox/action_provision") - config.vm.synced_folder "../", "/home/vagrant/plus-mb", type: "rsync", + config.vm.synced_folder "../", "/home/vagrant/vpp-latency-mb", type: "rsync", rsync__auto: false, rsync__exclude: [ "vagrant*" ] end diff --git a/vagrant/build.sh b/vagrant/build.sh index eda1b59..8c95bf4 100644 --- a/vagrant/build.sh +++ b/vagrant/build.sh @@ -5,7 +5,7 @@ echo "deb [trusted=yes] https://nexus.fd.io/content/repositories/fd.io.stable.17 apt-get update apt-get install -y git vpp vpp-lib vpp-dev vpp-dbg vpp-dpdk-dev vpp-dpdk-dkms vpp-plugins vpp-api-java vpp-api-lua vpp-api-python autoconf libtool traceroute python-scapy -# Compile/Install MMB plugin +# Compile/Install latency plugin service vpp stop -(cd /home/vagrant/plus-mb/plus-plugin; autoreconf -fis; ./configure; make; make install) +(cd /home/vagrant/vpp-latency-mb/latency-plugin; autoreconf -fis; ./configure; make; make install)