From 5afe48421143e4cb51bc4fb166d717e058503c9c Mon Sep 17 00:00:00 2001 From: Philip de Nier Date: Thu, 14 Mar 2024 13:53:05 +0000 Subject: [PATCH 1/2] Add Timestamp conversion to float and unix seconds float sem-ver: feature --- mediatimestamp/immutable/timestamp.py | 14 +++++++++++ tests/test_timestamp.py | 36 +++++++++++++++++++++++++++ 2 files changed, 50 insertions(+) diff --git a/mediatimestamp/immutable/timestamp.py b/mediatimestamp/immutable/timestamp.py index 237ba65..10928e4 100644 --- a/mediatimestamp/immutable/timestamp.py +++ b/mediatimestamp/immutable/timestamp.py @@ -405,6 +405,11 @@ def to_tai_sec_nsec(self) -> str: def to_tai_sec_frac(self, fixed_size: bool = False) -> str: return self.to_sec_frac(fixed_size=fixed_size) + def to_float(self) -> float: + """ Convert to a floating point number of seconds + """ + return self._value / Timestamp.MAX_NANOSEC + def to_datetime(self) -> datetime: sec, nsec, leap = self.to_unix() microsecond = int(round(nsec/1000)) @@ -437,6 +442,15 @@ def to_utc(self) -> Tuple[int, int, bool]: """ return self.to_unix() + def to_unix_float(self) -> float: + """ Convert to unix seconds since the epoch as a floating point number + """ + if self._value < 0: + return self.to_float() + else: + (sec, ns, _) = self.to_unix() + return sec + ns / Timestamp.MAX_NANOSEC + def to_iso8601_utc(self) -> str: """ Get printed representation in ISO8601 format (UTC) YYYY-MM-DDThh:mm:ss.s diff --git a/tests/test_timestamp.py b/tests/test_timestamp.py index 57c0356..aa3cb9b 100644 --- a/tests/test_timestamp.py +++ b/tests/test_timestamp.py @@ -277,6 +277,22 @@ def test_from_float(self): self.assertEqual(r, case[1], msg="Timestamp.from_float{!r} == {!r}, expected {!r}".format(case[0], r, case[1])) + def test_to_float(self): + """This tests that timestamps can be created from a float.""" + cases = [ + ((float(1.0)), Timestamp(1, 0)), + ((float(1_000_000_000)), Timestamp(1_000_000_000, 0)), + ((float(2.76)), Timestamp(2, 760_000_000)), + ((float(-3.14)), Timestamp(3, 140_000_000, -1)), + ((float(0.02)), Timestamp(0, 20_000_000)) + ] + + for case in cases: + with self.subTest(case=case): + r = case[1].to_float() + self.assertEqual(r, case[1], + msg="{!r},to_float() == {!r}, expected {!r}".format(case[1], r, case[0])) + def test_set_value(self): """This tests that timestamps cannot have their value set.""" tests_ts = [ @@ -879,3 +895,23 @@ def test_get_leap_seconds(self): for t in tests: self.assertEqual(t[0].get_leap_seconds(), t[1]) + + def test_to_unix(self): + tests = [ + (Timestamp(63072008, 999999999), (63072008, 999999999, False)), # 0 leap seconds + (Timestamp(63072009, 0), (63071999, 0, True)), # 10 leap seconds at leap + (Timestamp(1512491629, 0), (1512491592, 0, False)), # 37 leap seconds + ] + + for t in tests: + self.assertEqual(t[0].to_unix(), t[1]) + + def test_to_unix_float(self): + tests = [ + (Timestamp(63072008, 999999999), 63072008 + 999999999 / 1000000000), + (Timestamp(63072009, 0), 63071999), + (Timestamp(1000, 0, -1), -1000) + ] + + for t in tests: + self.assertEqual(t[0].to_unix_float(), t[1]) From 572c455b78136882b7be0ce5fb268970496c0cfa Mon Sep 17 00:00:00 2001 From: Philip de Nier Date: Thu, 14 Mar 2024 14:50:11 +0000 Subject: [PATCH 2/2] Support negative timestamp in to unix and iso8691 conversions Removed the to_utc method which was deprecated in favour of to_unix. CHanged to_unix to return the sign as well. sem-ver: api-break --- mediatimestamp/__main__.py | 2 +- mediatimestamp/immutable/timestamp.py | 109 ++++++++++++++++---------- tests/test_timestamp.py | 53 ++++++++++++- 3 files changed, 118 insertions(+), 46 deletions(-) diff --git a/mediatimestamp/__main__.py b/mediatimestamp/__main__.py index 8a018ec..9af2044 100644 --- a/mediatimestamp/__main__.py +++ b/mediatimestamp/__main__.py @@ -26,7 +26,7 @@ print("ips-tai-nsec {}".format(ts.to_tai_sec_nsec())) print("ips-tai-frac {}".format(ts.to_tai_sec_frac())) print("utc {}".format(ts.to_iso8601_utc())) - print("utc-secs {}".format(ts.to_utc()[0])) + print("utc-secs {}".format(ts.to_unix()[0])) print("smpte time label {}".format(ts.to_smpte_timelabel(50, 1))) sys.exit(0) diff --git a/mediatimestamp/immutable/timestamp.py b/mediatimestamp/immutable/timestamp.py index 10928e4..7b56949 100644 --- a/mediatimestamp/immutable/timestamp.py +++ b/mediatimestamp/immutable/timestamp.py @@ -148,7 +148,12 @@ def __mediatimerange__(self) -> "TimeRange": @classmethod def get_time(cls) -> "Timestamp": unix_time = time.time() - return cls.from_unix(int(unix_time), int(unix_time*cls.MAX_NANOSEC) - int(unix_time)*cls.MAX_NANOSEC) + abs_unix_time = abs(unix_time) + unix_sec = int(abs_unix_time) + unix_ns = int(abs_unix_time*cls.MAX_NANOSEC) - int(abs_unix_time)*cls.MAX_NANOSEC + unix_sign = 1 if unix_time >= 0 else -1 + + return cls.from_unix(unix_sec, unix_ns, unix_sign=unix_sign) @classmethod @deprecated(version="4.0.0", @@ -238,10 +243,18 @@ def from_float(cls, toff_float: float) -> "Timestamp": def from_datetime(cls, dt: datetime) -> "Timestamp": minTs = datetime.fromtimestamp(0, tz.gettz('UTC')) utcdt = dt.astimezone(tz.gettz('UTC')) - seconds = int((utcdt - minTs).total_seconds()) + seconds = abs(int((utcdt - minTs).total_seconds())) nanoseconds = utcdt.microsecond * 1000 + if utcdt < minTs: + sign = -1 + if nanoseconds > 0: + # The microseconds was for a positive date-time. In a negative + # unix time it needs to be flipped. + nanoseconds = cls.MAX_NANOSEC - nanoseconds + else: + sign = 1 - return cls.from_unix(seconds, nanoseconds, False) + return cls.from_unix(unix_sec=seconds, unix_ns=nanoseconds, unix_sign=sign, is_leap=False) @classmethod def from_iso8601_utc(cls, iso8601utc: str) -> "Timestamp": @@ -250,7 +263,18 @@ def from_iso8601_utc(cls, iso8601utc: str) -> "Timestamp": year, month, day, hour, minute, second, ns = _parse_iso8601(iso8601utc[:-1]) gmtuple = (year, month, day, hour, minute, second - (second == 60)) secs_since_epoch = calendar.timegm(gmtuple) - return cls.from_unix(secs_since_epoch, ns, (second == 60)) + if secs_since_epoch < 0: + sign = -1 + secs_since_epoch = abs(secs_since_epoch) + if ns > 0: + # The ns parsed from the timestamp was for a positive ISO 8601 date-time. In a negative + # unix time it needs to be flipped. + ns = cls.MAX_NANOSEC - ns + secs_since_epoch -= 1 + else: + sign = 1 + + return cls.from_unix(unix_sec=secs_since_epoch, unix_ns=ns, unix_sign=sign, is_leap=(second == 60)) @classmethod def from_smpte_timelabel(cls, timelabel: str) -> "Timestamp": @@ -319,19 +343,16 @@ def from_count(cls, count: int, rate_num: RationalTypes, rate_den: RationalTypes return cls(ns=ns, sign=sign) @classmethod - def from_unix(cls, unix_sec: int, unix_ns: int, is_leap: bool = False) -> "Timestamp": + def from_unix(cls, unix_sec: int, unix_ns: int, unix_sign: int = 1, is_leap: bool = False) -> "Timestamp": leap_sec = 0 - for tbl_sec, tbl_tai_sec_minus_1 in UTC_LEAP: - if unix_sec >= tbl_sec: - leap_sec = (tbl_tai_sec_minus_1 + 1) - tbl_sec - break - return cls(sec=unix_sec+leap_sec+is_leap, ns=unix_ns) - - @classmethod - def from_utc(cls, utc_sec: int, utc_ns: int, is_leap: bool = False) -> "Timestamp": - """ Wrapper of from_unix for back-compatibility. - """ - return cls.from_unix(utc_sec, utc_ns, is_leap) + if unix_sign >= 0: + for tbl_sec, tbl_tai_sec_minus_1 in UTC_LEAP: + if unix_sec + is_leap >= tbl_sec: + leap_sec = (tbl_tai_sec_minus_1 + 1) - tbl_sec + break + else: + is_leap = False + return cls(sec=unix_sec+leap_sec, ns=unix_ns, sign=unix_sign) def is_null(self) -> bool: return self._value == 0 @@ -411,55 +432,61 @@ def to_float(self) -> float: return self._value / Timestamp.MAX_NANOSEC def to_datetime(self) -> datetime: - sec, nsec, leap = self.to_unix() + sec, nsec, sign, leap = self.to_unix() microsecond = int(round(nsec/1000)) if microsecond > 999999: sec += 1 microsecond = 0 - dt = datetime.fromtimestamp(sec, tz.gettz('UTC')) + if sign < 0 and microsecond > 0: + # The microseconds is for a negative unix time. In a positive date-time + # it needs to be flipped. + microsecond = 1000000 - microsecond + sec += 1 + dt = datetime.fromtimestamp(sign * sec, tz.gettz('UTC')) dt = dt.replace(microsecond=microsecond) return dt - def to_unix(self) -> Tuple[int, int, bool]: + def to_unix(self) -> Tuple[int, int, int, bool]: """ Convert to unix seconds. Returns a tuple of (seconds, nanoseconds, is_leap), where `is_leap` is `True` when the input time corresponds exactly to a UTC leap second. Note that this deliberately returns a tuple, to try and avoid confusion. """ - leap_sec = 0 - is_leap = False - for unix_sec, tai_sec_minus_1 in UTC_LEAP: - if self.sec >= tai_sec_minus_1: - leap_sec = (tai_sec_minus_1 + 1) - unix_sec - is_leap = self.sec == tai_sec_minus_1 - break - - return (self.sec - leap_sec, self.ns, is_leap) + if self._value < 0: + return (self.sec, self.ns, self.sign, False) + else: + leap_sec = 0 + is_leap = False + for unix_sec, tai_sec_minus_1 in UTC_LEAP: + if self.sec >= tai_sec_minus_1: + leap_sec = (tai_sec_minus_1 + 1) - unix_sec + is_leap = self.sec == tai_sec_minus_1 + break - def to_utc(self) -> Tuple[int, int, bool]: - """ Wrapper of to_unix for back-compatibility. - """ - return self.to_unix() + return (self.sec - leap_sec, self.ns, self.sign, is_leap) def to_unix_float(self) -> float: """ Convert to unix seconds since the epoch as a floating point number """ - if self._value < 0: - return self.to_float() - else: - (sec, ns, _) = self.to_unix() - return sec + ns / Timestamp.MAX_NANOSEC + (sec, ns, sign, _) = self.to_unix() + return sign * (sec + ns / Timestamp.MAX_NANOSEC) def to_iso8601_utc(self) -> str: """ Get printed representation in ISO8601 format (UTC) YYYY-MM-DDThh:mm:ss.s where `s` is fractional seconds at nanosecond precision (always 9-chars wide) """ - unix_s, unix_ns, is_leap = self.to_unix() - utc_bd = time.gmtime(unix_s) - frac_sec = self._get_fractional_seconds(fixed_size=True) + unix_s, unix_ns, unix_sign, is_leap = self.to_unix() + if unix_sign < 0 and unix_ns > 0: + # The nanoseconds is for a negative unix time. In a positive ISO 8601 date-time + # it needs to be flipped. + unix_ns = Timestamp.MAX_NANOSEC - unix_ns + unix_s += 1 + utc_bd = time.gmtime(unix_sign * unix_s) + frac_sec = Timestamp(ns=unix_ns)._get_fractional_seconds(fixed_size=True) leap_sec = int(is_leap) + return '%04d-%02d-%02dT%02d:%02d:%02d.%sZ' % (utc_bd.tm_year, utc_bd.tm_mon, utc_bd.tm_mday, @@ -481,7 +508,7 @@ def to_smpte_timelabel(self, count_on_or_after_second = Timestamp(tai_seconds, 0).to_count(rate, rounding=self.ROUND_UP) count_within_second = count - count_on_or_after_second - unix_sec, unix_ns, is_leap = normalised_ts.to_unix() + unix_sec, unix_ns, unix_sign, is_leap = normalised_ts.to_unix() leap_sec = int(is_leap) if utc_offset is None: diff --git a/tests/test_timestamp.py b/tests/test_timestamp.py index aa3cb9b..ca6dd5e 100644 --- a/tests/test_timestamp.py +++ b/tests/test_timestamp.py @@ -746,6 +746,12 @@ def test_convert_iso_utc(self): """This tests that conversion to and from ISO date format UTC time works as expected.""" tests = [ + (Timestamp(62135596800, 0, -1), "0001-01-01T00:00:00.000000000Z"), + (Timestamp(1, 0, -1), "1969-12-31T23:59:59.000000000Z"), + (Timestamp(0, 999999999, -1), "1969-12-31T23:59:59.000000001Z"), + (Timestamp(0, 1, -1), "1969-12-31T23:59:59.999999999Z"), + (Timestamp(0, 0, 1), "1970-01-01T00:00:00.000000000Z"), + (Timestamp(1424177663, 102003), "2015-02-17T12:53:48.000102003Z"), # the leap second is 23:59:60 @@ -767,7 +773,8 @@ def test_convert_iso_utc(self): (Timestamp(1341100835, 100000000), "2012-07-01T00:00:00.100000000Z"), (Timestamp(1341100835, 999999999), "2012-07-01T00:00:00.999999999Z"), - (Timestamp(283996818, 0), "1979-01-01T00:00:00.000000000Z") # 1979 + (Timestamp(283996818, 0), "1979-01-01T00:00:00.000000000Z"), # 1979 + (Timestamp(253402300836, 999999999), "9999-12-31T23:59:59.999999999Z") ] for t in tests: @@ -843,9 +850,16 @@ def test_from_datetime(self): """Conversion from python's datetime object.""" tests = [ + (datetime(1, 1, 1, 0, 0, 0, 0, tz.gettz('UTC')), Timestamp(62135596800, 0, -1)), + (datetime(1969, 12, 31, 23, 59, 59, 0, tz.gettz('UTC')), Timestamp(1, 0, -1)), + (datetime(1969, 12, 31, 23, 59, 59, 1, tz.gettz('UTC')), Timestamp(0, 999999000, -1)), + (datetime(1969, 12, 31, 23, 59, 59, 999999, tz.gettz('UTC')), Timestamp(0, 1000, -1)), (datetime(1970, 1, 1, 0, 0, 0, 0, tz.gettz('UTC')), Timestamp(0, 0)), (datetime(1983, 3, 29, 15, 45, 0, 0, tz.gettz('UTC')), Timestamp(417800721, 0)), (datetime(2017, 12, 5, 16, 33, 12, 196, tz.gettz('UTC')), Timestamp(1512491629, 196000)), + (datetime(2514, 1, 1, 0, 0, 0, 0, tz.gettz('UTC')), Timestamp(17166988837, 0, 1)), + # Stopping around here because high datetime values have a floating point error. + # See https://stackoverflow.com/a/75582241. ] for t in tests: @@ -855,10 +869,18 @@ def test_to_datetime(self): """Conversion to python's datetime object.""" tests = [ + (datetime(1, 1, 1, 0, 0, 0, 0, tz.gettz('UTC')), Timestamp(62135596800, 0, -1)), + (datetime(1969, 12, 31, 23, 59, 59, 0, tz.gettz('UTC')), Timestamp(1, 0, -1)), + (datetime(1969, 12, 31, 23, 59, 59, 1, tz.gettz('UTC')), Timestamp(0, 999999000, -1)), + (datetime(1969, 12, 31, 23, 59, 59, 999999, tz.gettz('UTC')), Timestamp(0, 1000, -1)), (datetime(1970, 1, 1, 0, 0, 0, 0, tz.gettz('UTC')), Timestamp(0, 0)), + (datetime(1970, 1, 1, 0, 0, 0, 1, tz.gettz('UTC')), Timestamp(0, 1000)), (datetime(1983, 3, 29, 15, 45, 0, 0, tz.gettz('UTC')), Timestamp(417800721, 0)), (datetime(2017, 12, 5, 16, 33, 12, 196, tz.gettz('UTC')), Timestamp(1512491629, 196000)), (datetime(2017, 12, 5, 16, 33, 13, 0, tz.gettz('UTC')), Timestamp(1512491629, 999999999)), + (datetime(2514, 1, 1, 0, 0, 0, 0, tz.gettz('UTC')), Timestamp(17166988837, 0, 1)), + # Stopping around here because high datetime values have a floating point error. + # See https://stackoverflow.com/a/75582241 ] for t in tests: @@ -896,11 +918,34 @@ def test_get_leap_seconds(self): for t in tests: self.assertEqual(t[0].get_leap_seconds(), t[1]) + def test_from_unix(self): + tests = [ + ((Timestamp.MAX_SECONDS - 1, Timestamp.MAX_NANOSEC - 1, -1, False), # 0 leap seconds + Timestamp(Timestamp.MAX_SECONDS - 1, Timestamp.MAX_NANOSEC - 1, -1)), # 0 leap seconds + ((1000, 0, -1, False), Timestamp(1000, 0, -1)), # 0 leap seconds + ((63071999, 999999999, 1, False), Timestamp(63071999, 999999999)), # 0 leap seconds + ((63071999, 0, 1, True), Timestamp(63072009, 0)), # 10 leap seconds at leap + ((63072000, 0, 1, False), Timestamp(63072010, 0)), # 10 leap seconds + ((63072008, 999999999, 1, False), Timestamp(63072018, 999999999)), # 10 leap seconds + ((1512491592, 0, 1, False), Timestamp(1512491629, 0)), # 37 leap seconds + ((Timestamp.MAX_SECONDS - 1 - 37, Timestamp.MAX_NANOSEC - 1, 1, False), + Timestamp(Timestamp.MAX_SECONDS - 1, Timestamp.MAX_NANOSEC - 1)), # 37 leap seconds + ] + + for t in tests: + self.assertEqual(Timestamp.from_unix(*t[0]), t[1]) + def test_to_unix(self): tests = [ - (Timestamp(63072008, 999999999), (63072008, 999999999, False)), # 0 leap seconds - (Timestamp(63072009, 0), (63071999, 0, True)), # 10 leap seconds at leap - (Timestamp(1512491629, 0), (1512491592, 0, False)), # 37 leap seconds + (Timestamp(Timestamp.MAX_SECONDS - 1, Timestamp.MAX_NANOSEC - 1, -1), # 0 leap seconds + (Timestamp.MAX_SECONDS - 1, Timestamp.MAX_NANOSEC - 1, -1, False)), # 0 leap seconds + (Timestamp(1000, 0, -1), (1000, 0, -1, False)), # 0 leap seconds + (Timestamp(63072008, 999999999), (63072008, 999999999, 1, False)), # 0 leap seconds + (Timestamp(63072009, 0), (63071999, 0, 1, True)), # 10 leap seconds at leap + (Timestamp(63072010, 0), (63072000, 0, 1, False)), # 10 leap seconds + (Timestamp(1512491629, 0), (1512491592, 0, 1, False)), # 37 leap seconds + (Timestamp(Timestamp.MAX_SECONDS - 1, Timestamp.MAX_NANOSEC - 1), + (Timestamp.MAX_SECONDS - 1 - 37, Timestamp.MAX_NANOSEC - 1, 1, False)), # 37 leap seconds ] for t in tests: