Skip to content

Commit 6d3edf9

Browse files
authored
Merge pull request #374 from singnet/senna-361-1
[#361] Add optional filtering of QueryAnswers which can be considered duplicates
2 parents afb4f64 + 3b14532 commit 6d3edf9

16 files changed

+531
-28
lines changed

src/link_creation_agent/link_creation_agent.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,8 @@ void LinkCreationAgent::run() {
104104
shared_ptr<PatternMatchingQueryProxy> LinkCreationAgent::query(vector<string>& query_tokens,
105105
string context,
106106
bool update_attention_broker) {
107-
shared_ptr<PatternMatchingQueryProxy> proxy =
108-
make_shared<PatternMatchingQueryProxy>(query_tokens, context, update_attention_broker);
107+
shared_ptr<PatternMatchingQueryProxy> proxy = make_shared<PatternMatchingQueryProxy>(
108+
query_tokens, context, false, update_attention_broker, false);
109109
service_bus->issue_bus_command(proxy);
110110
return proxy;
111111
}

src/main/query_client_main.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ int main(int argc, char* argv[]) {
6565

6666
shared_ptr<ServiceBus> service_bus = ServiceBusSingleton::get_instance();
6767
shared_ptr<PatternMatchingQueryProxy> proxy =
68-
make_shared<PatternMatchingQueryProxy>(query, "", update_attention_broker);
68+
make_shared<PatternMatchingQueryProxy>(query, "", true, update_attention_broker, false);
6969
service_bus->issue_bus_command(proxy);
7070
shared_ptr<QueryAnswer> query_answer;
7171
int count = 0;
@@ -79,6 +79,7 @@ int main(int argc, char* argv[]) {
7979
}
8080
}
8181
}
82+
cout << count << " answers" << endl;
8283
if (count == 0) {
8384
cout << "No match for query" << endl;
8485
}

src/main/word_query_main.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ void run(const string& context, const string& word_tag) {
127127
"\"" + word_tag + "\""};
128128

129129
shared_ptr<PatternMatchingQueryProxy> proxy =
130-
make_shared<PatternMatchingQueryProxy>(query_word, context, true);
130+
make_shared<PatternMatchingQueryProxy>(query_word, context, false, true, false);
131131
service_bus->issue_bus_command(proxy);
132132

133133
shared_ptr<QueryAnswer> query_answer;
@@ -171,7 +171,7 @@ void run(const string& context, const string& word_tag) {
171171
}
172172

173173
shared_ptr<PatternMatchingQueryProxy> proxy2 =
174-
make_shared<PatternMatchingQueryProxy>(query_word, context, true, true);
174+
make_shared<PatternMatchingQueryProxy>(query_word, context, true, true, false);
175175
service_bus->issue_bus_command(proxy2);
176176
while (!proxy2->finished()) {
177177
Utils::sleep();

src/query_engine/PatternMatchingQueryProcessor.cc

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include "ServiceBus.h"
99
#include "Sink.h"
1010
#include "Terminal.h"
11+
#include "UniqueAssignmentFilter.h"
1112

1213
#define LOG_LEVEL INFO_LEVEL
1314
#include "Logger.h"
@@ -137,10 +138,13 @@ void PatternMatchingQueryProcessor::thread_process_one_query(
137138
if (proxy->args.size() < 2) {
138139
Utils::error("Syntax error in query command. Missing implicit parameters.");
139140
}
140-
proxy->set_context(proxy->args[0]);
141-
proxy->set_attention_update_flag(proxy->args[1] == "1");
142-
proxy->set_count_flag(proxy->args[2] == "1");
143-
proxy->query_tokens.insert(proxy->query_tokens.begin(), proxy->args.begin() + 3, proxy->args.end());
141+
unsigned int skip_arg = 0;
142+
proxy->set_context(proxy->args[skip_arg++]);
143+
proxy->set_unique_assignment_flag(proxy->args[skip_arg++] == "1");
144+
proxy->set_attention_update_flag(proxy->args[skip_arg++] == "1");
145+
proxy->set_count_flag(proxy->args[skip_arg++] == "1");
146+
proxy->query_tokens.insert(
147+
proxy->query_tokens.begin(), proxy->args.begin() + skip_arg, proxy->args.end());
144148
LOG_DEBUG("Setting up query tree");
145149
shared_ptr<QueryElement> root_query_element = setup_query_tree(proxy);
146150
set<string> joint_answer; // used to stimulate attention broker
@@ -212,8 +216,14 @@ shared_ptr<QueryElement> PatternMatchingQueryProcessor::setup_query_tree(
212216
element_stack.push(build_link_template(proxy, cursor, element_stack));
213217
} else if (proxy->query_tokens[cursor] == "AND") {
214218
element_stack.push(build_and(proxy, cursor, element_stack));
219+
if (proxy->get_unique_assignment_flag()) {
220+
element_stack.push(build_unique_assignment_filter(proxy, cursor, element_stack));
221+
}
215222
} else if (proxy->query_tokens[cursor] == "OR") {
216223
element_stack.push(build_or(proxy, cursor, element_stack));
224+
if (proxy->get_unique_assignment_flag()) {
225+
element_stack.push(build_unique_assignment_filter(proxy, cursor, element_stack));
226+
}
217227
} else {
218228
Utils::error("Invalid token " + proxy->query_tokens[cursor] +
219229
" in PATTERN_MATCHING_QUERY message");
@@ -630,3 +640,18 @@ shared_ptr<QueryElement> PatternMatchingQueryProcessor::build_link(
630640
}
631641
return NULL; // Just to avoid warnings. This is not actually reachable.
632642
}
643+
644+
shared_ptr<QueryElement> PatternMatchingQueryProcessor::build_unique_assignment_filter(
645+
shared_ptr<PatternMatchingQueryProxy> proxy,
646+
unsigned int cursor,
647+
stack<shared_ptr<QueryElement>>& element_stack) {
648+
if (element_stack.size() < 1) {
649+
Utils::error(
650+
"PATTERN_MATCHING_QUERY message: parse error in tokens - too few arguments for "
651+
"UniqueAssignmentFilter");
652+
}
653+
654+
shared_ptr<QueryElement> input = element_stack.top();
655+
element_stack.pop();
656+
return make_shared<UniqueAssignmentFilter>(input);
657+
}

src/query_engine/PatternMatchingQueryProcessor.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,11 @@ class PatternMatchingQueryProcessor : public BusCommandProcessor {
6868
unsigned int cursor,
6969
stack<shared_ptr<QueryElement>>& element_stack);
7070

71+
shared_ptr<QueryElement> build_unique_assignment_filter(
72+
shared_ptr<PatternMatchingQueryProxy> proxy,
73+
unsigned int cursor,
74+
stack<shared_ptr<QueryElement>>& element_stack);
75+
7176
vector<thread*> query_threads;
7277
mutex query_threads_mutex;
7378
shared_ptr<PatternMatchingQueryProxy> proxy;

src/query_engine/PatternMatchingQueryProxy.cc

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ PatternMatchingQueryProxy::PatternMatchingQueryProxy() {
2323

2424
PatternMatchingQueryProxy::PatternMatchingQueryProxy(const vector<string>& tokens,
2525
const string& context,
26+
bool unique_assignment,
2627
bool update_attention_broker,
2728
bool count_only)
2829
: BusCommandProxy() {
@@ -31,13 +32,18 @@ PatternMatchingQueryProxy::PatternMatchingQueryProxy(const vector<string>& token
3132
init();
3233
this->command = ServiceBus::PATTERN_MATCHING_QUERY;
3334
this->count_flag = count_only;
34-
this->args = {context, to_string(update_attention_broker), to_string(count_flag)};
35+
this->unique_assignment_flag = unique_assignment;
36+
this->args = {context,
37+
to_string(unique_assignment),
38+
to_string(update_attention_broker),
39+
to_string(count_flag)};
3540
this->args.insert(this->args.end(), tokens.begin(), tokens.end());
3641
}
3742

3843
void PatternMatchingQueryProxy::init() {
3944
this->answer_flow_finished = false;
4045
this->abort_flag = false;
46+
this->unique_assignment_flag = false;
4147
this->update_attention_broker = false;
4248
this->answer_count = 0;
4349
this->count_flag = false;
@@ -123,6 +129,16 @@ void PatternMatchingQueryProxy::set_count_flag(bool flag) {
123129
this->count_flag = flag;
124130
}
125131

132+
bool PatternMatchingQueryProxy::get_unique_assignment_flag() {
133+
lock_guard<mutex> semaphore(this->api_mutex);
134+
return this->unique_assignment_flag;
135+
}
136+
137+
void PatternMatchingQueryProxy::set_unique_assignment_flag(bool flag) {
138+
lock_guard<mutex> semaphore(this->api_mutex);
139+
this->unique_assignment_flag = flag;
140+
}
141+
126142
// ---------------------------------------------------------------------------------------------
127143
// Virtual superclass API from_remote_peer() and the piggyback methods called by it
128144

src/query_engine/PatternMatchingQueryProxy.h

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,16 +44,19 @@ class PatternMatchingQueryProxy : public BusCommandProxy {
4444
*
4545
* @param tokens Query tokens.
4646
* @param context AttentionBroker context
47+
* @param unique_assignment When true, operators (e.g. And, Or) don't output more than one
48+
* QueryAnswer with the same variable assignment.
4749
* @param update_attention_broker Flag to trigger AttentionBroker update based on this query
4850
* results
4951
* @param count_only Flag to indicate that this query is supposed to count the results and not
5052
* actually provide the query answers (i.e. no QueryAnswer is sent from the command executor and
5153
* the caller of the query).
5254
*/
5355
PatternMatchingQueryProxy(const vector<string>& tokens,
54-
const string& context = "",
55-
bool update_attention_broker = false,
56-
bool count_only = false);
56+
const string& context,
57+
bool unique_assignment,
58+
bool update_attention_broker,
59+
bool count_only);
5760

5861
/**
5962
* Destructor.
@@ -162,6 +165,24 @@ class PatternMatchingQueryProxy : public BusCommandProxy {
162165
*/
163166
void set_count_flag(bool flag);
164167

168+
/**
169+
* Getter for unique_assignment_flag
170+
*
171+
* unique_assignment_flag prevents duplicated variable assignment in Operators' output
172+
*
173+
* @return unique_assignment_flag
174+
*/
175+
bool get_unique_assignment_flag();
176+
177+
/**
178+
* Setter for unique_assignment_flag
179+
*
180+
* unique_assignment_flag prevents duplicated variable assignment in Operators' output
181+
*
182+
* @param flag Flag
183+
*/
184+
void set_unique_assignment_flag(bool flag);
185+
165186
/**
166187
* Getter for query_tokens
167188
*
@@ -221,6 +242,7 @@ class PatternMatchingQueryProxy : public BusCommandProxy {
221242
string context;
222243
bool update_attention_broker;
223244
bool count_flag;
245+
bool unique_assignment_flag;
224246

225247
void init();
226248
};

src/query_engine/QueryAnswer.cc

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,27 @@ string Assignment::to_string() {
9494
return answer;
9595
}
9696

97+
bool Assignment::operator==(const Assignment& other) const {
98+
if (this->size != other.size) {
99+
return false;
100+
}
101+
unsigned int n = this->size;
102+
for (unsigned int i = 0; i < n; i++) {
103+
unsigned int j = 0;
104+
while ((j != n) && strncmp(this->labels[i], other.labels[j], MAX_VARIABLE_NAME_SIZE)) {
105+
j++;
106+
}
107+
if (j == n) {
108+
// There's a variable in "this" which doesn't exist in "other"
109+
return false;
110+
} else if (strncmp(this->values[i], other.values[j], HANDLE_HASH_SIZE)) {
111+
// There same variable have different values in "this" and "other"
112+
return false;
113+
}
114+
}
115+
return true;
116+
}
117+
97118
// -------------------------------------------------------------------------------------------------
98119
// QueryAnswer
99120

src/query_engine/QueryAnswer.h

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ namespace query_engine {
2626
*/
2727
class Assignment {
2828
friend class QueryAnswer;
29+
friend struct std::hash<Assignment>;
2930

3031
public:
3132
/**
@@ -115,6 +116,12 @@ class Assignment {
115116
*/
116117
string to_string();
117118

119+
/**
120+
* Two Assignments are equal iff they have the exact set of variable lables with
121+
* the exact same assigned values.
122+
*/
123+
bool operator==(const Assignment& other) const;
124+
118125
private:
119126
const char* labels[MAX_NUMBER_OF_VARIABLES_IN_QUERY];
120127
const char* values[MAX_NUMBER_OF_VARIABLES_IN_QUERY];
@@ -260,3 +267,21 @@ class QueryAnswer {
260267
};
261268

262269
} // namespace query_engine
270+
271+
template <>
272+
struct std::hash<query_engine::Assignment> {
273+
std::size_t operator()(const query_engine::Assignment& k) const {
274+
if (k.size == 0) {
275+
return 0;
276+
}
277+
278+
std::size_t hash_value = 1;
279+
for (unsigned int i = 0; i < k.size; i++) {
280+
hash_value = hash_value ^ ((std::hash<string>()(string(k.labels[i])) ^
281+
(std::hash<string>()(string(k.values[i])) << 1)) >>
282+
1);
283+
}
284+
285+
return hash_value;
286+
}
287+
};

src/query_engine/query_element/BUILD

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ cc_library(
1313
":query_element",
1414
":link_template",
1515
":source",
16+
":unique_assignment_filter",
1617
],
1718
)
1819

@@ -103,6 +104,15 @@ cc_library(
103104
)
104105

105106

107+
cc_library(
108+
name = "unique_assignment_filter",
109+
srcs = ["UniqueAssignmentFilter.cc"],
110+
hdrs = ["UniqueAssignmentFilter.h"],
111+
deps = [
112+
":query_element",
113+
],
114+
)
115+
106116
cc_library(
107117
name = "source",
108118
srcs = ["Source.cc"],
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
#include "UniqueAssignmentFilter.h"
2+
3+
#define LOG_LEVEL INFO_LEVEL
4+
#include "Logger.h"
5+
6+
using namespace std;
7+
using namespace query_element;
8+
9+
// -------------------------------------------------------------------------------------------------
10+
// Constructors, destructors and initialization
11+
12+
UniqueAssignmentFilter::UniqueAssignmentFilter(const shared_ptr<QueryElement>& input)
13+
: Operator<1>({input}) {
14+
initialize(input);
15+
}
16+
17+
UniqueAssignmentFilter::~UniqueAssignmentFilter() { graceful_shutdown(); }
18+
19+
void UniqueAssignmentFilter::initialize(const shared_ptr<QueryElement>& input) {
20+
this->id = "UniqueAssignmentFilter(" + input->id + ")";
21+
LOG_INFO(this->id);
22+
}
23+
24+
// -------------------------------------------------------------------------------------------------
25+
// QueryElement API
26+
27+
void UniqueAssignmentFilter::setup_buffers() {
28+
Operator<1>::setup_buffers();
29+
this->operator_thread = new thread(&UniqueAssignmentFilter::thread_filter, this);
30+
}
31+
32+
void UniqueAssignmentFilter::graceful_shutdown() {
33+
Operator<1>::graceful_shutdown();
34+
if (this->operator_thread != NULL) {
35+
this->operator_thread->join();
36+
delete this->operator_thread;
37+
this->operator_thread = NULL;
38+
}
39+
}
40+
41+
// -------------------------------------------------------------------------------------------------
42+
// Private methods
43+
44+
void UniqueAssignmentFilter::thread_filter() {
45+
unordered_set<Assignment> already_used;
46+
47+
while (true) {
48+
if (this->input_buffer[0]->is_query_answers_finished() &&
49+
this->input_buffer[0]->is_query_answers_empty()) {
50+
this->output_buffer->query_answers_finished();
51+
break;
52+
}
53+
QueryAnswer* answer = dynamic_cast<QueryAnswer*>(this->input_buffer[0]->pop_query_answer());
54+
if (answer != NULL) {
55+
if (already_used.find(answer->assignment) == already_used.end()) {
56+
// New assignment. Let the QueryAnswer pass.
57+
already_used.insert(answer->assignment);
58+
this->output_buffer->add_query_answer(answer);
59+
} else {
60+
// Assignment has already been processed. Delete duplicate QueryAnswer.
61+
delete answer;
62+
}
63+
} else {
64+
Utils::sleep();
65+
}
66+
}
67+
}

0 commit comments

Comments
 (0)