Coverage for gws-app/gws/plugin/alkis/data/indexer.py: 0%

671 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-16 22:59 +0200

1from typing import Optional, Iterable 

2 

3import re 

4from typing import Generic, TypeVar 

5 

6import shapely 

7import shapely.strtree 

8import shapely.wkb 

9 

10import gws 

11import gws.lib.osx 

12import gws.lib.datetimex as dtx 

13from gws.lib.cli import ProgressIndicator 

14import gws.plugin.postgres.provider 

15 

16from . import types as dt 

17from . import index 

18from . import norbit6 

19 

20from .geo_info_dok import gid6 as gid 

21 

22 

23def run(ix: index.Object, data_schema: str, with_force=False, with_cache=False): 

24 if with_force: 

25 ix.drop() 

26 elif ix.status().complete: 

27 return 

28 

29 rdr = norbit6.Object(ix.db, schema=data_schema) 

30 rr = _Runner(ix, rdr, with_cache) 

31 rr.run() 

32 

33 

34## 

35 

36T = TypeVar("T") 

37 

38 

39class _ObjectDict(Generic[T]): 

40 def __init__(self, cls): 

41 self.d = {} 

42 self.cls = cls 

43 

44 def add(self, uid, recs) -> T: 

45 o = self.cls(uid=uid, recs=recs) 

46 o.isHistoric = all(r.isHistoric for r in recs) 

47 self.d[o.uid] = o 

48 return o 

49 

50 def get(self, uid, default=None) -> Optional[T]: 

51 return self.d.get(uid, default) 

52 

53 def get_many(self, uids) -> list[T]: 

54 res = {} 

55 

56 for uid in uids: 

57 if uid not in res: 

58 o = self.d.get(uid) 

59 if o: 

60 res[uid] = o 

61 

62 return list(res.values()) 

63 

64 def get_from_ptr(self, obj: dt.Entity, attr): 

65 uids = [] 

66 

67 for r in obj.recs: 

68 v = _pop(r, attr) 

69 if isinstance(v, list): 

70 uids.extend(v) 

71 elif isinstance(v, str): 

72 uids.append(v) 

73 

74 return self.get_many(uids) 

75 

76 def __iter__(self) -> Iterable[T]: 

77 yield from self.d.values() 

78 

79 def __len__(self): 

80 return len(self.d) 

81 

82 

83class _ObjectMap: 

84 

85 def __init__(self): 

86 self.Anschrift: _ObjectDict[dt.Anschrift] = _ObjectDict(dt.Anschrift) 

87 self.Buchungsblatt: _ObjectDict[dt.Buchungsblatt] = _ObjectDict(dt.Buchungsblatt) 

88 self.Buchungsstelle: _ObjectDict[dt.Buchungsstelle] = _ObjectDict(dt.Buchungsstelle) 

89 self.Flurstueck: _ObjectDict[dt.Flurstueck] = _ObjectDict(dt.Flurstueck) 

90 self.Gebaeude: _ObjectDict[dt.Gebaeude] = _ObjectDict(dt.Gebaeude) 

91 self.Lage: _ObjectDict[dt.Lage] = _ObjectDict(dt.Lage) 

92 self.Namensnummer: _ObjectDict[dt.Namensnummer] = _ObjectDict(dt.Namensnummer) 

93 self.Part: _ObjectDict[dt.Part] = _ObjectDict(dt.Part) 

94 self.Person: _ObjectDict[dt.Person] = _ObjectDict(dt.Person) 

95 

96 self.placeAll: dict = {} 

97 self.placeIdx: dict = {} 

98 self.catalog: dict = {} 

99 

100 

101class _Indexer: 

102 CACHE_KEY: str = '' 

103 

104 def __init__(self, runner: '_Runner'): 

105 self.rr = runner 

106 self.ix: index.Object = runner.ix 

107 self.om = _ObjectMap() 

108 

109 def load_or_collect(self): 

110 if not self.load_cache(): 

111 self.collect() 

112 self.store_cache() 

113 

114 def load_cache(self): 

115 if not self.rr.withCache or not self.CACHE_KEY: 

116 return False 

117 cpath = self.rr.cacheDir + '/' + self.CACHE_KEY 

118 if not gws.u.is_file(cpath): 

119 return False 

120 om = gws.u.unserialize_from_path(cpath) 

121 if not om: 

122 return False 

123 gws.log.info(f'ALKIS: use cache {self.CACHE_KEY!r}') 

124 self.om = om 

125 return True 

126 

127 def store_cache(self): 

128 if not self.rr.withCache or not self.CACHE_KEY: 

129 return 

130 cpath = self.rr.cacheDir + '/' + self.CACHE_KEY 

131 gws.u.serialize_to_path(self.om, cpath) 

132 gws.log.info(f'ALKIS: store cache {self.CACHE_KEY!r}') 

133 

134 def collect(self): 

135 pass 

136 

137 def write_table(self, table_id, values): 

138 if self.ix.has_table(table_id): 

139 return 

140 with ProgressIndicator(f'ALKIS: write {table_id!r}', len(values)) as progress: 

141 self.ix.create_table(table_id, values, progress) 

142 

143 def write(self): 

144 pass 

145 

146 

147class _PlaceIndexer(_Indexer): 

148 """Index places (Administration- und Verwaltungseinheiten). 

149 

150 References: https://de.wikipedia.org/wiki/Amtlicher_Gemeindeschl%C3%BCssel 

151 """ 

152 

153 CACHE_KEY = 'obj_place' 

154 

155 empty1 = dt.EnumPair(code='0', text='') 

156 empty2 = dt.EnumPair(code='00', text='') 

157 

158 def add(self, kind, ax, key_obj, **kwargs): 

159 if ax.endet: 

160 return 

161 

162 code = self.code(kind, key_obj) 

163 value = dt.EnumPair(code, ax.bezeichnung) 

164 

165 p = dt.Place(**kwargs) 

166 

167 p.uid = kind + code 

168 p.kind = kind 

169 setattr(p, kind, value) 

170 

171 self.om.placeAll[p.uid] = p 

172 self.om.placeIdx[p.uid] = value 

173 

174 return value 

175 

176 def collect(self): 

177 self.om.placeAll = {} 

178 self.om.placeIdx = {} 

