Coverage for gws-app/gws/plugin/alkis/cli.py: 0%
93 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-16 22:59 +0200
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-16 22:59 +0200
1"""Command-line ALKIS commands."""
3from typing import Optional, cast
5import gws
6import gws.config
7import gws.base.action
8import gws.lib.jsonx
9import gws.lib.osx
10from gws.lib.cli import ProgressIndicator
12from .data import types as dt
13from .data import exporter, index, indexer
14from . import action
16gws.ext.new.cli('alkis')
19class CreateIndexParams(gws.CliParams):
20 projectUid: Optional[str]
21 """Project uid."""
22 force: bool = False
23 """Force indexing."""
24 cache: bool = False
25 """Use object cache."""
28class StatusParams(gws.CliParams):
29 projectUid: Optional[str]
30 """Project uid."""
33class DumpParams(gws.CliParams):
34 projectUid: Optional[str]
35 """Project uid."""
36 fs: str
37 """Flurstueck UIDs"""
38 path: str
39 """Path to save the dump."""
42# @TODO options to filter data and configure groups
45class ExportParams(gws.CliParams):
46 projectUid: Optional[str]
47 """Project uid."""
48 exporterUid: Optional[str]
49 """Export uid."""
50 path: str
51 """Path to save the export."""
54class Object(gws.Node):
55 act: action.Object
56 ixStatus: dt.IndexStatus
58 def _prepare(self, project_uid):
59 root = gws.config.load()
60 self.act = cast(action.Object, gws.base.action.get_action_for_cli(root, 'alkis', project_uid))
61 if not self.act:
62 exit(1)
64 self.ixStatus = self.act.ix.status()
66 @gws.ext.command.cli('alkisIndex')
67 def do_index(self, p: CreateIndexParams):
68 """Create the ALKIS index."""
70 self._prepare(p.projectUid)
71 if self.ixStatus.complete and not p.force:
72 gws.log.info(f'ALKIS index ok')
73 return
75 gws.log.info(f'indexing db={self.act.db.uid} dataSchema={self.act.dataSchema} indexSchema={self.act.indexSchema}')
76 indexer.run(self.act.ix, self.act.dataSchema, with_force=p.force, with_cache=p.cache)
78 @gws.ext.command.cli('alkisStatus')
79 def do_status(self, p: StatusParams):
80 """Display the status of the ALKIS index."""
82 self._prepare(p.projectUid)
84 if self.ixStatus.complete:
85 gws.log.info(f'ALKIS index ok')
86 return
88 if self.ixStatus.missing:
89 gws.log.error(f'ALKIS index not found')
90 return
92 gws.log.warning(
93 f'ALKIS index incomplete: basic={self.ixStatus.basic} eigentuemer={self.ixStatus.eigentuemer} buchung={self.ixStatus.buchung}'
94 )
96 @gws.ext.command.cli('alkisExport')
97 def do_export(self, p: ExportParams):
98 """Export ALKIS data."""
100 self._prepare(p.projectUid)
102 if self.ixStatus.missing:
103 gws.log.error(f'ALKIS index missing')
104 exit(1)
106 sys_user = self.act.root.app.authMgr.systemUser
107 exp = self.act.get_exporter(p.exporterUid, sys_user)
108 if not exp:
109 gws.log.error(f'ALKIS exporter not found')
110 exit(3)
112 qo = dt.FlurstueckQueryOptions(
113 withEigentuemer=True,
114 withBuchung=True,
115 withHistorySearch=False,
116 withHistoryDisplay=False,
117 displayThemes=[
118 'lage',
119 'gebaeude',
120 'nutzung',
121 'festlegung',
122 'bewertung',
123 'buchung',
124 'eigentuemer',
125 ],
126 pageSize=1000,
127 )
129 total = self.act.ix.count_all(qo)
130 fs = self.act.ix.iter_all(qo)
132 with ProgressIndicator('export', total) as progress:
133 exp.run(
134 exporter.Args(
135 fsList=fs,
136 user=sys_user,
137 progress=progress,
138 path=p.path
139 )
140 )
142 @gws.ext.command.cli('alkisKeys')
143 def do_keys(self, p: gws.CliParams):
144 """Print ALKIS export keys."""
146 d = index.all_flat_keys()
147 for key, typ in d.items():
148 print(f'{typ.__name__:7} {key}')
150 @gws.ext.command.cli('alkisDump')
151 def do_dump(self, p: DumpParams):
152 """Dump internal representations of ALKIS objects."""
154 self._prepare(p.projectUid)
156 if self.ixStatus.missing:
157 gws.log.error(f'ALKIS index missing')
158 exit(1)
160 qo = dt.FlurstueckQueryOptions(
161 withEigentuemer=True,
162 withBuchung=True,
163 withHistorySearch=False,
164 withHistoryDisplay=True,
165 displayThemes=[
166 'lage',
167 'gebaeude',
168 'nutzung',
169 'festlegung',
170 'bewertung',
171 'buchung',
172 'eigentuemer',
173 ],
174 )
176 fs = self.act.ix.load_flurstueck(gws.u.to_list(p.fs), qo)
177 js = [index.serialize(f, encode_enum_pairs=False) for f in fs]
178 gws.u.write_file(p.path, gws.lib.jsonx.to_pretty_string(js))