forked from binshuohu/akka-persistence-transaction
-
Notifications
You must be signed in to change notification settings - Fork 0
/
TransactionManagerActor.scala
141 lines (105 loc) · 5.09 KB
/
TransactionManagerActor.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package sample.persistence.transaction
import akka.actor.{ActorLogging, Props}
import akka.persistence.AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot
import akka.persistence.{AtLeastOnceDelivery, PersistentActor, SnapshotOffer}
import sample.persistence.account.AccountActor
import sample.persistence.domain.Account
case class TransferMoney(transactionId: Long, from: Account, to: Account, amount: Long)
sealed trait Event
case class TransactionInitiated(transactionId: Long, from: Account, to: Account, amount: Long) extends Event
case class MoneyFrozen(deliveryId: Long) extends Event
case class FreezingMoneyFailed(deliveryId: Long, reason: String) extends Event
case class MoneyAdded(deliveryId: Long) extends Event
case class AddingMoneyFailed(deliveryId: Long, reason: String) extends Event
case class TransactionFinished(deliveryId: Long) extends Event
case class MoneyUnfrozen(deliveryId: Long) extends Event
object TransactionManagerActor {
def props(): Props = Props(classOf[TransactionManagerActor])
}
class TransactionManagerActor
extends PersistentActor with AtLeastOnceDelivery with ActorLogging {
override val persistenceId: String = self.path.name
case class TrxnMgrState(transactionId: Long = -1L,
from: Account = "",
to: Account = "",
amount: Long = 0L,
failureReason: String = "",
deliverySnapshot: AtLeastOnceDeliverySnapshot = getDeliverySnapshot) {
def updated(event: Event): TrxnMgrState = event match {
case TransactionInitiated(trxdId, from, to, amount) =>
TrxnMgrState(trxdId, from, to, amount, "", getDeliverySnapshot)
case FreezingMoneyFailed(_, reason) => copy(failureReason = reason, deliverySnapshot = getDeliverySnapshot)
case AddingMoneyFailed(_, reason) => copy(failureReason = reason, deliverySnapshot = getDeliverySnapshot)
case _ => copy(deliverySnapshot = getDeliverySnapshot)
}
}
var state = TrxnMgrState()
def updateState(event: Event): Unit = {
def fromActor = context.actorSelection(s"/user/${state.from}")
def toActor = context.actorSelection(s"/user/${state.to}")
state = state.updated(event)
event match {
case TransactionInitiated(id, from, to, amount) =>
deliver(fromActor) { deliveryId =>
AccountActor.FreezeMoney(deliveryId, id, to, amount)
}
case MoneyFrozen(deliveryId) =>
confirmDelivery(deliveryId)
deliver(toActor) { deliveryId =>
AccountActor.AddMoney(deliveryId, state.transactionId, state.from, state.amount)
}
case FreezingMoneyFailed(deliveryId, reason) =>
confirmDelivery(deliveryId)
case MoneyAdded(deliveryId) =>
confirmDelivery(deliveryId)
deliver(fromActor) { deliveryId =>
AccountActor.FinishTransaction(deliveryId, state.transactionId)
}
case AddingMoneyFailed(deliveryId, reason) =>
confirmDelivery(deliveryId)
deliver(fromActor) { deliveryId =>
AccountActor.UnfreezeMoney(deliveryId, state.transactionId)
}
case MoneyUnfrozen(deliveryId) =>
confirmDelivery(deliveryId)
case TransactionFinished(deliveryId) =>
confirmDelivery(deliveryId)
}
}
val receiveRecover: Receive = {
case evt: Event =>
log.info(s"replay event: $evt")
updateState(evt)
case SnapshotOffer(_, snapshot: TrxnMgrState) =>
state = snapshot
setDeliverySnapshot(state.deliverySnapshot)
}
override def receiveCommand: Receive = {
case TransferMoney(id, from, to, amount) =>
persist(TransactionInitiated(id, from, to, amount))(updateState)
case AccountActor.ConfirmMoneyFrozenFail(deliveryId, reason) =>
persist(FreezingMoneyFailed(deliveryId, reason)) { e =>
updateState(e)
log.info(s"unable to finish transaction ${state.transactionId}, reason: ${state.failureReason}")
context.system.eventStream.publish(s"unable to finish transaction ${state.transactionId}, reason: ${state.failureReason}")
}
case AccountActor.ConfirmMoneyFrozenSucc(deliveryId) =>
persist(MoneyFrozen(deliveryId))(updateState)
case AccountActor.ConfirmMoneyAddedFail(deliveryId, reason) =>
persist(AddingMoneyFailed(deliveryId, reason))(updateState)
case AccountActor.ConfirmMoneyAddedSucc(deliveryId) =>
persist(MoneyAdded(deliveryId))(updateState)
case AccountActor.ConfirmTransactionFinished(deliveryId) =>
persist(TransactionFinished(deliveryId)) { e=>
updateState(e)
log.info(s"transaction ${state.transactionId} finished successfully")
context.system.eventStream.publish(s"transaction ${state.transactionId} finished successfully")
}
case AccountActor.ConfirmMoneyUnfrozen(deliveryId) =>
persist(MoneyUnfrozen(deliveryId)) { e =>
updateState(e)
log.info(s"unable to finish transaction ${state.transactionId}, reason: ${state.failureReason}")
context.system.eventStream.publish(s"unable to finish transaction ${state.transactionId}, reason: ${state.failureReason}")
}
}
}