179 

180 for ax in self.rr.read_flat(gid.AX_Bundesland): 

181 self.add('land', ax, ax.schluessel) 

182 

183 for ax in self.rr.read_flat(gid.AX_Regierungsbezirk): 

184 o = ax.schluessel 

185 self.add('regierungsbezirk', ax, o, land=self.get_land(o)) 

186 

187 for ax in self.rr.read_flat(gid.AX_KreisRegion): 

188 o = ax.schluessel 

189 self.add('kreis', ax, o, land=self.get_land(o), regierungsbezirk=self.get_regierungsbezirk(o)) 

190 

191 for ax in self.rr.read_flat(gid.AX_Gemeinde): 

192 o = ax.gemeindekennzeichen 

193 self.add('gemeinde', ax, o, land=self.get_land(o), regierungsbezirk=self.get_regierungsbezirk(o), kreis=self.get_kreis(o)) 

194 

195 # @TODO map Gemarkung to Gemeinde (see https://de.wikipedia.org/wiki/Liste_der_Gemarkungen_in_Nordrhein-Westfalen etc) 

196 

197 for ax in self.rr.read_flat(gid.AX_Gemarkung): 

198 if str(ax.schluessel.gemarkungsnummer) in self.ix.excludeGemarkung: 

199 continue 

200 o = ax.schluessel 

201 self.add('gemarkung', ax, o, land=self.get_land(o)) 

202 

203 for ax in self.rr.read_flat(gid.AX_Buchungsblattbezirk): 

204 o = ax.schluessel 

205 self.add('buchungsblattbezirk', ax, o, land=self.get_land(o)) 

206 

207 for ax in self.rr.read_flat(gid.AX_Dienststelle): 

208 o = ax.schluessel 

209 self.add('dienststelle', ax, o, land=self.get_land(o)) 

210 

211 def write(self): 

212 values = [] 

213 

214 for place in self.om.placeAll.values(): 

215 values.append(dict( 

216 uid=place.uid, 

217 data=index.serialize(place), 

218 )) 

219 

220 self.write_table(index.TABLE_PLACE, values) 

221 

222 def get_land(self, o): 

223 return self.get('land', o) or self.empty2 

224 

225 def get_regierungsbezirk(self, o): 

226 return self.get('regierungsbezirk', o) or self.empty1 

227 

228 def get_kreis(self, o): 

229 return self.get('kreis', o) or self.empty2 

230 

231 def get_gemeinde(self, o): 

232 return self.get('gemeinde', o) or self.empty1 

233 

234 def get_gemarkung(self, o): 

235 return self.get('gemarkung', o) or self.empty1 

236 

237 def get_buchungsblattbezirk(self, o): 

238 return self.get('buchungsblattbezirk', o) or self.empty1 

239 

240 def get_dienststelle(self, o): 

241 return self.get('dienststelle', o) or self.empty1 

242 

243 def get(self, kind, o): 

244 return self.om.placeIdx.get(kind + self.code(kind, o)) 

245 

246 def is_empty(self, p: dt.EnumPair): 

247 return p.code == '0' or p.code == '00' 

248 

249 CODES = { 

250 'land': lambda o: o.land, 

251 'regierungsbezirk': lambda o: o.land + (o.regierungsbezirk or '0'), 

252 'kreis': lambda o: o.land + (o.regierungsbezirk or '0') + o.kreis, 

253 'gemeinde': lambda o: o.land + (o.regierungsbezirk or '0') + o.kreis + o.gemeinde, 

254 'gemarkung': lambda o: o.land + o.gemarkungsnummer, 

255 'buchungsblattbezirk': lambda o: o.land + o.bezirk, 

256 'dienststelle': lambda o: o.land + o.stelle, 

257 } 

258 

259 def code(self, kind, o): 

260 return self.CODES[kind](o) 

261 

262 

263class _LageIndexer(_Indexer): 

264 CACHE_KEY = 'obj_lage' 

265 

266 def collect(self): 

267 for ax in self.rr.read_flat(gid.AX_LagebezeichnungKatalogeintrag): 

268 self.om.catalog[self.lage_key(ax.schluessel)] = ax.bezeichnung 

269 

270 for cls in (gid.AX_LagebezeichnungMitHausnummer, gid.AX_LagebezeichnungOhneHausnummer): 

271 for uid, axs in self.rr.read_grouped(cls): 

272 self.om.Lage.add(uid, [ 

273 _from_ax( 

274 dt.LageRecord, 

275 ax, 

276 strasse=self.strasse(ax), 

277 hausnummer=index.normalize_hausnummer(ax.hausnummer), 

278 ) 

279 for ax in axs 

280 ]) 

281 

282 # use the PTO (art=HNR) geometry for lage coordinates 

283 # PTO.dientZurDarstellungVon -> lage.uid 

284 

285 for pto in self.rr.read_flat(gid.AP_PTO): 

286 if pto.lebenszeitintervall.endet is not None: 

287 continue 

288 art = _pop(pto, 'art') or '' 

289 if art.upper() != 'HNR': 

290 continue 

291 

292 uids = _pop(pto, 'dientZurDarstellungVon') 

293 if not uids or not isinstance(uids, list): 

294 continue 

295 

296 geom = _geom_of(pto) 

297 if not geom: 

298 continue 

299 

300 x = geom.centroid.x 

301 y = geom.centroid.y 

302 for la in self.om.Lage.get_many(uids): 

303 la.x = x 

304 la.y = y 

305 

306 # read related Gebaeude records 

307 

308 atts = _meta_attributes(gid.METADATA['AX_Gebaeude']) 

309 

310 for uid, axs in self.rr.read_grouped(gid.AX_Gebaeude): 

311 self.om.Gebaeude.add(uid, [ 

312 _from_ax( 

313 dt.GebaeudeRecord, 

314 ax, 

315 name=', '.join(ax.name) if ax.name else None, 

316 amtlicheFlaeche=ax.grundflaeche or 0, 

317 props=self.rr.props_from(ax, atts), 

318 _zeigtAuf=ax.zeigtAuf, 

319 ) 

320 for ax in axs 

321 ]) 

322 

323 for ge in self.om.Gebaeude: 

324 for r in ge.recs: 

325 geom = _geom_of(r) 

