Skip to content

Commit 3402fb9

Browse files
authored
Telemetry pipeline improvements (iteration 1) (#3133)
* telemetry pipeline improvements * addressed maddie comments * undo pylint warn fix * manually computing total seconds * addressing comments * fix the comment * fix UT * fix error * new comments (cherry picked from commit 55ea21f) * remove comment * paramter update and comment * merge conflict
1 parent d79966d commit 3402fb9

16 files changed

+304
-142
lines changed

azurelinuxagent/common/event.py

Lines changed: 46 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -213,13 +213,9 @@ def parse_json_event(data_str):
213213

214214
def parse_event(data_str):
215215
try:
216-
try:
217-
return parse_json_event(data_str)
218-
except ValueError:
219-
return parse_xml_event(data_str)
220-
except Exception as e:
221-
raise EventError("Error parsing event: {0}".format(ustr(e)))
222-
216+
return parse_json_event(data_str)
217+
except ValueError:
218+
return parse_xml_event(data_str)
223219

224220
def parse_xml_param(param_node):
225221
name = getattrib(param_node, "Name")
@@ -344,11 +340,15 @@ def update_unicode_error(self, unicode_err):
344340
def update_op_error(self, op_err):
345341
self.__op_error_count = self._update_errors_and_get_count(self.__op_error_count, self.__op_errors, op_err)
346342

343+
def get_error_count(self):
344+
return self.__op_error_count + self.__unicode_error_count
345+
347346

348347
class EventLogger(object):
349348
def __init__(self):
350349
self.event_dir = None
351350
self.periodic_events = {}
351+
self.protocol = None
352352

353353
#
354354
# All events should have these parameters.
@@ -494,7 +494,10 @@ def add_periodic(self, delta, name, op=WALAEventOperation.Unknown, is_success=Tr
494494
self.periodic_events[h] = datetime.now()
495495

496496
def add_event(self, name, op=WALAEventOperation.Unknown, is_success=True, duration=0, version=str(CURRENT_VERSION),
497-
message="", log_event=True):
497+
message="", log_event=True, flush=False):
498+
"""
499+
:param flush: Flush the event immediately to the wire server
500+
"""
498501

499502
if (not is_success) and log_event:
500503
_log_event(name, op, message, duration, is_success=is_success)
@@ -508,12 +511,7 @@ def add_event(self, name, op=WALAEventOperation.Unknown, is_success=True, durati
508511
event.parameters.append(TelemetryEventParam(GuestAgentExtensionEventsSchema.Duration, int(duration)))
509512
self.add_common_event_parameters(event, datetime.utcnow())
510513

511-
data = get_properties(event)
512-
513-
try:
514-
self.save_event(json.dumps(data))
515-
except EventError as e:
516-
logger.periodic_error(logger.EVERY_FIFTEEN_MINUTES, "[PERIODIC] {0}".format(ustr(e)))
514+
self.report_or_save_event(event, flush)
517515

518516
def add_log_event(self, level, message):
519517
event = TelemetryEvent(TELEMETRY_LOG_EVENT_ID, TELEMETRY_LOG_PROVIDER_ID)
@@ -524,11 +522,7 @@ def add_log_event(self, level, message):
524522
event.parameters.append(TelemetryEventParam(GuestAgentGenericLogsSchema.Context3, ''))
525523
self.add_common_event_parameters(event, datetime.utcnow())
526524

527-
data = get_properties(event)
528-
try:
529-
self.save_event(json.dumps(data))
530-
except EventError:
531-
pass
525+
self.report_or_save_event(event)
532526

533527
def add_metric(self, category, counter, instance, value, log_event=False):
534528
"""
@@ -551,11 +545,25 @@ def add_metric(self, category, counter, instance, value, log_event=False):
551545
event.parameters.append(TelemetryEventParam(GuestAgentPerfCounterEventsSchema.Value, float(value)))
552546
self.add_common_event_parameters(event, datetime.utcnow())
553547

554-
data = get_properties(event)
555-
try:
556-
self.save_event(json.dumps(data))
557-
except EventError as e:
558-
logger.periodic_error(logger.EVERY_FIFTEEN_MINUTES, "[PERIODIC] {0}".format(ustr(e)))
548+
self.report_or_save_event(event)
549+
550+
def report_or_save_event(self, event, flush=False):
551+
"""
552+
Flush the event to wireserver if flush to set to true, else
553+
save it disk if we fail to send or not required to flush immediately.
554+
TODO: pickup as many events as possible and send them in one go.
555+
"""
556+
report_success = False
557+
if flush and self.protocol is not None:
558+
report_success = self.protocol.report_event([event], flush)
559+
560+
if not report_success:
561+
try:
562+
data = get_properties(event)
563+
self.save_event(json.dumps(data))
564+
except EventError as e:
565+
logger.periodic_error(logger.EVERY_FIFTEEN_MINUTES, "[PERIODIC] {0}".format(ustr(e)))
566+
559567

560568
@staticmethod
561569
def _clean_up_message(message):
@@ -636,13 +644,16 @@ def elapsed_milliseconds(utc_start):
636644
(d.microseconds / 1000.0))
637645

638646

639-
def report_event(op, is_success=True, message='', log_event=True):
647+
def report_event(op, is_success=True, message='', log_event=True, flush=False):
648+
"""
649+
:param flush: if true, flush the event immediately to the wire server
650+
"""
640651
add_event(AGENT_NAME,
641652
version=str(CURRENT_VERSION),
642653
is_success=is_success,
643654
message=message,
644655
op=op,
645-
log_event=log_event)
656+
log_event=log_event, flush=flush)
646657

647658

648659
def report_periodic(delta, op, is_success=True, message=''):
@@ -675,12 +686,17 @@ def report_metric(category, counter, instance, value, log_event=False, reporter=
675686
"{0}/{1} [{2}] = {3}".format(category, counter, instance, value))
676687

677688

678-
def initialize_event_logger_vminfo_common_parameters(protocol, reporter=__event_logger__):
689+
def initialize_event_logger_vminfo_common_parameters_and_protocol(protocol, reporter=__event_logger__):
690+
# Initialize protocal for event logger to directly send events to wireserver
691+
reporter.protocol = protocol
679692
reporter.initialize_vminfo_common_parameters(protocol)
680693

681694

682695
def add_event(name=AGENT_NAME, op=WALAEventOperation.Unknown, is_success=True, duration=0, version=str(CURRENT_VERSION),
683-
message="", log_event=True, reporter=__event_logger__):
696+
message="", log_event=True, flush=False, reporter=__event_logger__):
697+
"""
698+
:param flush: if true, flush the event immediately to the wire server
699+
"""
684700
if reporter.event_dir is None:
685701
logger.warn("Cannot add event -- Event reporter is not initialized.")
686702
_log_event(name, op, message, duration, is_success=is_success)
@@ -690,7 +706,7 @@ def add_event(name=AGENT_NAME, op=WALAEventOperation.Unknown, is_success=True, d
690706
mark_event_status(name, version, op, is_success)
691707
reporter.add_event(name, op=op, is_success=is_success, duration=duration, version=str(version),
692708
message=message,
693-
log_event=log_event)
709+
log_event=log_event, flush=flush)
694710

695711

696712
def info(op, fmt, *args):
@@ -723,7 +739,7 @@ def add_log_event(level, message, forced=False, reporter=__event_logger__):
723739
:param message: Message
724740
:param forced: Force write the event even if send_logs_to_telemetry() is disabled
725741
(NOTE: Remove this flag once send_logs_to_telemetry() is enabled for all events)
726-
:param reporter:
742+
:param reporter: The EventLogger instance to which metric events should be sent
727743
:return:
728744
"""
729745
if reporter.event_dir is None:

azurelinuxagent/common/protocol/wire.py

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -44,14 +44,16 @@
4444
from azurelinuxagent.common.telemetryevent import GuestAgentExtensionEventsSchema
4545
from azurelinuxagent.common.utils import fileutil, restutil
4646
from azurelinuxagent.common.utils.cryptutil import CryptUtil
47+
from azurelinuxagent.common.utils.restutil import TELEMETRY_THROTTLE_DELAY_IN_SECONDS, \
48+
TELEMETRY_FLUSH_THROTTLE_DELAY_IN_SECONDS, TELEMETRY_DATA
4749
from azurelinuxagent.common.utils.textutil import parse_doc, findall, find, \
4850
findtext, gettext, remove_bom, get_bytes_from_pem, parse_json
4951
from azurelinuxagent.common.version import AGENT_NAME, CURRENT_VERSION
5052

5153
VERSION_INFO_URI = "http://{0}/?comp=versions"
5254
HEALTH_REPORT_URI = "http://{0}/machine?comp=health"
5355
ROLE_PROP_URI = "http://{0}/machine?comp=roleProperties"
54-
TELEMETRY_URI = "http://{0}/machine?comp=telemetrydata"
56+
TELEMETRY_URI = "http://{0}/machine?comp={1}"
5557

5658
PROTOCOL_VERSION = "2012-11-30"
5759
ENDPOINT_FINE_NAME = "WireServer"
@@ -144,8 +146,8 @@ def report_vm_status(self, vm_status):
144146
self.client.status_blob.set_vm_status(vm_status)
145147
self.client.upload_status_blob()
146148

147-
def report_event(self, events_iterator):
148-
self.client.report_event(events_iterator)
149+
def report_event(self, events_iterator, flush=False):
150+
return self.client.report_event(events_iterator, flush)
149151

150152
def upload_logs(self, logs):
151153
self.client.upload_logs(logs)
@@ -1047,8 +1049,8 @@ def report_health(self, status, substatus, description):
10471049
u",{0}: {1}").format(resp.status,
10481050
resp.read()))
10491051

