Skip to content

Commit 884aa1c

Browse files
author
Vincent Royer
committed
Enhance multicloud support
1 parent e87b6c2 commit 884aa1c

File tree

7 files changed

+54
-21
lines changed

7 files changed

+54
-21
lines changed

core/src/main/java/org/apache/cassandra/service/ElassandraDaemon.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -439,14 +439,14 @@ public Settings nodeSettings(Settings settings) {
439439
.put("network.bind_host", DatabaseDescriptor.getRpcAddress().getHostAddress())
440440
.put("network.publish_host", FBUtilities.getBroadcastRpcAddress().getHostAddress())
441441
.put("transport.bind_host", FBUtilities.getLocalAddress().getHostAddress())
442-
.put("transport.publish_host", FBUtilities.getBroadcastAddress().getHostAddress())
442+
.put("transport.publish_host", Boolean.getBoolean("es.use_internal_address") ? FBUtilities.getLocalAddress().getHostAddress() : FBUtilities.getBroadcastAddress().getHostAddress())
443443
.put("path.data",getElasticsearchDataDir())
444444
.put(settings)
445445
// not overloadable settings.
446-
.put("discovery.type","cassandra")
447-
.put("node.data",true)
448-
.put("node.master",true)
449-
.put("node.name", CassandraDiscovery.buildNodeName(DatabaseDescriptor.getListenAddress()))
446+
.put("discovery.type", "cassandra")
447+
.put("node.data", true)
448+
.put("node.master", true)
449+
.put("node.name", CassandraDiscovery.buildNodeName(DatabaseDescriptor.getBroadcastAddress()))
450450
.put("node.attr.dc", DatabaseDescriptor.getLocalDataCenter())
451451
.put("node.attr.rack", DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddress()))
452452
.put("cluster.name", ClusterService.getElasticsearchClusterName(env.settings()))

