forked from VestigeJ/dataminer
-
Notifications
You must be signed in to change notification settings - Fork 0
/
worker.cpp
118 lines (98 loc) · 3.13 KB
/
worker.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#include <stdlib.h>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/thread/thread.hpp>
#include <boost/chrono.hpp>
#include <boost/filesystem.hpp>
#include "httpreader.h"
#include "agenthandler.h"
#include "worker.h"
Worker::Worker()
{
m_interval = 60;
m_poll_count = 0;
}
bool Worker::setup(Settings *settings, string outputLocation, string uri, string interval)
{
m_outputLocation = outputLocation;
m_uri = uri;
m_settings = settings;
m_interval = atoi(interval.c_str());
if (m_interval <= 0) {
std::cerr << "interval [" << interval << "] is invalid" << endl;
return false;
}
m_next_sequence = settings->get(uri);
std::cout << "----------------" << endl;
std::cout << "Output Location: " << outputLocation << endl;
std::cout << "Agent Uri: " << uri << endl;
std::cout << "Poll Interval: " << interval << endl;
std::cout << "Next Sequence #: " << m_next_sequence << endl;
if (!boost::filesystem::exists(outputLocation))
{
std::cerr <<
"Location " << outputLocation << " does not exist!\n";
return false;
}
return m_reader.parseUri(uri);
}
void Worker::run()
{
while (true)
{
poll();
boost::this_thread::sleep_for( boost::chrono::seconds(m_interval) );
}
}
void Worker::poll()
{
m_poll_count++;
// get the meta info if not ready
set<string> &numericFields = m_handler.getNumericFields();
if (numericFields.size() == 0)
{
m_reader.setQuery("/probe");
string probeXml = m_reader.read();
if (probeXml.length() == 0)
{
std::cerr << "No data!" << endl;
return;
}
m_handler.setProbeInfo(probeXml);
m_reader.close();
}
if (m_next_sequence.length() == 0)
m_reader.setQuery("/current");
else
m_reader.setQuery("/sample?count=10000&from="+m_next_sequence);
string xmlData = m_reader.read();
if (xmlData.length() == 0)
{
std::cerr << "No data!" << endl;
return;
}
try {
m_handler.process(xmlData);
}
catch (exception & e)
{
std::cerr << e.what() << endl;
return;
}
// don't output if data has not changed
string sequence = m_handler.getJSON_data("MTConnectStreams.Header.<xmlattr>.nextSequence");
if (sequence.compare(m_next_sequence) == 0)
{
std::cout << "========== { " << m_uri << " - "<< m_poll_count << ", [SKIPPED] next sequence = " << m_next_sequence << " } ==========" << std::endl;
return;
}
m_next_sequence = sequence;
m_settings->set(m_uri, m_next_sequence);
if (sequence.length() == 0)
{
// last next_sequence may be invalid, reset to using "current" to fetch the latest data
std::cout << "========== { " << m_uri << " - "<< m_poll_count << ", [SKIPPED] reset to fetch current data } ==========" << std::endl;
return;
}
m_handler.outputJSON(&m_itemManager, m_outputLocation);
std::cout << "========== { " << m_uri << " - " << m_poll_count << ", next sequence = " << m_next_sequence << " } ==========" << std::endl;
}