1050-
def send_encoded_event(self, provider_id, event_str, encoding='utf8'):
1051-
uri = TELEMETRY_URI.format(self.get_endpoint())
1052+
def _send_encoded_event(self, provider_id, event_str, flush, encoding='utf8'):
1053+
uri = TELEMETRY_URI.format(self.get_endpoint(), TELEMETRY_DATA)
10521054
data_format_header = ustr('<?xml version="1.0"?><TelemetryData version="1.0"><Provider id="{0}">').format(
10531055
provider_id).encode(encoding)
10541056
data_format_footer = ustr('</Provider></TelemetryData>').encode(encoding)
@@ -1059,7 +1061,12 @@ def send_encoded_event(self, provider_id, event_str, encoding='utf8'):
10591061
header = self.get_header_for_xml_content()
10601062
# NOTE: The call to wireserver requests utf-8 encoding in the headers, but the body should not
10611063
# be encoded: some nodes in the telemetry pipeline do not support utf-8 encoding.
1062-
resp = self.call_wireserver(restutil.http_post, uri, data, header)
1064+
1065+
# if it's important event flush, we use less throttle delay(to avoid long delay to complete this operation)) on throttling errors
1066+
if flush:
1067+
resp = self.call_wireserver(restutil.http_post, uri, data, header, max_retry=3, throttle_delay=TELEMETRY_FLUSH_THROTTLE_DELAY_IN_SECONDS)
1068+
else:
1069+
resp = self.call_wireserver(restutil.http_post, uri, data, header, max_retry=3, throttle_delay=TELEMETRY_THROTTLE_DELAY_IN_SECONDS)
10631070
except HttpError as e:
10641071
raise ProtocolError("Failed to send events:{0}".format(e))
10651072