326 r.geomFlaeche = round(geom.area, 2) if geom else 0 

327 

328 # omit Gebaeude geometries for now 

329 for ge in self.om.Gebaeude: 

330 for r in ge.recs: 

331 _pop(r, 'geom') 

332 

333 for la in self.om.Lage: 

334 la.gebaeudeList = [] 

335 

336 # AX_Gebaeude.zeigtAuf -> AX_LagebezeichnungMitHausnummer 

337 for ge in self.om.Gebaeude: 

338 for la in self.om.Lage.get_from_ptr(ge, '_zeigtAuf'): 

339 la.gebaeudeList.append(ge) 

340 

341 def strasse(self, ax): 

342 if isinstance(ax.lagebezeichnung, str): 

343 return ax.lagebezeichnung 

344 return self.om.catalog.get(self.lage_key(ax.lagebezeichnung), '') 

345 

346 def lage_key(self, r): 

347 return _comma([ 

348 getattr(r, 'land'), 

349 getattr(r, 'regierungsbezirk'), 

350 getattr(r, 'kreis'), 

351 getattr(r, 'gemeinde'), 

352 getattr(r, 'lage'), 

353 ]) 

354 

355 def write(self): 

356 values = [] 

357 

358 for la in self.om.Lage: 

359 values.append(dict( 

360 uid=la.uid, 

361 rc=len(la.recs), 

362 data=index.serialize(la), 

363 )) 

364 

365 self.write_table(index.TABLE_LAGE, values) 

366 

367 

368class _BuchungIndexer(_Indexer): 

369 CACHE_KEY = 'obj_buchungsblatt' 

370 

371 buchungsblattkennzeichenMap: dict[str, dt.Buchungsblatt] = {} 

372 

373 def collect(self): 

374 for uid, axs in self.rr.read_grouped(gid.AX_Anschrift): 

375 self.om.Anschrift.add(uid, [ 

376 _from_ax( 

377 dt.AnschriftRecord, 

378 ax, 

379 ort=ax.ort_AmtlichesOrtsnamensverzeichnis or ax.ort_Post, 

380 plz=ax.postleitzahlPostzustellung, 

381 telefon=ax.telefon[0] if ax.telefon else None 

382 ) 

383 for ax in axs 

384 ]) 

385 

386 for uid, axs in self.rr.read_grouped(gid.AX_Person): 

387 self.om.Person.add(uid, [ 

388 _from_ax( 

389 dt.PersonRecord, 

390 ax, 

391 anrede=ax.anrede.text if ax.anrede else None, 

392 _hat=ax.hat, 

393 ) 

394 for ax in axs 

395 ]) 

396 

397 # AX_Person.hat -> [AX_Anschrift] 

398 for pe in self.om.Person: 

399 pe.anschriftList = self.om.Anschrift.get_from_ptr(pe, '_hat') 

400 

401 for uid, axs in self.rr.read_grouped(gid.AX_Namensnummer): 

402 self.om.Namensnummer.add(uid, [ 

403 _from_ax( 

404 dt.NamensnummerRecord, 

405 ax, 

406 anteil=_anteil(ax), 

407 _benennt=ax.benennt, 

408 _istBestandteilVon=ax.istBestandteilVon 

409 ) 

410 for ax in axs 

411 ]) 

412 

413 # AX_Namensnummer.benennt -> AX_Person 

414 for nn in self.om.Namensnummer: 

415 nn.laufendeNummer = nn.recs[-1].laufendeNummerNachDIN1421 

416 nn.personList = self.om.Person.get_from_ptr(nn, '_benennt') 

417 

418 for uid, axs in self.rr.read_grouped(gid.AX_Buchungsstelle): 

419 self.om.Buchungsstelle.add(uid, [ 

420 _from_ax( 

421 dt.BuchungsstelleRecord, 

422 ax, 

423 anteil=_anteil(ax), 

424 _an=ax.an, 

425 _zu=ax.zu, 

426 _istBestandteilVon=ax.istBestandteilVon, 

427 ) 

428 for ax in axs 

429 ]) 

430 

431 for bs in self.om.Buchungsstelle: 

432 bs.laufendeNummer = bs.recs[-1].laufendeNummer 

433 bs.fsUids = [] 

434 bs.flurstueckskennzeichenList = [] 

435 

436 for uid, axs in self.rr.read_grouped(gid.AX_Buchungsblatt): 

437 self.om.Buchungsblatt.add(uid, [ 

438 _from_ax( 

439 dt.BuchungsblattRecord, 

440 ax, 

441 buchungsblattbezirk=self.rr.place.get_buchungsblattbezirk(ax.buchungsblattbezirk), 

442 ) 

443 for ax in axs 

444 ]) 

445 

446 for bb in self.om.Buchungsblatt: 

447 bb.buchungsstelleList = [] 

448 bb.namensnummerList = [] 

449 bb.buchungsblattkennzeichen = bb.recs[-1].buchungsblattkennzeichen 

450 self.buchungsblattkennzeichenMap[bb.buchungsblattkennzeichen] = bb 

451 

452 # AX_Buchungsstelle.istBestandteilVon -> AX_Buchungsblatt 

453 for bs in self.om.Buchungsstelle: 

454 bb_list = self.om.Buchungsblatt.get_from_ptr(bs, '_istBestandteilVon') 

455 bs.buchungsblattUids = [bb.uid for bb in bb_list] 

456 bs.buchungsblattkennzeichenList = [bb.buchungsblattkennzeichen for bb in bb_list] 

457 for bb in bb_list: 

458 bb.buchungsstelleList.append(bs) 

459 

460 # AX_Namensnummer.istBestandteilVon -> AX_Buchungsblatt 

461 for nn in self.om.Namensnummer: 

462 bb_list = self.om.Buchungsblatt.get_from_ptr(nn, '_istBestandteilVon') 

463 nn.buchungsblattUids = [bb.uid for bb in bb_list] 

464 nn.buchungsblattkennzeichenList = [bb.buchungsblattkennzeichen for bb in bb_list] 

465 for bb in bb_list: 

466 bb.namensnummerList.append(nn) 

467 

468 for bb in self.om.Buchungsblatt: 

469 bb.buchungsstelleList.sort(key=_sortkey_buchungsstelle) 

