Coverage for gws-app/gws/lib/watcher/__init__.py: 100%

67 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-16 23:09 +0200

1"""File system watcher. 

2 

3Monitor file system events such as file creation, deletion, modification, and movement. 

4The module allows directories or specific files to be monitored, supports pattern matching for files, 

5and provides a mechanism to trigger custom notification callbacks when events occur. 

6""" 

7 

8from typing import Callable, TypeAlias 

9import os 

10import re 

11 

12import watchdog.events 

13import watchdog.observers 

14 

15import gws 

16 

17_WATCH_EVENTS = { 

18 watchdog.events.EVENT_TYPE_MOVED, 

19 watchdog.events.EVENT_TYPE_DELETED, 

20 watchdog.events.EVENT_TYPE_CREATED, 

21 watchdog.events.EVENT_TYPE_MODIFIED, 

22} 

23 

24_EVENTS = [] 

25 

26 

27class _DirEntry: 

28 def __init__(self, dirname, pattern, recursive): 

29 self.dirname = dirname 

30 self.pattern = pattern 

31 self.recursive = recursive 

32 

33 

34_NotifyFn: TypeAlias = Callable[[str, str], None] 

35 

36 

37def new(notify: _NotifyFn): 

38 """Create a new watcher. 

39 

40 Args: 

41 notify: A callback function that will be called when an event occurs. 

42 It should accept two arguments: the event type and the path of the file. 

43 Note that the callback will be called from a different thread than the one that created the watcher. 

44 """ 

45 return Watcher(notify) 

46 

47 

48class Watcher: 

49 observer: watchdog.observers.Observer 

50 

51 def __init__(self, notify: _NotifyFn): 

52 self.notify = notify 

53 self.dirEntries = {} 

54 self.filePaths = set() 

55 self.excludePatterns = [] 

56 

57 def add_directory(self, dirname: str | os.PathLike, file_pattern: str = '', recursive: bool = False): 

58 d = str(dirname) 

59 self.dirEntries[d] = _DirEntry(d, file_pattern or '.', recursive) 

60 

61 def add_file(self, filename: str | os.PathLike): 

62 self.filePaths.add(str(filename)) 

63 

64 def exclude(self, path_pattern: str): 

65 self.excludePatterns.append(path_pattern) 

66 

67 def start(self): 

68 self.observer = watchdog.observers.Observer() 

69 

70 h = _Handler(self) 

71 

72 for de in self.dirEntries.values(): 

73 gws.log.debug(f'watcher: watching {de.dirname!r}') 

74 self.observer.schedule(h, de.dirname, recursive=de.recursive) 

75 

76 for f in self.filePaths: 

77 gws.log.debug(f'watcher: watching {f!r}') 

78 self.observer.schedule(h, os.path.dirname(f), recursive=False) 

79 

80 self.observer.start() 

81 gws.log.debug(f'watcher: started with {self.observer.__class__.__name__}') 

82 

83 def stop(self): 

84 if self.observer is not None: 

85 self.observer.stop() 

86 self.observer.join() 

87 gws.log.debug(f'watcher: stopped') 

88 

89 def register(self, ev: watchdog.events.FileSystemEvent): 

90 if self.path_matches(ev.src_path): 

91 gws.log.debug(f'watcher: {ev.event_type} {ev.src_path}') 

92 self.notify(ev.event_type, ev.src_path) 

93 

94 def path_matches(self, path): 

95 if any(re.search(ex, path) for ex in self.excludePatterns): 

96 return False 

97 if path in self.filePaths: 

98 return True 

99 d, f = os.path.split(path) 

100 for de in self.dirEntries.values(): 

101 if (d == de.dirname or (de.recursive and d.startswith(de.dirname + '/'))) and re.search(de.pattern, f): 

102 return True 

103 return False 

104 

105 

106class _Handler(watchdog.events.FileSystemEventHandler): 

107 def __init__(self, obj: Watcher): 

108 super().__init__() 

109 self.obj = obj 

110 

111 def on_any_event(self, ev): 

112 if ev.event_type in _WATCH_EVENTS: 

113 self.obj.register(ev)