Skip to content
This repository has been archived by the owner on Jun 24, 2024. It is now read-only.

Commit

Permalink
Handle authentication on client-side (#14)
Browse files Browse the repository at this point in the history
  • Loading branch information
pvcnt committed Mar 30, 2018
1 parent 603b079 commit ca4ebcb
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ final class WebhookAuthStrategy(webhook: Webhook[ReviewResponse], cacheTtl: Dura

object WebhookAuthStrategy {

case class ReviewRequest(clientId: String)
case class ReviewRequest(accessToken: String)

case class ReviewResponse(authenticated: Boolean, user: Option[UserInfo])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,36 +26,30 @@ import fr.cnrs.liris.accio.api.{Utils, Values}
import scala.collection.JavaConverters._

/**
* Options when creating CSV reports.
* Create CSV reports from results of previous runs.
*
* @param separator Separator to use in CSV files.
* @param split Whether to split the reports per combination of workflow parameters.
* @param aggregate Whether to aggregate artifact values across multiple runs into a single value.
* @param append Whether to allow appending data to existing files if they already exists
*/
case class CsvReportOptions(separator: String, split: Boolean, aggregate: Boolean, append: Boolean)

/**
* Create CSV reports from results of previous runs.
*/
class CsvReportCreator {
final class CsvReportWriter(separator: String, split: Boolean, aggregate: Boolean, append: Boolean) {
/**
* Write the values of artifacts in CSV format inside the given directory.
*
* @param artifacts Artifacts to create a report for.
* @param workDir Directory where to write CSV reports (doesn't have to exist).
* @param opts Report options.
*/
def write(artifacts: ArtifactList, workDir: Path, opts: CsvReportOptions): Unit = {
if (opts.split) {
def write(artifacts: ArtifactList, workDir: Path): Unit = {
if (split) {
// When splitting, one sub-directory is created per combination of workflow parameters.
artifacts.split.foreach { list =>
val filename = Utils.label(list.params).replace(" ", ",")
doWrite(list, workDir.resolve(filename), opts)
doWrite(list, workDir.resolve(filename))
}
} else {
// When not splitting, we write report directory inside the working directory.
doWrite(artifacts, workDir, opts)
doWrite(artifacts, workDir)
}
}

Expand All @@ -64,47 +58,46 @@ class CsvReportCreator {
*
* @param metrics Metrics to create a report for.
* @param workDir Directory where to write CSV reports (doesn't have to exist).
* @param opts Report options.
*/
def write(metrics: MetricList, workDir: Path, opts: CsvReportOptions): Unit = {
if (opts.split) {
def write(metrics: MetricList, workDir: Path): Unit = {
if (split) {
// When splitting, one sub-directory is created per combination of workflow parameters.
metrics.split.foreach { list =>
val filename = Utils.label(list.params).replace(" ", ",")
doWrite(list, workDir.resolve(filename), opts)
doWrite(list, workDir.resolve(filename))
}
} else {
// When not splitting, we write report directory inside the working directory.
doWrite(metrics, workDir, opts)
doWrite(metrics, workDir)
}
}

private def doWrite(list: ArtifactList, workDir: Path, opts: CsvReportOptions): Unit = {
private def doWrite(list: ArtifactList, workDir: Path): Unit = {
Files.createDirectories(workDir)
list.groups.foreach { group =>
val header = asHeader(group.kind)
val artifacts = if (opts.aggregate) Seq(group.aggregated) else group.toSeq
val artifacts = if (aggregate) Seq(group.aggregated) else group.toSeq
val rows = artifacts.flatMap(artifact => asString(artifact.value))
val lines = (Seq(header) ++ rows).map(_.mkString(opts.separator))
val lines = (Seq(header) ++ rows).map(_.mkString(separator))
val filename = group.name.replace("/", "-")
doWrite(lines, workDir, filename, opts)
doWrite(lines, workDir, filename)
}
}

private def doWrite(list: MetricList, workDir: Path, opts: CsvReportOptions): Unit = {
private def doWrite(list: MetricList, workDir: Path): Unit = {
Files.createDirectories(workDir)
list.groups.groupBy(_.nodeName).foreach { case (nodeName, groups) =>
val header = Seq("metric_name", "value")
val rows = groups.flatMap { group =>
if (opts.aggregate) Seq(group.aggregated) else group.toSeq
}.map(metric => Seq(metric.name, metric.value.toString))
val lines = (Seq(header) ++ rows).map(_.mkString(opts.separator))
doWrite(lines, workDir, nodeName, opts)
val rows = groups
.flatMap(group => if (aggregate) Seq(group.aggregated) else group.toSeq)
.map(metric => Seq(metric.name, metric.value.toString))
val lines = (Seq(header) ++ rows).map(_.mkString(separator))
doWrite(lines, workDir, nodeName)
}
}

private def doWrite(lines: Seq[String], workDir: Path, filename: String, opts: CsvReportOptions): Unit = {
if (opts.append) {
private def doWrite(lines: Seq[String], workDir: Path, filename: String): Unit = {
if (append) {
val file = workDir.resolve(s"$filename.csv")
Files.write(file, lines.asJava, StandardOpenOption.CREATE, StandardOpenOption.APPEND, StandardOpenOption.WRITE)
} else {
Expand Down Expand Up @@ -148,4 +141,4 @@ class CsvReportCreator {
case AtomicType.Duration => Seq(Seq(Values.decodeDuration(value).getMillis.toString))
case _ => Seq(Seq(Values.decode(value).toString))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ private[commands] trait ClientCommand {
this: Command =>

private[this] val clusterFlag = flag[String]("cluster", "Name of the cluster to use")
private[this] val clientProvider = new ClusterClientProvider(ConfigParser.default)
private[this] val clientProvider = new ClientFactory(ConfigParser.default)

protected final def client: AgentService$FinagleClient = {
clusterFlag.get.map(clientProvider.apply).getOrElse(clientProvider.default)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package fr.cnrs.liris.accio.tools.cli.commands
import java.nio.file.Paths

import com.twitter.finagle.Thrift
import com.twitter.finagle.thrift.RichClientParam
import com.twitter.finagle.thrift.{ClientId, RichClientParam}
import fr.cnrs.liris.accio.agent.{AgentService, AgentService$FinagleClient}
import fr.cnrs.liris.accio.tools.cli.config.{Cluster, ClusterConfig, ConfigParser}
import fr.cnrs.liris.common.util.FileUtils
Expand All @@ -37,7 +37,7 @@ import scala.collection.mutable
*
* @param parser Cluster configuration parser.
*/
final class ClusterClientProvider(parser: ConfigParser) {
final class ClientFactory(parser: ConfigParser) {
private[this] val clients = mutable.Map.empty[String, AgentService$FinagleClient]
private[this] lazy val config = parseConfig

Expand Down Expand Up @@ -79,8 +79,9 @@ final class ClusterClientProvider(parser: ConfigParser) {
private def getOrCreate(config: Cluster) = {
clients.getOrElseUpdate(config.name, {
val params = RichClientParam()
val service = Thrift.newService(config.server)
new AgentService.FinagledClient(service, params)
var builder = Thrift.client
config.accessToken.foreach(accessToken => builder = builder.withClientId(ClientId(accessToken)))
new AgentService.FinagledClient(builder.newService(config.server), params)
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ final class ExportCommand extends Command with ClientCommand {

override def execute(residue: Seq[String], env: CommandEnvironment): ExitCode = {
if (residue.isEmpty) {
env.reporter.outErr.printOutLn("<error>[ERROR]</error> You must specify at least one run as argument.")
env.reporter.handle(Event.error("You must specify at least one run as argument."))
ExitCode.CommandLineError
} else {
val workDir = getWorkDir
Expand All @@ -56,14 +56,13 @@ final class ExportCommand extends Command with ClientCommand {

val artifacts = getArtifacts(runs)
val metrics = getMetrics(runs)
val reportCreator = new CsvReportCreator
val reportCreatorOpts = CsvReportOptions(
val writer = new CsvReportWriter(
separator = separatorFlag(),
split = splitFlag(),
aggregate = aggregateFlag(),
append = appendFlag())
reportCreator.write(artifacts, workDir, reportCreatorOpts)
reportCreator.write(metrics, workDir, reportCreatorOpts)
writer.write(artifacts, workDir)
writer.write(metrics, workDir)
ExitCode.Success
}
}
Expand Down
9 changes: 5 additions & 4 deletions accio/java/fr/cnrs/liris/accio/tools/cli/config/Cluster.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
package fr.cnrs.liris.accio.tools.cli.config

/**
* Configuration of a single Accio cluster. It specifies how the client should contact it.
* Configuration of a single Accio cluster.
*
* @param name Cluster name.
* @param server Cluster address (as a Finagle name).
* @param name Name of the cluster (used by the client to reference it).
* @param server Address to the agent server (specified as a Finagle name).
* @param accessToken An access token used to authenticate against the server.
*/
case class Cluster(name: String, server: String)
case class Cluster(name: String, server: String, accessToken: Option[String] = None)
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@

package fr.cnrs.liris.accio.tools.cli.config

import com.twitter.inject.domain.WrappedValue

/**
* Configuration specifying how to contact multiple Accio clusters.
*
* @param clusters Configuration of individual clusters.
* @throws IllegalArgumentException If clusters configuration is invalid.
*/
case class ClusterConfig(clusters: Seq[Cluster]) extends WrappedValue[Seq[Cluster]] {
case class ClusterConfig(clusters: Seq[Cluster]) {
{
// Validate that cluster definitions are valid.
require(clusters.nonEmpty, "You must define at least one cluster")
Expand Down Expand Up @@ -71,4 +69,8 @@ case class ClusterConfig(clusters: Seq[Cluster]) extends WrappedValue[Seq[Cluste
}
ClusterConfig(mergedClusters)
}
}

object ClusterConfig {
def default = ClusterConfig(Seq(Cluster("default", "localhost:9999")))
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ final class ConfigParser(mapper: FinatraObjectMapper) {
def parse(path: Path): ClusterConfig = {
val fis = new FileInputStream(path.toFile)
val config = try {
// It did not go well when trying to deserialized directly as ClusterConfig, despite it being a WrappedValue.
// So we fall back to deserializing clusters directly.
// It did not go well when trying to deserialized directly as ClusterConfig, even if it is
// defined as aa WrappedValue. So we fall back to deserializing clusters directly.
ClusterConfig(mapper.parse[Seq[Cluster]](fis))
} catch {
case e@(_: IOException | _: JsonParseException | _: JsonMappingException) =>
Expand Down

0 comments on commit ca4ebcb

Please sign in to comment.