@@ -1068,14 +1075,14 @@ def send_encoded_event(self, provider_id, event_str, encoding='utf8'):
10681075
raise ProtocolError(
10691076
"Failed to send events:{0}".format(resp.status))
10701077

1071-
def report_event(self, events_iterator):
1078+
def report_event(self, events_iterator, flush=False):
10721079
buf = {}
10731080
debug_info = CollectOrReportEventDebugInfo(operation=CollectOrReportEventDebugInfo.OP_REPORT)
10741081
events_per_provider = defaultdict(int)
10751082

1076-
def _send_event(provider_id, debug_info):
1083+
def _send_event(provider_id, debug_info, flush):
10771084
try:
1078-
self.send_encoded_event(provider_id, buf[provider_id])
1085+
self._send_encoded_event(provider_id, buf[provider_id], flush)
10791086
except UnicodeError as uni_error:
10801087
debug_info.update_unicode_error(uni_error)
10811088
except Exception as error:
@@ -1102,7 +1109,7 @@ def _send_event(provider_id, debug_info):
11021109
# If buffer is full, send out the events in buffer and reset buffer
11031110
if len(buf[event.providerId] + event_str) >= MAX_EVENT_BUFFER_SIZE:
11041111
logger.verbose("No of events this request = {0}".format(events_per_provider[event.providerId]))
1105-
_send_event(event.providerId, debug_info)
1112+
_send_event(event.providerId, debug_info, flush)
11061113
buf[event.providerId] = b""
11071114
events_per_provider[event.providerId] = 0
11081115

@@ -1117,10 +1124,12 @@ def _send_event(provider_id, debug_info):
11171124
for provider_id in list(buf.keys()):
11181125
if buf[provider_id]:
11191126
logger.verbose("No of events this request = {0}".format(events_per_provider[provider_id]))
1120-
_send_event(provider_id, debug_info)
1127+
_send_event(provider_id, debug_info, flush)
11211128

11221129
debug_info.report_debug_info()
11231130

