Skip to content

Commit

Permalink
replaced isAvailable call
Browse files Browse the repository at this point in the history
  • Loading branch information
patduin committed Oct 3, 2024
1 parent 720a4f5 commit 971b749
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 8 deletions.
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
## [3.13.4] - 2024-10-03
### Fix
- Removed call to primary and made it only when Kerberos (SASL) is enabled.
- Fixed Handler creation for SASL.

## [3.13.3] - 2024-10-02
### Added
- Metric for monitoring open transports. `<prefix>.open_transports_gauge`
- Metric for monitoring open transports. `<prefix>.com_hotels_bdp_waggledance_open_transports_gauge`
### Changed
- Removed RetryingHMSHandler. Retries are done in the client there should be no need to wrap everything in retry logic again.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,6 @@ private void add(AbstractMetaStore metaStore) {

if (metaStore.getFederationType() == PRIMARY) {
primaryDatabaseMapping = databaseMapping;
if (!metaStoreMapping.isAvailable()) {
throw new WaggleDanceException(
String.format("Primary metastore is unavailable %s", metaStore.getRemoteMetaStoreUris())
);
}
}

mappingsByPrefix.put(metaStoreMapping.getDatabasePrefix(), databaseMapping);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IHMSHandler;
import org.apache.hadoop.hive.metastore.TSetIpAddressProcessor;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.transport.TSocket;
Expand Down Expand Up @@ -59,7 +60,14 @@ public TProcessor getProcessor(TTransport transport) {

boolean useSASL = hiveConf.getBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL);
if (useSASL) {
IHMSHandler handler = TokenWrappingHMSHandler.newProxyInstance(baseHandler, useSASL);
try {
baseHandler.getStatus();
} catch (TException e) {
throw new RuntimeException("Error creating TProcessor. Could not get status.", e);
}
IHMSHandler tokenHandler = TokenWrappingHMSHandler.newProxyInstance(baseHandler, useSASL);
IHMSHandler handler = ExceptionWrappingHMSHandler.newProxyInstance(tokenHandler);
transportMonitor.monitor(transport, baseHandler);
return new TSetIpAddressProcessor<>(handler);
} else {
IHMSHandler handler = ExceptionWrappingHMSHandler.newProxyInstance(baseHandler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,13 @@ public class TSetIpAddressProcessorFactoryTest {
private @Mock TTransportMonitor transportMonitor;
private @Mock TTransport transport;

private final HiveConf hiveConf = new HiveConf();
private HiveConf hiveConf;
private TSetIpAddressProcessorFactory factory;

@Before
public void init() {
when(federatedHMSHandlerFactory.create()).thenReturn(federatedHMSHandler);
hiveConf = new HiveConf();
factory = new TSetIpAddressProcessorFactory(hiveConf, federatedHMSHandlerFactory, transportMonitor);
}

Expand All @@ -67,4 +68,16 @@ public void connectionIsMonitored() throws Exception {
assertThat(transportCaptor.getValue(), is(transport));
assertThat(handlerCaptor.getValue(), is(instanceOf(FederatedHMSHandler.class)));
}

@Test
public void connectionIsMonitoredSasl() throws Exception {
hiveConf.setBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL, Boolean.TRUE);
factory.getProcessor(transport);

ArgumentCaptor<TTransport> transportCaptor = ArgumentCaptor.forClass(TTransport.class);
ArgumentCaptor<Closeable> handlerCaptor = ArgumentCaptor.forClass(Closeable.class);
verify(transportMonitor).monitor(transportCaptor.capture(), handlerCaptor.capture());
assertThat(transportCaptor.getValue(), is(transport));
assertThat(handlerCaptor.getValue(), is(instanceOf(FederatedHMSHandler.class)));
}
}

0 comments on commit 971b749

Please sign in to comment.