Skip to content

Commit 7e2e3be

Browse files
authored
Merge pull request #67 from harbby/dev
Dev
2 parents 4f0d9ae + 2d1999a commit 7e2e3be

File tree

35 files changed

+812
-607
lines changed

35 files changed

+812
-607
lines changed

build.gradle

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ allprojects {
2222
}
2323

2424
ext.deps = [
25-
flink : '1.7.1',
25+
flink : '1.7.2',
2626
jetty : "9.4.6.v20170531", //8.1.17.v20150415 "9.4.6.v20170531"
2727
hadoop : "2.7.4",
2828
hbase : '1.1.2',
@@ -31,7 +31,7 @@ allprojects {
3131
joda_time: '2.9.3',
3232
log4j12 : '1.7.21',
3333
guice : '4.2.1',
34-
gadtry : '1.3.3',
34+
gadtry : '1.4.0-rc2',
3535
guava : '25.1-jre',
3636
jackson : '2.9.5',
3737
jersey : '2.27'
@@ -120,4 +120,15 @@ subprojects {
120120
artifacts {
121121
archives sourcesJar, javadocJar
122122
}
123+
124+
javadoc {
125+
options {
126+
encoding "UTF-8"
127+
charSet 'UTF-8'
128+
author true
129+
version true
130+
links "https://harbby.github.io/project/sylph/en/docs/intro/"
131+
title "sylph"
132+
}
133+
}
123134
}

sylph-connectors/sylph-elasticsearch5/src/main/java/ideal/sylph/plugins/elasticsearch5/Elasticsearch5Sink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
import java.util.Map;
3636
import java.util.concurrent.atomic.AtomicInteger;
3737

38-
import static com.github.harbby.gadtry.base.Checks.checkState;
38+
import static com.github.harbby.gadtry.base.MoreObjects.checkState;
3939

4040
@Name("elasticsearch5")
4141
@Description("this is elasticsearch5 sink plugin")

sylph-connectors/sylph-elasticsearch6/src/main/java/ideal/sylph/plugins/elasticsearch6/Elasticsearch6Sink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
import java.util.Map;
3636
import java.util.concurrent.atomic.AtomicInteger;
3737

38-
import static com.github.harbby.gadtry.base.Checks.checkState;
38+
import static com.github.harbby.gadtry.base.MoreObjects.checkState;
3939

4040
@Name("elasticsearch6")
4141
@Description("this is elasticsearch6 sink plugin")

sylph-connectors/sylph-hdfs/src/main/java/ideal/sylph/plugins/hdfs/parquet/ApacheParquet.java

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,6 @@
4242
import java.util.Map;
4343
import java.util.Queue;
4444
import java.util.concurrent.ConcurrentLinkedQueue;
45-
import java.util.concurrent.locks.Lock;
46-
import java.util.concurrent.locks.ReadWriteLock;
47-
import java.util.concurrent.locks.ReentrantReadWriteLock;
4845

4946
import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
5047
import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED;
@@ -61,9 +58,6 @@ public class ApacheParquet
6158
private final MessageType schema;
6259
private final String outputPath;
6360

64-
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
65-
private final Lock lock = rwLock.writeLock();
66-
6761
private long createTime = System.currentTimeMillis();
6862
private long lastTime = createTime;
6963

@@ -204,14 +198,8 @@ private void writeGroup(Group group)
204198
if (group == null) {
205199
return;
206200
}
207-
try {
208-
lock.lock(); //加锁
209-
lastTime = System.currentTimeMillis();
210-
writer.write(group);
211-
}
212-
finally {
213-
lock.unlock(); //解锁
214-
}
201+
lastTime = System.currentTimeMillis();
202+
writer.write(group);
215203
}
216204

217205
/**
@@ -222,7 +210,6 @@ public void close()
222210
throws IOException
223211
{
224212
try {
225-
lock.lock();
226213
writer.close();
227214
//1,修改文件名称
228215
FileSystem hdfs = FileSystem.get(java.net.URI.create(outputPath), new Configuration());
@@ -235,9 +222,6 @@ public void close()
235222
FileSystem hdfs = FileSystem.get(java.net.URI.create(outputPath), new Configuration());
236223
hdfs.rename(new Path(outputPath), new Path(outputPath + ".err"));
237224
}
238-
finally {
239-
lock.unlock();
240-
}
241225
}
242226

243227
/*

sylph-controller/src/main/java/ideal/sylph/controller/AuthAspect.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,12 @@ public void register(Binder binder)
4242
String action = proxy.getInfo().getName();
4343
logger.info("[auth] session:{}, action: {}, args: {}", id, action, proxy.getArgs());
4444
Object value = proxy.proceed();
45-
return value;
45+
switch (proxy.getInfo().getName()) {
46+
case "getAllJobs":
47+
return value; //按照权限进行过滤
48+
default:
49+
return value;
50+
}
4651
});
4752
}
4853
}

sylph-controller/src/main/java/ideal/sylph/controller/action/PluginManagerResource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public List<String> getETLActuators()
6969
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
7070
public Map getAllPlugins(@QueryParam("actuator") String actuator)
7171
{
72-
checkArgument(!Strings.isNullOrEmpty(actuator), "actuator not setting");
72+
checkArgument(!Strings.isNullOrEmpty(actuator), "actuator [" + actuator + "] not setting");
7373
return sylphContext.getPlugins(actuator).stream().map(pluginInfo -> {
7474
Map config = pluginInfo.getPluginConfig().stream()
7575
.collect(Collectors.toMap(

sylph-controller/src/main/java/ideal/sylph/controller/action/StreamSqlResource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
import java.util.Properties;
4848
import java.util.stream.Collectors;
4949

50-
import static com.github.harbby.gadtry.base.Checks.checkState;
50+
import static com.github.harbby.gadtry.base.MoreObjects.checkState;
5151
import static com.github.harbby.gadtry.base.Strings.isNotBlank;
5252
import static ideal.sylph.spi.exception.StandardErrorCode.ILLEGAL_OPERATION;
5353
import static java.nio.charset.StandardCharsets.UTF_8;

sylph-controller/src/main/java/ideal/sylph/controller/selvet/WebAppProxyServlet.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454
import java.util.List;
5555
import java.util.Set;
5656

57-
import static com.github.harbby.gadtry.base.Checks.checkState;
57+
import static com.github.harbby.gadtry.base.MoreObjects.checkState;
5858
import static com.google.common.base.Preconditions.checkArgument;
5959
import static ideal.sylph.spi.exception.StandardErrorCode.JOB_CONFIG_ERROR;
6060
import static java.util.Objects.requireNonNull;
@@ -170,10 +170,10 @@ protected void doGet1(HttpServletRequest req, HttpServletResponse resp)
170170
String[] parts = pathInfo.split("/", 3);
171171
checkArgument(parts.length >= 2, remoteUser + " gave an invalid proxy path " + pathInfo);
172172
//parts[0] is empty because path info always starts with a /
173-
String runId = requireNonNull(parts[1], "runId not setting");
173+
String jobId = requireNonNull(parts[1], "runId not setting");
174174
String rest = parts.length > 2 ? parts[2] : "";
175175

176-
URI trackingUri = new URI(getJobUrl(runId));
176+
URI trackingUri = new URI(getJobUrl(jobId));
177177

178178
// Append the user-provided path and query parameter to the original
179179
// tracking url.
@@ -195,9 +195,7 @@ protected void doGet1(HttpServletRequest req, HttpServletResponse resp)
195195
public String getJobUrl(String id)
196196
{
197197
JobContainer container = sylphContext.getJobContainer(id)
198-
.orElseGet(() -> sylphContext.getJobContainerWithRunId(id).orElseThrow(() ->
199-
new SylphException(JOB_CONFIG_ERROR, "job " + id + " not Online"))
200-
);
198+
.orElseThrow(() -> new SylphException(JOB_CONFIG_ERROR, "job " + id + " not Online"));
201199
Job.Status status = container.getStatus();
202200
checkState(status == Job.Status.RUNNING, "job " + id + " Status " + status + ",but not RUNNING");
203201

sylph-main/src/main/java/ideal/sylph/main/service/JobManager.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,12 +70,12 @@ public JobManager(JobStore jobStore, RunnerManager runnerManger, MetadataManager
7070
containers.forEach((jobId, container) -> {
7171
Job.Status status = container.getStatus();
7272
if (status == STOP) {
73+
logger.warn("Job {}[{}] Status is {}, Start Submit", jobId,
74+
container.getRunId(), status);
75+
container.setStatus(STARTING);
7376
Future future = jobStartPool.submit(() -> {
7477
try {
7578
Thread.currentThread().setName("job_submit_" + jobId);
76-
logger.warn("Job {}[{}] Status is {}, Soon to start", jobId,
77-
container.getRunId(), status);
78-
container.setStatus(STARTING);
7979
Optional<String> runId = container.run();
8080
container.setStatus(RUNNING);
8181
runId.ifPresent(result -> metadataManager.addMetadata(jobId, result));

sylph-main/src/main/java/ideal/sylph/main/service/PipelinePluginLoader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454
import java.util.Set;
5555
import java.util.stream.Collectors;
5656

57-
import static com.github.harbby.gadtry.base.Checks.checkState;
57+
import static com.github.harbby.gadtry.base.MoreObjects.checkState;
5858
import static com.github.harbby.gadtry.base.Throwables.throwsException;
5959
import static ideal.sylph.spi.exception.StandardErrorCode.LOAD_MODULE_ERROR;
6060
import static java.util.Objects.requireNonNull;

sylph-main/src/main/java/ideal/sylph/main/service/RunnerManager.java

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,10 @@
5151
import java.util.HashMap;
5252
import java.util.List;
5353
import java.util.Map;
54+
import java.util.Set;
5455
import java.util.stream.Collectors;
5556

56-
import static com.github.harbby.gadtry.base.Throwables.throwsException;
57+
import static com.github.harbby.gadtry.base.Throwables.noCatch;
5758
import static com.google.common.base.Preconditions.checkArgument;
5859
import static com.google.common.base.Preconditions.checkState;
5960
import static java.util.Objects.requireNonNull;
@@ -121,15 +122,10 @@ private void createRunner(final Runner runner)
121122
logger.info("Runner: {} starts loading {}", runner.getClass().getName(), PipelinePlugin.class.getName());
122123

123124
checkArgument(runner.getContainerFactory() != null, runner.getClass() + " getContainerFactory() return null");
124-
final ContainerFactory factory;
125-
try {
126-
factory = runner.getContainerFactory().newInstance();
127-
}
128-
catch (InstantiationException | IllegalAccessException e) {
129-
throw throwsException(e);
130-
}
131125

132-
runner.create(runnerContext).forEach(jobActuatorHandle -> {
126+
Set<JobActuatorHandle> jobActuators = runner.create(runnerContext);
127+
final ContainerFactory factory = noCatch(() -> runner.getContainerFactory().newInstance());
128+
jobActuators.forEach(jobActuatorHandle -> {
133129
JobActuator jobActuator = new JobActuatorImpl(jobActuatorHandle, factory);
134130
String name = jobActuator.getInfo().getName();
135131
checkState(!jobActuatorMap.containsKey(name), String.format("Multiple entries with same key: %s=%s and %s=%s", name, jobActuatorMap.get(name), name, jobActuator));
@@ -146,12 +142,12 @@ JobContainer createJobContainer(@Nonnull Job job, String jobInfo)
146142
String jobType = requireNonNull(job.getActuatorName(), "job Actuator Name is null " + job.getId());
147143
JobActuator jobActuator = jobActuatorMap.get(jobType);
148144
checkArgument(jobActuator != null, jobType + " not exists");
149-
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(jobActuator.getHandleClassLoader())) {
145+
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(job.getJobClassLoader())) {
150146
switch (config.getRunMode().toLowerCase()) {
151147
case "yarn":
152-
return jobActuator.getFactory().getYarnContainer(job, jobInfo);
148+
return jobActuator.getFactory().createYarnContainer(job, jobInfo);
153149
case "local":
154-
return jobActuator.getFactory().getLocalContainer(job, jobInfo);
150+
return jobActuator.getFactory().createLocalContainer(job, jobInfo);
155151
default:
156152
throw new IllegalArgumentException("this job.runtime.mode " + config.getRunMode() + " have't support!");
157153
}
@@ -238,6 +234,12 @@ public Collection<URL> getDepends()
238234
return dependFiles;
239235
}
240236

237+
@Override
238+
public ClassLoader getJobClassLoader()
239+
{
240+
return jobClassLoader;
241+
}
242+
241243
@NotNull
242244
@Override
243245
public String getActuatorName()

0 commit comments

Comments
 (0)