1131+
return debug_info.get_error_count() == 0
1132+
11241133
def report_status_event(self, message, is_success):
11251134
report_event(op=WALAEventOperation.ReportStatus,
11261135
is_success=is_success,

azurelinuxagent/common/telemetryevent.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,4 +111,4 @@ def get_version(self):
111111
for param in self.parameters:
112112
if param.name == GuestAgentExtensionEventsSchema.Version:
113113
return param.value
114-
return None
114+
return None

azurelinuxagent/common/utils/restutil.py

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@
3939

4040
THROTTLE_RETRIES = 25
4141
THROTTLE_DELAY_IN_SECONDS = 1
42+
# Reducing next attempt calls when throttled since telemetrydata endpoint has a limit 15 calls per 15 secs,
43+
TELEMETRY_THROTTLE_DELAY_IN_SECONDS = 8
44+
# Considering short delay for telemetry flush imp events
45+
TELEMETRY_FLUSH_THROTTLE_DELAY_IN_SECONDS = 2
4246

4347
REDACTED_TEXT = "<SAS_SIGNATURE>"
4448
SAS_TOKEN_RETRIEVAL_REGEX = re.compile(r'^(https?://[a-zA-Z0-9.].*sig=)([a-zA-Z0-9%-]*)(.*)$')
@@ -109,6 +113,7 @@
109113
KNOWN_WIRESERVER_IP = '168.63.129.16'
110114
HOST_PLUGIN_PORT = 32526
111115

116+
TELEMETRY_DATA = "telemetrydata"
112117

113118
class IOErrorCounter(object):
114119
_lock = threading.RLock()
@@ -163,6 +168,10 @@ def _is_retry_exception(e):
163168
def _is_throttle_status(status):
164169
return status in THROTTLE_CODES
165170

171+
def _is_telemetry_req(url):
172+
if TELEMETRY_DATA in url:
173+
return True
174+
return False
166175

167176
def _parse_url(url):
168177
"""
@@ -364,6 +373,7 @@ def http_request(method,
364373
max_retry=None,
365374
retry_codes=None,
366375
retry_delay=DELAY_IN_SECONDS,
376+
throttle_delay=THROTTLE_DELAY_IN_SECONDS,
367377
redact_data=False,
368378
return_raw_response=False):
369379
"""
@@ -427,10 +437,10 @@ def http_request(method,
427437
# (with a safe, minimum number of retry attempts)
428438
# -- Otherwise, compute a delay that is the product of the next
429439
# item in the Fibonacci series and the initial delay value
430-
delay = THROTTLE_DELAY_IN_SECONDS \
431-
if was_throttled \
432-
else _compute_delay(retry_attempt=attempt,
433-
delay=retry_delay)
440+
if was_throttled:
441+
delay = throttle_delay
442+
else:
443+
delay = _compute_delay(retry_attempt=attempt, delay=retry_delay)
434444

435445
logger.verbose("[HTTP Retry] "
436446
"Attempt {0} of {1} will delay {2} seconds: {3}",
@@ -468,7 +478,10 @@ def http_request(method,
468478
# retry attempts
469479
if _is_throttle_status(resp.status):
470480
was_throttled = True
471-
max_retry = max(max_retry, THROTTLE_RETRIES)
481+
# Today, THROTTLE_RETRIES is set to a large number (26) for retries, as opposed to backing off and attempting fewer retries.
482+
# However, for telemetry calls (due to throttle limit 15 calls per 15 seconds), we use max_retry set by the caller for overall retry attempts instead of THROTTLE_RETRIES.
483+
if not _is_telemetry_req(url):
484+
max_retry = max(max_retry, THROTTLE_RETRIES)
472485
continue
473486

474487
# If we got a 410 (resource gone) for any reason, raise an exception. The caller will handle it by
@@ -563,6 +576,7 @@ def http_post(url,
563576
max_retry=None,
564577
retry_codes=None,
565578
retry_delay=DELAY_IN_SECONDS,
579+
throttle_delay=THROTTLE_DELAY_IN_SECONDS,
566580
timeout=10):
567581

568582
if max_retry is None:
@@ -575,7 +589,8 @@ def http_post(url,
575589
use_proxy=use_proxy,
576590
max_retry=max_retry,
577591
retry_codes=retry_codes,
578-
retry_delay=retry_delay)
592+
retry_delay=retry_delay,
593+
throttle_delay=throttle_delay)
579594

580595

581596
def http_put(url,

azurelinuxagent/daemon/main.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import azurelinuxagent.common.logger as logger
2626
import azurelinuxagent.common.utils.fileutil as fileutil
2727

28-
from azurelinuxagent.common.event import add_event, WALAEventOperation, initialize_event_logger_vminfo_common_parameters
28+
from azurelinuxagent.common.event import add_event, WALAEventOperation, initialize_event_logger_vminfo_common_parameters_and_protocol
2929
from azurelinuxagent.common.future import ustr
3030
from azurelinuxagent.common.osutil import get_osutil
3131
from azurelinuxagent.common.protocol.goal_state import GoalState, GoalStateProperties
@@ -119,7 +119,7 @@ def initialize_environment(self):
119119

120120
def _initialize_telemetry(self):
121121
protocol = self.protocol_util.get_protocol()
122-
initialize_event_logger_vminfo_common_parameters(protocol)
122+
initialize_event_logger_vminfo_common_parameters_and_protocol(protocol)
123123

124124
def daemon(self, child_args=None):
125125
logger.info("Run daemon")

0 commit comments

Comments
 (0)