Skip to content

DSL Overview

Alex Boisvert edited this page Mar 14, 2012 · 14 revisions

DSL Overview

This page describes the high-level DSL concepts of Revolute.

Tables

Tables are objects extending the Table class that define one or more columns.

object AtoZ extends Table[(String, Int)](name = "A to Z") {
  def letter = column[String]("letter")
  def number = column[String]("number").as[Int]
  def * = letter ~ number
}

The table's type parameter represents the product of its columns' types.

The * method (aka "product" method) returns a Projection of the canonical columns of the table, using the ~ operator to concatenate the columns.

  def * = letter ~ number

A table can define several projections, e.g., for columns frequently used together.

def address = street ~ city ~ state ~ zipcode ~ country

Columns may have a different underlying storage type than actually presented to the user. The number column in this case defines a String storage representation however the logical presentation is an Int. TypeMapper's are used to convert between different representations and can be used implicitly or explicitly.

Queries

Queries are typically expressed using for-comprehensions,

for { t <- SomeTable } yield t.*

which is semantically equivalent to a typical SQL query: select t.* from SomeTable t.

You can select individual fields by creating an explicit projection using ~,

for (az <- AtoZ) yield az.letter ~ az.number

which would result in,

+-------------+-------------+
|   letter    |   number    |
+-------------+-------------+
|    a        |      1      |
|    b        |      2      |
|    c        |      3      |
|   ...       |     ...     |
+-------------+-------------+

Filtering

Rows can be filtered using if (which is a standard for-comprehension operator),

for {
  az <- AtoZ if (az.letter === "a")
} yield az.letter ~ az.number

+-------------+-------------+
|   letter    |   number    |
+-------------+-------------+
|    a        |      1      |
+-------------+-------------+

or using the where operator,

for {
  az <- AtoZ where (_.letter === "a")
} yield az.letter ~ az.number

When several conditions are used within a query, the conditions are implicitly AND'ed together, e.g.,

for {
  _ <- AtoZ where (_.number > 5)
  _ <- AtoZ where (_.number < 7)
} yield AtoZ

is equivalent to,

for {
  _ <- AtoZ where { az => az.number > 5 && az.number <= 7 }
} yield AtoZ

Revolute provides a number of built-in operators that work on common types. For a list of operators, see Operators.

Mapping / Transformation

Data transformations are also straightforward. For example, it's possible to concatenate strings using the built-in concatenation operator:

for {
  concat <- AtoZ.letter ++ AtoZ.number.as[String]
} yield concat

+---------------+
|  (anonymous)  |
+---------------+
|     a1        |
|     b2        |
|     c3        |
|     ...       |
+---------------+

Or you can use regular Scala functions (f: T1 => T2) to map values,

val query = for {
  az <- AtoZ
  vowel <- az.letter mapValue { letter => if ("aeiouy" contains letter) "yes" else "no" } as "vowel"
} yield az.letter ~ vowel

+-------------+-------------+
|   letter    |    vowel    |
+-------------+-------------+
|    a        |     yes     |
|    b        |     no      |
|    c        |     no      |
|   ...       |     ...     |
+-------------+-------------+

The function can be a Scala closure (anonymous function), a method, a named function, and so on... as long as its entire closure is Serializable. Here is an equivalent query to the above,

def isVowel(letter: String) = if ("aeiouy" contains letter) "yes" else "no"

val query = for {
  az <- AtoZ
  vowel <- (az.letter mapValue isVowel) as "vowel"
} yield az.letter ~ vowel

Partial Mapping (Mapping + Filtering)

It's possible to express both mapping and filtering as a single operation in different ways.

First is using Scala partial functions,

val query = for {
  az <- AtoZ
  vowel <- az.letter mapPartial { case l if ("aeiouy" contains l) => "yes" }
} yield az.letter ~ vowel

+-------------+-------------+
|   letter    |    vowel    |
+-------------+-------------+
|    a        |     yes     |
|    e        |     yes     |
|    i        |     yes     |
|   ...       |     ...     |
+-------------+-------------+

Another is using mapOption with function that returns an Option,

def vowel(letter: String): Option[String] =
  if ("aeiouy" contains l) Some("yes") else None

val query = for {
  az <- AtoZ
  vowel <- az.letter mapOption vowel
} yield az.letter ~ vowel

(same result -- the None's are eliminated, the Some's are unwrapped)

Or by filtering null values using mapNotNull,

def vowel(letter: String) = if ("aeiouy" contains l) "yes" else null

val query = for {
  az <- AtoZ
  vowel <- az.letter mapNonNull vowel
} yield az.letter ~ vowel

Mapping to Multiple Values

It's possible to map input values to zero or more output values using mapMany.

def dividers(n: Int): Seq[Int] =
  for {
    x <- 1 to n if (n % x == 0)
  } yield x

val query = for {
  az <- AtoZ
  divider <- az.number mapMany dividers
} yield az.letter ~ az.number ~ divider

+-------------+-------------+-------------+
|   letter    |    number   |   divider   |
+-------------+-------------+-------------+
|    a        |      1      |     1       |
|    b        |      2      |     1       |
|    b        |      2      |     2       |
|    c        |      3      |     1       |
|    c        |      3      |     3       |
|    c        |      3      |     1       |
|    d        |      4      |     1       |
|    d        |      4      |     2       |
|    d        |      4      |     4       |
|   ...       |     ...     |    ...      |
+-------------+-------------+-------------+

Joins

Assuming the following Words table,

object Words extends Table[(String, String)]("Words") {
  def letter = column[String]("letter")
  def word = column[String]("word")
  def * = letter ~ word
}

with content,

