Skip to content

Commit

Permalink
Merge pull request #38 from tkrs/add-debug-log
Browse files Browse the repository at this point in the history
Add debug log
  • Loading branch information
Takeru Sato committed Dec 6, 2016
2 parents 77e80b9 + 80fe1f5 commit 8108e58
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 103 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ lazy val publishSettings = Seq(
<url>https://github.com/tkrs</url>
</developer>
</developers>,
pgpPassphrase := sys.env.get("PGP_PASSPHRASE").map(_.getBytes.map(_.toChar))
pgpPassphrase := sys.env.get("PGP_PASSPHRASE").map(_.toCharArray)
) ++ credentialSettings

lazy val credentialSettings = Seq(
Expand Down
77 changes: 65 additions & 12 deletions examples/src/main/scala/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,64 @@ object Main extends App {

case class CCC(
i: Int,
ttt: String,
uuu: String,
aaa: String,
bbb: String,
ccc: String,
ddd: Int,
eee: Map[String, String],
ffff: Seq[Double],
ggg: Int,
hhh: String,
iii: Int,
jjj: String,
kkk: String,
lll: String,
mmm: String,
nnn: String,
ooo: String,
ppp: String,
qqq: Int,
rrr: Int,
sss: Int,
mmm: Map[String, String],
ggg: Seq[Double]
ttt: Int,
uuu: Int,
vvv: Int,
www: Int,
xxx: Int,
yyy: Int,
zzz: Int
)

val rnd0 = new Random()

val ccc: CCC = CCC(
0,
rnd0.nextString(1000),
rnd0.nextString(10),
rnd0.nextString(100),
rnd0.nextInt(Int.MaxValue),
Map("name" -> "fluflu"),
Seq(1.2, Double.MaxValue, Double.MinValue),
rnd0.nextInt(Int.MaxValue),
rnd0.nextString(30),
rnd0.nextInt(Int.MaxValue),
rnd0.nextString(30),
rnd0.nextString(30),
rnd0.nextString(30),
rnd0.nextString(30),
rnd0.nextString(30),
rnd0.nextString(30),
rnd0.nextString(30),
rnd0.nextInt(Int.MaxValue),
rnd0.nextInt(Int.MaxValue),
rnd0.nextInt(Int.MaxValue),
rnd0.nextInt(Int.MaxValue),
rnd0.nextInt(Int.MaxValue),
rnd0.nextInt(Int.MaxValue),
rnd0.nextInt(Int.MaxValue),
rnd0.nextInt(Int.MaxValue),
rnd0.nextInt(Int.MaxValue),
rnd0.nextInt(Int.MaxValue)
)

implicit val clock: Clock = Clock.systemUTC()
Expand All @@ -37,8 +90,6 @@ object Main extends App {
val rewriteBackoff: Backoff =
ExponentialBackoff(Duration.ofNanos(500), Duration.ofSeconds(5), rnd)

val ccc: CCC = CCC(0, "foo", "", Int.MaxValue, Map("name" -> "fluflu"), Seq(1.2, Double.MaxValue, Double.MinValue))

val messenger = fluflu.DefaultMessenger(
host = args(0),
port = args(1).toInt,
Expand All @@ -47,7 +98,7 @@ object Main extends App {
reconnectionBackoff = reconnectionBackoff,
rewriteBackoff = rewriteBackoff
)
val async: Async = Async(
val asyncQueue: Async = Async(
messenger = messenger,
initialBufferSize = 1024,
initialDelay = 500,
Expand All @@ -57,20 +108,22 @@ object Main extends App {
terminationDelayTimeUnit = TimeUnit.SECONDS
)
val push: Event[CCC] => Future[Unit] = { a =>
async.push(a).fold(Future.failed, Future.successful)
asyncQueue.push(a).fold(Future.failed, Future.successful)
}

val idx = new AtomicInteger(0)
val xs: Vector[Event[CCC]] =
Iterator.from(1).map(x => Event("example", "ccc", ccc.copy(i = x))).take(5000).toVector
Iterator.from(1).map(x => Event("example", "ccc", ccc.copy(i = idx.getAndIncrement()))).take(5000).toVector

val i = new AtomicInteger(1)
val wokers = new AtomicInteger(1)

val scheduler: ScheduledExecutorService = Executors.newScheduledThreadPool(1)
val worker = new Runnable {
override def run(): Unit = {
val start = System.nanoTime()
val r = Await.result(xs traverse push attempt, Inf)
val elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)
println(("worker", i.getAndIncrement(), r.isRight, elapsed))
println(("worker", wokers.getAndIncrement(), r.isRight, elapsed))
}
}

Expand All @@ -82,5 +135,5 @@ object Main extends App {
scheduler.awaitTermination(args(2).toLong, TimeUnit.SECONDS)
scheduler.shutdownNow()
pool.shutdown()
async.close()
asyncQueue.close()
}
180 changes: 90 additions & 90 deletions msgpack/src/main/scala/fluflu/msgpack/MessagePacker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,20 @@ package fluflu
package msgpack

import java.lang.Double.doubleToLongBits
import java.nio.{ ByteBuffer, CharBuffer }
import java.nio.CharBuffer
import java.nio.charset.{ CharsetEncoder, StandardCharsets }

import cats.syntax.either._
import io.circe.{ Encoder, Json }

import scala.collection.mutable
import scala.util.{ Either => \/ }

object MessagePacker {

val encoder: CharsetEncoder = StandardCharsets.UTF_8.newEncoder()

final val `0xc0`: Byte = 0xc0.toByte
final val `0xc3`: Byte = 0xc3.toByte
final val `0xc2`: Byte = 0xc2.toByte

Expand All @@ -39,59 +41,59 @@ object MessagePacker {

def apply() = new MessagePacker()

def formatArrayFamilyHeader(size: Int, builder: ByteBuffer): Unit = {
def formatArrayFamilyHeader(size: Int, builder: mutable.ArrayBuilder[Byte]): Unit = {
if (size < 16)
builder.put((0x90 | size).toByte)
builder += (0x90 | size).toByte
else if (size < 65536) {
builder.put(`0xdc`)
builder.put((size >>> 8).toByte)
builder.put((size >>> 0).toByte)
builder += `0xdc`
builder += (size >>> 8).toByte
builder += (size >>> 0).toByte
} else {
builder.put(`0xdd`)
builder.put((size >>> 24).toByte)
builder.put((size >>> 16).toByte)
builder.put((size >>> 8).toByte)
builder.put((size >>> 0).toByte)
builder += `0xdd`
builder += (size >>> 24).toByte
builder += (size >>> 16).toByte
builder += (size >>> 8).toByte
builder += (size >>> 0).toByte
}
}

def formatMapFamilyHeader(sz: Int, builder: ByteBuffer): Unit = {
def formatMapFamilyHeader(sz: Int, builder: mutable.ArrayBuilder[Byte]): Unit = {
if (sz < 16)
builder.put((0x80 | sz).toByte)
builder += (0x80 | sz).toByte
else if (sz < 65536) {
builder.put(`0xde`)
builder.put((sz >>> 8).toByte)
builder.put((sz >>> 0).toByte)
builder += `0xde`
builder += (sz >>> 8).toByte
builder += (sz >>> 0).toByte
} else {
builder.put(`0xdf`)
builder.put((sz >>> 24).toByte)
builder.put((sz >>> 16).toByte)
builder.put((sz >>> 8).toByte)
builder.put((sz >>> 0).toByte)
builder += `0xdf`
builder += (sz >>> 24).toByte
builder += (sz >>> 16).toByte
builder += (sz >>> 8).toByte
builder += (sz >>> 0).toByte
}
}

def formatStrFamilyHeader(sz: Int, builder: ByteBuffer): Unit =
def formatStrFamilyHeader(sz: Int, builder: mutable.ArrayBuilder[Byte]): Unit =
if (sz < 32)
builder.put((0xa0 | sz).toByte)
builder += (0xa0 | sz).toByte
else if (sz < 65536) {
builder.put(`0xda`)
builder.put((sz >>> 8).toByte)
builder.put((sz >>> 0).toByte)
builder += `0xda`
builder += (sz >>> 8).toByte
builder += (sz >>> 0).toByte
} else {
builder.put(`0xdb`)
builder.put((sz >>> 24).toByte)
builder.put((sz >>> 16).toByte)
builder.put((sz >>> 8).toByte)
builder.put((sz >>> 0).toByte)
builder += `0xdb`
builder += (sz >>> 24).toByte
builder += (sz >>> 16).toByte
builder += (sz >>> 8).toByte
builder += (sz >>> 0).toByte
}

val formatNil: Byte = 0xc0.toByte
def formatNil(builder: mutable.ArrayBuilder[Byte]): Unit = builder += `0xc0`

def formatBoolFamily(v: Boolean, builder: ByteBuffer): Unit =
builder.put(if (v) `0xc3` else `0xc2`)
def formatBoolFamily(v: Boolean, builder: mutable.ArrayBuilder[Byte]): Unit =
builder += (if (v) `0xc3` else `0xc2`)

def formatIntFamily(l: Long, builder: ByteBuffer): Unit =
def formatIntFamily(l: Long, builder: mutable.ArrayBuilder[Byte]): Unit =
if (4294967296L <= l) formatLong(`0xcf`, l, builder)
else if (65536L <= l) formatInt(`0xce`, l.toInt, builder)
else if (256L <= l) formatShort(`0xcd`, l.toInt, builder)
Expand All @@ -103,67 +105,70 @@ object MessagePacker {
else if (l >= Int.MinValue.toLong) formatInt(`0xd2`, l.toInt, builder)
else formatLong(`0xd3`, l, builder)

def formatIntFamily(t: Byte, v: BigInt, builder: ByteBuffer): Unit = {
builder.put(t)
builder.put((v >> 56).toByte)
builder.put((v >> 48).toByte)
builder.put((v >> 40).toByte)
builder.put((v >> 32).toByte)
builder.put((v >> 24).toByte)
builder.put((v >> 16).toByte)
builder.put((v >> 8).toByte)
builder.put((v >> 0).toByte)
def formatIntFamily(t: Byte, v: BigInt, builder: mutable.ArrayBuilder[Byte]): Unit = {
builder += t
builder += (v >> 56).toByte
builder += (v >> 48).toByte
builder += (v >> 40).toByte
builder += (v >> 32).toByte
builder += (v >> 24).toByte
builder += (v >> 16).toByte
builder += (v >> 8).toByte
builder += (v >> 0).toByte
}

def formatFloatFamily(v: Double, builder: ByteBuffer): Unit = {
def formatFloatFamily(v: Double, builder: mutable.ArrayBuilder[Byte]): Unit = {
val x = doubleToLongBits(v)
builder.put(`0xcb`)
builder.put((x >>> 56).toByte)
builder.put((x >>> 48).toByte)
builder.put((x >>> 40).toByte)
builder.put((x >>> 32).toByte)
builder.put((x >>> 24).toByte)
builder.put((x >>> 16).toByte)
builder.put((x >>> 8).toByte)
builder.put((x >>> 0).toByte)
builder += `0xcb`
builder += (x >>> 56).toByte
builder += (x >>> 48).toByte
builder += (x >>> 40).toByte
builder += (x >>> 32).toByte
builder += (x >>> 24).toByte
builder += (x >>> 16).toByte
builder += (x >>> 8).toByte
builder += (x >>> 0).toByte
}

def formatStrFamily(v: String, builder: ByteBuffer): Unit = {
def formatStrFamily(v: String, builder: mutable.ArrayBuilder[Byte]): Unit = {
val cb = CharBuffer.wrap(v)
val buf = encoder.encode(cb)
formatStrFamilyHeader(strSize(cb), builder)
builder.put(buf)
val arr = Array.ofDim[Byte](buf.remaining())
buf.get(arr)
builder ++= arr
buf.clear()
cb.clear()
}

def formatByte(v: Byte, builder: ByteBuffer): Unit = builder.put(v)
def formatByte(v: Byte, builder: mutable.ArrayBuilder[Byte]): Unit = builder += v

def formatByte(t: Byte, v: Byte, builder: ByteBuffer): Unit = {
builder.put(t)
builder.put(v)
def formatByte(t: Byte, v: Byte, builder: mutable.ArrayBuilder[Byte]): Unit = {
builder += t
builder += v
}
def formatShort(t: Byte, v: Int, builder: ByteBuffer): Unit = {
builder.put(t)
builder.put((v >>> 8).toByte)
builder.put((v >>> 0).toByte)
def formatShort(t: Byte, v: Int, builder: mutable.ArrayBuilder[Byte]): Unit = {
builder += t
builder += (v >>> 8).toByte
builder += (v >>> 0).toByte
}
def formatInt(t: Byte, v: Int, builder: ByteBuffer): Unit = {
builder.put(t)
builder.put((v >>> 24).toByte)
builder.put((v >>> 16).toByte)
builder.put((v >>> 8).toByte)
builder.put((v >>> 0).toByte)
def formatInt(t: Byte, v: Int, builder: mutable.ArrayBuilder[Byte]): Unit = {
builder += t
builder += (v >>> 24).toByte
builder += (v >>> 16).toByte
builder += (v >>> 8).toByte
builder += (v >>> 0).toByte
}
def formatLong(t: Byte, v: Long, builder: ByteBuffer): Unit = {
builder.put(t)
builder.put((v >>> 56).toByte)
builder.put((v >>> 48).toByte)
builder.put((v >>> 40).toByte)
builder.put((v >>> 32).toByte)
builder.put((v >>> 24).toByte)
builder.put((v >>> 16).toByte)
builder.put((v >>> 8).toByte)
builder.put((v >>> 0).toByte)
def formatLong(t: Byte, v: Long, builder: mutable.ArrayBuilder[Byte]): Unit = {
builder += t
builder += (v >>> 56).toByte
builder += (v >>> 48).toByte
builder += (v >>> 40).toByte
builder += (v >>> 32).toByte
builder += (v >>> 24).toByte
builder += (v >>> 16).toByte
builder += (v >>> 8).toByte
builder += (v >>> 0).toByte
}

def strSize(cb: CharBuffer): Int =
Expand All @@ -184,23 +189,18 @@ final class MessagePacker {
def encode[A](a: A)(implicit A: Encoder[A]): Throwable \/ Array[Byte] = pack(A(a))

def pack(doc: Json): Throwable \/ Array[Byte] = {
val acc: ByteBuffer = ByteBuffer.allocate(1048576)
val acc: mutable.ArrayBuilder[Byte] = mutable.ArrayBuilder.make[Byte]
\/.catchNonFatal {
go(doc, acc)
val i = acc.position()
val arr = Array.ofDim[Byte](i)
acc.flip()
acc.get(arr)
acc.clear()
arr
acc.result
}
}

def double(x: BigDecimal): Boolean = x.scale != 0

def go(json: Json, acc: ByteBuffer): Unit =
def go(json: Json, acc: mutable.ArrayBuilder[Byte]): Unit =
if (json.isNull)
acc.put(formatNil)
formatNil(acc)
else if (json.isBoolean) json.asBoolean match {
case None => ()
case Some(x) => formatBoolFamily(x, acc)
Expand Down
1 change: 1 addition & 0 deletions queue/src/main/scala/fluflu/queue/Async.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ final case class Async(
scheduler.shutdown()
scheduler.awaitTermination(terminationDelay, terminationDelayTimeUnit)
if (!scheduler.isTerminated) scheduler.shutdownNow()
if (!letterQueue.isEmpty) logger.debug(s"message queue has remaining: ${letterQueue.size()}")
command.run()
messenger.close()
}
Expand Down

0 comments on commit 8108e58

Please sign in to comment.