Skip to content

Commit

Permalink
allow fastrouter subscriptions to use hostnames
Browse files Browse the repository at this point in the history
  • Loading branch information
terencehonles committed Oct 10, 2024
1 parent b738f55 commit 8055cda
Showing 1 changed file with 55 additions and 21 deletions.
76 changes: 55 additions & 21 deletions core/subscription.c
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,12 @@ static void send_subscription(int sfd, char *host, char *message, uint16_t messa
memset(&udp_addr, 0, sizeof(struct sockaddr_in));
udp_addr.sin_family = AF_INET;
udp_addr.sin_port = htons(atoi(udp_port + 1));
udp_addr.sin_addr.s_addr = inet_addr(host);
char *resolved = uwsgi_resolve_ip(host);
if (!resolved) {
uwsgi_error("send_subscription()/socket()");
return;
}
udp_addr.sin_addr.s_addr = inet_addr(resolved);
ret = sendto(fd, message, message_size, 0, (struct sockaddr *) &udp_addr, sizeof(udp_addr));
udp_port[0] = ':';
}
Expand Down Expand Up @@ -652,15 +657,15 @@ static int uwsgi_subscription_ub_fix(struct uwsgi_buffer *ub, uint8_t modifier1,
return 0;
}

static struct uwsgi_buffer *uwsgi_subscription_ub(char *key, size_t keysize, uint8_t modifier1, uint8_t modifier2, uint8_t cmd, char *socket_name, char *sign, char *sni_key, char *sni_crt, char *sni_ca) {
static struct uwsgi_buffer *uwsgi_subscription_ub(char *key, size_t keysize, uint8_t modifier1, uint8_t modifier2, uint8_t cmd, char *address, char *sign, char *sni_key, char *sni_crt, char *sni_ca) {
struct uwsgi_buffer *ub = uwsgi_buffer_new(4096);

// make space for uwsgi header
ub->pos = 4;

if (uwsgi_buffer_append_keyval(ub, "key", 3, key, keysize))
goto end;
if (uwsgi_buffer_append_keyval(ub, "address", 7, socket_name, strlen(socket_name)))
if (uwsgi_buffer_append_keyval(ub, "address", 7, address, strlen(address)))
goto end;

if (uwsgi.subscribe_with_modifier1) {
Expand Down Expand Up @@ -716,16 +721,16 @@ static struct uwsgi_buffer *uwsgi_subscription_ub(char *key, size_t keysize, uin
return NULL;
}

void uwsgi_send_subscription_from_fd(int fd, char *udp_address, char *key, size_t keysize, uint8_t modifier1, uint8_t modifier2, uint8_t cmd, char *socket_name, char *sign, char *sni_key, char *sni_crt, char *sni_ca) {
void uwsgi_send_subscription_from_fd(int fd, char *udp_address, char *key, size_t keysize, uint8_t modifier1, uint8_t modifier2, uint8_t cmd, char *address, char *sign, char *sni_key, char *sni_crt, char *sni_ca) {

if (socket_name == NULL && !uwsgi.sockets)
if (address == NULL && !uwsgi.sockets)
return;

if (!socket_name) {
socket_name = uwsgi.sockets->name;
if (!address) {
address = uwsgi.sockets->name;
}

struct uwsgi_buffer *ub = uwsgi_subscription_ub(key, keysize, modifier1, modifier2, cmd, socket_name, sign, sni_key, sni_crt, sni_ca);
struct uwsgi_buffer *ub = uwsgi_subscription_ub(key, keysize, modifier1, modifier2, cmd, address, sign, sni_key, sni_crt, sni_ca);

if (!ub)
return;
Expand All @@ -735,8 +740,8 @@ void uwsgi_send_subscription_from_fd(int fd, char *udp_address, char *key, size_
}


void uwsgi_send_subscription(char *udp_address, char *key, size_t keysize, uint8_t modifier1, uint8_t modifier2, uint8_t cmd, char *socket_name, char *sign, char *sni_key, char *sni_crt, char *sni_ca) {
uwsgi_send_subscription_from_fd(-1, udp_address, key, keysize, modifier1, modifier2, cmd, socket_name, sign, sni_key, sni_crt, sni_ca);
void uwsgi_send_subscription(char *udp_address, char *key, size_t keysize, uint8_t modifier1, uint8_t modifier2, uint8_t cmd, char *address, char *sign, char *sni_key, char *sni_crt, char *sni_ca) {
uwsgi_send_subscription_from_fd(-1, udp_address, key, keysize, modifier1, modifier2, cmd, address, sign, sni_key, sni_crt, sni_ca);
}

#ifdef UWSGI_SSL
Expand Down Expand Up @@ -837,27 +842,43 @@ void uwsgi_subscribe(char *subscription, uint8_t cmd) {
int keysize = 0;
char *modifier1 = NULL;
int modifier1_len = 0;
char *socket_name = NULL;
char *address = NULL;
char *address_resolved = NULL;
char *udp_address = subscription;
char *udp_port = NULL;
char *subscription_key = NULL;
char *sign = NULL;

// check for explicit socket_name
// Check for an explicit address. This may be a reference to a shared
// socket i.e. "=0=REST", a unix socket "/path/to/socket.sock=REST",
// or a udp socket "server-address:port=REST"
char *equal = strchr(subscription, '=');
if (equal) {
socket_name = subscription;
if (socket_name[0] == '=') {
equal = strchr(socket_name + 1, '=');
address = subscription;
if (address[0] == '=') {
equal = strchr(address + 1, '=');
if (!equal)
return;
*equal = '\0';
struct uwsgi_socket *us = uwsgi_get_shared_socket_by_num(atoi(socket_name + 1));
struct uwsgi_socket *us = uwsgi_get_shared_socket_by_num(atoi(address + 1));
if (!us)
return;
socket_name = us->name;
address = us->name;
}
*equal = '\0';

char *address_port = strchr(address, ':');
if (address_port) {
*address_port = '\0';
char *resolved = uwsgi_resolve_ip(address);
*address_port = ':';

if (!resolved)
return;

address_resolved = uwsgi_concat2n(resolved, strlen(resolved), address_port, equal - address_port);
}

udp_address = equal + 1;
}

Expand Down Expand Up @@ -900,7 +921,7 @@ void uwsgi_subscribe(char *subscription, uint8_t cmd) {
modifier1_len = strlen(modifier1);
keysize = strlen(key);
}
uwsgi_send_subscription(udp_address, key, keysize, uwsgi_str_num(modifier1, modifier1_len), 0, cmd, socket_name, sign, NULL, NULL, NULL);
uwsgi_send_subscription(udp_address, key, keysize, uwsgi_str_num(modifier1, modifier1_len), 0, cmd, address_resolved ? address_resolved : address, sign, NULL, NULL, NULL);
modifier1 = NULL;
modifier1_len = 0;
}
Expand All @@ -918,7 +939,7 @@ void uwsgi_subscribe(char *subscription, uint8_t cmd) {
modifier1_len = strlen(modifier1);
keysize = strlen(key);
}
uwsgi_send_subscription(udp_address, key, keysize, uwsgi_str_num(modifier1, modifier1_len), 0, cmd, socket_name, sign, NULL, NULL, NULL);
uwsgi_send_subscription(udp_address, key, keysize, uwsgi_str_num(modifier1, modifier1_len), 0, cmd, address_resolved ? address_resolved : address, sign, NULL, NULL, NULL);
modifier1 = NULL;
modifier1_len = 0;
lines[i] = '\n';
Expand Down Expand Up @@ -947,7 +968,7 @@ void uwsgi_subscribe(char *subscription, uint8_t cmd) {
modifier1_len = strlen(modifier1);
}

uwsgi_send_subscription(udp_address, subscription_key + 1, strlen(subscription_key + 1), uwsgi_str_num(modifier1, modifier1_len), 0, cmd, socket_name, sign, NULL, NULL, NULL);
uwsgi_send_subscription(udp_address, subscription_key + 1, strlen(subscription_key + 1), uwsgi_str_num(modifier1, modifier1_len), 0, cmd, address_resolved ? address_resolved : address, sign, NULL, NULL, NULL);
if (modifier1)
modifier1[-1] = ',';
if (sign)
Expand All @@ -958,6 +979,7 @@ void uwsgi_subscribe(char *subscription, uint8_t cmd) {
if (equal)
*equal = '=';
free(udp_address);
free(address_resolved);

}

Expand All @@ -967,6 +989,7 @@ void uwsgi_subscribe2(char *arg, uint8_t cmd) {
char *s2_key = NULL;
char *s2_socket = NULL;
char *s2_addr = NULL;
char *s2_addr_resolved = NULL;
char *s2_weight = NULL;
char *s2_sign = NULL;
char *s2_modifier1 = NULL;
Expand Down Expand Up @@ -1051,14 +1074,23 @@ void uwsgi_subscribe2(char *arg, uint8_t cmd) {
s2_addr = uwsgi_str(uwsgi.sockets->name);
}

char *port = strchr(s2_addr, ':');
if (port) {
*port = '\0';
char *resolved = uwsgi_resolve_ip(s2_addr);
if (!resolved) goto end;
*port = ':';
s2_addr_resolved = uwsgi_concat2n(resolved, strlen(resolved), port, strlen(port));
}

ub = uwsgi_buffer_new(uwsgi.page_size);
if (!ub) goto end;
// leave space for the header
ub->pos = 4;

if (uwsgi_buffer_append_keyval(ub, "key", 3, s2_key, strlen(s2_key)))
goto end;
if (uwsgi_buffer_append_keyval(ub, "address", 7, s2_addr, strlen(s2_addr)))
if (uwsgi_buffer_append_keyval(ub, "address", 7, s2_addr_resolved ? s2_addr_resolved : s2_addr, strlen(s2_addr_resolved ? s2_addr_resolved : s2_addr)))
goto end;
if (uwsgi_buffer_append_keynum(ub, "modifier1", 9, modifier1))
goto end;
Expand Down Expand Up @@ -1140,6 +1172,8 @@ void uwsgi_subscribe2(char *arg, uint8_t cmd) {
free(s2_socket);
if (s2_addr)
free(s2_addr);
if (s2_addr_resolved)
free(s2_addr_resolved);
if (s2_weight)
free(s2_weight);
if (s2_modifier1)
Expand Down

0 comments on commit 8055cda

Please sign in to comment.