+-------------+-------------+
|   letter    |   word      |
+-------------+-------------+
|   a         |   apple     |
|   b         |   beer      |
|   c         |   crab      |
|   ...       |   ...       |
+-------------+-------------+

The AtoZ and Words tables can be joined together implicitly using an if,

for {
  az <- AtoZ
  words <- Words if az.letter is words.letter
} yield az.letter ~ az.number ~ words.word

or explicitly using innerJoin,

for {
  Join(az, words) <- (AtoZ innerJoin Words) on (_.letter is _.letter)
} yield az.letter ~ az.number ~ words.word

+-------------+-------------+-------------+
|   letter    |    number   |    word     |
+-------------+-------------+-------------+
|    a        |      1      |    apple    |
|    b        |      2      |    beer     |
|    c        |      3      |    crab     |
|   ...       |     ...     |    ...      |
+-------------+-------------+-------------+

Group-By and Aggregates

Grouping data is performed using the Query.groupBy operator and passing a set of columns,

for { log <- UserLogs _ <- Query.groupBy(log.date ~ log.userId) } yield log.date ~ logId.user ~ Query.count(views)

As you can see, the Query.count aggregator is used to calculate the number of page views per day and user.

Note that while grouping is declared in the query it is never explicitly referenced or statically checked. (This may change in the future)

It's also possible to count distinct records using Query.countDistinct,

for {
  log <- UserLogs
  _   <- Query.groupBy(log.date ~ log.userId)
} yield log.date ~ log.userId ~ Query.countDistinct(log.pageUrl)

Aggregators can be easily defined by implementing this trait,

trait Aggregator[+Input, -Output] {
  def initialize: Unit
  def aggregate(values: Input): Unit
  def complete: Output
  def sorting: Seq[SortOrder]
}

and used in a query as,

type Url = String
type Timestamp = Long

object mostPopularPage extends Aggregator[(Url, Timestamp), Url]
  ...
}

for {
  log <- UserLogs
  _   <- Query.groupBy(log.date ~ log.userId)
} yield log.date ~ log.userId ~ mostPopularPage(log.pageUrl ~ log.timestamp)

Note that aggregators may enforce a sort order in order to properly perform their function.

Sorting

Values related to a grouping (the leftover fields when Query.groupBy is used) may be explicitly sorted for reporting or aggregation purposes using Query.sortBy:

for {
  log <- UserLogs
  _   <- Query.groupBy(log.date ~ log.userId)
  _   <- Query.sortBy(log.timestamp ~ log.pageUrl)
} yield log.date ~ log.userId ~ Query.countDistinct(log.pageUrl)

The asc and desc qualifiers may be used to sort in ascending or descending order, respectively. If not specified, the sort order is ascending.

Sorting is similar to grouping in that it's specified in the query but never explicitly referenced or statically checked.

Note 1: Hadoop automatically sorts the group-by fields using their natural sort order before the reduce phase.

Note 2: The sort order explicitly defined using Query.sortBy may not conflict with any aggregator-defined sort order.

Limiting

Limiting the number of output records is achieved using Query.limit, e.g.,

for {
  log   <- UserLogs
  _     <- Query.groupBy(log.date)
  users <- Query.count(log.userId)
  _     <- Query.sortBy(users desc)
  _     <- Query.limit(100)
} yield log.date ~ users

Inserts

Inserting data into a table is done by using insert and passing it a Query and a destination Table.

insert {
  for {
    az <- AtoZ
    words <- Words if az.letter is words.letter
  } yield az.letter ~ az.number ~ words.word
} into SomeTable

Multiple-Table Insert

It's also possible to reuse the same query to insert into different tables.

val myQuery = for {
  az <- AtoZ
  words <- Words if az.letter is words.letter
} yield az.letter ~ az.number ~ words.word

insert myQuery into SomeTable

// insert into another table using additional conditions
insert {
  for {
    (letter, number, word) <- myQuery
    if letter in Set("a", "b", "c")
  } yield letter, word
} into OtherTable

Flow

To actually run queries, they must be included inside a flow block.

flow(context) {

  // statements + queries go here
  insert {
    ...
  } into SomeTable

}

The flow block delimits the scope of execution (which maps to a fully-formed Flow in Cascading). Separating execution from definition makes it possible to define queries and actions -- pre-canned or parametric -- and reuse them in different contexts.

Context and Data Storage

A context is used to define table bindings to actual/physical representations.

val context = new FlowContext
context.tableBindings += (AtoZ  -> new FileTap(new TextDelimited(AtoZ.*,  "\t"), "path/to/a-z.txt"))
context.tableBindings += (Words -> new FileTap(new TextDelimited(Words.*, "\t"), "path/to/words.txt"))

Each Table used in the flow must be associated with a corresponding Cascading Tap. Taps may be sources (read-only) or sinks (write-only) or both depending on your application.

There are taps for local files, HDFS, Amazon S3 files, relational databases, NoSQL databases, etc.

It's common practice for tables to define their standard source/sink representations, e.g.,

object MyTable {

  ...

  def defaultScheme = new TextDelimited(*,  "\t")

  def asSink(implicit stage: Stage) = this -> (stage match {
    case Local => new FileTap(defaultScheme, "target/test/resources/my-table.txt", SinkMode.REPLACE)
    case QA    => new Hfs(defaultScheme,     "s3://qa-bucket/path/to/mytable/",    SinkMode.UPDATE)
    case Prod  => new Hfs(defaultScheme ,    "s3://prod-bucket/path/to/mytable/"   SinkMode.KEEP)
  })
}

which means creating a context typically looks like,

val context = new FlowContext
implicit val stage = ...
context.tableBindings ++= SubProject.sources  // read-only
context.tableBindings +=  AtoZ.asSink         // write-only
context.tableBindings +=  Words.asSink        // write-only