Skip to content

Commit

Permalink
#570 fix for branch 3.3.1
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaopeng-he committed Jan 11, 2019
1 parent 19ee9a9 commit 96bda95
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@

import java.io.Serializable;

/**
* @author chembo.huang
*/
public class JobConfig implements Serializable {

private static final long serialVersionUID = 7366583369937964951L;
Expand Down Expand Up @@ -38,7 +35,7 @@ public class JobConfig implements Serializable {
private Boolean useSerial;
private Boolean failover;
private String jobMode; // 系统作业等
private String customContext;
private String customContext; // 仅仅用于动态更新cron时携带的上下文数据
/**
* @deprecated replaced by downStream
*/
Expand All @@ -63,6 +60,7 @@ public void setDefaultValues() {
cron = getDefaultIfNull(cron, "");
pausePeriodDate = getDefaultIfNull(pausePeriodDate, "");
pausePeriodTime = getDefaultIfNull(pausePeriodTime, "");
shardingItemParameters = getDefaultIfNull(shardingItemParameters, "");
jobParameter = getDefaultIfNull(jobParameter, "");
processCountIntervalSeconds = getDefaultIfNull(processCountIntervalSeconds, 300);
description = getDefaultIfNull(description, "");
Expand All @@ -84,6 +82,7 @@ public void setDefaultValues() {
useSerial = getDefaultIfNull(useSerial, Boolean.FALSE);
failover = getDefaultIfNull(failover, !localMode); // 已经设置localMode
jobMode = getDefaultIfNull(jobMode, "");
customContext = getDefaultIfNull(customContext, "{}");
dependencies = getDefaultIfNull(dependencies, "");
groups = getDefaultIfNull(groups, "");
rerun = getDefaultIfNull(rerun, Boolean.FALSE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

public interface CuratorRepository {

Expand All @@ -27,21 +26,21 @@ public interface CuratorRepository {

interface CuratorFrameworkOp {

boolean checkExists(String znode);
boolean checkExists(String node);

String getData(String znode);
String getData(String node);

List<String> getChildren(String znode);
List<String> getChildren(String node);

void create(String znode);
void create(String node);

void create(final String znode, Object value);
void create(final String node, Object value);

void update(String znode, Object value);
void update(String node, Object value);

void delete(String znode);
void delete(String node);

void deleteRecursive(String znode);
void deleteRecursive(String node);

void fillJobNodeIfNotExist(String node, Object value);

Expand All @@ -57,16 +56,13 @@ interface CuratorFrameworkOp {

interface CuratorTransactionOp {

CuratorTransactionOp replace(String znode, Object value) throws Exception;
CuratorTransactionOp replace(String node, Object value) throws Exception;

CuratorTransactionOp replaceIfChanged(String znode, Object value) throws Exception;
CuratorTransactionOp replaceIfChanged(String node, Object value) throws Exception;

CuratorTransactionOp replaceIfChanged(String znode, Object value, AtomicInteger changedCount)
throws Exception;
CuratorTransactionOp create(String node, Object value) throws Exception;

CuratorTransactionOp create(String znode, Object value) throws Exception;

CuratorTransactionOp delete(String znode) throws Exception;
CuratorTransactionOp delete(String node) throws Exception;

Collection<CuratorTransactionResult> commit() throws Exception;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

@Repository
public class CuratorRepositoryImpl implements CuratorRepository {
Expand Down Expand Up @@ -109,9 +108,9 @@ public CuratorFrameworkOpImpl(CuratorFramework curatorFramework) {
}

@Override
public boolean checkExists(final String znode) {
public boolean checkExists(final String node) {
try {
return null != curatorFramework.checkExists().forPath(znode);
return null != curatorFramework.checkExists().forPath(node);
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
Expand All @@ -120,15 +119,15 @@ public boolean checkExists(final String znode) {
}

@Override
public String getData(final String znode) {
public String getData(final String node) {
try {
if (checkExists(znode)) {
byte[] getZnodeData = curatorFramework.getData().forPath(znode);
if (getZnodeData == null) {// executor的分片可能存在全部飘走的情况,sharding节点有可能获取到的是null,需要对null做判断,否则new
if (checkExists(node)) {
byte[] nodeData = curatorFramework.getData().forPath(node);
if (nodeData == null) {// executor的分片可能存在全部飘走的情况,sharding节点有可能获取到的是null,需要对null做判断,否则new
// String时会报空指针异常
return null;
}
return new String(getZnodeData, Charset.forName("UTF-8"));
return new String(nodeData, Charset.forName("UTF-8"));
} else {
return null;
}
Expand All @@ -142,9 +141,9 @@ public String getData(final String znode) {
}

@Override
public List<String> getChildren(final String znode) {
public List<String> getChildren(final String node) {
try {
return curatorFramework.getChildren().forPath(znode);
return curatorFramework.getChildren().forPath(node);
// CHECKSTYLE:OFF
} catch (final NoNodeException ignore) {
return null;
Expand All @@ -156,15 +155,19 @@ public List<String> getChildren(final String znode) {
}

@Override
public void create(final String znode) {
create(znode, "");
public void create(final String node) {
create(node, "");
}

@Override
public void create(final String znode, Object data) {
public void create(final String node, Object value) {
if (value == null) {
log.info("node value is null, won't create, node: {}", node);
return;
}
try {
curatorFramework.create().creatingParentsIfNeeded()
.forPath(znode, data.toString().getBytes(Charset.forName("UTF-8")));
.forPath(node, value.toString().getBytes(Charset.forName("UTF-8")));
} catch (final NodeExistsException ignore) {
// CHECKSTYLE:OFF
} catch (final Exception ex) {
Expand All @@ -173,13 +176,18 @@ public void create(final String znode, Object data) {
}
}

public void update(final String znode, final Object value) {
@Override
public void update(final String node, final Object value) {
if (value == null) {
log.info("node value is null, won't update, node: {}", node);
return;
}
try {
if (this.checkExists(znode)) {
curatorFramework.inTransaction().check().forPath(znode).and().setData()
.forPath(znode, value.toString().getBytes(Charset.forName("UTF-8"))).and().commit();
if (this.checkExists(node)) {
curatorFramework.inTransaction().check().forPath(node).and().setData()
.forPath(node, value.toString().getBytes(Charset.forName("UTF-8"))).and().commit();
} else {
this.create(znode, value);
this.create(node, value);
}
} catch (final NoNodeException ignore) {
// CHECKSTYLE:OFF
Expand All @@ -190,10 +198,10 @@ public void update(final String znode, final Object value) {
}

@Override
public void delete(final String znode) {
public void delete(final String node) {
try {
if (null != curatorFramework.checkExists().forPath(znode)) {
curatorFramework.delete().forPath(znode);
if (null != curatorFramework.checkExists().forPath(node)) {
curatorFramework.delete().forPath(node);
}
} catch (final NoNodeException ignore) {
// CHECKSTYLE:OFF
Expand All @@ -204,10 +212,10 @@ public void delete(final String znode) {
}

@Override
public void deleteRecursive(final String znode) {
public void deleteRecursive(final String node) {
try {
if (null != curatorFramework.checkExists().forPath(znode)) {
CuratorUtils.deletingChildrenIfNeeded(curatorFramework, znode);
if (null != curatorFramework.checkExists().forPath(node)) {
CuratorUtils.deletingChildrenIfNeeded(curatorFramework, node);
}
} catch (final NoNodeException ignore) {
// CHECKSTYLE:OFF
Expand All @@ -225,8 +233,8 @@ public void deleteRecursive(final String znode) {
*/
@Override
public void fillJobNodeIfNotExist(final String node, final Object value) {
if (null == value) {
log.info("job node value is null, node:{}", node);
if (value == null) {
log.info("node value is null, won't fillJobNodeIfNotExist, node: {}", node);
return;
}
if (!checkExists(node)) {
Expand Down Expand Up @@ -311,22 +319,18 @@ public CuratorTransactionOpImpl(CuratorFramework curatorClient) {
}
}

private boolean checkExists(String znode) throws Exception {
return curatorClient.checkExists().forPath(znode) != null;
private boolean checkExists(String node) throws Exception {
return curatorClient.checkExists().forPath(node) != null;
}

private CuratorTransactionOpImpl create(String znode, byte[] data) throws Exception {
private CuratorTransactionOpImpl create(String node, byte[] data) throws Exception {
curatorTransactionFinal = curatorTransactionFinal.create().withMode(CreateMode.PERSISTENT)
.forPath(znode, data).and();
.forPath(node, data).and();
return this;
}

private byte[] getData(String znode) throws Exception {
return curatorClient.getData().forPath(znode);
}

private byte[] toData(Object value) {
return (value == null ? "" : value.toString()).getBytes(Charset.forName("UTF-8"));
private byte[] getData(String node) throws Exception {
return curatorClient.getData().forPath(node);
}

private boolean bytesEquals(byte[] a, byte[] b) {
Expand All @@ -345,44 +349,50 @@ private boolean bytesEquals(byte[] a, byte[] b) {
}

@Override
public CuratorTransactionOpImpl replace(String znode, Object value) throws Exception {
byte[] data = toData(value);
curatorTransactionFinal = curatorTransactionFinal.setData().forPath(znode, data).and();
public CuratorTransactionOpImpl replace(String node, Object value) throws Exception {
if (value == null) {
log.info("node value is null, won't replace, node: {}", node);
return this;
}
byte[] data = value.toString().getBytes(Charset.forName("UTF-8"));
curatorTransactionFinal = curatorTransactionFinal.setData().forPath(node, data).and();
return this;
}

public CuratorTransactionOpImpl replaceIfChanged(String znode, Object value) throws Exception {
return replaceIfChanged(znode, value, new AtomicInteger(0));
}

public CuratorTransactionOpImpl replaceIfChanged(String znode, Object value, AtomicInteger changedCount)
throws Exception {
byte[] newData = toData(value);
if (this.checkExists(znode)) {
byte[] oldData = this.getData(znode);
@Override
public CuratorTransactionOpImpl replaceIfChanged(String node, Object value) throws Exception {
if (value == null) {
log.info("node value is null, won't replaceIfChanged, node: {}", node);
return this;
}
byte[] newData = value.toString().getBytes(Charset.forName("UTF-8"));
if (this.checkExists(node)) {
byte[] oldData = this.getData(node);
if (!bytesEquals(newData, oldData)) {
curatorTransactionFinal = curatorTransactionFinal.check().forPath(znode).and().setData()
.forPath(znode, newData).and();
changedCount.incrementAndGet();
curatorTransactionFinal = curatorTransactionFinal.check().forPath(node).and().setData()
.forPath(node, newData).and();
}
} else {
this.create(znode, newData);
changedCount.incrementAndGet();
this.create(node, newData);
}
return this;
}

@Override
public CuratorTransactionOp create(String znode, Object value) throws Exception {
byte[] data = toData(value);
public CuratorTransactionOp create(String node, Object value) throws Exception {
if (value == null) {
log.info("node value is null, won't create, node: {}", node);
return this;
}
byte[] data = value.toString().getBytes(Charset.forName("UTF-8"));
curatorTransactionFinal = curatorTransactionFinal.create().withMode(CreateMode.PERSISTENT)
.forPath(znode, data).and();
.forPath(node, data).and();
return this;
}

@Override
public CuratorTransactionOp delete(String znode) throws Exception {
curatorTransactionFinal = curatorTransactionFinal.delete().forPath(znode).and();
public CuratorTransactionOp delete(String node) throws Exception {
curatorTransactionFinal = curatorTransactionFinal.delete().forPath(node).and();
return this;
}

Expand Down

0 comments on commit 96bda95

Please sign in to comment.