Skip to content

Commit

Permalink
Merge pull request #86 from bbc/philipn-float-timestamps
Browse files Browse the repository at this point in the history
Add to_float timestamp methods and further support to negative timestamps
  • Loading branch information
philipnbbc authored Mar 21, 2024
2 parents b6e3d24 + 572c455 commit 009c761
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 38 deletions.
2 changes: 1 addition & 1 deletion mediatimestamp/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
print("ips-tai-nsec {}".format(ts.to_tai_sec_nsec()))
print("ips-tai-frac {}".format(ts.to_tai_sec_frac()))
print("utc {}".format(ts.to_iso8601_utc()))
print("utc-secs {}".format(ts.to_utc()[0]))
print("utc-secs {}".format(ts.to_unix()[0]))
print("smpte time label {}".format(ts.to_smpte_timelabel(50, 1)))
sys.exit(0)

Expand Down
113 changes: 77 additions & 36 deletions mediatimestamp/immutable/timestamp.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,12 @@ def __mediatimerange__(self) -> "TimeRange":
@classmethod
def get_time(cls) -> "Timestamp":
unix_time = time.time()
return cls.from_unix(int(unix_time), int(unix_time*cls.MAX_NANOSEC) - int(unix_time)*cls.MAX_NANOSEC)
abs_unix_time = abs(unix_time)
unix_sec = int(abs_unix_time)
unix_ns = int(abs_unix_time*cls.MAX_NANOSEC) - int(abs_unix_time)*cls.MAX_NANOSEC
unix_sign = 1 if unix_time >= 0 else -1

return cls.from_unix(unix_sec, unix_ns, unix_sign=unix_sign)