470 bb.namensnummerList.sort(key=_sortkey_namensnummer) 

471 

472 # AX_Buchungsstelle.an -> [AX_Buchungsstelle] 

473 # AX_Buchungsstelle.zu -> [AX_Buchungsstelle] 

474 # see Erläuterungen zu ALKIS Version 6, page 116-119 

475 

476 for bs in self.om.Buchungsstelle: 

477 bs.childUids = [] 

478 bs.parentUids = [] 

479 bs.parentkennzeichenList = [] 

480 

481 for bs in self.om.Buchungsstelle: 

482 parent_uids = set() 

483 parent_knz = set() 

484 

485 for r in bs.recs: 

486 parent_uids.update(_pop(r, '_an')) 

487 parent_uids.update(_pop(r, '_zu')) 

488 

489 for parent_bs in self.om.Buchungsstelle.get_many(parent_uids): 

490 parent_bs.childUids.append(bs.uid) 

491 bs.parentUids.append(parent_bs.uid) 

492 for bb_knz in parent_bs.buchungsblattkennzeichenList: 

493 parent_knz.add(bb_knz + '.' + parent_bs.laufendeNummer) 

494 

495 bs.parentkennzeichenList = sorted(parent_knz) 

496 

497 def write(self): 

498 values = [] 

499 

500 for bb in self.om.Buchungsblatt: 

501 values.append(dict( 

502 uid=bb.uid, 

503 rc=len(bb.recs), 

504 data=index.serialize(bb), 

505 )) 

506 

507 self.write_table(index.TABLE_BUCHUNGSBLATT, values) 

508 

509 

510class _PartIndexer(_Indexer): 

511 CACHE_KEY = 'obj_part' 

512 MIN_PART_AREA = 1 

513 

514 parts: list[dt.Part] = [] 

515 

516 fs_list = [] 

517 fs_geom = [] 

518 

519 stree: shapely.strtree.STRtree 

520 

521 def collect(self): 

522 

523 for kind in dt.Part.KIND: 

524 self.collect_kind(kind) 

525 

526 for fs in self.rr.fsdata.om.Flurstueck: 

527 self.fs_list.append(fs) 

528 # NB take only the most recent fs geometry into account 

529 self.fs_geom.append(_geom_of(fs.recs[-1])) 

530 

531 self.stree = shapely.strtree.STRtree(self.fs_geom) 

532 

533 with ProgressIndicator(f'ALKIS: parts', len(self.om.Part)) as progress: 

534 for pa in self.om.Part: 

535 self.compute_intersections(pa) 

536 progress.update(1) 

537 

538 self.parts.sort(key=_sortkey_part) 

539 

540 for pa in self.parts: 

541 pa.isHistoric = all(r.isHistoric for r in pa.recs) 

542 

543 def collect_kind(self, kind): 

544 _, key = dt.Part.KIND[kind] 

545 classes = [ 

546 getattr(gid, meta['name']) 

547 for meta in gid.METADATA.values() 

548 if ( 

549 meta['kind'] == 'object' 

550 and meta['geom'] 

551 and re.search(key + r'/\w+/', meta['key']) 

552 ) 

553 ] 

554 

555 for cls in classes: 

556 self.collect_class(kind, cls) 

557 

558 def collect_class(self, kind, cls): 

559 meta = gid.METADATA[cls.__name__] 

560 atts = _meta_attributes(meta) 

561 

562 for uid, axs in self.rr.read_grouped(cls): 

563 pa = self.om.Part.add(uid, [ 

564 _from_ax( 

565 dt.PartRecord, 

566 ax, 

567 props=self.rr.props_from(ax, atts), 

568 ) 

569 for ax in axs 

570 ]) 

571 pa.kind = kind 

572 pa.name = dt.EnumPair(meta['uid'], meta['title']) 

573 

574 def compute_intersections(self, pa: dt.Part): 

575 parts_map = {} 

576 

577 for r in pa.recs: 

578 geom = _geom_of(r) 

579 if not geom: 

580 continue 

581 

582 for i in self.stree.query(geom): 

583 part_geom = shapely.intersection(self.fs_geom[i], geom) 

584 part_area = round(part_geom.area, 2) 

585 if part_area < self.MIN_PART_AREA: 

586 continue 

587 

588 fs = self.fs_list[i] 

589 

590 part = parts_map.setdefault(fs.uid, dt.Part( 

591 uid=pa.uid, 

592 recs=[], 

593 kind=pa.kind, 

594 name=pa.name, 

595 fs=fs.uid, 

596 )) 

597 

598 # computed area corrected with respect to FS's "amtlicheFlaeche" 

599 part_area_corrected = round( 

600 fs.recs[-1].amtlicheFlaeche * (part_area / fs.recs[-1].geomFlaeche), 

601 2) 

602 

603 part.recs.append(dt.PartRecord( 

604 uid=r.uid, 

605 beginnt=r.beginnt, 

606 endet=r.endet, 

607 anlass=r.anlass, 

608 props=r.props, 

609 geomFlaeche=part_area, 

610 amtlicheFlaeche=part_area_corrected, 

611 isHistoric=r.endet is not None, 

612 )) 

613 

614 part.geom = shapely.wkb.dumps(part_geom, srid=self.ix.crs.srid, hex=True) 

615 part.geomFlaeche = part_area 

616 part.amtlicheFlaeche = part_area_corrected 

617 

618 self.parts.extend(parts_map.values()) 

619 

620 def write(self): 

621 values = [] 

622 

623 for n, pa in enumerate(self.parts, 1): 

624 geom = _pop(pa, 'geom') 

625 data = index.serialize(pa) 

626 pa.geom = geom 

627 values.append(dict( 

628 n=n, 

629 fs=pa.fs, 

630 uid=pa.uid, 

631 beginnt=pa.recs[-1].beginnt, 

632 endet=pa.recs[-1].endet, 

633 kind=pa.kind, 

634 name=pa.name.text, 

635 parthistoric=pa.isHistoric, 

636 data=data, 

637 geom=geom, 

638 )) 

639 

640 self.write_table(index.TABLE_PART, values) 

641 

642 

643class _FsDataIndexer(_Indexer): 

644 CACHE_KEY = 'obj_flurstueck' 

645 

