Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use monotonic clock for time measuring (backport #982) #986

Merged
merged 1 commit into from
Dec 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import threading
from datetime import datetime
import time

from rosbridge_library.capability import Capability

Expand Down Expand Up @@ -84,13 +84,13 @@ def __init__(self, protocol):
# 4.b) pass the reconstructed message string to protocol.incoming() # protocol.incoming is checking message fields by itself, so no need to do this before passing the reconstructed message to protocol
# 4.c) remove the fragment list to free up memory
def defragment(self, message):
now = datetime.now()
now = time.monotonic()

if self.received_fragments is not None:
for id in self.received_fragments.keys():
time_diff = now - self.received_fragments[id]["timestamp_last_append"]
if (
time_diff.total_seconds() > self.fragment_timeout
time_diff > self.fragment_timeout
and not self.received_fragments[id]["is_reconstructing"]
):
log_msg = ["fragment list ", str(id), " timed out.."]
Expand Down Expand Up @@ -188,15 +188,15 @@ def defragment(self, message):
log_msg = "".join(log_msg)
self.protocol.log("debug", log_msg)

duration = datetime.now() - now
duration = time.monotonic() - now

# Pass the reconstructed message to rosbridge
self.protocol.incoming(reconstructed_msg)
log_msg = ["reconstructed message (ID:" + str(msg_id) + ") from "]
log_msg.extend([str(msg_total), " fragments. "])
# cannot access msg.data if message is a service_response or else!
# log_msg += "[message length: " + str(len(str(json.loads(reconstructed_msg)["msg"]["data"]))) +"]"
log_msg.extend(["[duration: ", str(duration.total_seconds()), " s]"])
log_msg.extend(["[duration: ", str(duration), " s]"])
log_msg = "".join(log_msg)
self.protocol.log("info", log_msg)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@
# POSSIBILITY OF SUCH DAMAGE.

import sys
import time
import traceback
from collections import deque
from threading import Condition, Thread
from time import time

""" Sits between incoming messages from a subscription, and the outgoing
publish method. Provides throttling / buffering capabilities.
Expand Down Expand Up @@ -66,10 +66,10 @@ def set_queue_length(self, queue_length):
return self.transition()

def time_remaining(self):
return max((self.last_publish + self.throttle_rate) - time(), 0)
return max((self.last_publish + self.throttle_rate) - time.monotonic(), 0)

def handle_message(self, msg):
self.last_publish = time()
self.last_publish = time.monotonic()
self.publish(msg)

def transition(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,9 @@ def cb(msg):
handler.publish = cb

self.assertTrue(handler.time_remaining() == 0)
t1 = time.time()
t1 = time.monotonic()
handler.handle_message(msg)
t2 = time.time()
t2 = time.monotonic()

self.assertEqual(received["msg"], msg)
self.assertLessEqual(t1, handler.last_publish)
Expand Down