Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

import com.google.common.base.Preconditions;

public class EventCoordinator {
public class EventCoordinator implements EventPublisher {

private static final Logger log = LoggerFactory.getLogger(EventCoordinator.class);

Expand Down Expand Up @@ -108,26 +108,31 @@ public KeyExtent getExtent() {
}
}

@Override
public void event(String msg, Object... args) {
log.info(String.format(msg, args));
publish(new Event());
}

@Override
public void event(Ample.DataLevel level, String msg, Object... args) {
log.info(String.format(msg, args));
publish(new Event(EventScope.DATA_LEVEL, level));
}

@Override
public void event(TableId tableId, String msg, Object... args) {
log.info(String.format(msg, args));
publish(new Event(EventScope.TABLE, tableId));
}

@Override
public void event(KeyExtent extent, String msg, Object... args) {
log.debug(String.format(msg, args));
publish(new Event(EventScope.TABLE_RANGE, extent));
}

@Override
public void event(Collection<KeyExtent> extents, String msg, Object... args) {
if (!extents.isEmpty()) {
log.debug(String.format(msg, args));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.manager;

import java.util.Collection;

import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.metadata.schema.Ample;

public interface EventPublisher {
void event(String msg, Object... args);

void event(Ample.DataLevel level, String msg, Object... args);

void event(TableId tableId, String msg, Object... args);

void event(KeyExtent extent, String msg, Object... args);

void event(Collection<KeyExtent> extents, String msg, Object... args);
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,6 @@
import org.apache.accumulo.core.client.admin.CompactionConfig;
import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.client.admin.servers.ServerId.Type;
import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
import org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.conf.SiteConfiguration;
Expand Down Expand Up @@ -100,6 +97,7 @@
import org.apache.accumulo.core.metadata.SystemTables;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.accumulo.core.metrics.MetricsInfo;
import org.apache.accumulo.core.metrics.MetricsProducer;
import org.apache.accumulo.core.trace.TraceUtil;
Expand All @@ -116,6 +114,7 @@
import org.apache.accumulo.manager.recovery.RecoveryManager;
import org.apache.accumulo.manager.split.Splitter;
import org.apache.accumulo.manager.state.TableCounts;
import org.apache.accumulo.manager.tableOps.FateEnv;
import org.apache.accumulo.manager.tableOps.TraceRepo;
import org.apache.accumulo.manager.upgrade.UpgradeCoordinator;
import org.apache.accumulo.manager.upgrade.UpgradeCoordinator.UpgradeStatus;
Expand Down Expand Up @@ -161,7 +160,7 @@
* <p>
* The manager will also coordinate log recoveries and reports general status.
*/
public class Manager extends AbstractServer implements LiveTServerSet.Listener {
public class Manager extends AbstractServer implements LiveTServerSet.Listener, FateEnv {

static final Logger log = LoggerFactory.getLogger(Manager.class);

Expand Down Expand Up @@ -205,7 +204,7 @@ public class Manager extends AbstractServer implements LiveTServerSet.Listener {
// should already have been set; ConcurrentHashMap will guarantee that all threads will see
// the initialized fate references after the latch is ready
private final CountDownLatch fateReadyLatch = new CountDownLatch(1);
private final AtomicReference<Map<FateInstanceType,Fate<Manager>>> fateRefs =
private final AtomicReference<Map<FateInstanceType,Fate<FateEnv>>> fateRefs =
new AtomicReference<>();

static class TServerStatus {
Expand Down Expand Up @@ -303,7 +302,7 @@ public boolean stillManager() {
*
* @return the Fate object, only after the fate components are running and ready
*/
public Fate<Manager> fate(FateInstanceType type) {
public Fate<FateEnv> fate(FateInstanceType type) {
try {
// block up to 30 seconds until it's ready; if it's still not ready, introduce some logging
if (!fateReadyLatch.await(30, SECONDS)) {
Expand Down Expand Up @@ -465,19 +464,12 @@ int displayUnassigned() {
return result;
}

public void mustBeOnline(final TableId tableId) throws ThriftTableOperationException {
ServerContext context = getContext();
context.clearTableListCache();
if (context.getTableState(tableId) != TableState.ONLINE) {
throw new ThriftTableOperationException(tableId.canonical(), null, TableOperation.MERGE,
TableOperationExceptionType.OFFLINE, "table is not online");
}
}

@Override
public TableManager getTableManager() {
return getContext().getTableManager();
}

@Override
public ThreadPoolExecutor getTabletRefreshThreadPool() {
return tabletRefreshThreadPool;
}
Expand Down Expand Up @@ -559,6 +551,7 @@ ManagerGoalState getManagerGoalState() {

private Splitter splitter;

@Override
public Splitter getSplitter() {
return splitter;
}
Expand All @@ -571,6 +564,11 @@ public CompactionCoordinator getCompactionCoordinator() {
return compactionCoordinator;
}

@Override
public void recordCompactionCompletion(ExternalCompactionId ecid) {
getCompactionCoordinator().recordCompletion(ecid);
}

public void hostOndemand(List<KeyExtent> extents) {
extents.forEach(e -> Preconditions.checkArgument(DataLevel.of(e.tableId()) == DataLevel.USER));

Expand Down Expand Up @@ -1263,9 +1261,9 @@ public void mainWait() throws InterruptedException {
Thread.sleep(500);
}

protected Fate<Manager> initializeFateInstance(ServerContext context, FateStore<Manager> store) {
protected Fate<FateEnv> initializeFateInstance(ServerContext context, FateStore<FateEnv> store) {

final Fate<Manager> fateInstance = new Fate<>(this, store, true, TraceRepo::toLogString,
final Fate<FateEnv> fateInstance = new Fate<>(this, store, true, TraceRepo::toLogString,
getConfiguration(), context.getScheduledExecutor());

var fateCleaner = new FateCleaner<>(store, Duration.ofHours(8), this::getSteadyTime);
Expand Down Expand Up @@ -1367,7 +1365,8 @@ private long remaining(long deadline) {
return Math.max(1, deadline - System.currentTimeMillis());
}

public ServiceLock getManagerLock() {
@Override
public ServiceLock getServiceLock() {
return managerLock;
}

Expand Down Expand Up @@ -1410,7 +1409,7 @@ private ServiceLockData getManagerLock(final ServiceLockPath zManagerLoc)
sleepUninterruptibly(TIME_TO_WAIT_BETWEEN_LOCK_CHECKS, MILLISECONDS);
}

this.getContext().setServiceLock(getManagerLock());
this.getContext().setServiceLock(getServiceLock());
return sld;
}

Expand Down Expand Up @@ -1504,6 +1503,7 @@ public Set<TableId> onlineTables() {
return result;
}

@Override
public Set<TServerInstance> onlineTabletServers() {
return tserverSet.getSnapshot().getTservers();
}
Expand All @@ -1525,6 +1525,12 @@ public EventCoordinator getEventCoordinator() {
return nextEvent;
}

@Override
public EventPublisher getEventPublisher() {
return nextEvent;
}

@Override
public VolumeManager getVolumeManager() {
return getContext().getVolumeManager();
}
Expand Down Expand Up @@ -1587,10 +1593,12 @@ public Set<TServerInstance> shutdownServers() {
}
}

@Override
public void updateBulkImportStatus(String directory, BulkImportState state) {
bulkImportStatus.updateBulkImportStatus(Collections.singletonList(directory), state);
}

@Override
public void removeBulkImportStatus(String directory) {
bulkImportStatus.removeBulkImportStatus(Collections.singletonList(directory));
}
Expand All @@ -1600,6 +1608,7 @@ public void removeBulkImportStatus(String directory) {
* monotonic clock, which will be approximately consistent between different managers or different
* runs of the same manager. SteadyTime supports both nanoseconds and milliseconds.
*/
@Override
public SteadyTime getSteadyTime() {
return timeKeeper.getTime();
}
Expand All @@ -1614,7 +1623,7 @@ public void registerMetrics(MeterRegistry registry) {
compactionCoordinator.registerMetrics(registry);
}

private Map<FateInstanceType,Fate<Manager>> getFateRefs() {
private Map<FateInstanceType,Fate<FateEnv>> getFateRefs() {
var fateRefs = this.fateRefs.get();
Preconditions.checkState(fateRefs != null, "Unexpected null fate references map");
return fateRefs;
Expand All @@ -1630,6 +1639,7 @@ public ServiceLock getLock() {
*
* @return {@link ExecutorService}
*/
@Override
public ExecutorService getRenamePool() {
return this.renamePool;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.FateInstanceType;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.manager.state.tables.TableState;
import org.apache.accumulo.core.manager.thrift.ManagerClientService;
import org.apache.accumulo.core.manager.thrift.ManagerGoalState;
import org.apache.accumulo.core.manager.thrift.ManagerMonitorInfo;
Expand All @@ -78,6 +79,7 @@
import org.apache.accumulo.core.securityImpl.thrift.TDelegationToken;
import org.apache.accumulo.core.securityImpl.thrift.TDelegationTokenConfig;
import org.apache.accumulo.core.util.ByteBufferUtil;
import org.apache.accumulo.manager.tableOps.FateEnv;
import org.apache.accumulo.manager.tableOps.TraceRepo;
import org.apache.accumulo.manager.tserverOps.ShutdownTServer;
import org.apache.accumulo.server.ServerContext;
Expand Down Expand Up @@ -326,7 +328,7 @@ public void shutdownTabletServer(TInfo info, TCredentials c, String tabletServer
}
}

Fate<Manager> fate = manager.fate(FateInstanceType.META);
Fate<FateEnv> fate = manager.fate(FateInstanceType.META);
FateId fateId = fate.startTransaction();

String msg = "Shutdown tserver " + tabletServer;
Expand Down Expand Up @@ -354,7 +356,7 @@ public void tabletServerStopping(TInfo tinfo, TCredentials credentials, String t
if (manager.shutdownTServer(tserver)) {
// If there is an exception seeding the fate tx this should cause the RPC to fail which should
// cause the tserver to halt. Because of that not making an attempt to handle failure here.
Fate<Manager> fate = manager.fate(FateInstanceType.META);
Fate<FateEnv> fate = manager.fate(FateInstanceType.META);
var tid = fate.startTransaction();
String msg = "Shutdown tserver " + tabletServer;

Expand Down Expand Up @@ -714,6 +716,15 @@ public TDelegationToken getDelegationToken(TInfo tinfo, TCredentials credentials
}
}

public static void mustBeOnline(ServerContext context, final TableId tableId)
throws ThriftTableOperationException {
context.clearTableListCache();
if (context.getTableState(tableId) != TableState.ONLINE) {
throw new ThriftTableOperationException(tableId.canonical(), null, TableOperation.MERGE,
TableOperationExceptionType.OFFLINE, "table is not online");
}
}

@Override
public void requestTabletHosting(TInfo tinfo, TCredentials credentials, String tableIdStr,
List<TKeyExtent> extents) throws ThriftSecurityException, ThriftTableOperationException {
Expand All @@ -725,7 +736,7 @@ public void requestTabletHosting(TInfo tinfo, TCredentials credentials, String t
SecurityErrorCode.PERMISSION_DENIED);
}

manager.mustBeOnline(tableId);
mustBeOnline(manager.getContext(), tableId);

manager.hostOndemand(Lists.transform(extents, KeyExtent::fromThrift));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@
import org.apache.accumulo.manager.compaction.queue.CompactionJobPriorityQueue;
import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues;
import org.apache.accumulo.manager.compaction.queue.ResolvedCompactionJob;
import org.apache.accumulo.manager.tableOps.FateEnv;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.ServiceEnvironmentImpl;
import org.apache.accumulo.server.compaction.CompactionConfigStorage;
Expand Down Expand Up @@ -270,7 +271,7 @@ static FailureCounts incrementSuccess(Object key, FailureCounts counts) {
private final ServerContext ctx;
private final AuditedSecurityOperation security;
private final CompactionJobQueues jobQueues;
private final AtomicReference<Map<FateInstanceType,Fate<Manager>>> fateInstances;
private final AtomicReference<Map<FateInstanceType,Fate<FateEnv>>> fateInstances;
// Exposed for tests
protected final CountDownLatch shutdown = new CountDownLatch(1);

Expand All @@ -290,7 +291,7 @@ static FailureCounts incrementSuccess(Object key, FailureCounts counts) {
private final Set<String> activeCompactorReservationRequest = ConcurrentHashMap.newKeySet();

public CompactionCoordinator(Manager manager,
AtomicReference<Map<FateInstanceType,Fate<Manager>>> fateInstances) {
AtomicReference<Map<FateInstanceType,Fate<FateEnv>>> fateInstances) {
this.ctx = manager.getContext();
this.security = ctx.getSecurityOperation();
this.manager = Objects.requireNonNull(manager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import org.apache.accumulo.core.metadata.schema.filters.HasExternalCompactionsFilter;
import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.FateEnv;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.util.FindCompactionTmpFiles;
import org.apache.accumulo.server.util.FindCompactionTmpFiles.DeleteStats;
Expand All @@ -62,11 +62,11 @@ public class DeadCompactionDetector {
private final ScheduledThreadPoolExecutor schedExecutor;
private final ConcurrentHashMap<ExternalCompactionId,Long> deadCompactions;
private final Set<TableId> tablesWithUnreferencedTmpFiles = new HashSet<>();
private final AtomicReference<Map<FateInstanceType,Fate<Manager>>> fateInstances;
private final AtomicReference<Map<FateInstanceType,Fate<FateEnv>>> fateInstances;

public DeadCompactionDetector(ServerContext context, CompactionCoordinator coordinator,
ScheduledThreadPoolExecutor stpe,
AtomicReference<Map<FateInstanceType,Fate<Manager>>> fateInstances) {
AtomicReference<Map<FateInstanceType,Fate<FateEnv>>> fateInstances) {
this.context = context;
this.coordinator = coordinator;
this.schedExecutor = stpe;
Expand Down
Loading