646 def __init__(self, runner: '_Runner'): 

647 super().__init__(runner) 

648 self.counts = { 

649 'ok': 0, 

650 'no_geometry': 0, 

651 'excluded': 0, 

652 } 

653 

654 def collect(self): 

655 for uid, axs in self.rr.read_grouped(gid.AX_Flurstueck): 

656 recs = gws.u.compact(self.record(ax) for ax in axs) 

657 if recs: 

658 self.om.Flurstueck.add(uid, recs) 

659 

660 for uid, axs in self.rr.read_grouped(gid.AX_HistorischesFlurstueck): 

661 recs = gws.u.compact(self.record(ax) for ax in axs) 

662 if not recs: 

663 continue 

664 # For a historic FS, 'beginnt' is basically when the history beginnt 

665 # (see comments for AX_HistorischesFlurstueck in gid6). 

666 # we set F.endet=F.beginnt to designate this one as 'historic' 

667 for r in recs: 

668 r.endet = r.beginnt 

669 r.isHistoric = True 

670 self.om.Flurstueck.add(uid, recs) 

671 

672 for fs in self.om.Flurstueck: 

673 fs.flurstueckskennzeichen = fs.recs[-1].flurstueckskennzeichen 

674 fs.fsnummer = index.make_fsnummer(fs.recs[-1]) 

675 fs.x = fs.recs[-1].x 

676 fs.y = fs.recs[-1].y 

677 self.process_lage(fs) 

678 self.process_gebaeude(fs) 

679 self.process_buchung(fs) 

680 

681 # check the 'nachfolgerFlurstueckskennzeichen' array 

682 # and mark each referenced FS as a "vorgaenger" FS 

683 # It is a M:N relation, therefore 'vorgaengerFlurstueckskennzeichen' is also an array 

684 

685 knz_to_fs = { 

686 fs.flurstueckskennzeichen: fs 

687 for fs in self.om.Flurstueck 

688 } 

689 for fs in self.om.Flurstueck: 

690 nfs = fs.recs[-1].nachfolgerFlurstueckskennzeichen 

691 if not nfs: 

692 continue 

693 for nf_knz in nfs: 

694 nf_fs = knz_to_fs.get(nf_knz) 

695 if not nf_fs: 

696 gws.log.warning(f'ALKIS: nachfolgerFlurstueck missing {fs.flurstueckskennzeichen!r}->{nf_knz!r}') 

697 continue 

698 if not nf_fs.recs[-1].vorgaengerFlurstueckskennzeichen: 

699 nf_fs.recs[-1].vorgaengerFlurstueckskennzeichen = [] 

700 nf_fs.recs[-1].vorgaengerFlurstueckskennzeichen.append(fs.flurstueckskennzeichen) 

701 

702 def record(self, ax): 

703 r: dt.FlurstueckRecord = _from_ax( 

704 dt.FlurstueckRecord, 

705 ax, 

706 amtlicheFlaeche=ax.amtlicheFlaeche or 0, 

707 flurnummer=_str(ax.flurnummer), 

708 zaehler=_str(ax.flurstuecksnummer.zaehler), 

709 nenner=_str(ax.flurstuecksnummer.nenner), 

710 zustaendigeStelle=[self.rr.place.get_dienststelle(p) for p in (ax.zustaendigeStelle or [])], 

711 

712 _weistAuf=ax.weistAuf, 

713 _zeigtAuf=ax.zeigtAuf, 

714 _istGebucht=ax.istGebucht, 

715 _buchung=ax.buchung, 

716 ) 

717 

718 # basic data 

719 

720 r.gemarkung = self.rr.place.get_gemarkung(ax.gemarkung) 

721 r.gemeinde = self.rr.place.get_gemeinde(ax.gemeindezugehoerigkeit) 

722 r.regierungsbezirk = self.rr.place.get_regierungsbezirk(ax.gemeindezugehoerigkeit) 

723 r.kreis = self.rr.place.get_kreis(ax.gemeindezugehoerigkeit) 

724 r.land = self.rr.place.get_land(ax.gemeindezugehoerigkeit) 

725 

726 if self.rr.place.is_empty(r.gemarkung) or self.rr.place.is_empty(r.gemeinde): 

727 # exclude Flurstücke that refer to Gemeinde/Gemarkung 

728 # which do not exist in the reference AX tables 

729 self.counts['excluded'] += 1 

730 return None 

731 

732 # geometry 

733 

734 geom = _geom_of(r) 

735 if not geom: 

736 self.counts['no_geometry'] += 1 

737 return None 

738 

739 r.geomFlaeche = round(geom.area, 2) 

740 r.x = round(geom.centroid.x, 2) 

741 r.y = round(geom.centroid.y, 2) 

742 

743 self.counts['ok'] += 1 

744 return r 

745 

746 def process_lage(self, fs: dt.Flurstueck): 

747 fs.lageList = [] 

748 

749 # AX_Flurstueck.weistAuf -> AX_LagebezeichnungMitHausnummer 

750 # AX_Flurstueck.zeigtAuf -> AX_LagebezeichnungOhneHausnummer 

751 fs.lageList.extend(self.rr.lage.om.Lage.get_from_ptr(fs, '_weistAuf')) 

752 fs.lageList.extend(self.rr.lage.om.Lage.get_from_ptr(fs, '_zeigtAuf')) 

753 

754 def process_gebaeude(self, fs: dt.Flurstueck): 

755 ge_map = {} 

756 

757 for la in fs.lageList: 

758 for ge in la.gebaeudeList: 

759 ge_map[ge.uid] = ge 

760 

761 fs.gebaeudeList = list(ge_map.values()) 

762 fs.gebaeudeList.sort(key=_sortkey_gebaeude) 

763 

764 fs.gebaeudeAmtlicheFlaeche = sum(ge.recs[-1].amtlicheFlaeche for ge in fs.gebaeudeList if not ge.recs[-1].endet) 

765 fs.gebaeudeGeomFlaeche = sum(ge.recs[-1].geomFlaeche for ge in fs.gebaeudeList if not ge.recs[-1].endet) 

766 

767 def process_buchung(self, fs: dt.Flurstueck): 

768 bs_historic_map = {} 

769 bs_seen = set() 

770 buchung_map = {} 

