Skip to content

Latest commit

 

History

History
77 lines (69 loc) · 2.83 KB

7.Mailbox源码分析.md

File metadata and controls

77 lines (69 loc) · 2.83 KB

看了4.创建ActorRef的内部流程一节我们已经知道,在初始化ActorCell时会为ActorCell创建Mailbox,Mailbox只有一个类参数:MessageQueue,而MessageQueue的实现类实例是由MailboxType的create方法创建的。

/**
 * MessageQueue是Akka Mailbox的核心组件之一
 * 当正常的messages发送到Actors时,messages被入队到MessageQueue中 (随后也会被从MessageQueue取出)
 * MessageQueue至少需要支持 Nproducers 和 1consumer 线程安全模式。
 */
trait MessageQueue {
  /**
   * Try to enqueue the message to this queue, or throw an exception.
   */
  def enqueue(receiver: ActorRef, handle: Envelope): Unit // NOTE: receiver is used only in two places, but cannot be removed

  /**
   * Try to dequeue the next message from this queue, return null failing that.
   */
  def dequeue(): Envelope

  /**
   * 返回MessageQueue当前持有的messages的数量.
   * 不要用这个方法检查MessageQueue是否存储messages,应该用`hasMessages`方法.
   */
  def numberOfMessages: Int

  /**
   * 检查MessageQueue是否是非空的
   */
  def hasMessages: Boolean

  /**
   * Called when the mailbox this queue belongs to is disposed of. Normally it
   * is expected to transfer all remaining messages into the dead letter queue
   * which is passed in. The owner of this MessageQueue is passed in if
   * available (e.g. for creating DeadLetters()), “/deadletters” otherwise.
   */
  def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit
}

/**
 * QueueBasedMessageQueue是一个底层基于java.util.Queue的MessageQueue
 */
trait QueueBasedMessageQueue extends MessageQueue with MultipleConsumerSemantics {
  def queue: Queue[Envelope]
  def numberOfMessages = queue.size
  def hasMessages = !queue.isEmpty
  def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = {
    if (hasMessages) {
      var envelope = dequeue
      while (envelope ne null) {
        deadLetters.enqueue(owner, envelope)
        envelope = dequeue
      }
    }
  }
}

trait UnboundedQueueBasedMessageQueue extends QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
  def enqueue(receiver: ActorRef, handle: Envelope): Unit = queue add handle
  def dequeue(): Envelope = queue.poll()
}

/**
 * UnboundedMailbox是Akka Actor默认使用的无界的MailboxType.
 * 底层使用的是`ConcurrentLinkedQueue`.
 */
final case class UnboundedMailbox() extends MailboxType with ProducesMessageQueue[UnboundedMailbox.MessageQueue] {
  def this(settings: ActorSystem.Settings, config: Config) = this()
  final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
    new UnboundedMailbox.MessageQueue
}
object UnboundedMailbox {
  class MessageQueue extends ConcurrentLinkedQueue[Envelope] with UnboundedQueueBasedMessageQueue {
    final def queue: Queue[Envelope] = this
  }
}