Coverage for gws-app/gws/plugin/alkis/cli.py: 0%

93 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-16 23:09 +0200

1"""Command-line ALKIS commands.""" 

2 

3from typing import Optional, cast 

4 

5import gws 

6import gws.config 

7import gws.base.action 

8import gws.lib.jsonx 

9import gws.lib.osx 

10from gws.lib.cli import ProgressIndicator 

11 

12from .data import types as dt 

13from .data import exporter, index, indexer 

14from . import action 

15 

16gws.ext.new.cli('alkis') 

17 

18 

19class CreateIndexParams(gws.CliParams): 

20 projectUid: Optional[str] 

21 """Project uid.""" 

22 force: bool = False 

23 """Force indexing.""" 

24 cache: bool = False 

25 """Use object cache.""" 

26 

27 

28class StatusParams(gws.CliParams): 

29 projectUid: Optional[str] 

30 """Project uid.""" 

31 

32 

33class DumpParams(gws.CliParams): 

34 projectUid: Optional[str] 

35 """Project uid.""" 

36 fs: str 

37 """Flurstueck UIDs""" 

38 path: str 

39 """Path to save the dump.""" 

40 

41 

42# @TODO options to filter data and configure groups 

43 

44 

45class ExportParams(gws.CliParams): 

46 projectUid: Optional[str] 

47 """Project uid.""" 

48 exporterUid: Optional[str] 

49 """Export uid.""" 

50 path: str 

51 """Path to save the export.""" 

52 

53 

54class Object(gws.Node): 

55 act: action.Object 

56 ixStatus: dt.IndexStatus 

57 

58 def _prepare(self, project_uid): 

59 root = gws.config.load() 

60 self.act = cast(action.Object, gws.base.action.get_action_for_cli(root, 'alkis', project_uid)) 

61 if not self.act: 

62 exit(1) 

63 

64 self.ixStatus = self.act.ix.status() 

65 

66 @gws.ext.command.cli('alkisIndex') 

67 def do_index(self, p: CreateIndexParams): 

68 """Create the ALKIS index.""" 

69 

70 self._prepare(p.projectUid) 

71 if self.ixStatus.complete and not p.force: 

72 gws.log.info(f'ALKIS index ok') 

73 return 

74 

75 gws.log.info(f'indexing db={self.act.db.uid} dataSchema={self.act.dataSchema} indexSchema={self.act.indexSchema}') 

76 indexer.run(self.act.ix, self.act.dataSchema, with_force=p.force, with_cache=p.cache) 

77 

78 @gws.ext.command.cli('alkisStatus') 

79 def do_status(self, p: StatusParams): 

80 """Display the status of the ALKIS index.""" 

81 

82 self._prepare(p.projectUid) 

83 

84 if self.ixStatus.complete: 

85 gws.log.info(f'ALKIS index ok') 

86 return 

87 

88 if self.ixStatus.missing: 

89 gws.log.error(f'ALKIS index not found') 

90 return 

91 

92 gws.log.warning( 

93 f'ALKIS index incomplete: basic={self.ixStatus.basic} eigentuemer={self.ixStatus.eigentuemer} buchung={self.ixStatus.buchung}' 

94 ) 

95 

96 @gws.ext.command.cli('alkisExport') 

97 def do_export(self, p: ExportParams): 

98 """Export ALKIS data.""" 

99 

100 self._prepare(p.projectUid) 

101 

102 if self.ixStatus.missing: 

103 gws.log.error(f'ALKIS index missing') 

104 exit(1) 

105 

106 sys_user = self.act.root.app.authMgr.systemUser 

107 exp = self.act.get_exporter(p.exporterUid, sys_user) 

108 if not exp: 

109 gws.log.error(f'ALKIS exporter not found') 

110 exit(3) 

111 

112 qo = dt.FlurstueckQueryOptions( 

113 withEigentuemer=True, 

114 withBuchung=True, 

115 withHistorySearch=False, 

116 withHistoryDisplay=False, 

117 displayThemes=[ 

118 'lage', 

119 'gebaeude', 

120 'nutzung', 

121 'festlegung', 

122 'bewertung', 

123 'buchung', 

124 'eigentuemer', 

125 ], 

126 pageSize=1000, 

127 ) 

128 

129 total = self.act.ix.count_all(qo) 

130 fs = self.act.ix.iter_all(qo) 

131 

132 with ProgressIndicator('export', total) as progress: 

133 exp.run( 

134 exporter.Args( 

135 fsList=fs, 

136 user=sys_user, 

137 progress=progress, 

138 path=p.path 

139 ) 

140 ) 

141 

142 @gws.ext.command.cli('alkisKeys') 

143 def do_keys(self, p: gws.CliParams): 

144 """Print ALKIS export keys.""" 

145 

146 d = index.all_flat_keys() 

147 for key, typ in d.items(): 

148 print(f'{typ.__name__:7} {key}') 

149 

150 @gws.ext.command.cli('alkisDump') 

151 def do_dump(self, p: DumpParams): 

152 """Dump internal representations of ALKIS objects.""" 

153 

154 self._prepare(p.projectUid) 

155 

156 if self.ixStatus.missing: 

157 gws.log.error(f'ALKIS index missing') 

158 exit(1) 

159 

160 qo = dt.FlurstueckQueryOptions( 

161 withEigentuemer=True, 

162 withBuchung=True, 

163 withHistorySearch=False, 

164 withHistoryDisplay=True, 

165 displayThemes=[ 

166 'lage', 

167 'gebaeude', 

168 'nutzung', 

169 'festlegung', 

170 'bewertung', 

171 'buchung', 

172 'eigentuemer', 

173 ], 

174 ) 

175 

176 fs = self.act.ix.load_flurstueck(gws.u.to_list(p.fs), qo) 

177 js = [index.serialize(f, encode_enum_pairs=False) for f in fs] 

178 gws.u.write_file(p.path, gws.lib.jsonx.to_pretty_string(js))