771 

772 # for each Flurstück record, we collect all related Buchungsstellen (with respect to parent-child relations) 

773 # then group Buchungsstellen by their Buchungsblatt 

774 # and create Buchung objects for a FS 

775 

776 for r in fs.recs: 

777 hist_buchung = _pop(r, '_buchung') 

778 if hist_buchung: 

779 bs_list = self.historic_buchungsstelle_list(r, hist_buchung) 

780 else: 

781 bs_list = self.buchungsstelle_list(r) 

782 

783 for bs in bs_list: 

784 # a Buchungsstelle referred to by an expired Flurstück might not be expired itself, 

785 # so we have to track its state separately by wrapping it in a BuchungsstelleReference 

786 # a BuchungsstelleReference is historic if its Flurstück is 

787 bs_historic_map[bs.uid] = r.isHistoric 

788 

789 if bs.uid in bs_seen: 

790 continue 

791 bs_seen.add(bs.uid) 

792 

793 # populate Flurstück references in a Buchungsstelle 

794 if fs.uid not in bs.fsUids: 

795 bs.fsUids.append(fs.uid) 

796 bs.flurstueckskennzeichenList.append(fs.flurstueckskennzeichen) 

797 

798 # create Buchung records by grouping Buchungsstellen 

799 for bb_uid in bs.buchungsblattUids: 

800 bu = buchung_map.setdefault(bb_uid, dt.Buchung(recs=[], buchungsblattUid=bb_uid)) 

801 bu.recs.append(dt.BuchungsstelleReference(buchungsstelle=bs)) 

802 

803 fs.buchungList = list(buchung_map.values()) 

804 

805 for bu in fs.buchungList: 

806 for ref in bu.recs: 

807 ref.isHistoric = bs_historic_map[ref.buchungsstelle.uid] 

808 bu.isHistoric = all(ref.isHistoric for ref in bu.recs) 

809 

810 return fs 

811 

812 def historic_buchungsstelle_list(self, r: dt.FlurstueckRecord, hist_buchung): 

813 # an AX_HistorischesFlurstueck with a special 'buchung' reference 

814 

815 bs_list = [] 

816 

817 for bu in hist_buchung: 

818 bb = self.rr.buchung.buchungsblattkennzeichenMap.get(bu.buchungsblattkennzeichen) 

819 if not bb: 

820 continue 

821 # create a fake historic Buchungstelle 

822 bs_list.append(dt.Buchungsstelle( 

823 uid=bb.uid + '_' + bu.laufendeNummerDerBuchungsstelle, 

824 recs=[ 

825 dt.BuchungsstelleRecord( 

826 endet=r.endet, 

827 laufendeNummer=bu.laufendeNummerDerBuchungsstelle, 

828 isHistoric=True, 

829 ) 

830 ], 

831 buchungsblattUids=[bb.uid], 

832 buchungsblattkennzeichenList=[bb.buchungsblattkennzeichen], 

833 parentUids=[], 

834 childUids=[], 

835 fsUids=[], 

836 parentkennzeichenList=[], 

837 flurstueckskennzeichenList=[], 

838 laufendeNummer=bu.laufendeNummerDerBuchungsstelle, 

839 isHistoric=True, 

840 )) 

841 

842 return bs_list 

843 

844 def buchungsstelle_list(self, r: dt.FlurstueckRecord): 

845 # AX_Flurstueck.istGebucht -> AX_Buchungsstelle 

846 

847 this_bs = self.rr.buchung.om.Buchungsstelle.get(_pop(r, '_istGebucht')) 

848 if not this_bs: 

849 return [] 

850 

851 bs_list = [] 

852 

853 # A Flurstück points to a Buchungsstelle (F.istGebucht -> B). 

854 # A Buchungsstelle can have parent (B.an -> parent.uid) and child (child.an -> B.uid) Buchungsstellen 

855 # (these references are populated in _BuchungIndexer above). 

856 # Our task here, given F.istGebucht -> B, collect B's parents and children 

857 # These are Buchungsstellen that directly or indirectly mention the current Flurstück. 

858 

859 queue: list[dt.Buchungsstelle] = [this_bs] 

860 while queue: 

861 bs = queue.pop(0) 

862 bs_list.insert(0, bs) 

863 for uid in bs.parentUids: 

864 queue.append(self.rr.buchung.om.Buchungsstelle.get(uid)) 

865 

866 # remove this_bs 

867 bs_list.pop() 

868 

869 queue: list[dt.Buchungsstelle] = [this_bs] 

870 while queue: 

871 bs = queue.pop(0) 

872 bs_list.append(bs) 

873 # sort related (child) Buchungsstellen by their BB-Kennzeichen 

874 child_bs_list = self.rr.buchung.om.Buchungsstelle.get_many(bs.childUids) 

875 child_bs_list.sort(key=_sortkey_buchungsstelle_by_bblatt) 

876 queue.extend(child_bs_list) 

877 

878 # if len(bs_list) > 1: 

879 # gws.log.debug(f'bs chain: {r.uid=} {this_bs.uid=} {[bs.uid for bs in bs_list]}') 

880 

881 return bs_list 

882 

883 def write(self): 

884 values = [] 

885 

886 for fs in self.om.Flurstueck: 

887 geoms = [_pop(r, 'geom') for r in fs.recs] 

888 data = index.serialize(fs) 

889 for r, g in zip(fs.recs, geoms): 

890 r.geom = g 

891 

892 values.append(dict( 

893 uid=fs.uid, 

894 rc=len(fs.recs), 

895 fshistoric=fs.isHistoric, 

896 data=data, 

897 geom=geoms[-1], 

898 )) 

899 

900 self.write_table(index.TABLE_FLURSTUECK, values) 

901 

902 

903class _FsIndexIndexer(_Indexer): 

904 entries = { 

905 index.TABLE_INDEXFLURSTUECK: [], 

906 index.TABLE_INDEXLAGE: [], 

907 index.TABLE_INDEXBUCHUNGSBLATT: [], 

908 index.TABLE_INDEXPERSON: [], 

909 index.TABLE_INDEXGEOM: [], 

910 } 

911 

912 def collect(self): 

913 with ProgressIndicator(f'ALKIS: creating indexes', len(self.rr.fsdata.om.Flurstueck)) as progress: 

