-
Notifications
You must be signed in to change notification settings - Fork 83
/
Copy pathsink_impl.cc
147 lines (132 loc) · 5.02 KB
/
sink_impl.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
#include "source/sink/sink_impl.h"
#include <array>
#include <filesystem>
#include <fstream>
#include "external/envoy/source/common/common/logger.h"
#include "external/envoy/source/common/common/random_generator.h"
#include "fmt/ostream.h"
// NOLINT(namespace-nighthawk)
namespace fmt {
// Allow fmtlib to use operator << defined in std::filesystem::path.
template <> struct formatter<::std::filesystem::path> : ostream_formatter {};
} // namespace fmt
namespace Nighthawk {
namespace {
absl::Status verifyCanBeUsedAsDirectoryName(absl::string_view s) {
Envoy::Random::RandomGeneratorImpl random;
const std::string reference_value = random.uuid();
constexpr absl::string_view err_template = "'{}' is not a guid: {}";
if (s.size() != reference_value.size()) {
return absl::InvalidArgumentError(fmt::format(err_template, s, "bad string length."));
}
for (size_t i = 0; i < s.size(); i++) {
if (reference_value[i] == '-') {
if (s[i] != '-') {
return absl::InvalidArgumentError(
fmt::format(err_template, s, "expectations around '-' positions not met."));
}
continue;
}
if (!std::isxdigit(s[i])) {
return absl::InvalidArgumentError(
fmt::format(err_template, s, "unexpected character encountered."));
}
}
return absl::OkStatus();
}
absl::Status validateKey(absl::string_view key, bool validate_as_directory_name) {
absl::Status status =
key.empty() ? absl::Status(absl::StatusCode::kInvalidArgument, "empty key is not allowed.")
: absl::OkStatus();
if (status.ok() && validate_as_directory_name) {
status = verifyCanBeUsedAsDirectoryName(key);
}
return status;
}
} // namespace
absl::Status
FileSinkImpl::StoreExecutionResultPiece(const nighthawk::client::ExecutionResponse& response) {
const std::string& execution_id = response.execution_id();
absl::Status status = validateKey(execution_id, true);
if (!status.ok()) {
return status;
}
std::error_code error_code;
std::filesystem::create_directories("/tmp/nh/" + std::string(execution_id) + "/", error_code);
// Note error_code will not be set if an existing directory existed.
if (error_code.value()) {
return absl::InternalError(error_code.message());
}
// Write to a tmp file, and if that succeeds, we swap it atomically to the target path,
// to make the completely written file visible to consumers of LoadExecutionResult.
Envoy::Random::RandomGeneratorImpl random;
const std::string uid = "/tmp/nighthawk_" + random.uuid();
{
std::ofstream ofs(uid.data(), std::ios_base::out | std::ios_base::binary);
if (!response.SerializeToOstream(&ofs)) {
return absl::InternalError("Failure writing to temp file");
}
}
std::filesystem::path filesystem_path(uid.data());
const std::string new_name =
"/tmp/nh/" + std::string(execution_id) + "/" + std::string(filesystem_path.filename());
std::filesystem::rename(uid.data(), new_name, error_code);
if (error_code.value()) {
return absl::InternalError(error_code.message());
}
ENVOY_LOG_MISC(trace, "Stored '{}'.", new_name);
return absl::OkStatus();
}
absl::StatusOr<std::vector<nighthawk::client::ExecutionResponse>>
FileSinkImpl::LoadExecutionResult(absl::string_view execution_id) const {
absl::Status status = validateKey(execution_id, true);
if (!status.ok()) {
return status;
}
std::filesystem::path filesystem_directory_path("/tmp/nh/" + std::string(execution_id) + "/");
std::vector<nighthawk::client::ExecutionResponse> responses;
std::error_code error_code;
for (const auto& it :
std::filesystem::directory_iterator(filesystem_directory_path, error_code)) {
if (error_code.value()) {
break;
}
nighthawk::client::ExecutionResponse response;
std::ifstream ifs(it.path(), std::ios_base::binary);
if (!response.ParseFromIstream(&ifs)) {
return absl::InternalError(fmt::format("Failed to parse ExecutionResponse '{}'.", it.path()));
} else {
ENVOY_LOG_MISC(trace, "Loaded '{}'.", it.path());
}
responses.push_back(response);
}
if (error_code.value()) {
return absl::NotFoundError(error_code.message());
}
return responses;
}
absl::Status
InMemorySinkImpl::StoreExecutionResultPiece(const nighthawk::client::ExecutionResponse& response) {
absl::Status status = validateKey(response.execution_id(), false);
if (!status.ok()) {
return status;
}
auto iterator =
map_.insert({response.execution_id(), std::vector<nighthawk::client::ExecutionResponse>()})
.first;
iterator->second.push_back(response);
return absl::OkStatus();
}
absl::StatusOr<std::vector<nighthawk::client::ExecutionResponse>>
InMemorySinkImpl::LoadExecutionResult(absl::string_view execution_id) const {
absl::Status status = validateKey(execution_id, false);
if (!status.ok()) {
return status;
}
auto iterator = map_.find(execution_id);
if (iterator != map_.end()) {
return (*iterator).second;
}
return absl::NotFoundError(fmt::format("No results found for execution-id: '{}'", execution_id));
}
} // namespace Nighthawk