Coverage for gws-app / gws / lib / datetimex / __init__.py: 76%
299 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 10:12 +0100
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 10:12 +0100
1"""Date and time utilities.
3These utilities are wrappers around the `datetime` module,
4some functions also use `pendulum` (https://pendulum.eustace.io/),
5however all functions here return strictly stock ``datetime.datetime`` objects.
7``date`` objects are silently promoted to ``datetime`` with time set to midnight UTC.
8``time`` objects are silently promoted to ``datetime`` in the local timezone with the today's date.
10This module always returns timezone-aware objects.
12When constructing an object (e.g. from a string), the default time zone should be passed
13as a zoneinfo string (like ``Europe/Berlin``). An empty zoneinfo string (default) means the local time zone.
14Alias names like ``CEST`` are not supported.
16Naive datetime arguments are assumed to be in the local time zone.
18When running in a docker container, there are several ways to set up the local time zone:
20- by setting the config variable ``server.timeZone`` (see `gws.config.parser`)
21- by setting the ``TZ`` environment variable
22- mounting a host zone info file to ``/etc/localtime``
23"""
25from typing import Optional
27import datetime as dt
28import contextlib
29import os
30import re
31import zoneinfo
33import pendulum
34import pendulum.helpers
35import pendulum.parsing
36import pendulum.parsing.exceptions
38import gws
39import gws.lib.osx
42class Error(gws.Error):
43 pass
46UTC = zoneinfo.ZoneInfo('UTC')
48_ZI_CACHE = {
49 'utc': UTC,
50 'UTC': UTC,
51 'Etc/UTC': UTC,
52}
54_ZI_ALL = set(zoneinfo.available_timezones())
57# Time zones
60def set_local_time_zone(tz: str):
61 """Set the local time zone for the system.
63 Args:
64 tz: Time zone string (e.g. 'Europe/Berlin')
65 """
66 new_zi = time_zone(tz)
67 cur_zi = _zone_info_from_localtime()
69 gws.log.debug(f'set_local_time_zone: cur={cur_zi} new={new_zi}')
71 if new_zi == cur_zi:
72 return
73 _set_localtime_from_zone_info(new_zi)
75 gws.log.debug(f'set_local_time_zone: cur={_zone_info_from_localtime()}')
78def time_zone(tz: str = '') -> zoneinfo.ZoneInfo:
79 """Get a ZoneInfo object for the specified time zone.
81 Args:
82 tz: Time zone string (e.g. 'Europe/Berlin'). Empty string returns local time zone.
83 """
85 if tz in _ZI_CACHE:
86 return _ZI_CACHE[tz]
88 if not tz:
89 _ZI_CACHE[''] = _zone_info_from_localtime()
90 return _ZI_CACHE['']
92 return _zone_info_from_string(tz)
95def _set_localtime_from_zone_info(zi):
96 if os.getuid() != 0:
97 raise Error('cannot set timezone, must be root')
98 gws.lib.osx.run(['ln', '-fs', f'/usr/share/zoneinfo/{zi}', '/etc/localtime'])
101def _zone_info_from_localtime():
102 a = '/etc/localtime'
104 try:
105 p = os.readlink(a)
106 except FileNotFoundError:
107 gws.log.warning(f'time zone: {a!r} not found, assuming UTC')
108 return UTC
110 m = re.search(r'zoneinfo/(.+)$', p)
111 if not m:
112 gws.log.warning(f'time zone: {a!r}={p!r} invalid, assuming UTC')
113 return UTC
115 try:
116 return zoneinfo.ZoneInfo(m.group(1))
117 except zoneinfo.ZoneInfoNotFoundError:
118 gws.log.warning(f'time zone: {a!r}={p!r} not found, assuming UTC')
119 return UTC
122def _zone_info_from_string(tz):
123 if tz not in _ZI_ALL:
124 raise Error(f'invalid time zone {tz!r}')
125 try:
126 return zoneinfo.ZoneInfo(tz)
127 except zoneinfo.ZoneInfoNotFoundError as exc:
128 raise Error(f'invalid time zone {tz!r}') from exc
131def _zone_info_from_tzinfo(tzinfo: dt.tzinfo):
132 if type(tzinfo) is zoneinfo.ZoneInfo:
133 return tzinfo
134 s = str(tzinfo)
135 if s == '+0:0':
136 return UTC
137 try:
138 return _zone_info_from_string(s)
139 except Error:
140 pass
143# init from the env variable right now
145if 'TZ' in os.environ:
146 _set_localtime_from_zone_info(_zone_info_from_string(os.environ['TZ']))
149# Constructors
152def new(year, month, day, hour=0, minute=0, second=0, microsecond=0, fold=0, tz: str = '') -> dt.datetime:
153 """Create a new datetime object with the specified components.
155 Args:
156 year: Year component
157 month: Month component
158 day: Day component
159 hour: Hour component (default 0)
160 minute: Minute component (default 0)
161 second: Second component (default 0)
162 microsecond: Microsecond component (default 0)
163 fold: Fold component for ambiguous times (default 0)
164 tz: Time zone string (default empty for local time zone)
165 """
167 return dt.datetime(year, month, day, hour, minute, second, microsecond, fold=fold, tzinfo=time_zone(tz))
170def now(tz: str = '') -> dt.datetime:
171 """Get the current date and time.
173 Args:
174 tz: Time zone string (default empty for local time zone)
175 """
177 return _now(time_zone(tz))
180def now_utc() -> dt.datetime:
181 """Get the current date and time in UTC."""
183 return _now(UTC)
186# for testing
188_MOCK_NOW = None
191@contextlib.contextmanager
192def mock_now(d):
193 global _MOCK_NOW
194 _MOCK_NOW = d
195 yield
196 _MOCK_NOW = None
199def _now(tzinfo):
200 return _MOCK_NOW or dt.datetime.now(tz=tzinfo)
203def today(tz: str = '') -> dt.datetime:
204 """Get today's date at midnight.
206 Args:
207 tz: Time zone string (default empty for local time zone)
208 """
210 return now(tz).replace(hour=0, minute=0, second=0, microsecond=0)
213def today_utc() -> dt.datetime:
214 """Get today's date at midnight in UTC."""
216 return now_utc().replace(hour=0, minute=0, second=0, microsecond=0)
219def parse(s: str | dt.datetime | dt.date | None, tz: str = '') -> Optional[dt.datetime]:
220 """Parse a string, datetime, or date into a datetime object.
222 Args:
223 s: Input to parse (string, datetime, date, or None)
224 tz: Default time zone string for timezone-naive inputs
225 """
227 if not s:
228 return None
230 if isinstance(s, dt.datetime):
231 return _ensure_tzinfo(s, tz)
233 if isinstance(s, dt.date):
234 return new(s.year, s.month, s.day, tz=tz)
236 try:
237 return from_string(str(s), tz)
238 except Error:
239 pass
242def parse_time(s: str | dt.time | None, tz: str = '') -> Optional[dt.datetime]:
243 """Parse a string or time into a datetime object.
245 Args:
246 s: Input to parse (string, time, or None)
247 tz: Default time zone string for timezone-naive inputs
248 """
250 if not s:
251 return
253 if isinstance(s, dt.time):
254 return _datetime(_ensure_tzinfo(s, tz))
256 try:
257 return from_iso_time_string(str(s), tz)
258 except Error:
259 pass
262def from_string(s: str, tz: str = '') -> dt.datetime:
263 """Parse a datetime string using flexible parsing.
265 Args:
266 s: Date/time string to parse
267 tz: Default time zone string for timezone-naive inputs
268 """
270 return _pend_parse_datetime(s.strip(), tz, iso_only=False)
273def from_iso_string(s: str, tz: str = '') -> dt.datetime:
274 """Parse an ISO 8601 datetime string.
276 Args:
277 s: ISO 8601 date/time string to parse
278 tz: Default time zone string for timezone-naive inputs
279 """
281 return _pend_parse_datetime(s.strip(), tz, iso_only=True)
284def from_iso_time_string(s: str, tz: str = '') -> dt.datetime:
285 """Parse an ISO 8601 time string.
287 Args:
288 s: ISO 8601 time string to parse
289 tz: Default time zone string for timezone-naive inputs
290 """
292 return _pend_parse_time(s.strip(), tz, iso_only=True)
295def from_timestamp(n: float, tz: str = '') -> dt.datetime:
296 """Create a datetime from a Unix timestamp.
298 Args:
299 n: Unix timestamp (seconds since epoch)
300 tz: Time zone string (default empty for local time zone)
301 """
303 return dt.datetime.fromtimestamp(n, tz=time_zone(tz))
306# Formatters
309def to_iso_string(d: Optional[dt.date] = None, with_tz='+', sep='T') -> str:
310 """Convert a date or time to an ISO string.
312 Args:
313 d: Date or time to convert. If None, the current date and time is used.
314 with_tz: If set, append the time zone information. Can be "Z" (for UTC), ":" (for ISO 8601 format) or "+" (for +hhmm format).
315 sep: Separator between date and time. Default is "T".
316 """
318 d = _datetime(d)
319 s = d.strftime(f'%Y-%m-%d{sep}%H:%M:%S')
320 if not with_tz:
321 return s
322 tz = d.strftime('%z')
323 if with_tz == 'Z' and tz == '+0000':
324 return s + 'Z'
325 if with_tz == ':' and len(tz) == 5:
326 return s + tz[:3] + ':' + tz[3:]
327 return s + tz
330def to_iso_date_string(d: Optional[dt.date] = None) -> str:
331 """Convert a date to an ISO date string (YYYY-MM-DD format).
333 Args:
334 d: Date to convert (default current date/time)
335 """
337 return _datetime(d).strftime('%Y-%m-%d')
340def to_basic_string(d: Optional[dt.date] = None, with_ms=False) -> str:
341 """Convert a date to a basic string format (YYYYMMDDHHMMSS).
343 Args:
344 d: Date to convert (default current date/time)
345 """
347 d = _datetime(d)
348 s = d.strftime('%Y%m%d%H%M%S')
349 if with_ms:
350 s += f'{d.microsecond // 1000:03d}'
351 return s
354def to_iso_time_string(d: Optional[dt.date] = None, with_tz='+') -> str:
355 """Convert a date to an ISO time string.
357 Args:
358 d: Date to convert (default current date/time)
359 with_tz: Include timezone information ('+' for +hhmm, 'Z' for UTC as Z, False for no timezone)
360 """
362 fmt = '%H:%M:%S'
363 if with_tz:
364 fmt += '%z'
365 s = _datetime(d).strftime(fmt)
366 if with_tz == 'Z' and s.endswith('+0000'):
367 s = s[:-5] + 'Z'
368 return s
371def to_string(fmt: str, d: Optional[dt.date] = None) -> str:
372 """Convert a date to a string using a custom format.
374 Args:
375 fmt: strftime format string
376 d: Date to convert (default current date/time)
377 """
379 return _datetime(d).strftime(fmt)
382def time_to_iso_string(d: Optional[dt.date | dt.time] = None, with_tz='+') -> str:
383 """Convert a date or time to an ISO time string.
385 Args:
386 d: Date or time to convert (default returns 00:00:00)
387 with_tz: Include timezone information (unused in current implementation)
388 """
390 if isinstance(d, (dt.datetime, dt.time)):
391 return f'{d.hour:02d}:{d.minute:02d}:{d.second:02d}'
392 return f'00:00:00'
395# Converters
398def to_timestamp(d: Optional[dt.date] = None) -> int:
399 """Convert a date to a Unix timestamp.
401 Args:
402 d: Date to convert (default current date/time)
403 """
405 return int(_datetime(d).timestamp())
408def to_millis(d: Optional[dt.date] = None) -> int:
409 """Convert a date to milliseconds since Unix epoch.
411 Args:
412 d: Date to convert (default current date/time)
413 """
415 return int(_datetime(d).timestamp() * 1000)
418def to_utc(d: Optional[dt.date] = None) -> dt.datetime:
419 """Convert a date to UTC timezone.
421 Args:
422 d: Date to convert (default current date/time)
423 """
425 return _datetime(d).astimezone(time_zone('UTC'))
428def to_local(d: Optional[dt.date] = None) -> dt.datetime:
429 """Convert a date to local timezone.
431 Args:
432 d: Date to convert (default current date/time)
433 """
435 return _datetime(d).astimezone(time_zone(''))
438def to_time_zone(tz: str, d: Optional[dt.date] = None) -> dt.datetime:
439 """Convert a date to a specific timezone.
441 Args:
442 tz: Target timezone string
443 d: Date to convert (default current date/time)
444 """
446 return _datetime(d).astimezone(time_zone(tz))
449# Predicates
452def is_date(x) -> bool:
453 """Check if an object is a date.
455 Args:
456 x: Object to check
457 """
459 return isinstance(x, dt.date)
462def is_datetime(x) -> bool:
463 """Check if an object is a datetime.
465 Args:
466 x: Object to check
467 """
469 return isinstance(x, dt.datetime)
472def is_utc(d: dt.datetime) -> bool:
473 """Check if a datetime is in UTC timezone.
475 Args:
476 d: Datetime to check
477 """
479 return _zone_info_from_tzinfo(gws.u.require(_datetime(d).tzinfo)) == UTC
482def is_local(d: dt.datetime) -> bool:
483 """Check if a datetime is in the local timezone.
485 Args:
486 d: Datetime to check
487 """
489 return _zone_info_from_tzinfo(gws.u.require(_datetime(d).tzinfo)) == time_zone('')
492# Arithmetic
495def add(d: Optional[dt.date] = None, years=0, months=0, days=0, weeks=0, hours=0, minutes=0, seconds=0, microseconds=0) -> dt.datetime:
496 """Add time components to a datetime.
498 Args:
499 d: Base datetime (default current date/time)
500 years: Years to add
501 months: Months to add
502 days: Days to add
503 weeks: Weeks to add
504 hours: Hours to add
505 minutes: Minutes to add
506 seconds: Seconds to add
507 microseconds: Microseconds to add
508 """
510 return pendulum.helpers.add_duration(
511 _datetime(d),
512 years=years,
513 months=months,
514 days=days,
515 weeks=weeks,
516 hours=hours,
517 minutes=minutes,
518 seconds=seconds,
519 microseconds=microseconds,
520 )
523class Diff:
524 """Difference between two dates."""
526 years: int
527 months: int
528 weeks: int
529 days: int
530 hours: int
531 minutes: int
532 seconds: int
533 microseconds: int
535 def __repr__(self):
536 return repr(vars(self))
539def difference(d1: dt.date, d2: Optional[dt.date] = None) -> Diff:
540 """Compute the difference between two dates.
542 Args:
543 d1: The first date.
544 d2: The second date. If None, the current date and time is used.
545 """
547 iv = pendulum.Interval(_datetime(d1), _datetime(d2), absolute=False)
548 df = Diff()
550 df.years = iv.years
551 df.months = iv.months
552 df.weeks = iv.weeks
553 df.days = iv.remaining_days
554 df.hours = iv.hours
555 df.minutes = iv.minutes
556 df.seconds = iv.remaining_seconds
557 df.microseconds = iv.microseconds
559 return df
562def total_difference(d1: dt.date, d2: Optional[dt.date] = None) -> Diff:
563 """Compute the total difference between two dates in each unit.
565 Args:
566 d1: First date
567 d2: Second date (default current date/time)
568 """
570 iv = pendulum.Interval(_datetime(d1), _datetime(d2), absolute=False)
571 df = Diff()
573 df.years = iv.in_years()
574 df.months = iv.in_months()
575 df.weeks = iv.in_weeks()
576 df.days = iv.in_days()
577 df.hours = iv.in_hours()
578 df.minutes = iv.in_minutes()
579 df.seconds = iv.in_seconds()
580 df.microseconds = df.seconds * 1_000_000
582 return df
585# Wrappers for useful pendulum utilities
587# fmt:off
589def start_of_second(d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).start_of('second'))
590def start_of_minute(d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).start_of('minute'))
591def start_of_hour (d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).start_of('hour'))
592def start_of_day (d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).start_of('day'))
593def start_of_week (d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).start_of('week'))
594def start_of_month (d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).start_of('month'))
595def start_of_year (d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).start_of('year'))
598def end_of_second(d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).end_of('second'))
599def end_of_minute(d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).end_of('minute'))
600def end_of_hour (d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).end_of('hour'))
601def end_of_day (d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).end_of('day'))
602def end_of_week (d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).end_of('week'))
603def end_of_month (d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).end_of('month'))
604def end_of_year (d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).end_of('year'))
607def day_of_week (d: Optional[dt.date] = None) -> int: return _pend(d).day_of_week
608def day_of_year (d: Optional[dt.date] = None) -> int: return _pend(d).day_of_year
609def week_of_month (d: Optional[dt.date] = None) -> int: return _pend(d).week_of_month
610def week_of_year (d: Optional[dt.date] = None) -> int: return _pend(d).week_of_year
611def days_in_month (d: Optional[dt.date] = None) -> int: return _pend(d).days_in_month
614# fmt:on
616_WD = {
617 0: pendulum.WeekDay.MONDAY,
618 1: pendulum.WeekDay.TUESDAY,
619 2: pendulum.WeekDay.WEDNESDAY,
620 3: pendulum.WeekDay.THURSDAY,
621 4: pendulum.WeekDay.FRIDAY,
622 5: pendulum.WeekDay.SATURDAY,
623 6: pendulum.WeekDay.SUNDAY,
624 'monday': pendulum.WeekDay.MONDAY,
625 'tuesday': pendulum.WeekDay.TUESDAY,
626 'wednesday': pendulum.WeekDay.WEDNESDAY,
627 'thursday': pendulum.WeekDay.THURSDAY,
628 'friday': pendulum.WeekDay.FRIDAY,
629 'saturday': pendulum.WeekDay.SATURDAY,
630 'sunday': pendulum.WeekDay.SUNDAY,
631}
634def next(day: int | str, d: Optional[dt.date] = None, keep_time=False) -> dt.datetime:
635 """Get the next occurrence of a specific weekday.
637 Args:
638 day: Day of week (0-6 for Monday-Sunday or weekday name string)
639 d: Starting date (default current date/time)
640 keep_time: Whether to keep the time component
641 """
643 return _unpend(_pend(d).next(_WD[day], keep_time))
646def prev(day: int | str, d: Optional[dt.date] = None, keep_time=False) -> dt.datetime:
647 """Get the previous occurrence of a specific weekday.
649 Args:
650 day: Day of week (0-6 for Monday-Sunday or weekday name string)
651 d: Starting date (default current date/time)
652 keep_time: Whether to keep the time component
653 """
655 return _unpend(_pend(d).previous(_WD[day], keep_time))
658# Duration
660_DURATION_UNITS = {
661 'w': 3600 * 24 * 7,
662 'd': 3600 * 24,
663 'h': 3600,
664 'm': 60,
665 's': 1,
666}
669def parse_duration(s: str) -> int:
670 """Convert duration string to seconds.
672 Args:
673 s: Duration string (e.g. '1w2d3h4m5s') or integer seconds
674 """
676 if isinstance(s, int):
677 return s
679 p = None
680 r = 0
682 for n, v in re.findall(r'(\d+)|(\D+)', str(s).strip()):
683 if n:
684 p = int(n)
685 continue
686 v = v.strip()
687 if p is None or v not in _DURATION_UNITS:
688 raise Error('invalid duration', s)
689 r += p * _DURATION_UNITS[v]
690 p = None
692 if p:
693 r += p
695 return r
698def format_duration(s: int) -> str:
699 """Format a duration in seconds to a string.
701 Args:
702 s: Duration in seconds
703 """
705 r = ''
707 for u, v in _DURATION_UNITS.items():
708 n = s // v
709 if n:
710 r += f'{n}{u} '
711 s -= n * v
713 return r.strip() or '0s'
715##
717# conversions
720def _datetime(d: dt.date | dt.time | None) -> dt.datetime:
721 # ensure a valid datetime object
723 if d is None:
724 return now()
726 if isinstance(d, dt.datetime):
727 # if a value is a naive datetime, assume the local tz
728 # see https://www.postgresql.org/docs/current/datatype-datetime.html#DATATYPE-DATETIME-INPUT-TIME-STAMPS:
729 # > Conversions between timestamp without time zone and timestamp with time zone normally assume
730 # > that the timestamp without time zone value should be taken or given as timezone local time.
731 return _ensure_tzinfo(d, tz='')
733 if isinstance(d, dt.date):
734 # promote date to midnight UTC
735 return dt.datetime(d.year, d.month, d.day, tzinfo=UTC)
737 if isinstance(d, dt.time):
738 # promote time to today's time
739 n = _now(d.tzinfo)
740 return dt.datetime(n.year, n.month, n.day, d.hour, d.minute, d.second, d.microsecond, d.tzinfo, fold=d.fold)
742 raise Error(f'invalid datetime value {d!r}')
745def _ensure_tzinfo(d, tz: str):
746 # attach tzinfo if not set
748 if not d.tzinfo:
749 return d.replace(tzinfo=time_zone(tz))
751 # try to convert 'their' tzinfo (might be an unnamed dt.timezone or pendulum.FixedTimezone) to zoneinfo
752 zi = _zone_info_from_tzinfo(d.tzinfo)
753 if zi:
754 return d.replace(tzinfo=zi)
756 # failing that, keep existing tzinfo
757 return d
760# pendulum.DateTime <-> python datetime
763def _pend(d: dt.date | None) -> pendulum.DateTime:
764 return pendulum.instance(_datetime(d))
767def _unpend(p: pendulum.DateTime) -> dt.datetime:
768 return dt.datetime(
769 p.year,
770 p.month,
771 p.day,
772 p.hour,
773 p.minute,
774 p.second,
775 p.microsecond,
776 tzinfo=p.tzinfo,
777 fold=p.fold,
778 )
781# NB using private APIs
784def _pend_parse_datetime(s, tz, iso_only):
785 try:
786 if iso_only:
787 d = pendulum.parsing.parse_iso8601(s)
788 else:
789 # do not normalize
790 d = pendulum.parsing._parse(s)
791 except (ValueError, pendulum.parsing.exceptions.ParserError) as exc:
792 raise Error(f'invalid date {s!r}') from exc
794 if isinstance(d, dt.datetime):
795 return _ensure_tzinfo(d, tz)
796 if isinstance(d, dt.date):
797 return new(d.year, d.month, d.day, tz=tz)
799 # times and durations not accepted
800 raise Error(f'invalid date {s!r}')
803def _pend_parse_time(s, tz, iso_only):
804 try:
805 if iso_only:
806 d = pendulum.parsing.parse_iso8601(s)
807 else:
808 # do not normalize
809 d = pendulum.parsing._parse(s)
810 except (ValueError, pendulum.parsing.exceptions.ParserError) as exc:
811 raise Error(f'invalid time {s!r}') from exc
813 if isinstance(d, dt.time):
814 return _datetime(_ensure_tzinfo(d, tz))
816 # dates and durations not accepted
817 raise Error(f'invalid time {s!r}')