914 for fs in self.rr.fsdata.om.Flurstueck: 

915 for r in fs.recs: 

916 self.add(fs, r) 

917 progress.update(1) 

918 

919 def add(self, fs: dt.Flurstueck, r: dt.FlurstueckRecord): 

920 base = dict( 

921 fs=r.uid, 

922 fshistoric=r.isHistoric, 

923 ) 

924 

925 places = dict( 

926 land=r.land.text, 

927 land_t=index.text_key(r.land.text), 

928 landcode=r.land.code, 

929 

930 regierungsbezirk=r.regierungsbezirk.text, 

931 regierungsbezirk_t=index.text_key(r.regierungsbezirk.text), 

932 regierungsbezirkcode=r.regierungsbezirk.code, 

933 

934 kreis=r.kreis.text, 

935 kreis_t=index.text_key(r.kreis.text), 

936 kreiscode=r.kreis.code, 

937 

938 gemeinde=r.gemeinde.text, 

939 gemeinde_t=index.text_key(r.gemeinde.text), 

940 gemeindecode=r.gemeinde.code, 

941 

942 gemarkung=r.gemarkung.text, 

943 gemarkung_t=index.text_key(r.gemarkung.text), 

944 gemarkungcode=r.gemarkung.code, 

945 

946 ) 

947 

948 self.entries[index.TABLE_INDEXFLURSTUECK].append(dict( 

949 **base, 

950 **places, 

951 

952 amtlicheflaeche=r.amtlicheFlaeche, 

953 geomflaeche=r.geomFlaeche, 

954 

955 flurnummer=r.flurnummer, 

956 zaehler=r.zaehler, 

957 nenner=r.nenner, 

958 flurstuecksfolge=r.flurstuecksfolge, 

959 flurstueckskennzeichen=r.flurstueckskennzeichen, 

960 

961 x=r.x, 

962 y=r.y, 

963 )) 

964 

965 self.entries[index.TABLE_INDEXGEOM].append(dict( 

966 **base, 

967 geomflaeche=r.geomFlaeche, 

968 x=r.x, 

969 y=r.y, 

970 geom=r.geom, 

971 )) 

972 

973 for la in fs.lageList: 

974 for la_r in la.recs: 

975 self.entries[index.TABLE_INDEXLAGE].append(dict( 

976 **base, 

977 **places, 

978 lageuid=la_r.uid, 

979 lagehistoric=la_r.isHistoric, 

980 strasse=la_r.strasse, 

981 strasse_t=index.strasse_key(la_r.strasse), 

982 hausnummer=la_r.hausnummer, 

983 x=la.x or r.x, 

984 y=la.y or r.y, 

985 )) 

986 

987 for bu in fs.buchungList: 

988 bb = self.rr.buchung.om.Buchungsblatt.get(bu.buchungsblattUid) 

989 

990 for bb_r in bb.recs: 

991 self.entries[index.TABLE_INDEXBUCHUNGSBLATT].append(dict( 

992 **base, 

993 buchungsblattuid=bb_r.uid, 

994 buchungsblattkennzeichen=bb_r.buchungsblattkennzeichen, 

995 buchungsblatthistoric=bu.isHistoric, 

996 )) 

997 

998 pe_uids = set() 

999 

1000 for nn in bb.namensnummerList: 

1001 for pe in nn.personList: 

1002 if pe.uid in pe_uids: 

1003 continue 

1004 pe_uids.add(pe.uid) 

1005 for pe_r in pe.recs: 

1006 self.entries[index.TABLE_INDEXPERSON].append(dict( 

1007 **base, 

1008 personuid=pe_r.uid, 

1009 personhistoric=pe_r.isHistoric, 

1010 name=pe_r.nachnameOderFirma, 

1011 name_t=index.text_key(pe_r.nachnameOderFirma), 

1012 vorname=pe_r.vorname, 

1013 vorname_t=index.text_key(pe_r.vorname), 

1014 )) 

1015 

1016 def write(self): 

1017 for table_id, values in self.entries.items(): 

1018 if not self.ix.has_table(table_id): 

1019 for n, v in enumerate(values, 1): 

1020 v['n'] = n 

1021 self.write_table(table_id, values) 

1022 

1023 

1024class _Runner: 

1025 def __init__(self, ix: index.Object, reader: dt.Reader, with_cache=False): 

1026 self.ix: index.Object = ix 

1027 self.reader: dt.Reader = reader 

1028 

1029 self.withCache = with_cache 

1030 self.cacheDir = gws.c.CACHE_DIR + '/alkis' 

1031 if self.withCache: 

1032 gws.u.ensure_dir(self.cacheDir) 

1033 

1034 self.place = _PlaceIndexer(self) 

1035 self.lage = _LageIndexer(self) 

1036 self.buchung = _BuchungIndexer(self) 

1037 self.part = _PartIndexer(self) 

1038 self.fsdata = _FsDataIndexer(self) 

1039 self.fsindex = _FsIndexIndexer(self) 

1040 

1041 self.initMemory = gws.lib.osx.process_rss_size() 

1042 

1043 def run(self): 

1044 with self.ix.db.connect() as conn: 

1045 with ProgressIndicator(f'ALKIS: indexing'): 

1046 self.place.load_or_collect() 

1047 self.memory_info() 

1048 

1049 self.buchung.load_or_collect() 

1050 self.memory_info() 

1051 

1052 self.lage.load_or_collect() 

1053 self.memory_info() 

1054 

1055 self.fsdata.load_or_collect() 

1056 gws.log.info(f'ALKIS: fs counts: {self.fsdata.counts}') 

1057 self.memory_info() 

1058 

1059 self.part.load_or_collect() 

1060 self.memory_info() 

1061 

1062 self.fsindex.collect() 

1063 self.memory_info() 

1064 

1065 self.place.write() 

1066 self.buchung.write() 

1067 self.lage.write() 

1068 self.fsdata.write() 

1069 self.part.write() 

1070 self.fsindex.write() 

1071 

1072 def memory_info(self): 

1073 v = gws.lib.osx.process_rss_size() - self.initMemory 

1074 if v > 0: 

1075 gws.log.info(f'ALKIS: memory used: {v:.2f} MB', stacklevel=2) 