core/src/main/java/org/elassandra/cluster/routing/AbstractSearchStrategy.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,14 @@ public Router(final Index index, final String ksName, BiFunction<Index, UUID, Sh
106106
this.strategy = Keyspace.open(ksName).getReplicationStrategy();
107107
this.metadata = StorageService.instance.getTokenMetadata().cloneOnlyTokenMap();
108108
for(DiscoveryNode node : clusterState.nodes()) {
109-
for(Token token : this.metadata.getTokens(node.getInetAddress()))
110-
this.tokenToNodes.put(token, node);
109+
InetAddress endpoint = node.getNameAsInetAddress();
110+
if (endpoint == null) {
111+
endpoint = this.metadata.getEndpointForHostId(node.uuid());
112+
}
113+
if (endpoint != null && this.metadata.isMember(endpoint)) {
114+
for(Token token : this.metadata.getTokens(endpoint))
115+
this.tokenToNodes.put(token, node);
116+
}
111117
}
112118
} else {
113119
this.strategy = null;
@@ -128,7 +134,7 @@ public Router(final Index index, final String ksName, BiFunction<Index, UUID, Sh
128134

129135
// greenshard = available node -> token range bitset,
130136
boolean orphanRange = true;
131-
for(InetAddress endpoint : (this.metadata == null) ? Collections.singletonList(localNode.getInetAddress()) : this.strategy.calculateNaturalEndpoints(token, this.metadata)) {
137+
for(InetAddress endpoint : (this.metadata == null) ? Collections.singletonList(localNode.getNameAsInetAddress()) : this.strategy.calculateNaturalEndpoints(token, this.metadata)) {
132138
UUID uuid = StorageService.instance.getHostId(endpoint);
133139
DiscoveryNode node = (uuid == null) ? clusterState.nodes().findByInetAddress(endpoint) : clusterState.nodes().get(uuid.toString());
134140
if (node != null && node.status() == DiscoveryNode.DiscoveryNodeStatus.ALIVE) {

core/src/main/java/org/elassandra/discovery/CassandraDiscovery.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -151,8 +151,9 @@ protected void doStart() {
151151

152152
// initialize cluster from cassandra system.peers
153153
// WARNING: system.peers may be incomplete because commitlogs not yet applied
154-
for (UntypedResultSet.Row row : executeInternal("SELECT peer, data_center, rack, rpc_address, host_id from system." + SystemKeyspace.PEERS)) {
154+
for (UntypedResultSet.Row row : executeInternal("SELECT peer, data_center, rack, preferred_ip, rpc_address, host_id from system." + SystemKeyspace.PEERS)) {
155155
InetAddress peer = row.getInetAddress("peer");
156+
InetAddress preferred_ip = row.has("preferred_ip") ? row.getInetAddress("preferred_ip") : null;
156157
InetAddress rpc_address = row.has("rpc_address") ? row.getInetAddress("rpc_address") : null;
157158
String datacenter = row.has("data_center") ? row.getString("data_center") : null;
158159
String host_id = row.has("host_id") ? row.getUUID("host_id").toString() : null;
@@ -162,7 +163,11 @@ protected void doStart() {
162163
if (row.has("rack"))
163164
attrs.put("rack", row.getString("rack"));
164165

165-
DiscoveryNode dn = new DiscoveryNode(buildNodeName(peer), host_id, new InetSocketTransportAddress(rpc_address, publishPort()), attrs, CASSANDRA_ROLES, Version.CURRENT);
166+
DiscoveryNode dn = new DiscoveryNode(buildNodeName(peer), host_id,
167+
new InetSocketTransportAddress(Boolean.getBoolean("es.use_internal_address") && preferred_ip != null ? preferred_ip : rpc_address, publishPort()),
168+
attrs,
169+
CASSANDRA_ROLES,
170+
Version.CURRENT);
166171
EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(peer);
167172
if (state == null) {
168173
dn.status( DiscoveryNodeStatus.UNKNOWN );
@@ -295,8 +300,9 @@ public void updateClusterGroupsFromGossiper() {
295300

296301
InetAddress internal_address = com.google.common.net.InetAddresses.forString(state.getApplicationState(ApplicationState.INTERNAL_IP).value);
297302
InetAddress rpc_address = com.google.common.net.InetAddresses.forString(state.getApplicationState(ApplicationState.RPC_ADDRESS).value);
303+
298304
dn = new DiscoveryNode(buildNodeName(internal_address), hostId.toString(),
299-
new InetSocketTransportAddress(rpc_address, publishPort()), attrs, CASSANDRA_ROLES, Version.CURRENT);
305+
new InetSocketTransportAddress(Boolean.getBoolean("es.use_internal_address") ? internal_address : rpc_address, publishPort()), attrs, CASSANDRA_ROLES, Version.CURRENT);
300306
dn.status(status);
301307

302308
if (!localAddress.equals(endpoint)) {
@@ -518,7 +524,8 @@ private boolean isMember(InetAddress endpoint) {
518524
* ES RPC adress can be different from the cassandra broadcast address.
519525
*/
520526
private boolean isNormal(DiscoveryNode node) {
521-
EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(node.getInetAddress());
527+
// endpoint address = C* broadcast address = Elasticsearch node name (transport may be bound to C* internal or C* RPC broadcast)
528+
EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(InetAddresses.forString(node.getName()));
522529
if (state == null) {
523530
logger.warn("Node endpoint address=[{}] name=[{}] state not found", node.getInetAddress(), node.getName());
524531
return false;

core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@
2020
package org.elasticsearch.cluster.node;
2121

2222
import org.elasticsearch.Version;
23-
import org.elasticsearch.common.UUIDs;
2423
import org.elasticsearch.common.io.stream.StreamInput;
2524
import org.elasticsearch.common.io.stream.StreamOutput;
2625
import org.elasticsearch.common.io.stream.Writeable;
26+
import org.elasticsearch.common.network.InetAddresses;
2727
import org.elasticsearch.common.settings.Settings;
2828
import org.elasticsearch.common.transport.InetSocketTransportAddress;
2929
import org.elasticsearch.common.transport.LocalTransportAddress;
@@ -80,6 +80,7 @@ public static boolean isIngestNode(Settings settings) {
8080
private final String hostName;
8181
private final String hostAddress;
8282
private final TransportAddress address;
83+
private transient final InetAddress nodeNameAddress;
8384
private final Map<String, String> attributes;
8485
private final Version version;
8586
private final Set<Role> roles;
@@ -139,6 +140,10 @@ public UUID uuid() {
139140
return this.nodeUuid;
140141
}
141142

143+
public InetAddress getNameAsInetAddress() {
144+
return this.nodeNameAddress;
145+
}
146+
142147
/**
143148
* The inet listen address of the node.
144149
*/
@@ -229,11 +234,17 @@ public DiscoveryNode(String nodeName, String nodeId, TransportAddress address,
229234
*/
230235
public DiscoveryNode(String nodeName, String nodeId, String ephemeralId, String hostName, String hostAddress,
231236
TransportAddress address, Map<String, String> attributes, Set<Role> roles, Version version) {
237+
InetAddress nodeAddr = null;
232238
if (nodeName != null) {
233239
this.nodeName = nodeName.intern();
240+
try {
241+
nodeAddr = InetAddresses.forString(nodeName);
242+
} catch (Exception e) {
243+
}
234244
} else {
235245
this.nodeName = "";
236246
}
247+
this.nodeNameAddress = nodeAddr;
237248
this.nodeId = nodeId.intern();
238249
try {
239250
this.nodeUuid = UUID.fromString(nodeId);
@@ -293,6 +304,12 @@ public static Set<Role> getRolesFromSettings(Settings settings) {
293304
*/
294305
public DiscoveryNode(StreamInput in) throws IOException {
295306
this.nodeName = in.readString().intern();
307+
InetAddress nodeNameAddress = null;
308+
try {
309+
nodeNameAddress = InetAddresses.forString(this.nodeName);
310+
} catch (Exception e) {
311+
}
312+
this.nodeNameAddress = nodeNameAddress;
296313
this.nodeId = in.readString().intern();
297314
this.ephemeralId = in.readString().intern();
298315
this.hostName = in.readString().intern();

core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ public DiscoveryNode findByAddress(TransportAddress address) {
269269
public DiscoveryNode findByInetAddress(InetAddress address) {
270270
for (ObjectCursor<DiscoveryNode> cursor : nodes.values()) {
271271
DiscoveryNode node = cursor.value;
272-
if (node.getInetAddress().equals(address)) {
272+
if (node.getInetAddress().equals(address) || node.getName().equals(address.getHostAddress())) {
273273
return node;
274274
}
275275
}

core/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,7 @@ public Builder(Index index, ClusterService clusterService, ClusterState targetSt
377377
} catch (NullPointerException | java.lang.AssertionError e) {
378378
// thrown by cassandra when the keyspace is not yet create locally.
379379
// We must wait for a gossip schema change to update the routing Table.
380-
Loggers.getLogger(getClass().getName()).warn("Keyspace not available: {}", e, this.index);
380+
Loggers.getLogger(getClass().getName()).warn("Keyspace not available for index ["+this.index+"]", e);
381381
}
382382
}
383383

@@ -398,9 +398,9 @@ public Builder(Index index, ClusterService clusterService, ClusterState targetSt
398398
} catch (NullPointerException | java.lang.AssertionError e) {
399399
// thrown by cassandra when the keyspace is not yet create locally.
400400
// We must wait for a gossip schema change to update the routing Table.
401-
Loggers.getLogger(getClass()).warn("Keyspace {} not available", e, this.index);
401+
Loggers.getLogger(getClass()).warn("Keyspace not available for index ["+this.index+"]", e);
402402
} catch (Exception e1) {
403-
Loggers.getLogger(getClass()).warn("Failde to compute route for {}", e1, this.index);
403+
Loggers.getLogger(getClass()).warn("Failed to compute route for index ["+this.index+"]", e1);
404404
}
405405
}
406406

docs/elassandra/source/configuration.rst

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,15 @@ Elasticsearch configuration rely on cassandra configuration file **conf/cassandr
4646

4747
Node role (master, primary, data) is automatically set by elassandra, standard configuration should only set **cluster_name**, **rpc_address** in the ``conf/cassandra.yaml``.
4848

49-
By default, Elasticsearch HTTP is bound to Cassandra RPC addresses, while Elasticsearch transport protocol is bound to Cassandra internal addresses.
50-
You can overload these default settings by defining Elasticsearch network settings in conf/elasticsearch.yaml (in order to bind Elasticsearch transport on
51-
a public interface if you want to use the Elasticsearch transport client from your application).
49+
By default, Elasticsearch HTTP is bound to Cassandra RPC address ``rpc_address``, while Elasticsearch transport protocol is bound to Cassandra internal address ``listen_address``.
50+
You can overload these default settings by defining Elasticsearch network settings in conf/elasticsearch.yaml (in order to bind Elasticsearch transport on another interface).
51+
52+
By default, Elasticsearch transport publish adress is the Cassandra broadcast adress. However, in some network configurations (including multi-cloud deployment), the Cassandra broadcast adress is a public address managed by a firewall, and
53+
it would invlove a network overhead for elasticsearch inter-node communication. In such case, you can set the system property ``es.use_internal_address=true`` to use the Cassandra ``listen_address`` as the elasticsearch transport published address.
54+
5255

5356
.. CAUTION::
54-
If you use the `GossipPropertyFile <https://docs.datastax.com/en/cassandra/2.0/cassandra/architecture/architectureSnitchGossipPF_c.html>`_ Snitch to configure your cassandra datacenter and rack properties in **conf/cassandra-rackdc.properties**, keep
57+
If you use the `GossipingPropertyFile <https://docs.datastax.com/en/cassandra/2.0/cassandra/architecture/architectureSnitchGossipPF_c.html>`_ Snitch to configure your cassandra datacenter and rack properties in **conf/cassandra-rackdc.properties**, keep
5558
in mind this snitch falls back to the PropertyFileSnitch when gossip is not enabled. So, when re-starting the first node, dead nodes can appear in the default DC and rack configured in **conf/cassandra-topology.properties**. This also
5659
breaks the replica placement strategy and the computation of the Elasticsearch routing tables. So it is strongly recommended to set the same default rack and datacenter in both the **conf/cassandra-topology.properties** and **conf/cassandra-rackdc.properties**.
5760

0 commit comments

Comments
 (0)