@classmethod
@deprecated(version="4.0.0",
Expand Down Expand Up @@ -238,10 +243,18 @@ def from_float(cls, toff_float: float) -> "Timestamp":
def from_datetime(cls, dt: datetime) -> "Timestamp":
minTs = datetime.fromtimestamp(0, tz.gettz('UTC'))
utcdt = dt.astimezone(tz.gettz('UTC'))
seconds = int((utcdt - minTs).total_seconds())
seconds = abs(int((utcdt - minTs).total_seconds()))
nanoseconds = utcdt.microsecond * 1000
if utcdt < minTs:
sign = -1
if nanoseconds > 0:
# The microseconds was for a positive date-time. In a negative
# unix time it needs to be flipped.
nanoseconds = cls.MAX_NANOSEC - nanoseconds
else:
sign = 1

return cls.from_unix(seconds, nanoseconds, False)
return cls.from_unix(unix_sec=seconds, unix_ns=nanoseconds, unix_sign=sign, is_leap=False)

@classmethod
def from_iso8601_utc(cls, iso8601utc: str) -> "Timestamp":
Expand All @@ -250,7 +263,18 @@ def from_iso8601_utc(cls, iso8601utc: str) -> "Timestamp":
year, month, day, hour, minute, second, ns = _parse_iso8601(iso8601utc[:-1])
gmtuple = (year, month, day, hour, minute, second - (second == 60))
secs_since_epoch = calendar.timegm(gmtuple)
return cls.from_unix(secs_since_epoch, ns, (second == 60))
if secs_since_epoch < 0:
sign = -1
secs_since_epoch = abs(secs_since_epoch)
if ns > 0:
# The ns parsed from the timestamp was for a positive ISO 8601 date-time. In a negative
# unix time it needs to be flipped.
ns = cls.MAX_NANOSEC - ns
secs_since_epoch -= 1
else:
sign = 1

return cls.from_unix(unix_sec=secs_since_epoch, unix_ns=ns, unix_sign=sign, is_leap=(second == 60))

@classmethod
def from_smpte_timelabel(cls, timelabel: str) -> "Timestamp":
Expand Down Expand Up @@ -319,19 +343,16 @@ def from_count(cls, count: int, rate_num: RationalTypes, rate_den: RationalTypes
return cls(ns=ns, sign=sign)

@classmethod
def from_unix(cls, unix_sec: int, unix_ns: int, is_leap: bool = False) -> "Timestamp":
def from_unix(cls, unix_sec: int, unix_ns: int, unix_sign: int = 1, is_leap: bool = False) -> "Timestamp":
leap_sec = 0
for tbl_sec, tbl_tai_sec_minus_1 in UTC_LEAP:
if unix_sec >= tbl_sec:
leap_sec = (tbl_tai_sec_minus_1 + 1) - tbl_sec
break
return cls(sec=unix_sec+leap_sec+is_leap, ns=unix_ns)

@classmethod
def from_utc(cls, utc_sec: int, utc_ns: int, is_leap: bool = False) -> "Timestamp":
""" Wrapper of from_unix for back-compatibility.
"""
return cls.from_unix(utc_sec, utc_ns, is_leap)
if unix_sign >= 0:
for tbl_sec, tbl_tai_sec_minus_1 in UTC_LEAP:
if unix_sec + is_leap >= tbl_sec:
leap_sec = (tbl_tai_sec_minus_1 + 1) - tbl_sec
break
else:
is_leap = False
return cls(sec=unix_sec+leap_sec, ns=unix_ns, sign=unix_sign)

def is_null(self) -> bool:
return self._value == 0
Expand Down Expand Up @@ -405,47 +426,67 @@ def to_tai_sec_nsec(self) -> str:
def to_tai_sec_frac(self, fixed_size: bool = False) -> str:
return self.to_sec_frac(fixed_size=fixed_size)

def to_float(self) -> float:
""" Convert to a floating point number of seconds
"""
return self._value / Timestamp.MAX_NANOSEC

def to_datetime(self) -> datetime:
sec, nsec, leap = self.to_unix()
sec, nsec, sign, leap = self.to_unix()
microsecond = int(round(nsec/1000))
if microsecond > 999999:
sec += 1
microsecond = 0
dt = datetime.fromtimestamp(sec, tz.gettz('UTC'))
if sign < 0 and microsecond > 0:
# The microseconds is for a negative unix time. In a positive date-time
# it needs to be flipped.
microsecond = 1000000 - microsecond
sec += 1
dt = datetime.fromtimestamp(sign * sec, tz.gettz('UTC'))
dt = dt.replace(microsecond=microsecond)

return dt

def to_unix(self) -> Tuple[int, int, bool]:
def to_unix(self) -> Tuple[int, int, int, bool]:
""" Convert to unix seconds.
Returns a tuple of (seconds, nanoseconds, is_leap), where `is_leap` is
`True` when the input time corresponds exactly to a UTC leap second.
Note that this deliberately returns a tuple, to try and avoid confusion.
"""
leap_sec = 0
is_leap = False
for unix_sec, tai_sec_minus_1 in UTC_LEAP:
if self.sec >= tai_sec_minus_1:
leap_sec = (tai_sec_minus_1 + 1) - unix_sec
is_leap = self.sec == tai_sec_minus_1
break

return (self.sec - leap_sec, self.ns, is_leap)

def to_utc(self) -> Tuple[int, int, bool]:
""" Wrapper of to_unix for back-compatibility.
if self._value < 0:
return (self.sec, self.ns, self.sign, False)
else:
leap_sec = 0
is_leap = False
for unix_sec, tai_sec_minus_1 in UTC_LEAP:
if self.sec >= tai_sec_minus_1:
leap_sec = (tai_sec_minus_1 + 1) - unix_sec
is_leap = self.sec == tai_sec_minus_1
break

return (self.sec - leap_sec, self.ns, self.sign, is_leap)

def to_unix_float(self) -> float:
""" Convert to unix seconds since the epoch as a floating point number
"""
return self.to_unix()
(sec, ns, sign, _) = self.to_unix()
return sign * (sec + ns / Timestamp.MAX_NANOSEC)

def to_iso8601_utc(self) -> str:
""" Get printed representation in ISO8601 format (UTC)
YYYY-MM-DDThh:mm:ss.s
where `s` is fractional seconds at nanosecond precision (always 9-chars wide)
"""
unix_s, unix_ns, is_leap = self.to_unix()
utc_bd = time.gmtime(unix_s)
frac_sec = self._get_fractional_seconds(fixed_size=True)
unix_s, unix_ns, unix_sign, is_leap = self.to_unix()
if unix_sign < 0 and unix_ns > 0:
# The nanoseconds is for a negative unix time. In a positive ISO 8601 date-time
# it needs to be flipped.
unix_ns = Timestamp.MAX_NANOSEC - unix_ns
unix_s += 1
utc_bd = time.gmtime(unix_sign * unix_s)
frac_sec = Timestamp(ns=unix_ns)._get_fractional_seconds(fixed_size=True)
leap_sec = int(is_leap)

return '%04d-%02d-%02dT%02d:%02d:%02d.%sZ' % (utc_bd.tm_year,
utc_bd.tm_mon,
utc_bd.tm_mday,
Expand All @@ -467,7 +508,7 @@ def to_smpte_timelabel(self,
count_on_or_after_second = Timestamp(tai_seconds, 0).to_count(rate, rounding=self.ROUND_UP)
count_within_second = count - count_on_or_after_second

unix_sec, unix_ns, is_leap = normalised_ts.to_unix()
unix_sec, unix_ns, unix_sign, is_leap = normalised_ts.to_unix()
leap_sec = int(is_leap)

if utc_offset is None:
Expand Down
83 changes: 82 additions & 1 deletion tests/test_timestamp.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,22 @@ def test_from_float(self):
self.assertEqual(r, case[1],
msg="Timestamp.from_float{!r} == {!r}, expected {!r}".format(case[0], r, case[1]))

def test_to_float(self):
"""This tests that timestamps can be created from a float."""
cases = [
((float(1.0)), Timestamp(1, 0)),
((float(1_000_000_000)), Timestamp(1_000_000_000, 0)),
((float(2.76)), Timestamp(2, 760_000_000)),
((float(-3.14)), Timestamp(3, 140_000_000, -1)),
((float(0.02)), Timestamp(0, 20_000_000))
]

for case in cases:
with self.subTest(case=case):
r = case[1].to_float()
self.assertEqual(r, case[1],
msg="{!r},to_float() == {!r}, expected {!r}".format(case[1], r, case[0]))

def test_set_value(self):
"""This tests that timestamps cannot have their value set."""
tests_ts = [
Expand Down Expand Up @@ -730,6 +746,12 @@ def test_convert_iso_utc(self):
"""This tests that conversion to and from ISO date format UTC time works as expected."""

tests = [
(Timestamp(62135596800, 0, -1), "0001-01-01T00:00:00.000000000Z"),
(Timestamp(1, 0, -1), "1969-12-31T23:59:59.000000000Z"),
(Timestamp(0, 999999999, -1), "1969-12-31T23:59:59.000000001Z"),
(Timestamp(0, 1, -1), "1969-12-31T23:59:59.999999999Z"),
(Timestamp(0, 0, 1), "1970-01-01T00:00:00.000000000Z"),

(Timestamp(1424177663, 102003), "2015-02-17T12:53:48.000102003Z"),

# the leap second is 23:59:60
Expand All @@ -751,7 +773,8 @@ def test_convert_iso_utc(self):
(Timestamp(1341100835, 100000000), "2012-07-01T00:00:00.100000000Z"),
(Timestamp(1341100835, 999999999), "2012-07-01T00:00:00.999999999Z"),

(Timestamp(283996818, 0), "1979-01-01T00:00:00.000000000Z") # 1979
(Timestamp(283996818, 0), "1979-01-01T00:00:00.000000000Z"), # 1979
(Timestamp(253402300836, 999999999), "9999-12-31T23:59:59.999999999Z")
]

for t in tests:
Expand Down Expand Up @@ -827,9 +850,16 @@ def test_from_datetime(self):
"""Conversion from python's datetime object."""

tests = [
(datetime(1, 1, 1, 0, 0, 0, 0, tz.gettz('UTC')), Timestamp(62135596800, 0, -1)),
(datetime(1969, 12, 31, 23, 59, 59, 0, tz.gettz('UTC')), Timestamp(1, 0, -1)),
(datetime(1969, 12, 31, 23, 59, 59, 1, tz.gettz('UTC')), Timestamp(0, 999999000, -1)),
(datetime(1969, 12, 31, 23, 59, 59, 999999, tz.gettz('UTC')), Timestamp(0, 1000, -1)),
(datetime(1970, 1, 1, 0, 0, 0, 0, tz.gettz('UTC')), Timestamp(0, 0)),
(datetime(1983, 3, 29, 15, 45, 0, 0, tz.gettz('UTC')), Timestamp(417800721, 0)),
(datetime(2017, 12, 5, 16, 33, 12, 196, tz.gettz('UTC')), Timestamp(1512491629, 196000)),
(datetime(2514, 1, 1, 0, 0, 0, 0, tz.gettz('UTC')), Timestamp(17166988837, 0, 1)),
# Stopping around here because high datetime values have a floating point error.
# See https://stackoverflow.com/a/75582241.
]

for t in tests:
Expand All @@ -839,10 +869,18 @@ def test_to_datetime(self):
"""Conversion to python's datetime object."""

tests = [
(datetime(1, 1, 1, 0, 0, 0, 0, tz.gettz('UTC')), Timestamp(62135596800, 0, -1)),
(datetime(1969, 12, 31, 23, 59, 59, 0, tz.gettz('UTC')), Timestamp(1, 0, -1)),
(datetime(1969, 12, 31, 23, 59, 59, 1, tz.gettz('UTC')), Timestamp(0, 999999000, -1)),
(datetime(1969, 12, 31, 23, 59, 59, 999999, tz.gettz('UTC')), Timestamp(0, 1000, -1)),
(datetime(1970, 1, 1, 0, 0, 0, 0, tz.gettz('UTC')), Timestamp(0, 0)),
(datetime(1970, 1, 1, 0, 0, 0, 1, tz.gettz('UTC')), Timestamp(0, 1000)),
(datetime(1983, 3, 29, 15, 45, 0, 0, tz.gettz('UTC')), Timestamp(417800721, 0)),
(datetime(2017, 12, 5, 16, 33, 12, 196, tz.gettz('UTC')), Timestamp(1512491629, 196000)),
(datetime(2017, 12, 5, 16, 33, 13, 0, tz.gettz('UTC')), Timestamp(1512491629, 999999999)),
(datetime(2514, 1, 1, 0, 0, 0, 0, tz.gettz('UTC')), Timestamp(17166988837, 0, 1)),
# Stopping around here because high datetime values have a floating point error.
# See https://stackoverflow.com/a/75582241
]

for t in tests:
Expand Down Expand Up @@ -879,3 +917,46 @@ def test_get_leap_seconds(self):

for t in tests:
self.assertEqual(t[0].get_leap_seconds(), t[1])

def test_from_unix(self):
tests = [
((Timestamp.MAX_SECONDS - 1, Timestamp.MAX_NANOSEC - 1, -1, False), # 0 leap seconds
Timestamp(Timestamp.MAX_SECONDS - 1, Timestamp.MAX_NANOSEC - 1, -1)), # 0 leap seconds
((1000, 0, -1, False), Timestamp(1000, 0, -1)), # 0 leap seconds
((63071999, 999999999, 1, False), Timestamp(63071999, 999999999)), # 0 leap seconds
((63071999, 0, 1, True), Timestamp(63072009, 0)), # 10 leap seconds at leap
((63072000, 0, 1, False), Timestamp(63072010, 0)), # 10 leap seconds
((63072008, 999999999, 1, False), Timestamp(63072018, 999999999)), # 10 leap seconds
((1512491592, 0, 1, False), Timestamp(1512491629, 0)), # 37 leap seconds
((Timestamp.MAX_SECONDS - 1 - 37, Timestamp.MAX_NANOSEC - 1, 1, False),
Timestamp(Timestamp.MAX_SECONDS - 1, Timestamp.MAX_NANOSEC - 1)), # 37 leap seconds
]

for t in tests:
self.assertEqual(Timestamp.from_unix(*t[0]), t[1])

def test_to_unix(self):
tests = [
(Timestamp(Timestamp.MAX_SECONDS - 1, Timestamp.MAX_NANOSEC - 1, -1), # 0 leap seconds
(Timestamp.MAX_SECONDS - 1, Timestamp.MAX_NANOSEC - 1, -1, False)), # 0 leap seconds
(Timestamp(1000, 0, -1), (1000, 0, -1, False)), # 0 leap seconds
(Timestamp(63072008, 999999999), (63072008, 999999999, 1, False)), # 0 leap seconds
(Timestamp(63072009, 0), (63071999, 0, 1, True)), # 10 leap seconds at leap
(Timestamp(63072010, 0), (63072000, 0, 1, False)), # 10 leap seconds
(Timestamp(1512491629, 0), (1512491592, 0, 1, False)), # 37 leap seconds
(Timestamp(Timestamp.MAX_SECONDS - 1, Timestamp.MAX_NANOSEC - 1),
(Timestamp.MAX_SECONDS - 1 - 37, Timestamp.MAX_NANOSEC - 1, 1, False)), # 37 leap seconds
]

for t in tests:
self.assertEqual(t[0].to_unix(), t[1])

def test_to_unix_float(self):
tests = [
(Timestamp(63072008, 999999999), 63072008 + 999999999 / 1000000000),
(Timestamp(63072009, 0), 63071999),
(Timestamp(1000, 0, -1), -1000)
]

for t in tests:
self.assertEqual(t[0].to_unix_float(), t[1])

0 comments on commit 009c761

Please sign in to comment.