1076 

1077 def read_flat(self, cls): 

1078 cpath = self.cacheDir + '/flat_' + cls.__name__ 

1079 if self.withCache and gws.u.is_file(cpath): 

1080 return gws.u.unserialize_from_path(cpath) 

1081 

1082 rs = self._read_flat(cls) 

1083 if self.withCache: 

1084 gws.u.serialize_to_path(rs, cpath) 

1085 

1086 return rs 

1087 

1088 def _read_flat(self, cls): 

1089 cnt = self.reader.count(cls) 

1090 if cnt <= 0: 

1091 gws.log.warning(f'ALKIS: read {cls.__name__}: empty table') 

1092 return [] 

1093 

1094 rs = [] 

1095 with ProgressIndicator(f'ALKIS: read {cls.__name__}', cnt) as progress: 

1096 for ax in self.reader.read_all(cls): 

1097 rs.append(ax) 

1098 progress.update(1) 

1099 return rs 

1100 

1101 def read_grouped(self, cls): 

1102 cpath = self.cacheDir + '/grouped_' + cls.__name__ 

1103 if self.withCache and gws.u.is_file(cpath): 

1104 return gws.u.unserialize_from_path(cpath) 

1105 

1106 rs = self._read_grouped(cls) 

1107 if self.withCache: 

1108 gws.u.serialize_to_path(rs, cpath) 

1109 

1110 return rs 

1111 

1112 def _read_grouped(self, cls): 

1113 cnt = self.reader.count(cls) 

1114 if cnt <= 0: 

1115 gws.log.warning(f'ALKIS: read {cls.__name__}: empty table') 

1116 return [] 

1117 

1118 groups = {} 

1119 with ProgressIndicator(f'ALKIS: read {cls.__name__}', cnt) as progress: 

1120 for ax in self.reader.read_all(cls): 

1121 groups.setdefault(ax.identifikator, []).append(ax) 

1122 progress.update(1) 

1123 for g in groups.values(): 

1124 g.sort(key=_sortkey_lebenszeitintervall) 

1125 

1126 return list(groups.items()) 

1127 

1128 def props_from(self, ax, atts): 

1129 d = {} 

1130 

1131 for a in atts: 

1132 v = getattr(ax, a['name'], None) 

1133 if isinstance(v, gid.Object): 

1134 # @TODO handle properties which are objects 

1135 continue 

1136 if dtx.is_date(v): 

1137 v = dtx.to_string('%d.%m.%Y', v) 

1138 if not gws.u.is_empty(v): 

1139 d[a['name']] = v 

1140 

1141 return dt.Object(**d) 

1142 

1143 

1144def _from_ax(cls, ax, **kwargs): 

1145 d = {} 

1146 

1147 if ax: 

1148 for k in cls.__annotations__: 

1149 v = getattr(ax, k, None) 

1150 if v: 

1151 d[k] = v 

1152 

1153 d['uid'] = ax.identifikator 

1154 d['beginnt'] = ax.lebenszeitintervall.beginnt 

1155 d['endet'] = ax.lebenszeitintervall.endet 

1156 d['isHistoric'] = d['endet'] is not None 

1157 if ax.anlass and ax.anlass[0].code != '000000': 

1158 d['anlass'] = ax.anlass[0] 

1159 if ax.geom: 

1160 d['geom'] = ax.geom 

1161 

1162 d.update(kwargs) 

1163 return cls(**d) 

1164 

1165 

1166def _anteil(ax): 

1167 try: 

1168 z = float(ax.anteil.zaehler) 

1169 z = str(int(z) if z.is_integer() else z) 

1170 n = float(ax.anteil.nenner) 

1171 n = str(int(n) if n.is_integer() else n) 

1172 return z + '/' + n 

1173 except (AttributeError, ValueError, TypeError): 

1174 pass 

1175 

1176 

1177def _meta_attributes(meta): 

1178 return sorted( 

1179 [a for a in meta['attributes'] if a['name'] in dt.PROPS], 

1180 key=lambda a: a['title'] 

1181 ) 

1182 

1183 

1184def _geom_of(o): 

1185 if not o.geom: 

1186 gws.log.warning(f'{o.__class__.__name__}:{o.uid}: no geometry') 

1187 return 

1188 return shapely.wkb.loads(o.geom, hex=True) 

1189 

1190 

1191def _pop(obj, attr): 

1192 v = getattr(obj, attr, None) 

1193 try: 

1194 delattr(obj, attr) 

1195 except AttributeError: 

1196 pass 

1197 return v 

1198 

1199 

1200def _sortkey_beginnt(o): 

1201 return o.beginnt 

1202 

1203 

1204def _sortkey_lebenszeitintervall(o): 

1205 return o.lebenszeitintervall.beginnt 

1206 

1207 

1208def _sortkey_namensnummer(nn: dt.Namensnummer): 

1209 return _natkey(nn.recs[-1].laufendeNummerNachDIN1421), nn.recs[-1].beginnt 

1210 

1211 

1212def _sortkey_buchungsstelle(bs: dt.Buchungsstelle): 

1213 return _natkey(bs.recs[-1].laufendeNummer), bs.recs[-1].beginnt 

1214 

1215 

1216def _sortkey_buchungsstelle_by_bblatt(bs: dt.Buchungsstelle): 

1217 return bs.buchungsblattkennzeichenList[0], bs.recs[-1].beginnt 

1218 

1219 

1220def _sortkey_part(pa: dt.Part): 

1221 return pa.name.text, -pa.geomFlaeche 

1222 

1223 

1224def _sortkey_gebaeude(ge: dt.Gebaeude): 

1225 # sort Gebaeude by area (big->small) 

1226 return ge.recs[-1].beginnt, -ge.recs[-1].geomFlaeche 

1227 

1228 

1229def _natkey(v): 

1230 if not v: 

1231 return [] 

1232 return [ 

1233 '{:080d}'.format(int(digits)) if digits else chars.lower() 

1234 for digits, chars in re.findall(r'(\d+)|(\D+)', v.strip()) 

1235 ] 

1236 

1237 

1238def _comma(a): 

1239 return ','.join(str(s) if s is not None else '' for s in a) 

1240 

1241 

1242def _str(x): 

1243 return None if x is None else str(x)