支持DDD领域驱动架构的落地实现
springboot-ddd-framework 提供了一些在中小型微服务集群内落地DDD架构所依赖的通用组件和模式规范,无额外外部服务依赖,集群化,分片化,支持自动故障转移,无单点问题。
初衷是能快速构建出基于DDD架构风格的微服务实现,除必要的仓储依赖外,集群内部提供了分布式架构需要的基础功能,服务只保留除必要的输入输出,依赖集最小化。
组件包括
1. 通用DDD领域概念类(ddd-core)
2. 领域事件发布订阅(ddd-akka-event-publisher)
3. 分布式任务调度(ddd-akka-distributedJob)
4. 过程控制器(TODO)
5. CQRS模式(TODO)
6. RPC
1. 事件发布同【业务事务】同步提交或撤回,事件发布流程简单高效,避免了消息队列分布式事务的性能问题
2. 支持事件分区消费,事件过滤,允许无限扩展事件消费者并行处理事件
3. 支持事件有序消费,组消费
4. 支持消费背压
5. 节点下线自动转移【事件发布broker】,【分区消费端】至其余在线节点上,无单点调度/消费故障
6. 注解标注方法模式,开发配置简单, 目前支持mysql/mongodb仓储
1. master/worker模式,采用分片模式选择任意在线节点自动转移调度职责,无单点故障
2. 无外部存储强依赖,调度配置/轮次信息以CRDT数据结构分布在集群内部, 依赖简单,但可选自定义存储实现
3. 支持MAP(分治),SHARDING(数据分片),BROADCAST(广播),STANDALONE(单节点) 几种常用的分布式任务调度模式
4. 支持springCron表达式/secondDelay两种调度规则,类似springSchedule注解开发模式
5. 节点下线自动转移【调度器】,【未完成任务】至其余在线节点上
1. 发布简单,注解标注方法即可作为远程服务发布
2. 协议分发简单,mavenPlugin会自动生成服务方法的定义文件(protobuf协议),依赖方引入协议文件后,会自动生成【远程服务接口类及相关参数对象类】
3. 远程服务实时上下线监控,本地请求支持重试,熔断,降级
4. 支持请求背压,允许基于服务方提供的上限请求阀值控制请求频次
<ddd.version>0.0.6-RELEASE</ddd.version>
<dependency>
<groupId>com.zero.ddd</groupId>
<artifactId>ddd-akka-event-publisher-relational-database</artifactId>
<version>${ddd.version}</version>
</dependency>
<dependency>
<groupId>com.zero.ddd</groupId>
<artifactId>ddd-akka-distributedJob</artifactId>
<version>${ddd.version}</version>
</dependency>
<repositories>
<repository>
<id>release-mvn-repo</id>
<url>https://raw.githubusercontent.com/rezar1/maven-rpeo/release/</url>
</repository>
</repositories>
示例项目: ddd-akka-event-publisher-demo
// 发布代码片段
private void reCompareFirstInivterInfo() {
InviterInfo compareFirstInviterInfo =
this.compareFirstInviterInfo();
if (this.inviterInfo == null
|| !this.inviterInfo.equals(compareFirstInviterInfo)) {
this.inviterInfo = compareFirstInviterInfo;
DomainEventPublisher.publish(
new ParticipantsInviterInfoChanged(
this.participantsId,
this.companyId,
this.taskId,
this.wxUnionId,
this.wxMemberId,
this.inviterInfo));
}
}
// 消费代码片段
@EventSynchronizer(
partition = 30,
clientConcurrency = 10)
@ShardingKeyExpression(
el ="#event[taskId]")
public void updateParticipantsBelongChain(
ParticipantsInviterInfoChanged participantsInviterInfoChanged,
@ShardingKeyExpression(
el ="#event[taskId]",
filterEl = "#event[wxUnionId] != null && #event[wxMemberId] != null")
ParticipantsMatchedVisitorIdChanged participantsMatchedVisitorIdChanged) {
String traceId = UUID.randomUUID().toString();
ParticipantsId participantsId = null;
if (participantsInviterInfoChanged != null) {
log.info(
"[{}] participantsInviterInfoChanged:{}-{}-{}-{}-{}-{}",
traceId,
participantsInviterInfoChanged.getCompanyId(),
participantsInviterInfoChanged.getTaskId(),
participantsInviterInfoChanged.getParticipantsId(),
participantsInviterInfoChanged.getWxUnionId(),
participantsInviterInfoChanged.getWxMemberId(),
participantsInviterInfoChanged.getInviterInfo());
participantsId = participantsInviterInfoChanged.getParticipantsId();
} else if (participantsMatchedVisitorIdChanged != null) {
log.info(
"[{}] participantsMatchedVisitorIdChanged:{}-{}-{}-{}-{}",
traceId,
participantsMatchedVisitorIdChanged.getCompanyId(),
participantsMatchedVisitorIdChanged.getTaskId(),
participantsMatchedVisitorIdChanged.getParticipantsId(),
participantsMatchedVisitorIdChanged.getWxUnionId(),
participantsMatchedVisitorIdChanged.getWxMemberId());
participantsId = participantsMatchedVisitorIdChanged.getParticipantsId();
}
}
// 数据分片
@DistributedJob(
jobShowName = "分片生成profile图片的phash",
concurrency = 40,
jobParams = "0=run&1=run&2=run&3=run&4=run&5=run&6=run&7=run&8=run&9=run&10=run&11=run",
jobExecuteType = JobExecuteType.SHARDING,
jobScheduled = @JobScheduled(cron = "0/5 * * * * ?", initialDelay = 10000),
jobScheduleTimeout =
@JobScheduleTimeout(
timeout = 3,
timeoutUnit = TimeUnit.MINUTES))
public TaskResult initProfileImageHash(
JobTaskContext context) {
ShardingRequest shardingRequest = context.shardingRequest();
this.app.shardingQueryNeedInitImageHashProfile(
shardingRequest.getShardingNum(),
shardingRequest.getShardingId().intValue())
.forEach(profileVo -> {
this.app.tryCreateProfileImageHash(profileVo);
});
return TaskResult.succ();
}
// 任务分治
@DistributedJob(
// 日志显示可读的任务名称
jobShowName = "打印所有的公司员工",
// 任务调度的类型
jobExecuteType = JobExecuteType.MAP,
// 单实例并行的任务执行者数量
concurrency = 5,
// spring crontab
jobScheduled = @JobScheduled(cron = "0/20 * * * * ?", initialDelay = 30000),
// 单次调度超时时间
jobScheduleTimeout =
@JobScheduleTimeout(
timeout = 10,
timeoutUnit = TimeUnit.MINUTES,
timeoutListener = JustLogJobScheudleTimeoutListener.class))
public TaskResult logCompanyUsers(
JobTaskContext context) {
if (context.isRootTask()) {
// 初始根任务
return this.rootMapSubTask();
} else if (context.isTaskOfType(SUB_TASK_1)) {
// 二级实际执行的任务
return this.executeSubTaskLogic(context);
} else {} // others
return TaskResult.succ();
}