diff --git a/suro-core/src/main/java/com/netflix/suro/sink/remotefile/RemotePrefixFormatter.java b/suro-core/src/main/java/com/netflix/suro/sink/remotefile/RemotePrefixFormatter.java index c88126c2..eec26af4 100644 --- a/suro-core/src/main/java/com/netflix/suro/sink/remotefile/RemotePrefixFormatter.java +++ b/suro-core/src/main/java/com/netflix/suro/sink/remotefile/RemotePrefixFormatter.java @@ -18,6 +18,8 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; +import java.io.File; + /** * When uploading files to the remote file system such as S3 or HDFS, the file * path should be specified. This interface is describing which file path would @@ -27,5 +29,5 @@ */ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") public interface RemotePrefixFormatter { - public String get(); + public String get(File file); } diff --git a/suro-localfile/src/main/java/com/netflix/suro/sink/localfile/FileNameFormatter.java b/suro-localfile/src/main/java/com/netflix/suro/sink/localfile/FileNameFormatter.java index 74ce2c7f..cdfec6dc 100644 --- a/suro-localfile/src/main/java/com/netflix/suro/sink/localfile/FileNameFormatter.java +++ b/suro-localfile/src/main/java/com/netflix/suro/sink/localfile/FileNameFormatter.java @@ -20,6 +20,7 @@ import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; +import java.io.File; import java.net.InetAddress; import java.net.UnknownHostException; import java.rmi.server.UID; @@ -57,4 +58,9 @@ public static String get(String dir) { .append(new UID().toString()); return sb.toString().replaceAll("[-:]", ""); } + + public static DateTime getFileCreateTime(File file){ + String fileName = file.getName(); + return fmt.parseDateTime(fileName.substring(0,"P20141006T213521".length())); + } } diff --git a/suro-s3/src/main/java/com/netflix/suro/sink/remotefile/RemoteFileSink.java b/suro-s3/src/main/java/com/netflix/suro/sink/remotefile/RemoteFileSink.java index e80ed7a6..6573ff58 100644 --- a/suro-s3/src/main/java/com/netflix/suro/sink/remotefile/RemoteFileSink.java +++ b/suro-s3/src/main/java/com/netflix/suro/sink/remotefile/RemoteFileSink.java @@ -212,7 +212,7 @@ public void run() { } private String makeUploadPath(File file) { - return prefixFormatter.get() + file.getName(); + return prefixFormatter.get(file) + file.getName(); } @Monitor(name = "uploadedFileSize", type = DataSourceType.COUNTER) diff --git a/suro-s3/src/main/java/com/netflix/suro/sink/remotefile/formatter/DateRegionStackFormatter.java b/suro-s3/src/main/java/com/netflix/suro/sink/remotefile/formatter/DateRegionStackFormatter.java index 49668b84..39f91e60 100644 --- a/suro-s3/src/main/java/com/netflix/suro/sink/remotefile/formatter/DateRegionStackFormatter.java +++ b/suro-s3/src/main/java/com/netflix/suro/sink/remotefile/formatter/DateRegionStackFormatter.java @@ -24,6 +24,8 @@ import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; +import java.io.File; + /** * It would be useful to append region and stack information to the file path * when we upload files to AWS S3. region and stack can be injected through @@ -49,7 +51,7 @@ public DateRegionStackFormatter( } @Override - public String get() { + public String get(File file) { StringBuilder sb = new StringBuilder(); sb.append(format.print(new DateTime())).append('/') .append(region).append('/') diff --git a/suro-s3/src/main/java/com/netflix/suro/sink/remotefile/formatter/DynamicRemotePrefixFormatter.java b/suro-s3/src/main/java/com/netflix/suro/sink/remotefile/formatter/DynamicRemotePrefixFormatter.java index 1b7a8651..a9a45c0a 100644 --- a/suro-s3/src/main/java/com/netflix/suro/sink/remotefile/formatter/DynamicRemotePrefixFormatter.java +++ b/suro-s3/src/main/java/com/netflix/suro/sink/remotefile/formatter/DynamicRemotePrefixFormatter.java @@ -5,6 +5,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.netflix.suro.sink.remotefile.RemotePrefixFormatter; +import java.io.File; import java.util.ArrayList; import java.util.List; @@ -22,7 +23,7 @@ public DynamicRemotePrefixFormatter(@JsonProperty("format") String formatString) } @Override - public String get() { + public String get(File file) { StringBuilder sb = new StringBuilder(); for (PrefixFormatter formatter : formatterList) { diff --git a/suro-s3/src/main/java/com/netflix/suro/sink/remotefile/formatter/FileDatePrefixFormatter.java b/suro-s3/src/main/java/com/netflix/suro/sink/remotefile/formatter/FileDatePrefixFormatter.java new file mode 100644 index 00000000..b8fd6842 --- /dev/null +++ b/suro-s3/src/main/java/com/netflix/suro/sink/remotefile/formatter/FileDatePrefixFormatter.java @@ -0,0 +1,42 @@ +package com.netflix.suro.sink.remotefile.formatter; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.netflix.suro.sink.localfile.FileNameFormatter; +import com.netflix.suro.sink.remotefile.RemotePrefixFormatter; +import org.joda.time.DateTime; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; + +import java.io.File; + +/** + * Created by liuzhenchuan@foxmail.com on 10/6/14. + */ +public class FileDatePrefixFormatter implements RemotePrefixFormatter{ + public static final String TYPE = "FileDate"; + private final DateTimeFormatter formatter; + //lastModify or dateCreated + private String dateType; + + public FileDatePrefixFormatter(@JsonProperty("format") String formatString, + @JsonProperty("dateType") String dateType){ + formatter = DateTimeFormat.forPattern(formatString); + this.dateType = dateType; + + } + + @Override + public String get(File file) { + String prefix = formatter.print(getDateCreated(file));//use dateCreated as default. + if("lastModify".equalsIgnoreCase(dateType)){ + prefix = formatter.print(file.lastModified()); + } + if(!prefix.endsWith("/")) prefix += "/"; + return prefix; + } + + private DateTime getDateCreated(File file){ + return FileNameFormatter.getFileCreateTime(file); + } + +} diff --git a/suro-s3/src/test/java/com/netflix/suro/sink/remotefile/SuroSinkPlugin.java b/suro-s3/src/test/java/com/netflix/suro/sink/remotefile/SuroSinkPlugin.java index 5b8139fe..177f6472 100644 --- a/suro-s3/src/test/java/com/netflix/suro/sink/remotefile/SuroSinkPlugin.java +++ b/suro-s3/src/test/java/com/netflix/suro/sink/remotefile/SuroSinkPlugin.java @@ -7,6 +7,7 @@ import com.netflix.suro.sink.notice.QueueNotice; import com.netflix.suro.sink.remotefile.formatter.DateRegionStackFormatter; import com.netflix.suro.sink.remotefile.formatter.DynamicRemotePrefixFormatter; +import com.netflix.suro.sink.remotefile.formatter.FileDatePrefixFormatter; public class SuroSinkPlugin extends SuroPlugin { @Override @@ -17,6 +18,7 @@ protected void configure() { this.addSinkType(HdfsFileSink.TYPE, HdfsFileSink.class); this.addRemotePrefixFormatterType(DateRegionStackFormatter.TYPE, DateRegionStackFormatter.class); this.addRemotePrefixFormatterType(DynamicRemotePrefixFormatter.TYPE, DynamicRemotePrefixFormatter.class); + this.addRemotePrefixFormatterType(FileDatePrefixFormatter.TYPE,FileDatePrefixFormatter.class); this.addSinkType(SuroSink.TYPE, SuroSink.class); diff --git a/suro-s3/src/test/java/com/netflix/suro/sink/remotefile/TestPrefixFormatter.java b/suro-s3/src/test/java/com/netflix/suro/sink/remotefile/TestPrefixFormatter.java index 5a61aaea..abcc729f 100644 --- a/suro-s3/src/test/java/com/netflix/suro/sink/remotefile/TestPrefixFormatter.java +++ b/suro-s3/src/test/java/com/netflix/suro/sink/remotefile/TestPrefixFormatter.java @@ -55,7 +55,7 @@ public void testDynamicStatic() throws IOException { ObjectMapper mapper = injector.getInstance(ObjectMapper.class); RemotePrefixFormatter formatter = mapper.readValue(spec, new TypeReference(){}); - assertEquals(formatter.get(), "prefix/"); + assertEquals(formatter.get(null), "prefix/"); } @Test @@ -69,7 +69,7 @@ public void testDynamicDate() throws IOException { RemotePrefixFormatter formatter = mapper.readValue(spec, new TypeReference(){}); DateTimeFormatter format = DateTimeFormat.forPattern("YYYYMMDD"); - assertEquals(formatter.get(), format.print(new DateTime()) + "/"); + assertEquals(formatter.get(null), format.print(new DateTime()) + "/"); } @Test @@ -84,7 +84,7 @@ public void testDynamicProperty() throws IOException { ObjectMapper mapper = injector.getInstance(ObjectMapper.class); RemotePrefixFormatter formatter = mapper.readValue(spec, new TypeReference(){}); - assertEquals(formatter.get(), "prop1/"); + assertEquals(formatter.get(null), "prop1/"); } @Test @@ -101,7 +101,7 @@ public void testDynamicCombination() throws IOException { ObjectMapper mapper = injector.getInstance(ObjectMapper.class); RemotePrefixFormatter formatter = mapper.readValue(spec, new TypeReference(){}); - assertEquals(formatter.get(), "routing_key/" + format.print(new DateTime()) + "/propvalue1/"); + assertEquals(formatter.get(null), "routing_key/" + format.print(new DateTime()) + "/propvalue1/"); } @@ -116,7 +116,7 @@ public void testInjectedDateRegionStack() throws IOException { RemotePrefixFormatter formatter = mapper.readValue(spec, new TypeReference() {}); DateTimeFormatter format = DateTimeFormat.forPattern("YYYYMMDD"); String answer = String.format("%s/eu-west-1/gps/", format.print(new DateTime())); - assertEquals(formatter.get(), answer); + assertEquals(formatter.get(null), answer); } @Test @@ -132,6 +132,6 @@ public void testDateRegionStack() throws IOException { RemotePrefixFormatter formatter = mapper.readValue(spec, new TypeReference() {}); DateTimeFormatter format = DateTimeFormat.forPattern("YYYYMMDD"); String answer = String.format("%s/us-east-1/normal/", format.print(new DateTime())); - assertEquals(formatter.get(), answer); + assertEquals(formatter.get(null), answer); } } diff --git a/suro-server/src/main/java/com/netflix/suro/sink/ServerSinkPlugin.java b/suro-server/src/main/java/com/netflix/suro/sink/ServerSinkPlugin.java index bdba5348..08bd75e0 100644 --- a/suro-server/src/main/java/com/netflix/suro/sink/ServerSinkPlugin.java +++ b/suro-server/src/main/java/com/netflix/suro/sink/ServerSinkPlugin.java @@ -15,6 +15,7 @@ import com.netflix.suro.sink.remotefile.S3FileSink; import com.netflix.suro.sink.remotefile.formatter.DateRegionStackFormatter; import com.netflix.suro.sink.remotefile.formatter.DynamicRemotePrefixFormatter; +import com.netflix.suro.sink.remotefile.formatter.FileDatePrefixFormatter; /** * @@ -35,6 +36,7 @@ protected void configure() { this.addSinkType(HdfsFileSink.TYPE, HdfsFileSink.class); this.addRemotePrefixFormatterType(DateRegionStackFormatter.TYPE, DateRegionStackFormatter.class); this.addRemotePrefixFormatterType(DynamicRemotePrefixFormatter.TYPE, DynamicRemotePrefixFormatter.class); + this.addRemotePrefixFormatterType(FileDatePrefixFormatter.TYPE,FileDatePrefixFormatter.class); this.addSinkType(SuroSink.TYPE, SuroSink.class);