Coverage for gws-app / gws / plugin / alkis / cli.py: 0%
95 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 10:12 +0100
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 10:12 +0100
1"""Command-line ALKIS commands."""
3from typing import Optional, cast
5import gws
6import gws.config
7import gws.base.action
8import gws.lib.jsonx
9import gws.lib.osx
10from gws.lib.cli import ProgressIndicator
12from .data import types as dt
13from .data import exporter, index, indexer
14from . import action
16gws.ext.new.cli('alkis')
19class CreateIndexParams(gws.CliParams):
20 projectUid: Optional[str]
21 """Project uid."""
22 force: bool = False
23 """Force indexing."""
24 cache: bool = False
25 """Use object cache."""
28class StatusParams(gws.CliParams):
29 projectUid: Optional[str]
30 """Project uid."""
33class DumpParams(gws.CliParams):
34 projectUid: Optional[str]
35 """Project uid."""
36 fs: str
37 """Flurstueck UIDs"""
38 path: str
39 """Path to save the dump."""
42# @TODO options to filter data and configure groups
45class ExportParams(gws.CliParams):
46 projectUid: Optional[str]
47 """Project uid."""
48 exporterUid: Optional[str]
49 """Export uid."""
50 path: str
51 """Path to save the export."""
54class Object(gws.Node):
55 act: action.Object
57 def _prepare(self, project_uid):
58 root = gws.config.load()
59 self.act = cast(action.Object, gws.base.action.get_action_for_cli(root, 'alkis', project_uid))
60 if not self.act:
61 exit(1)
63 @gws.ext.command.cli('alkisIndex')
64 def do_index(self, p: CreateIndexParams):
65 """Create the ALKIS index."""
67 self._prepare(p.projectUid)
69 s = self.act.ix.status()
70 if s.complete and not p.force:
71 gws.log.info(f'ALKIS index ok')
72 return
74 gws.log.info(f'indexing db={self.act.db.uid} dataSchema={self.act.dataSchema} indexSchema={self.act.indexSchema}')
75 indexer.run(self.act.ix, self.act.dataSchema, with_force=p.force, with_cache=p.cache)
77 @gws.ext.command.cli('alkisStatus')
78 def do_status(self, p: StatusParams):
79 """Display the status of the ALKIS index."""
81 self._prepare(p.projectUid)
83 s = self.act.ix.status()
84 if s.complete:
85 gws.log.info(f'ALKIS index ok')
86 return
87 if s.missing:
88 gws.log.error(f'ALKIS index not found')
89 return
91 gws.log.warning(f'ALKIS index incomplete: basic={s.basic} eigentuemer={s.eigentuemer} buchung={s.buchung}')
93 @gws.ext.command.cli('alkisExport')
94 def do_export(self, p: ExportParams):
95 """Export ALKIS data."""
97 self._prepare(p.projectUid)
99 s = self.act.ix.status()
100 if s.missing:
101 gws.log.error(f'ALKIS index missing')
102 exit(1)
104 sys_user = self.act.root.app.authMgr.systemUser
105 exp = self.act.get_exporter(p.exporterUid, sys_user)
106 if not exp:
107 gws.log.error(f'ALKIS exporter not found')
108 exit(3)
110 qo = dt.FlurstueckQueryOptions(
111 withEigentuemer=True,
112 withBuchung=True,
113 withHistorySearch=False,
114 withHistoryDisplay=False,
115 displayThemes=[
116 'lage',
117 'gebaeude',
118 'nutzung',
119 'festlegung',
120 'bewertung',
121 'buchung',
122 'eigentuemer',
123 ],
124 pageSize=1000,
125 )
127 total = self.act.ix.count_all(qo)
128 fs = self.act.ix.iter_all(qo)
130 with ProgressIndicator('export', total) as progress:
131 exp.run(exporter.Args(fsList=fs, user=sys_user, progress=progress, path=p.path))
133 @gws.ext.command.cli('alkisKeys')
134 def do_keys(self, p: gws.CliParams):
135 """Print ALKIS export keys."""
137 d = index.all_flat_keys()
138 for key, typ in d.items():
139 print(f'{typ.__name__:7} {key}')
141 @gws.ext.command.cli('alkisDump')
142 def do_dump(self, p: DumpParams):
143 """Dump internal representations of ALKIS objects."""
145 self._prepare(p.projectUid)
147 s = self.act.ix.status()
148 if s.missing:
149 gws.log.error(f'ALKIS index missing')
150 exit(1)
152 qo = dt.FlurstueckQueryOptions(
153 withEigentuemer=True,
154 withBuchung=True,
155 withHistorySearch=False,
156 withHistoryDisplay=True,
157 displayThemes=[
158 'lage',
159 'gebaeude',
160 'nutzung',
161 'festlegung',
162 'bewertung',
163 'buchung',
164 'eigentuemer',
165 ],
166 )
168 fs = self.act.ix.load_flurstueck(gws.u.to_list(p.fs), qo)
169 js = [index.serialize(f, encode_enum_pairs=False) for f in fs]
170 gws.u.write_file(p.path, gws.lib.jsonx.to_pretty_string(js))