Coverage for gws-app/gws/lib/datetimex/__init__.py: 79%
287 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-16 23:09 +0200
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-16 23:09 +0200
1"""Date and time utilities.
3These utilities are wrappers around the `datetime` module,
4some functions also use `pendulum` (https://pendulum.eustace.io/),
5however all functions here return strictly stock ``datetime.datetime`` objects.
7``date`` objects are silently promoted to ``datetime`` with time set to midnight UTC.
8``time`` objects are silently promoted to ``datetime`` in the local timezone with the today's date.
10This module always returns timezone-aware objects.
12When constructing an object (e.g. from a string), the default time zone should be passed
13as a zoneinfo string (like ``Europe/Berlin``). An empty zoneinfo string (default) means the local time zone.
14Alias names like ``CEST`` are not supported.
16Naive datetime arguments are assumed to be in the local time zone.
18When running in a docker container, there are several ways to set up the local time zone:
20- by setting the config variable ``server.timeZone`` (see `gws.config.parser`)
21- by setting the ``TZ`` environment variable
22- mounting a host zone info file to ``/etc/localtime``
23"""
25from typing import Optional
27import datetime as dt
28import contextlib
29import os
30import re
31import zoneinfo
33import pendulum
34import pendulum.helpers
35import pendulum.parsing
36import pendulum.parsing.exceptions
38import gws
39import gws.lib.osx
42class Error(gws.Error):
43 pass
46UTC = zoneinfo.ZoneInfo('UTC')
48_ZI_CACHE = {
49 'utc': UTC,
50 'UTC': UTC,
51 'Etc/UTC': UTC,
52}
54_ZI_ALL = set(zoneinfo.available_timezones())
57# Time zones
60def set_local_time_zone(tz: str):
61 """Set the local time zone for the system.
63 Args:
64 tz: Time zone string (e.g. 'Europe/Berlin')
65 """
66 new_zi = time_zone(tz)
67 cur_zi = _zone_info_from_localtime()
69 gws.log.debug(f'set_local_time_zone: cur={cur_zi} new={new_zi}')
71 if new_zi == cur_zi:
72 return
73 _set_localtime_from_zone_info(new_zi)
75 gws.log.debug(f'set_local_time_zone: cur={_zone_info_from_localtime()}')
78def time_zone(tz: str = '') -> zoneinfo.ZoneInfo:
79 """Get a ZoneInfo object for the specified time zone.
81 Args:
82 tz: Time zone string (e.g. 'Europe/Berlin'). Empty string returns local time zone.
83 """
85 if tz in _ZI_CACHE:
86 return _ZI_CACHE[tz]
88 if not tz:
89 _ZI_CACHE[''] = _zone_info_from_localtime()
90 return _ZI_CACHE['']
92 return _zone_info_from_string(tz)
95def _set_localtime_from_zone_info(zi):
96 if os.getuid() != 0:
97 raise Error('cannot set timezone, must be root')
98 gws.lib.osx.run(['ln', '-fs', f'/usr/share/zoneinfo/{zi}', '/etc/localtime'])
101def _zone_info_from_localtime():
102 a = '/etc/localtime'
104 try:
105 p = os.readlink(a)
106 except FileNotFoundError:
107 gws.log.warning(f'time zone: {a!r} not found, assuming UTC')
108 return UTC
110 m = re.search(r'zoneinfo/(.+)$', p)
111 if not m:
112 gws.log.warning(f'time zone: {a!r}={p!r} invalid, assuming UTC')
113 return UTC
115 try:
116 return zoneinfo.ZoneInfo(m.group(1))
117 except zoneinfo.ZoneInfoNotFoundError:
118 gws.log.warning(f'time zone: {a!r}={p!r} not found, assuming UTC')
119 return UTC
122def _zone_info_from_string(tz):
123 if tz not in _ZI_ALL:
124 raise Error(f'invalid time zone {tz!r}')
125 try:
126 return zoneinfo.ZoneInfo(tz)
127 except zoneinfo.ZoneInfoNotFoundError as exc:
128 raise Error(f'invalid time zone {tz!r}') from exc
131def _zone_info_from_tzinfo(tzinfo: dt.tzinfo):
132 if type(tzinfo) is zoneinfo.ZoneInfo:
133 return tzinfo
134 s = str(tzinfo)
135 if s == '+0:0':
136 return UTC
137 try:
138 return _zone_info_from_string(s)
139 except Error:
140 pass
143# init from the env variable right now
145if 'TZ' in os.environ:
146 _set_localtime_from_zone_info(_zone_info_from_string(os.environ['TZ']))
149# Constructors
152def new(year, month, day, hour=0, minute=0, second=0, microsecond=0, fold=0, tz: str = '') -> dt.datetime:
153 """Create a new datetime object with the specified components.
155 Args:
156 year: Year component
157 month: Month component
158 day: Day component
159 hour: Hour component (default 0)
160 minute: Minute component (default 0)
161 second: Second component (default 0)
162 microsecond: Microsecond component (default 0)
163 fold: Fold component for ambiguous times (default 0)
164 tz: Time zone string (default empty for local time zone)
165 """
167 return dt.datetime(year, month, day, hour, minute, second, microsecond, fold=fold, tzinfo=time_zone(tz))
170def now(tz: str = '') -> dt.datetime:
171 """Get the current date and time.
173 Args:
174 tz: Time zone string (default empty for local time zone)
175 """
177 return _now(time_zone(tz))
180def now_utc() -> dt.datetime:
181 """Get the current date and time in UTC."""
183 return _now(UTC)
186# for testing
188_MOCK_NOW = None
191@contextlib.contextmanager
192def mock_now(d):
193 global _MOCK_NOW
194 _MOCK_NOW = d
195 yield
196 _MOCK_NOW = None
199def _now(tzinfo):
200 return _MOCK_NOW or dt.datetime.now(tz=tzinfo)
203def today(tz: str = '') -> dt.datetime:
204 """Get today's date at midnight.
206 Args:
207 tz: Time zone string (default empty for local time zone)
208 """
210 return now(tz).replace(hour=0, minute=0, second=0, microsecond=0)
213def today_utc() -> dt.datetime:
214 """Get today's date at midnight in UTC."""
216 return now_utc().replace(hour=0, minute=0, second=0, microsecond=0)
219def parse(s: str | dt.datetime | dt.date | None, tz: str = '') -> Optional[dt.datetime]:
220 """Parse a string, datetime, or date into a datetime object.
222 Args:
223 s: Input to parse (string, datetime, date, or None)
224 tz: Default time zone string for timezone-naive inputs
225 """
227 if not s:
228 return None
230 if isinstance(s, dt.datetime):
231 return _ensure_tzinfo(s, tz)
233 if isinstance(s, dt.date):
234 return new(s.year, s.month, s.day, tz=tz)
236 try:
237 return from_string(str(s), tz)
238 except Error:
239 pass
242def parse_time(s: str | dt.time | None, tz: str = '') -> Optional[dt.datetime]:
243 """Parse a string or time into a datetime object.
245 Args:
246 s: Input to parse (string, time, or None)
247 tz: Default time zone string for timezone-naive inputs
248 """
250 if not s:
251 return
253 if isinstance(s, dt.time):
254 return _datetime(_ensure_tzinfo(s, tz))
256 try:
257 return from_iso_time_string(str(s), tz)
258 except Error:
259 pass
262def from_string(s: str, tz: str = '') -> dt.datetime:
263 """Parse a datetime string using flexible parsing.
265 Args:
266 s: Date/time string to parse
267 tz: Default time zone string for timezone-naive inputs
268 """
270 return _pend_parse_datetime(s.strip(), tz, iso_only=False)
273def from_iso_string(s: str, tz: str = '') -> dt.datetime:
274 """Parse an ISO 8601 datetime string.
276 Args:
277 s: ISO 8601 date/time string to parse
278 tz: Default time zone string for timezone-naive inputs
279 """
281 return _pend_parse_datetime(s.strip(), tz, iso_only=True)
284def from_iso_time_string(s: str, tz: str = '') -> dt.datetime:
285 """Parse an ISO 8601 time string.
287 Args:
288 s: ISO 8601 time string to parse
289 tz: Default time zone string for timezone-naive inputs
290 """
292 return _pend_parse_time(s.strip(), tz, iso_only=True)
295def from_timestamp(n: float, tz: str = '') -> dt.datetime:
296 """Create a datetime from a Unix timestamp.
298 Args:
299 n: Unix timestamp (seconds since epoch)
300 tz: Time zone string (default empty for local time zone)
301 """
303 return dt.datetime.fromtimestamp(n, tz=time_zone(tz))
306# Formatters
309def to_iso_string(d: Optional[dt.date] = None, with_tz='+', sep='T') -> str:
310 """Convert a date or time to an ISO string.
312 Args:
313 d: Date or time to convert. If None, the current date and time is used.
314 with_tz: If set, append the time zone information. Can be "Z" (for UTC), ":" (for ISO 8601 format) or "+" (for +hhmm format).
315 sep: Separator between date and time. Default is "T".
316 """
318 d = _datetime(d)
319 s = d.strftime(f'%Y-%m-%d{sep}%H:%M:%S')
320 if not with_tz:
321 return s
322 tz = d.strftime('%z')
323 if with_tz == 'Z' and tz == '+0000':
324 return s + 'Z'
325 if with_tz == ':' and len(tz) == 5:
326 return s + tz[:3] + ':' + tz[3:]
327 return s + tz
330def to_iso_date_string(d: Optional[dt.date] = None) -> str:
331 """Convert a date to an ISO date string (YYYY-MM-DD format).
333 Args:
334 d: Date to convert (default current date/time)
335 """
337 return _datetime(d).strftime('%Y-%m-%d')
340def to_basic_string(d: Optional[dt.date] = None) -> str:
341 """Convert a date to a basic string format (YYYYMMDDHHMMSS).
343 Args:
344 d: Date to convert (default current date/time)
345 """
347 return _datetime(d).strftime('%Y%m%d%H%M%S')
350def to_iso_time_string(d: Optional[dt.date] = None, with_tz='+') -> str:
351 """Convert a date to an ISO time string.
353 Args:
354 d: Date to convert (default current date/time)
355 with_tz: Include timezone information ('+' for +hhmm, 'Z' for UTC as Z, False for no timezone)
356 """
358 fmt = '%H:%M:%S'
359 if with_tz:
360 fmt += '%z'
361 s = _datetime(d).strftime(fmt)
362 if with_tz == 'Z' and s.endswith('+0000'):
363 s = s[:-5] + 'Z'
364 return s
367def to_string(fmt: str, d: Optional[dt.date] = None) -> str:
368 """Convert a date to a string using a custom format.
370 Args:
371 fmt: strftime format string
372 d: Date to convert (default current date/time)
373 """
375 return _datetime(d).strftime(fmt)
378def time_to_iso_string(d: Optional[dt.date | dt.time] = None, with_tz='+') -> str:
379 """Convert a date or time to an ISO time string.
381 Args:
382 d: Date or time to convert (default returns 00:00:00)
383 with_tz: Include timezone information (unused in current implementation)
384 """
386 if isinstance(d, (dt.datetime, dt.time)):
387 return f'{d.hour:02d}:{d.minute:02d}:{d.second:02d}'
388 return f'00:00:00'
391# Converters
394def to_timestamp(d: Optional[dt.date] = None) -> int:
395 """Convert a date to a Unix timestamp.
397 Args:
398 d: Date to convert (default current date/time)
399 """
401 return int(_datetime(d).timestamp())
404def to_millis(d: Optional[dt.date] = None) -> int:
405 """Convert a date to milliseconds since Unix epoch.
407 Args:
408 d: Date to convert (default current date/time)
409 """
411 return int(_datetime(d).timestamp() * 1000)
414def to_utc(d: Optional[dt.date] = None) -> dt.datetime:
415 """Convert a date to UTC timezone.
417 Args:
418 d: Date to convert (default current date/time)
419 """
421 return _datetime(d).astimezone(time_zone('UTC'))
424def to_local(d: Optional[dt.date] = None) -> dt.datetime:
425 """Convert a date to local timezone.
427 Args:
428 d: Date to convert (default current date/time)
429 """
431 return _datetime(d).astimezone(time_zone(''))
434def to_time_zone(tz: str, d: Optional[dt.date] = None) -> dt.datetime:
435 """Convert a date to a specific timezone.
437 Args:
438 tz: Target timezone string
439 d: Date to convert (default current date/time)
440 """
442 return _datetime(d).astimezone(time_zone(tz))
445# Predicates
448def is_date(x) -> bool:
449 """Check if an object is a date.
451 Args:
452 x: Object to check
453 """
455 return isinstance(x, dt.date)
458def is_datetime(x) -> bool:
459 """Check if an object is a datetime.
461 Args:
462 x: Object to check
463 """
465 return isinstance(x, dt.datetime)
468def is_utc(d: dt.datetime) -> bool:
469 """Check if a datetime is in UTC timezone.
471 Args:
472 d: Datetime to check
473 """
475 return _zone_info_from_tzinfo(gws.u.require(_datetime(d).tzinfo)) == UTC
478def is_local(d: dt.datetime) -> bool:
479 """Check if a datetime is in the local timezone.
481 Args:
482 d: Datetime to check
483 """
485 return _zone_info_from_tzinfo(gws.u.require(_datetime(d).tzinfo)) == time_zone('')
488# Arithmetic
491def add(d: Optional[dt.date] = None, years=0, months=0, days=0, weeks=0, hours=0, minutes=0, seconds=0, microseconds=0) -> dt.datetime:
492 """Add time components to a datetime.
494 Args:
495 d: Base datetime (default current date/time)
496 years: Years to add
497 months: Months to add
498 days: Days to add
499 weeks: Weeks to add
500 hours: Hours to add
501 minutes: Minutes to add
502 seconds: Seconds to add
503 microseconds: Microseconds to add
504 """
506 return pendulum.helpers.add_duration(
507 _datetime(d),
508 years=years,
509 months=months,
510 days=days,
511 weeks=weeks,
512 hours=hours,
513 minutes=minutes,
514 seconds=seconds,
515 microseconds=microseconds,
516 )
519class Diff:
520 """Difference between two dates."""
522 years: int
523 months: int
524 weeks: int
525 days: int
526 hours: int
527 minutes: int
528 seconds: int
529 microseconds: int
531 def __repr__(self):
532 return repr(vars(self))
535def difference(d1: dt.date, d2: Optional[dt.date] = None) -> Diff:
536 """Compute the difference between two dates.
538 Args:
539 d1: The first date.
540 d2: The second date. If None, the current date and time is used.
541 """
543 iv = pendulum.Interval(_datetime(d1), _datetime(d2), absolute=False)
544 df = Diff()
546 df.years = iv.years
547 df.months = iv.months
548 df.weeks = iv.weeks
549 df.days = iv.remaining_days
550 df.hours = iv.hours
551 df.minutes = iv.minutes
552 df.seconds = iv.remaining_seconds
553 df.microseconds = iv.microseconds
555 return df
558def total_difference(d1: dt.date, d2: Optional[dt.date] = None) -> Diff:
559 """Compute the total difference between two dates in each unit.
561 Args:
562 d1: First date
563 d2: Second date (default current date/time)
564 """
566 iv = pendulum.Interval(_datetime(d1), _datetime(d2), absolute=False)
567 df = Diff()
569 df.years = iv.in_years()
570 df.months = iv.in_months()
571 df.weeks = iv.in_weeks()
572 df.days = iv.in_days()
573 df.hours = iv.in_hours()
574 df.minutes = iv.in_minutes()
575 df.seconds = iv.in_seconds()
576 df.microseconds = df.seconds * 1_000_000
578 return df
581# Wrappers for useful pendulum utilities
583# fmt:off
585def start_of_second(d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).start_of('second'))
586def start_of_minute(d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).start_of('minute'))
587def start_of_hour (d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).start_of('hour'))
588def start_of_day (d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).start_of('day'))
589def start_of_week (d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).start_of('week'))
590def start_of_month (d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).start_of('month'))
591def start_of_year (d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).start_of('year'))
594def end_of_second(d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).end_of('second'))
595def end_of_minute(d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).end_of('minute'))
596def end_of_hour (d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).end_of('hour'))
597def end_of_day (d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).end_of('day'))
598def end_of_week (d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).end_of('week'))
599def end_of_month (d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).end_of('month'))
600def end_of_year (d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).end_of('year'))
603def day_of_week (d: Optional[dt.date] = None) -> int: return _pend(d).day_of_week
604def day_of_year (d: Optional[dt.date] = None) -> int: return _pend(d).day_of_year
605def week_of_month (d: Optional[dt.date] = None) -> int: return _pend(d).week_of_month
606def week_of_year (d: Optional[dt.date] = None) -> int: return _pend(d).week_of_year
607def days_in_month (d: Optional[dt.date] = None) -> int: return _pend(d).days_in_month
610# fmt:on
612_WD = {
613 0: pendulum.WeekDay.MONDAY,
614 1: pendulum.WeekDay.TUESDAY,
615 2: pendulum.WeekDay.WEDNESDAY,
616 3: pendulum.WeekDay.THURSDAY,
617 4: pendulum.WeekDay.FRIDAY,
618 5: pendulum.WeekDay.SATURDAY,
619 6: pendulum.WeekDay.SUNDAY,
620 'monday': pendulum.WeekDay.MONDAY,
621 'tuesday': pendulum.WeekDay.TUESDAY,
622 'wednesday': pendulum.WeekDay.WEDNESDAY,
623 'thursday': pendulum.WeekDay.THURSDAY,
624 'friday': pendulum.WeekDay.FRIDAY,
625 'saturday': pendulum.WeekDay.SATURDAY,
626 'sunday': pendulum.WeekDay.SUNDAY,
627}
630def next(day: int | str, d: Optional[dt.date] = None, keep_time=False) -> dt.datetime:
631 """Get the next occurrence of a specific weekday.
633 Args:
634 day: Day of week (0-6 for Monday-Sunday or weekday name string)
635 d: Starting date (default current date/time)
636 keep_time: Whether to keep the time component
637 """
639 return _unpend(_pend(d).next(_WD[day], keep_time))
642def prev(day: int | str, d: Optional[dt.date] = None, keep_time=False) -> dt.datetime:
643 """Get the previous occurrence of a specific weekday.
645 Args:
646 day: Day of week (0-6 for Monday-Sunday or weekday name string)
647 d: Starting date (default current date/time)
648 keep_time: Whether to keep the time component
649 """
651 return _unpend(_pend(d).previous(_WD[day], keep_time))
654# Duration
656_DURATION_UNITS = {
657 'w': 3600 * 24 * 7,
658 'd': 3600 * 24,
659 'h': 3600,
660 'm': 60,
661 's': 1,
662}
665def parse_duration(s: str) -> int:
666 """Convert duration string to seconds.
668 Args:
669 s: Duration string (e.g. '1w2d3h4m5s') or integer seconds
670 """
672 if isinstance(s, int):
673 return s
675 p = None
676 r = 0
678 for n, v in re.findall(r'(\d+)|(\D+)', str(s).strip()):
679 if n:
680 p = int(n)
681 continue
682 v = v.strip()
683 if p is None or v not in _DURATION_UNITS:
684 raise Error('invalid duration', s)
685 r += p * _DURATION_UNITS[v]
686 p = None
688 if p:
689 r += p
691 return r
694##
696# conversions
699def _datetime(d: dt.date | dt.time | None) -> dt.datetime:
700 # ensure a valid datetime object
702 if d is None:
703 return now()
705 if isinstance(d, dt.datetime):
706 # if a value is a naive datetime, assume the local tz
707 # see https://www.postgresql.org/docs/current/datatype-datetime.html#DATATYPE-DATETIME-INPUT-TIME-STAMPS:
708 # > Conversions between timestamp without time zone and timestamp with time zone normally assume
709 # > that the timestamp without time zone value should be taken or given as timezone local time.
710 return _ensure_tzinfo(d, tz='')
712 if isinstance(d, dt.date):
713 # promote date to midnight UTC
714 return dt.datetime(d.year, d.month, d.day, tzinfo=UTC)
716 if isinstance(d, dt.time):
717 # promote time to today's time
718 n = _now(d.tzinfo)
719 return dt.datetime(n.year, n.month, n.day, d.hour, d.minute, d.second, d.microsecond, d.tzinfo, fold=d.fold)
721 raise Error(f'invalid datetime value {d!r}')
724def _ensure_tzinfo(d, tz: str):
725 # attach tzinfo if not set
727 if not d.tzinfo:
728 return d.replace(tzinfo=time_zone(tz))
730 # try to convert 'their' tzinfo (might be an unnamed dt.timezone or pendulum.FixedTimezone) to zoneinfo
731 zi = _zone_info_from_tzinfo(d.tzinfo)
732 if zi:
733 return d.replace(tzinfo=zi)
735 # failing that, keep existing tzinfo
736 return d
739# pendulum.DateTime <-> python datetime
742def _pend(d: dt.date | None) -> pendulum.DateTime:
743 return pendulum.instance(_datetime(d))
746def _unpend(p: pendulum.DateTime) -> dt.datetime:
747 return dt.datetime(
748 p.year,
749 p.month,
750 p.day,
751 p.hour,
752 p.minute,
753 p.second,
754 p.microsecond,
755 tzinfo=p.tzinfo,
756 fold=p.fold,
757 )
760# NB using private APIs
763def _pend_parse_datetime(s, tz, iso_only):
764 try:
765 if iso_only:
766 d = pendulum.parsing.parse_iso8601(s)
767 else:
768 # do not normalize
769 d = pendulum.parsing._parse(s)
770 except (ValueError, pendulum.parsing.exceptions.ParserError) as exc:
771 raise Error(f'invalid date {s!r}') from exc
773 if isinstance(d, dt.datetime):
774 return _ensure_tzinfo(d, tz)
775 if isinstance(d, dt.date):
776 return new(d.year, d.month, d.day, tz=tz)
778 # times and durations not accepted
779 raise Error(f'invalid date {s!r}')
782def _pend_parse_time(s, tz, iso_only):
783 try:
784 if iso_only:
785 d = pendulum.parsing.parse_iso8601(s)
786 else:
787 # do not normalize
788 d = pendulum.parsing._parse(s)
789 except (ValueError, pendulum.parsing.exceptions.ParserError) as exc:
790 raise Error(f'invalid time {s!r}') from exc
792 if isinstance(d, dt.time):
793 return _datetime(_ensure_tzinfo(d, tz))
795 # dates and durations not accepted
796 raise Error(f'invalid time {s!r}')