Skip to content

Commit

Permalink
Merge branch 'release/0.9.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
ianoc committed Mar 20, 2014
2 parents efbb43c + ff96d5e commit 295bdbd
Show file tree
Hide file tree
Showing 65 changed files with 3,695 additions and 119 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ before_script:
services:
- redis-server
- memcache
- mongodb
script: umask 0022 && sbt ++$TRAVIS_SCALA_VERSION test
23 changes: 23 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,28 @@
# Storehaus #

### Version 0.9.0 ###
* Reporting store algebra: https://github.com/twitter/storehaus/pull/176
* Bumping finagle to a more recent version, changes that were required: https://github.com/twitter/storehaus/pull/223
* Bump Algebird to version 0.5.0: https://github.com/twitter/storehaus/pull/221
* Add stores for read-through and write-through caching: https://github.com/twitter/storehaus/pull/220
* fix bug in onFailure enriched mergeable store: https://github.com/twitter/storehaus/pull/218
* Fixes an issue that Future.collect is N^2 on scala Lists: https://github.com/twitter/storehaus/pull/219
* Adds GetBatchingReadableStore: https://github.com/twitter/storehaus/pull/215
* Elastic Search Store: https://github.com/twitter/storehaus/pull/205
* Issue #72: Added mongodb store.: https://github.com/twitter/storehaus/pull/199
* Add out of retries exception to retrying store: https://github.com/twitter/storehaus/pull/210
* IterableStore: https://github.com/twitter/storehaus/pull/191
* add onFailure to EnrichedMergeableStore: https://github.com/twitter/storehaus/pull/200
* clean up htable after finishing get and put operations.: https://github.com/twitter/storehaus/pull/207
* Adds a mutable TTL cache: https://github.com/twitter/storehaus/pull/196
* add MergeableStore.fromStoreNoMulti that does single get then put: https://github.com/twitter/storehaus/pull/201
* my little proxy: https://github.com/twitter/storehaus/pull/202
* Add immutable LIRS Cache implementation: https://github.com/twitter/storehaus/pull/155
* Adds the CalendarTimeStrategy: https://github.com/twitter/storehaus/pull/195
* Adds the ability to add an Optional component onto any strategy: https://github.com/twitter/storehaus/pull/198
* Just adds some whitespace: https://github.com/twitter/storehaus/pull/197
* Kafka Sink for SummingBird: https://github.com/twitter/storehaus/pull/192

### Version 0.8.0 ###
* add BatchedStore for writes: https://github.com/twitter/storehaus/pull/175
* MySQL batched multiPut: https://github.com/twitter/storehaus/pull/173
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ Discussion occurs primarily on the [Storehaus mailing list](https://groups.googl

## Maven

Storehaus modules are available on maven central. The current groupid and version for all modules is, respectively, `"com.twitter"` and `0.6.0`.
Storehaus modules are available on maven central. The current groupid and version for all modules is, respectively, `"com.twitter"` and `0.9.0`.

Current published artifacts are

Expand Down
61 changes: 52 additions & 9 deletions project/Build.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2013 Twitter inc.
* Copyright 2014 Twitter inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -55,15 +55,16 @@ object StorehausBuild extends Build {
val sharedSettings = extraSettings ++ ciSettings ++ Seq(
organization := "com.twitter",
scalaVersion := "2.9.3",
version := "0.8.0",
version := "0.9.0",
crossScalaVersions := Seq("2.9.3", "2.10.0"),
javacOptions ++= Seq("-source", "1.6", "-target", "1.6"),
javacOptions in doc := Seq("-source", "1.6"),
libraryDependencies <+= scalaVersion(specs2Import(_)),
resolvers ++= Seq(
Opts.resolver.sonatypeSnapshots,
Opts.resolver.sonatypeReleases,
"Twitter Maven" at "http://maven.twttr.com"
"Twitter Maven" at "http://maven.twttr.com",
"Conjars Repository" at "http://conjars.org/repo"
),
parallelExecution in Test := true,
scalacOptions ++= Seq(Opts.compile.unchecked, Opts.compile.deprecation),
Expand Down Expand Up @@ -113,11 +114,12 @@ object StorehausBuild extends Build {
def youngestForwardCompatible(subProj: String) =
Some(subProj)
.filterNot(unreleasedModules.contains(_))
.map { s => "com.twitter" % ("storehaus-" + s + "_2.9.3") % "0.8.0" }
.map { s => "com.twitter" % ("storehaus-" + s + "_2.9.3") % "0.9.0" }

val algebirdVersion = "0.3.1"
val bijectionVersion = "0.6.0"
val utilVersion = "6.3.7"
val algebirdVersion = "0.5.0"
val bijectionVersion = "0.6.2"
val utilVersion = "6.11.0"
val scaldingVersion = "0.9.0rc15"

lazy val storehaus = Project(
id = "storehaus",
Expand All @@ -136,6 +138,9 @@ object StorehausBuild extends Build {
storehausRedis,
storehausHBase,
storehausDynamoDB,
storehausKafka,
storehausMongoDB,
storehausElastic,
storehausTesting
)

Expand All @@ -147,7 +152,10 @@ object StorehausBuild extends Build {
).dependsOn(storehausTesting % "test->test")
}

lazy val storehausCache = module("cache")
lazy val storehausCache = module("cache").settings(
libraryDependencies += "com.twitter" %% "algebird-core" % algebirdVersion,
libraryDependencies += withCross("com.twitter" %% "util-core" % utilVersion)
)

lazy val storehausCore = module("core").settings(
libraryDependencies ++= Seq(
Expand All @@ -160,7 +168,8 @@ object StorehausBuild extends Build {
lazy val storehausAlgebra = module("algebra").settings(
libraryDependencies += "com.twitter" %% "algebird-core" % algebirdVersion,
libraryDependencies += "com.twitter" %% "algebird-util" % algebirdVersion,
libraryDependencies += "com.twitter" %% "algebird-bijection" % algebirdVersion
libraryDependencies += "com.twitter" %% "algebird-bijection" % algebirdVersion,
libraryDependencies += "com.twitter" %% "scalding-date" % scaldingVersion
).dependsOn(storehausCore % "test->test;compile->compile")

lazy val storehausMemcache = module("memcache").settings(
Expand Down Expand Up @@ -212,6 +221,40 @@ object StorehausBuild extends Build {
parallelExecution in Test := false
).dependsOn(storehausAlgebra % "test->test;compile->compile")

lazy val storehausKafka = module("kafka").settings(
libraryDependencies ++= Seq (
"com.twitter" %% "bijection-core" % bijectionVersion,
"com.twitter" %% "bijection-avro" % bijectionVersion,
"com.twitter"%"kafka_2.9.2"%"0.7.0" excludeAll(
ExclusionRule("com.sun.jdmk","jmxtools"),
ExclusionRule( "com.sun.jmx","jmxri"),
ExclusionRule( "javax.jms","jms")
)
),
// we don't want various tests clobbering each others keys
parallelExecution in Test := false
).dependsOn(storehausAlgebra % "test->test;compile->compile")

lazy val storehausMongoDB= module("mongodb").settings(
libraryDependencies ++= Seq(
"com.twitter" %% "bijection-core" % bijectionVersion,
"org.mongodb" %% "casbah" % "2.6.4"
),
parallelExecution in Test := false
).dependsOn(storehausAlgebra % "test->test;compile->compile")

lazy val storehausElastic = module("elasticsearch").settings(
libraryDependencies ++= Seq (
"org.elasticsearch" % "elasticsearch" % "0.90.9",
"org.json4s" %% "json4s-native" % "3.2.6",
"com.google.code.findbugs" % "jsr305" % "1.3.+",
"com.twitter" %% "bijection-json4s" % bijectionVersion
),
// we don't want various tests clobbering each others keys
parallelExecution in Test := false
).dependsOn(storehausAlgebra % "test->test;compile->compile")


val storehausTesting = Project(
id = "storehaus-testing",
base = file("storehaus-testing"),
Expand Down
2 changes: 1 addition & 1 deletion project/Finagle.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ package storehaus
* dependency */
object Finagle {
import sbt._
val LatestVersion = "6.5.1"
val LatestVersion = "6.12.2"
def module(name: String, version: String = LatestVersion) =
StorehausBuild.withCross("com.twitter" %% "finagle-%s".format(name) % version)
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
/*
* Copyright 2013 Twitter Inc.
* Copyright 2014 Twitter inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.storehaus.algebra
Expand Down Expand Up @@ -49,4 +49,12 @@ class EnrichedMergeableStore[K, V](store: MergeableStore[K, V]) {

def convert[K1, V1](fn: K1 => K)(implicit bij: ImplicitBijection[V1, V]): MergeableStore[K1, V1] =
MergeableStore.convert(store)(fn)

def onMergeFailure(rescueException: Throwable => Unit): MergeableStore[K, V] =
new MergeableStoreProxy[K, V] {
override def self = store
override def merge(kv: (K, V)) = store.merge(kv).onFailure(rescueException)
override def multiMerge[K1 <: K](kvs: Map[K1, V]) =
store.multiMerge(kvs).map { case (key, value) => (key, value.onFailure(rescueException)) }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2013 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.storehaus.algebra

import com.twitter.algebird.Semigroup
import com.twitter.storehaus.Proxied
import com.twitter.util.{ Future, Time }

/** Proxy for Mergeables. Methods not overrided in extensions will be forwared to Proxied
* self member */
trait MergeableProxy[K, V] extends Mergeable[K, V] with Proxied[Mergeable[K,V]] {
override def semigroup: Semigroup[V] = self.semigroup
override def merge(kv: (K, V)): Future[Option[V]] = self.merge(kv)
override def multiMerge[K1 <: K](kvs: Map[K1, V]): Map[K1, Future[Option[V]]] = self.multiMerge(kvs)
override def close(time: Time) = self.close(time)
}

/** Proxy for MergeableStoress. Methods not overrided in extensions will be forwared to Proxied
* self member */
trait MergeableStoreProxy[K, V] extends MergeableStore[K, V] with Proxied[MergeableStore[K, V]] {
override def semigroup: Semigroup[V] = self.semigroup
override def merge(kv: (K, V)): Future[Option[V]] = self.merge(kv)
override def multiMerge[K1 <: K](kvs: Map[K1, V]): Map[K1, Future[Option[V]]] = self.multiMerge(kvs)
override def put(kv: (K, Option[V])): Future[Unit] = self.put(kv)
override def multiPut[K1 <: K](kvs: Map[K1, Option[V]]): Map[K1, Future[Unit]] = self.multiPut(kvs)
override def get(k: K): Future[Option[V]] = self.get(k)
override def multiGet[K1 <: K](ks: Set[K1]): Map[K1, Future[Option[V]]] = self.multiGet(ks)
override def close(time: Time) = self.close(time)
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ object MergeableStore {
val keySet = kvs.keySet
val collected: Future[Map[K, Future[Option[V]]]] =
collect {
store.multiGet(keySet).view.map {
store.multiGet(keySet).iterator.map {
case (k, futureOptV) =>
futureOptV.map { init =>
val incV = kvs(k)
val resV = init.map(Semigroup.plus(_, incV)).orElse(Some(incV))
k -> (init, resV)
}
}.toSeq
}.toIndexedSeq
}.map { pairs: Seq[(K, (Option[V], Option[V]))] =>
val pairMap = pairs.toMap
store.multiPut(pairMap.mapValues(_._2))
Expand All @@ -71,6 +71,14 @@ object MergeableStore {
fc: FutureCollector[(K, Option[V])]): MergeableStore[K,V] =
new MergeableStoreViaGetPut[K, V](store, fc)

/** Create a mergeable by implementing merge with single get followed by put for each key. Also forces multiGet and
* multiPut to use the store's default implementation of a single get and put.
* The merge is only safe if each key is owned by a single thread. Useful in certain cases where multiGets and
* multiPuts may result in higher error rates or lower throughput.
*/
def fromStoreNoMulti[K,V](store: Store[K,V])(implicit sg: Semigroup[V]): MergeableStore[K,V] =
new MergeableStoreViaSingleGetPut[K, V](store)

/** Create a mergeable by implementing merge with get followed by put.
* Only safe if each key is owned by a single thread.
* This deletes zeros on put, but returns zero on empty (never returns None).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,12 @@ import com.twitter.util.Future
* writer thread per key. Otherwise you need to do some locking or compare-and-swap
* in the store
*/
class MergeableStoreViaGetPut[-K, V: Semigroup](store: Store[K, V], fc: FutureCollector[(K, Option[V])] = FutureCollector.default[(K, Option[V])])
extends MergeableStore[K, V] {

class MergeableStoreViaSingleGetPut[-K, V: Semigroup](store: Store[K, V]) extends MergeableStore[K, V] {
override def semigroup: Semigroup[V] = implicitly[Semigroup[V]]

override def get(k: K) = store.get(k)
override def multiGet[K1 <: K](ks: Set[K1]) = store.multiGet(ks)
override def put(kv: (K, Option[V])) = store.put(kv)
override def multiPut[K1 <: K](kvs: Map[K1, Option[V]]) = store.multiPut(kvs)

/**
* sets to .plus(get(kv._1).get.getOrElse(monoid.zero), kv._2)
Expand All @@ -44,6 +42,13 @@ class MergeableStoreViaGetPut[-K, V: Semigroup](store: Store[K, V], fc: FutureCo
newVOpt = vOpt.map(Semigroup.plus(_, kv._2)).orElse(Some(kv._2))
finalUnit <- put((kv._1, newVOpt))
} yield vOpt
}

class MergeableStoreViaGetPut[-K, V: Semigroup](store: Store[K, V], fc: FutureCollector[(K, Option[V])] = FutureCollector.default[(K, Option[V])])
extends MergeableStoreViaSingleGetPut[K, V](store) {

override def multiGet[K1 <: K](ks: Set[K1]) = store.multiGet(ks)
override def multiPut[K1 <: K](kvs: Map[K1, Option[V]]) = store.multiPut(kvs)

override def multiMerge[K1 <: K](kvs: Map[K1, V]): Map[K1, Future[Option[V]]] = {
implicit val collector = fc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ object MutableCacheAlgebra {

def mutableFromTTL[K, V](ttlCache: TTLCache[K, V]): MutableCache[K, V] =
ttlCache.toMutable()
.inject(new TTLInjection[K, Long, V](ttlCache.ttl)(ttlCache.clock))
.inject(new TTLInjection[K, Long, V](ttlCache.ttl.inMilliseconds)(ttlCache.clock))
}

class AlgebraicMutableCache[K, V](cache: MutableCache[K, V]) {
Expand Down
Loading

0 comments on commit 295bdbd

Please sign in to comment.