Coverage for gws-app / gws / plugin / alkis / data / indexer.py: 0%

673 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-03 10:12 +0100

1from typing import Optional, Iterable 

2 

3import re 

4from typing import Generic, TypeVar 

5 

6import shapely 

7import shapely.strtree 

8import shapely.wkb 

9 

10import gws 

11import gws.lib.osx 

12import gws.lib.datetimex as dtx 

13from gws.lib.cli import ProgressIndicator 

14import gws.plugin.postgres.provider 

15 

16from . import types as dt 

17from . import index 

18from . import norbit6 

19 

20from .geo_info_dok import gid6 as gid 

21 

22 

23def run(ix: index.Object, data_schema: str, with_force=False, with_cache=False): 

24 if not ix.has_schema(): 

25 raise gws.Error(f'ALKIS: schema {ix.schema!r} does not exist') 

26 

27 if with_force: 

28 ix.drop() 

29 elif ix.status().complete: 

30 return 

31 

32 rdr = norbit6.Object(ix.db, schema=data_schema) 

33 rr = _Runner(ix, rdr, with_cache) 

34 rr.run() 

35 

36 

37## 

38 

39T = TypeVar("T") 

40 

41 

42class _ObjectDict(Generic[T]): 

43 def __init__(self, cls): 

44 self.d = {} 

45 self.cls = cls 

46 

47 def add(self, uid, recs) -> T: 

48 o = self.cls(uid=uid, recs=recs) 

49 o.isHistoric = all(r.isHistoric for r in recs) 

50 self.d[o.uid] = o 

51 return o 

52 

53 def get(self, uid, default=None) -> Optional[T]: 

54 return self.d.get(uid, default) 

55 

56 def get_many(self, uids) -> list[T]: 

57 res = {} 

58 

59 for uid in uids: 

60 if uid not in res: 

61 o = self.d.get(uid) 

62 if o: 

63 res[uid] = o 

64 

65 return list(res.values()) 

66 

67 def get_from_ptr(self, obj: dt.Entity, attr): 

68 uids = [] 

69 

70 for r in obj.recs: 

71 v = _pop(r, attr) 

72 if isinstance(v, list): 

73 uids.extend(v) 

74 elif isinstance(v, str): 

75 uids.append(v) 

76 

77 return self.get_many(uids) 

78 

79 def __iter__(self) -> Iterable[T]: 

80 yield from self.d.values() 

81 

82 def __len__(self): 

83 return len(self.d) 

84 

85 

86class _ObjectMap: 

87 

88 def __init__(self): 

89 self.Anschrift: _ObjectDict[dt.Anschrift] = _ObjectDict(dt.Anschrift) 

90 self.Buchungsblatt: _ObjectDict[dt.Buchungsblatt] = _ObjectDict(dt.Buchungsblatt) 

91 self.Buchungsstelle: _ObjectDict[dt.Buchungsstelle] = _ObjectDict(dt.Buchungsstelle) 

92 self.Flurstueck: _ObjectDict[dt.Flurstueck] = _ObjectDict(dt.Flurstueck) 

93 self.Gebaeude: _ObjectDict[dt.Gebaeude] = _ObjectDict(dt.Gebaeude) 

94 self.Lage: _ObjectDict[dt.Lage] = _ObjectDict(dt.Lage) 

95 self.Namensnummer: _ObjectDict[dt.Namensnummer] = _ObjectDict(dt.Namensnummer) 

96 self.Part: _ObjectDict[dt.Part] = _ObjectDict(dt.Part) 

97 self.Person: _ObjectDict[dt.Person] = _ObjectDict(dt.Person) 

98 

99 self.placeAll: dict = {} 

100 self.placeIdx: dict = {} 

101 self.catalog: dict = {} 

102 

103 

104class _Indexer: 

105 CACHE_KEY: str = '' 

106 

107 def __init__(self, runner: '_Runner'): 

108 self.rr = runner 

109 self.ix: index.Object = runner.ix 

110 self.om = _ObjectMap() 

111 

112 def load_or_collect(self): 

113 if not self.load_cache(): 

114 self.collect() 

115 self.store_cache() 

116 

117 def load_cache(self): 

118 if not self.rr.withCache or not self.CACHE_KEY: 

119 return False 

120 cpath = self.rr.cacheDir + '/' + self.CACHE_KEY 

121 if not gws.u.is_file(cpath): 

122 return False 

123 om = gws.u.unserialize_from_path(cpath) 

124 if not om: 

125 return False 

126 gws.log.info(f'ALKIS: use cache {self.CACHE_KEY!r}') 

127 self.om = om 

128 return True 

129 

130 def store_cache(self): 

131 if not self.rr.withCache or not self.CACHE_KEY: 

132 return 

133 cpath = self.rr.cacheDir + '/' + self.CACHE_KEY 

134 gws.u.serialize_to_path(self.om, cpath) 

135 gws.log.info(f'ALKIS: store cache {self.CACHE_KEY!r}') 

136 

137 def collect(self): 

138 pass 

139 

140 def write_table(self, table_id, values): 

141 if self.ix.has_table(table_id): 

142 return 

143 with ProgressIndicator(f'ALKIS: write {table_id!r}', len(values)) as progress: 

144 self.ix.create_table(table_id, values, progress) 

145 

146 def write(self): 

147 pass 

148 

149 

150class _PlaceIndexer(_Indexer): 

151 """Index places (Administration- und Verwaltungseinheiten). 

152 

153 References: https://de.wikipedia.org/wiki/Amtlicher_Gemeindeschl%C3%BCssel 

154 """ 

155 

156 CACHE_KEY = 'obj_place' 

157 

158 empty1 = dt.EnumPair(code='0', text='') 

159 empty2 = dt.EnumPair(code='00', text='') 

160 

161 def add(self, kind, ax, key_obj, **kwargs): 

162 if ax.endet: 

163 return 

164 

165 code = self.code(kind, key_obj) 

166 value = dt.EnumPair(code, ax.bezeichnung) 

167 

168 p = dt.Place(**kwargs) 

169 

170 p.uid = kind + code 

171 p.kind = kind 

172 setattr(p, kind, value) 

173 

174 self.om.placeAll[p.uid] = p 

175 self.om.placeIdx[p.uid] = value 

176 

177 return value 

178 

179 def collect(self): 

180 self.om.placeAll = {} 

181 self.om.placeIdx = {} 

182 

183 for ax in self.rr.read_flat(gid.AX_Bundesland): 

184 self.add('land', ax, ax.schluessel) 

185 

186 for ax in self.rr.read_flat(gid.AX_Regierungsbezirk): 

187 o = ax.schluessel 

188 self.add('regierungsbezirk', ax, o, land=self.get_land(o)) 

189 

190 for ax in self.rr.read_flat(gid.AX_KreisRegion): 

191 o = ax.schluessel 

192 self.add('kreis', ax, o, land=self.get_land(o), regierungsbezirk=self.get_regierungsbezirk(o)) 

193 

194 for ax in self.rr.read_flat(gid.AX_Gemeinde): 

195 o = ax.gemeindekennzeichen 

196 self.add('gemeinde', ax, o, land=self.get_land(o), regierungsbezirk=self.get_regierungsbezirk(o), kreis=self.get_kreis(o)) 

197 

198 # @TODO map Gemarkung to Gemeinde (see https://de.wikipedia.org/wiki/Liste_der_Gemarkungen_in_Nordrhein-Westfalen etc) 

199 

200 for ax in self.rr.read_flat(gid.AX_Gemarkung): 

201 if str(ax.schluessel.gemarkungsnummer) in self.ix.excludeGemarkung: 

202 continue 

203 o = ax.schluessel 

204 self.add('gemarkung', ax, o, land=self.get_land(o)) 

205 

206 for ax in self.rr.read_flat(gid.AX_Buchungsblattbezirk): 

207 o = ax.schluessel 

208 self.add('buchungsblattbezirk', ax, o, land=self.get_land(o)) 

209 

210 for ax in self.rr.read_flat(gid.AX_Dienststelle): 

211 o = ax.schluessel 

212 self.add('dienststelle', ax, o, land=self.get_land(o)) 

213 

214 def write(self): 

215 values = [] 

216 

217 for place in self.om.placeAll.values(): 

218 values.append(dict( 

219 uid=place.uid, 

220 data=index.serialize(place), 

221 )) 

222 

223 self.write_table(index.TABLE_PLACE, values) 

224 

225 def get_land(self, o): 

226 return self.get('land', o) or self.empty2 

227 

228 def get_regierungsbezirk(self, o): 

229 return self.get('regierungsbezirk', o) or self.empty1 

230 

231 def get_kreis(self, o): 

232 return self.get('kreis', o) or self.empty2 

233 

234 def get_gemeinde(self, o): 

235 return self.get('gemeinde', o) or self.empty1 

236 

237 def get_gemarkung(self, o): 

238 return self.get('gemarkung', o) or self.empty1 

239 

240 def get_buchungsblattbezirk(self, o): 

241 return self.get('buchungsblattbezirk', o) or self.empty1 

242 

243 def get_dienststelle(self, o): 

244 return self.get('dienststelle', o) or self.empty1 

245 

246 def get(self, kind, o): 

247 return self.om.placeIdx.get(kind + self.code(kind, o)) 

248 

249 def is_empty(self, p: dt.EnumPair): 

250 return p.code == '0' or p.code == '00' 

251 

252 CODES = { 

253 'land': lambda o: o.land, 

254 'regierungsbezirk': lambda o: o.land + (o.regierungsbezirk or '0'), 

255 'kreis': lambda o: o.land + (o.regierungsbezirk or '0') + o.kreis, 

256 'gemeinde': lambda o: o.land + (o.regierungsbezirk or '0') + o.kreis + o.gemeinde, 

257 'gemarkung': lambda o: o.land + o.gemarkungsnummer, 

258 'buchungsblattbezirk': lambda o: o.land + o.bezirk, 

259 'dienststelle': lambda o: o.land + o.stelle, 

260 } 

261 

262 def code(self, kind, o): 

263 return self.CODES[kind](o) 

264 

265 

266class _LageIndexer(_Indexer): 

267 CACHE_KEY = 'obj_lage' 

268 

269 def collect(self): 

270 for ax in self.rr.read_flat(gid.AX_LagebezeichnungKatalogeintrag): 

271 self.om.catalog[self.lage_key(ax.schluessel)] = ax.bezeichnung 

272 

273 for cls in (gid.AX_LagebezeichnungMitHausnummer, gid.AX_LagebezeichnungOhneHausnummer): 

274 for uid, axs in self.rr.read_grouped(cls): 

275 self.om.Lage.add(uid, [ 

276 _from_ax( 

277 dt.LageRecord, 

278 ax, 

279 strasse=self.strasse(ax), 

280 hausnummer=index.normalize_hausnummer(ax.hausnummer), 

281 ) 

282 for ax in axs 

283 ]) 

284 

285 # use the PTO (art=HNR) geometry for lage coordinates 

286 # PTO.dientZurDarstellungVon -> lage.uid 

287 

288 for pto in self.rr.read_flat(gid.AP_PTO): 

289 if pto.lebenszeitintervall.endet is not None: 

290 continue 

291 art = _pop(pto, 'art') or '' 

292 if art.upper() != 'HNR': 

293 continue 

294 

295 uids = _pop(pto, 'dientZurDarstellungVon') 

296 if not uids or not isinstance(uids, list): 

297 continue 

298 

299 geom = _geom_of(pto) 

300 if not geom: 

301 continue 

302 

303 x = geom.centroid.x 

304 y = geom.centroid.y 

305 for la in self.om.Lage.get_many(uids): 

306 la.x = x 

307 la.y = y 

308 

309 # read related Gebaeude records 

310 

311 atts = _meta_attributes(gid.METADATA['AX_Gebaeude']) 

312 

313 for uid, axs in self.rr.read_grouped(gid.AX_Gebaeude): 

314 self.om.Gebaeude.add(uid, [ 

315 _from_ax( 

316 dt.GebaeudeRecord, 

317 ax, 

318 name=', '.join(ax.name) if ax.name else None, 

319 amtlicheFlaeche=ax.grundflaeche or 0, 

320 props=self.rr.props_from(ax, atts), 

321 _zeigtAuf=ax.zeigtAuf, 

322 ) 

323 for ax in axs 

324 ]) 

325 

326 for ge in self.om.Gebaeude: 

327 for r in ge.recs: 

328 geom = _geom_of(r) 

329 r.geomFlaeche = round(geom.area, 2) if geom else 0 

330 

331 # omit Gebaeude geometries for now 

332 for ge in self.om.Gebaeude: 

333 for r in ge.recs: 

334 _pop(r, 'geom') 

335 

336 for la in self.om.Lage: 

337 la.gebaeudeList = [] 

338 

339 # AX_Gebaeude.zeigtAuf -> AX_LagebezeichnungMitHausnummer 

340 for ge in self.om.Gebaeude: 

341 for la in self.om.Lage.get_from_ptr(ge, '_zeigtAuf'): 

342 la.gebaeudeList.append(ge) 

343 

344 def strasse(self, ax): 

345 if isinstance(ax.lagebezeichnung, str): 

346 return ax.lagebezeichnung 

347 return self.om.catalog.get(self.lage_key(ax.lagebezeichnung), '') 

348 

349 def lage_key(self, r): 

350 return _comma([ 

351 getattr(r, 'land'), 

352 getattr(r, 'regierungsbezirk'), 

353 getattr(r, 'kreis'), 

354 getattr(r, 'gemeinde'), 

355 getattr(r, 'lage'), 

356 ]) 

357 

358 def write(self): 

359 values = [] 

360 

361 for la in self.om.Lage: 

362 values.append(dict( 

363 uid=la.uid, 

364 rc=len(la.recs), 

365 data=index.serialize(la), 

366 )) 

367 

368 self.write_table(index.TABLE_LAGE, values) 

369 

370 

371class _BuchungIndexer(_Indexer): 

372 CACHE_KEY = 'obj_buchungsblatt' 

373 

374 buchungsblattkennzeichenMap: dict[str, dt.Buchungsblatt] = {} 

375 

376 def collect(self): 

377 for uid, axs in self.rr.read_grouped(gid.AX_Anschrift): 

378 self.om.Anschrift.add(uid, [ 

379 _from_ax( 

380 dt.AnschriftRecord, 

381 ax, 

382 ort=ax.ort_AmtlichesOrtsnamensverzeichnis or ax.ort_Post, 

383 plz=ax.postleitzahlPostzustellung, 

384 telefon=ax.telefon[0] if ax.telefon else None 

385 ) 

386 for ax in axs 

387 ]) 

388 

389 for uid, axs in self.rr.read_grouped(gid.AX_Person): 

390 self.om.Person.add(uid, [ 

391 _from_ax( 

392 dt.PersonRecord, 

393 ax, 

394 anrede=ax.anrede.text if ax.anrede else None, 

395 _hat=ax.hat, 

396 ) 

397 for ax in axs 

398 ]) 

399 

400 # AX_Person.hat -> [AX_Anschrift] 

401 for pe in self.om.Person: 

402 pe.anschriftList = self.om.Anschrift.get_from_ptr(pe, '_hat') 

403 

404 for uid, axs in self.rr.read_grouped(gid.AX_Namensnummer): 

405 self.om.Namensnummer.add(uid, [ 

406 _from_ax( 

407 dt.NamensnummerRecord, 

408 ax, 

409 anteil=_anteil(ax), 

410 _benennt=ax.benennt, 

411 _istBestandteilVon=ax.istBestandteilVon 

412 ) 

413 for ax in axs 

414 ]) 

415 

416 # AX_Namensnummer.benennt -> AX_Person 

417 for nn in self.om.Namensnummer: 

418 nn.laufendeNummer = nn.recs[-1].laufendeNummerNachDIN1421 

419 nn.personList = self.om.Person.get_from_ptr(nn, '_benennt') 

420 

421 for uid, axs in self.rr.read_grouped(gid.AX_Buchungsstelle): 

422 self.om.Buchungsstelle.add(uid, [ 

423 _from_ax( 

424 dt.BuchungsstelleRecord, 

425 ax, 

426 anteil=_anteil(ax), 

427 _an=ax.an, 

428 _zu=ax.zu, 

429 _istBestandteilVon=ax.istBestandteilVon, 

430 ) 

431 for ax in axs 

432 ]) 

433 

434 for bs in self.om.Buchungsstelle: 

435 bs.laufendeNummer = bs.recs[-1].laufendeNummer 

436 bs.fsUids = [] 

437 bs.flurstueckskennzeichenList = [] 

438 

439 for uid, axs in self.rr.read_grouped(gid.AX_Buchungsblatt): 

440 self.om.Buchungsblatt.add(uid, [ 

441 _from_ax( 

442 dt.BuchungsblattRecord, 

443 ax, 

444 buchungsblattbezirk=self.rr.place.get_buchungsblattbezirk(ax.buchungsblattbezirk), 

445 ) 

446 for ax in axs 

447 ]) 

448 

449 for bb in self.om.Buchungsblatt: 

450 bb.buchungsstelleList = [] 

451 bb.namensnummerList = [] 

452 bb.buchungsblattkennzeichen = bb.recs[-1].buchungsblattkennzeichen 

453 self.buchungsblattkennzeichenMap[bb.buchungsblattkennzeichen] = bb 

454 

455 # AX_Buchungsstelle.istBestandteilVon -> AX_Buchungsblatt 

456 for bs in self.om.Buchungsstelle: 

457 bb_list = self.om.Buchungsblatt.get_from_ptr(bs, '_istBestandteilVon') 

458 bs.buchungsblattUids = [bb.uid for bb in bb_list] 

459 bs.buchungsblattkennzeichenList = [bb.buchungsblattkennzeichen for bb in bb_list] 

460 for bb in bb_list: 

461 bb.buchungsstelleList.append(bs) 

462 

463 # AX_Namensnummer.istBestandteilVon -> AX_Buchungsblatt 

464 for nn in self.om.Namensnummer: 

465 bb_list = self.om.Buchungsblatt.get_from_ptr(nn, '_istBestandteilVon') 

466 nn.buchungsblattUids = [bb.uid for bb in bb_list] 

467 nn.buchungsblattkennzeichenList = [bb.buchungsblattkennzeichen for bb in bb_list] 

468 for bb in bb_list: 

469 bb.namensnummerList.append(nn) 

470 

471 for bb in self.om.Buchungsblatt: 

472 bb.buchungsstelleList.sort(key=_sortkey_buchungsstelle) 

473 bb.namensnummerList.sort(key=_sortkey_namensnummer) 

474 

475 # AX_Buchungsstelle.an -> [AX_Buchungsstelle] 

476 # AX_Buchungsstelle.zu -> [AX_Buchungsstelle] 

477 # see Erläuterungen zu ALKIS Version 6, page 116-119 

478 

479 for bs in self.om.Buchungsstelle: 

480 bs.childUids = [] 

481 bs.parentUids = [] 

482 bs.parentkennzeichenList = [] 

483 

484 for bs in self.om.Buchungsstelle: 

485 parent_uids = set() 

486 parent_knz = set() 

487 

488 for r in bs.recs: 

489 parent_uids.update(_pop(r, '_an')) 

490 parent_uids.update(_pop(r, '_zu')) 

491 

492 for parent_bs in self.om.Buchungsstelle.get_many(parent_uids): 

493 parent_bs.childUids.append(bs.uid) 

494 bs.parentUids.append(parent_bs.uid) 

495 for bb_knz in parent_bs.buchungsblattkennzeichenList: 

496 parent_knz.add(bb_knz + '.' + parent_bs.laufendeNummer) 

497 

498 bs.parentkennzeichenList = sorted(parent_knz) 

499 

500 def write(self): 

501 values = [] 

502 

503 for bb in self.om.Buchungsblatt: 

504 values.append(dict( 

505 uid=bb.uid, 

506 rc=len(bb.recs), 

507 data=index.serialize(bb), 

508 )) 

509 

510 self.write_table(index.TABLE_BUCHUNGSBLATT, values) 

511 

512 

513class _PartIndexer(_Indexer): 

514 CACHE_KEY = 'obj_part' 

515 MIN_PART_AREA = 1 

516 

517 parts: list[dt.Part] = [] 

518 

519 fs_list = [] 

520 fs_geom = [] 

521 

522 stree: shapely.strtree.STRtree 

523 

524 def collect(self): 

525 

526 for kind in dt.Part.KIND: 

527 self.collect_kind(kind) 

528 

529 for fs in self.rr.fsdata.om.Flurstueck: 

530 self.fs_list.append(fs) 

531 # NB take only the most recent fs geometry into account 

532 self.fs_geom.append(_geom_of(fs.recs[-1])) 

533 

534 self.stree = shapely.strtree.STRtree(self.fs_geom) 

535 

536 with ProgressIndicator(f'ALKIS: parts', len(self.om.Part)) as progress: 

537 for pa in self.om.Part: 

538 self.compute_intersections(pa) 

539 progress.update(1) 

540 

541 self.parts.sort(key=_sortkey_part) 

542 

543 for pa in self.parts: 

544 pa.isHistoric = all(r.isHistoric for r in pa.recs) 

545 

546 def collect_kind(self, kind): 

547 _, key = dt.Part.KIND[kind] 

548 classes = [ 

549 getattr(gid, meta['name']) 

550 for meta in gid.METADATA.values() 

551 if ( 

552 meta['kind'] == 'object' 

553 and meta['geom'] 

554 and re.search(key + r'/\w+/', meta['key']) 

555 ) 

556 ] 

557 

558 for cls in classes: 

559 self.collect_class(kind, cls) 

560 

561 def collect_class(self, kind, cls): 

562 meta = gid.METADATA[cls.__name__] 

563 atts = _meta_attributes(meta) 

564 

565 for uid, axs in self.rr.read_grouped(cls): 

566 pa = self.om.Part.add(uid, [ 

567 _from_ax( 

568 dt.PartRecord, 

569 ax, 

570 props=self.rr.props_from(ax, atts), 

571 ) 

572 for ax in axs 

573 ]) 

574 pa.kind = kind 

575 pa.name = dt.EnumPair(meta['uid'], meta['title']) 

576 

577 def compute_intersections(self, pa: dt.Part): 

578 parts_map = {} 

579 

580 for r in pa.recs: 

581 geom = _geom_of(r) 

582 if not geom: 

583 continue 

584 

585 for i in self.stree.query(geom): 

586 part_geom = shapely.intersection(self.fs_geom[i], geom) 

587 part_area = round(part_geom.area, 2) 

588 if part_area < self.MIN_PART_AREA: 

589 continue 

590 

591 fs = self.fs_list[i] 

592 

593 part = parts_map.setdefault(fs.uid, dt.Part( 

594 uid=pa.uid, 

595 recs=[], 

596 kind=pa.kind, 

597 name=pa.name, 

598 fs=fs.uid, 

599 )) 

600 

601 # computed area corrected with respect to FS's "amtlicheFlaeche" 

602 part_area_corrected = round( 

603 fs.recs[-1].amtlicheFlaeche * (part_area / fs.recs[-1].geomFlaeche), 

604 2) 

605 

606 part.recs.append(dt.PartRecord( 

607 uid=r.uid, 

608 beginnt=r.beginnt, 

609 endet=r.endet, 

610 anlass=r.anlass, 

611 props=r.props, 

612 geomFlaeche=part_area, 

613 amtlicheFlaeche=part_area_corrected, 

614 isHistoric=r.endet is not None, 

615 )) 

616 

617 part.geom = shapely.wkb.dumps(part_geom, srid=self.ix.crs.srid, hex=True) 

618 part.geomFlaeche = part_area 

619 part.amtlicheFlaeche = part_area_corrected 

620 

621 self.parts.extend(parts_map.values()) 

622 

623 def write(self): 

624 values = [] 

625 

626 for n, pa in enumerate(self.parts, 1): 

627 geom = _pop(pa, 'geom') 

628 data = index.serialize(pa) 

629 pa.geom = geom 

630 values.append(dict( 

631 n=n, 

632 fs=pa.fs, 

633 uid=pa.uid, 

634 beginnt=pa.recs[-1].beginnt, 

635 endet=pa.recs[-1].endet, 

636 kind=pa.kind, 

637 name=pa.name.text, 

638 parthistoric=pa.isHistoric, 

639 data=data, 

640 geom=geom, 

641 )) 

642 

643 self.write_table(index.TABLE_PART, values) 

644 

645 

646class _FsDataIndexer(_Indexer): 

647 CACHE_KEY = 'obj_flurstueck' 

648 

649 def __init__(self, runner: '_Runner'): 

650 super().__init__(runner) 

651 self.counts = { 

652 'ok': 0, 

653 'no_geometry': 0, 

654 'excluded': 0, 

655 } 

656 

657 def collect(self): 

658 for uid, axs in self.rr.read_grouped(gid.AX_Flurstueck): 

659 recs = gws.u.compact(self.record(ax) for ax in axs) 

660 if recs: 

661 self.om.Flurstueck.add(uid, recs) 

662 

663 for uid, axs in self.rr.read_grouped(gid.AX_HistorischesFlurstueck): 

664 recs = gws.u.compact(self.record(ax) for ax in axs) 

665 if not recs: 

666 continue 

667 # For a historic FS, 'beginnt' is basically when the history beginnt 

668 # (see comments for AX_HistorischesFlurstueck in gid6). 

669 # we set F.endet=F.beginnt to designate this one as 'historic' 

670 for r in recs: 

671 r.endet = r.beginnt 

672 r.isHistoric = True 

673 self.om.Flurstueck.add(uid, recs) 

674 

675 for fs in self.om.Flurstueck: 

676 fs.flurstueckskennzeichen = fs.recs[-1].flurstueckskennzeichen 

677 fs.fsnummer = index.make_fsnummer(fs.recs[-1]) 

678 fs.x = fs.recs[-1].x 

679 fs.y = fs.recs[-1].y 

680 self.process_lage(fs) 

681 self.process_gebaeude(fs) 

682 self.process_buchung(fs) 

683 

684 # check the 'nachfolgerFlurstueckskennzeichen' array 

685 # and mark each referenced FS as a "vorgaenger" FS 

686 # It is a M:N relation, therefore 'vorgaengerFlurstueckskennzeichen' is also an array 

687 

688 knz_to_fs = { 

689 fs.flurstueckskennzeichen: fs 

690 for fs in self.om.Flurstueck 

691 } 

692 for fs in self.om.Flurstueck: 

693 nfs = fs.recs[-1].nachfolgerFlurstueckskennzeichen 

694 if not nfs: 

695 continue 

696 for nf_knz in nfs: 

697 nf_fs = knz_to_fs.get(nf_knz) 

698 if not nf_fs: 

699 gws.log.warning(f'ALKIS: nachfolgerFlurstueck missing {fs.flurstueckskennzeichen!r}->{nf_knz!r}') 

700 continue 

701 if not nf_fs.recs[-1].vorgaengerFlurstueckskennzeichen: 

702 nf_fs.recs[-1].vorgaengerFlurstueckskennzeichen = [] 

703 nf_fs.recs[-1].vorgaengerFlurstueckskennzeichen.append(fs.flurstueckskennzeichen) 

704 

705 def record(self, ax): 

706 r: dt.FlurstueckRecord = _from_ax( 

707 dt.FlurstueckRecord, 

708 ax, 

709 amtlicheFlaeche=ax.amtlicheFlaeche or 0, 

710 flurnummer=_str(ax.flurnummer), 

711 zaehler=_str(ax.flurstuecksnummer.zaehler), 

712 nenner=_str(ax.flurstuecksnummer.nenner), 

713 zustaendigeStelle=[self.rr.place.get_dienststelle(p) for p in (ax.zustaendigeStelle or [])], 

714 

715 _weistAuf=ax.weistAuf, 

716 _zeigtAuf=ax.zeigtAuf, 

717 _istGebucht=ax.istGebucht, 

718 _buchung=ax.buchung, 

719 ) 

720 

721 # basic data 

722 

723 r.gemarkung = self.rr.place.get_gemarkung(ax.gemarkung) 

724 r.gemeinde = self.rr.place.get_gemeinde(ax.gemeindezugehoerigkeit) 

725 r.regierungsbezirk = self.rr.place.get_regierungsbezirk(ax.gemeindezugehoerigkeit) 

726 r.kreis = self.rr.place.get_kreis(ax.gemeindezugehoerigkeit) 

727 r.land = self.rr.place.get_land(ax.gemeindezugehoerigkeit) 

728 

729 if self.rr.place.is_empty(r.gemarkung) or self.rr.place.is_empty(r.gemeinde): 

730 # exclude Flurstücke that refer to Gemeinde/Gemarkung 

731 # which do not exist in the reference AX tables 

732 self.counts['excluded'] += 1 

733 return None 

734 

735 # geometry 

736 

737 geom = _geom_of(r) 

738 if not geom: 

739 self.counts['no_geometry'] += 1 

740 return None 

741 

742 r.geomFlaeche = round(geom.area, 2) 

743 r.x = round(geom.centroid.x, 2) 

744 r.y = round(geom.centroid.y, 2) 

745 

746 self.counts['ok'] += 1 

747 return r 

748 

749 def process_lage(self, fs: dt.Flurstueck): 

750 fs.lageList = [] 

751 

752 # AX_Flurstueck.weistAuf -> AX_LagebezeichnungMitHausnummer 

753 # AX_Flurstueck.zeigtAuf -> AX_LagebezeichnungOhneHausnummer 

754 fs.lageList.extend(self.rr.lage.om.Lage.get_from_ptr(fs, '_weistAuf')) 

755 fs.lageList.extend(self.rr.lage.om.Lage.get_from_ptr(fs, '_zeigtAuf')) 

756 

757 def process_gebaeude(self, fs: dt.Flurstueck): 

758 ge_map = {} 

759 

760 for la in fs.lageList: 

761 for ge in la.gebaeudeList: 

762 ge_map[ge.uid] = ge 

763 

764 fs.gebaeudeList = list(ge_map.values()) 

765 fs.gebaeudeList.sort(key=_sortkey_gebaeude) 

766 

767 fs.gebaeudeAmtlicheFlaeche = sum(ge.recs[-1].amtlicheFlaeche for ge in fs.gebaeudeList if not ge.recs[-1].endet) 

768 fs.gebaeudeGeomFlaeche = sum(ge.recs[-1].geomFlaeche for ge in fs.gebaeudeList if not ge.recs[-1].endet) 

769 

770 def process_buchung(self, fs: dt.Flurstueck): 

771 bs_historic_map = {} 

772 bs_seen = set() 

773 buchung_map = {} 

774 

775 # for each Flurstück record, we collect all related Buchungsstellen (with respect to parent-child relations) 

776 # then group Buchungsstellen by their Buchungsblatt 

777 # and create Buchung objects for a FS 

778 

779 for r in fs.recs: 

780 hist_buchung = _pop(r, '_buchung') 

781 if hist_buchung: 

782 bs_list = self.historic_buchungsstelle_list(r, hist_buchung) 

783 else: 

784 bs_list = self.buchungsstelle_list(r) 

785 

786 for bs in bs_list: 

787 # a Buchungsstelle referred to by an expired Flurstück might not be expired itself, 

788 # so we have to track its state separately by wrapping it in a BuchungsstelleReference 

789 # a BuchungsstelleReference is historic if its Flurstück is 

790 bs_historic_map[bs.uid] = r.isHistoric 

791 

792 if bs.uid in bs_seen: 

793 continue 

794 bs_seen.add(bs.uid) 

795 

796 # populate Flurstück references in a Buchungsstelle 

797 if fs.uid not in bs.fsUids: 

798 bs.fsUids.append(fs.uid) 

799 bs.flurstueckskennzeichenList.append(fs.flurstueckskennzeichen) 

800 

801 # create Buchung records by grouping Buchungsstellen 

802 for bb_uid in bs.buchungsblattUids: 

803 bu = buchung_map.setdefault(bb_uid, dt.Buchung(recs=[], buchungsblattUid=bb_uid)) 

804 bu.recs.append(dt.BuchungsstelleReference(buchungsstelle=bs)) 

805 

806 fs.buchungList = list(buchung_map.values()) 

807 

808 for bu in fs.buchungList: 

809 for ref in bu.recs: 

810 ref.isHistoric = bs_historic_map[ref.buchungsstelle.uid] 

811 bu.isHistoric = all(ref.isHistoric for ref in bu.recs) 

812 

813 return fs 

814 

815 def historic_buchungsstelle_list(self, r: dt.FlurstueckRecord, hist_buchung): 

816 # an AX_HistorischesFlurstueck with a special 'buchung' reference 

817 

818 bs_list = [] 

819 

820 for bu in hist_buchung: 

821 bb = self.rr.buchung.buchungsblattkennzeichenMap.get(bu.buchungsblattkennzeichen) 

822 if not bb: 

823 continue 

824 # create a fake historic Buchungstelle 

825 bs_list.append(dt.Buchungsstelle( 

826 uid=bb.uid + '_' + bu.laufendeNummerDerBuchungsstelle, 

827 recs=[ 

828 dt.BuchungsstelleRecord( 

829 endet=r.endet, 

830 laufendeNummer=bu.laufendeNummerDerBuchungsstelle, 

831 isHistoric=True, 

832 ) 

833 ], 

834 buchungsblattUids=[bb.uid], 

835 buchungsblattkennzeichenList=[bb.buchungsblattkennzeichen], 

836 parentUids=[], 

837 childUids=[], 

838 fsUids=[], 

839 parentkennzeichenList=[], 

840 flurstueckskennzeichenList=[], 

841 laufendeNummer=bu.laufendeNummerDerBuchungsstelle, 

842 isHistoric=True, 

843 )) 

844 

845 return bs_list 

846 

847 def buchungsstelle_list(self, r: dt.FlurstueckRecord): 

848 # AX_Flurstueck.istGebucht -> AX_Buchungsstelle 

849 

850 this_bs = self.rr.buchung.om.Buchungsstelle.get(_pop(r, '_istGebucht')) 

851 if not this_bs: 

852 return [] 

853 

854 bs_list = [] 

855 

856 # A Flurstück points to a Buchungsstelle (F.istGebucht -> B). 

857 # A Buchungsstelle can have parent (B.an -> parent.uid) and child (child.an -> B.uid) Buchungsstellen 

858 # (these references are populated in _BuchungIndexer above). 

859 # Our task here, given F.istGebucht -> B, collect B's parents and children 

860 # These are Buchungsstellen that directly or indirectly mention the current Flurstück. 

861 

862 queue: list[dt.Buchungsstelle] = [this_bs] 

863 while queue: 

864 bs = queue.pop(0) 

865 bs_list.insert(0, bs) 

866 for uid in bs.parentUids: 

867 queue.append(self.rr.buchung.om.Buchungsstelle.get(uid)) 

868 

869 # remove this_bs 

870 bs_list.pop() 

871 

872 queue: list[dt.Buchungsstelle] = [this_bs] 

873 while queue: 

874 bs = queue.pop(0) 

875 bs_list.append(bs) 

876 # sort related (child) Buchungsstellen by their BB-Kennzeichen 

877 child_bs_list = self.rr.buchung.om.Buchungsstelle.get_many(bs.childUids) 

878 child_bs_list.sort(key=_sortkey_buchungsstelle_by_bblatt) 

879 queue.extend(child_bs_list) 

880 

881 # if len(bs_list) > 1: 

882 # gws.log.debug(f'bs chain: {r.uid=} {this_bs.uid=} {[bs.uid for bs in bs_list]}') 

883 

884 return bs_list 

885 

886 def write(self): 

887 values = [] 

888 

889 for fs in self.om.Flurstueck: 

890 geoms = [_pop(r, 'geom') for r in fs.recs] 

891 data = index.serialize(fs) 

892 for r, g in zip(fs.recs, geoms): 

893 r.geom = g 

894 

895 values.append(dict( 

896 uid=fs.uid, 

897 rc=len(fs.recs), 

898 fshistoric=fs.isHistoric, 

899 data=data, 

900 geom=geoms[-1], 

901 )) 

902 

903 self.write_table(index.TABLE_FLURSTUECK, values) 

904 

905 

906class _FsIndexIndexer(_Indexer): 

907 entries = { 

908 index.TABLE_INDEXFLURSTUECK: [], 

909 index.TABLE_INDEXLAGE: [], 

910 index.TABLE_INDEXBUCHUNGSBLATT: [], 

911 index.TABLE_INDEXPERSON: [], 

912 index.TABLE_INDEXGEOM: [], 

913 } 

914 

915 def collect(self): 

916 with ProgressIndicator(f'ALKIS: creating indexes', len(self.rr.fsdata.om.Flurstueck)) as progress: 

917 for fs in self.rr.fsdata.om.Flurstueck: 

918 for r in fs.recs: 

919 self.add(fs, r) 

920 progress.update(1) 

921 

922 def add(self, fs: dt.Flurstueck, r: dt.FlurstueckRecord): 

923 base = dict( 

924 fs=r.uid, 

925 fshistoric=r.isHistoric, 

926 ) 

927 

928 places = dict( 

929 land=r.land.text, 

930 land_t=index.text_key(r.land.text), 

931 landcode=r.land.code, 

932 

933 regierungsbezirk=r.regierungsbezirk.text, 

934 regierungsbezirk_t=index.text_key(r.regierungsbezirk.text), 

935 regierungsbezirkcode=r.regierungsbezirk.code, 

936 

937 kreis=r.kreis.text, 

938 kreis_t=index.text_key(r.kreis.text), 

939 kreiscode=r.kreis.code, 

940 

941 gemeinde=r.gemeinde.text, 

942 gemeinde_t=index.text_key(r.gemeinde.text), 

943 gemeindecode=r.gemeinde.code, 

944 

945 gemarkung=r.gemarkung.text, 

946 gemarkung_t=index.text_key(r.gemarkung.text), 

947 gemarkungcode=r.gemarkung.code, 

948 

949 ) 

950 

951 self.entries[index.TABLE_INDEXFLURSTUECK].append(dict( 

952 **base, 

953 **places, 

954 

955 amtlicheflaeche=r.amtlicheFlaeche, 

956 geomflaeche=r.geomFlaeche, 

957 

958 flurnummer=r.flurnummer, 

959 zaehler=r.zaehler, 

960 nenner=r.nenner, 

961 flurstuecksfolge=r.flurstuecksfolge, 

962 flurstueckskennzeichen=r.flurstueckskennzeichen, 

963 

964 x=r.x, 

965 y=r.y, 

966 )) 

967 

968 self.entries[index.TABLE_INDEXGEOM].append(dict( 

969 **base, 

970 geomflaeche=r.geomFlaeche, 

971 x=r.x, 

972 y=r.y, 

973 geom=r.geom, 

974 )) 

975 

976 for la in fs.lageList: 

977 for la_r in la.recs: 

978 self.entries[index.TABLE_INDEXLAGE].append(dict( 

979 **base, 

980 **places, 

981 lageuid=la_r.uid, 

982 lagehistoric=la_r.isHistoric, 

983 strasse=la_r.strasse, 

984 strasse_t=index.strasse_key(la_r.strasse), 

985 hausnummer=la_r.hausnummer, 

986 x=la.x or r.x, 

987 y=la.y or r.y, 

988 )) 

989 

990 for bu in fs.buchungList: 

991 bb = self.rr.buchung.om.Buchungsblatt.get(bu.buchungsblattUid) 

992 

993 for bb_r in bb.recs: 

994 self.entries[index.TABLE_INDEXBUCHUNGSBLATT].append(dict( 

995 **base, 

996 buchungsblattuid=bb_r.uid, 

997 buchungsblattkennzeichen=bb_r.buchungsblattkennzeichen, 

998 buchungsblatthistoric=bu.isHistoric, 

999 )) 

1000 

1001 pe_uids = set() 

1002 

1003 for nn in bb.namensnummerList: 

1004 for pe in nn.personList: 

1005 if pe.uid in pe_uids: 

1006 continue 

1007 pe_uids.add(pe.uid) 

1008 for pe_r in pe.recs: 

1009 self.entries[index.TABLE_INDEXPERSON].append(dict( 

1010 **base, 

1011 personuid=pe_r.uid, 

1012 personhistoric=pe_r.isHistoric, 

1013 name=pe_r.nachnameOderFirma, 

1014 name_t=index.text_key(pe_r.nachnameOderFirma), 

1015 vorname=pe_r.vorname, 

1016 vorname_t=index.text_key(pe_r.vorname), 

1017 )) 

1018 

1019 def write(self): 

1020 for table_id, values in self.entries.items(): 

1021 if not self.ix.has_table(table_id): 

1022 for n, v in enumerate(values, 1): 

1023 v['n'] = n 

1024 self.write_table(table_id, values) 

1025 

1026 

1027class _Runner: 

1028 def __init__(self, ix: index.Object, reader: dt.Reader, with_cache=False): 

1029 self.ix: index.Object = ix 

1030 self.reader: dt.Reader = reader 

1031 

1032 self.withCache = with_cache 

1033 self.cacheDir = gws.c.CACHE_DIR + '/alkis' 

1034 if self.withCache: 

1035 gws.u.ensure_dir(self.cacheDir) 

1036 

1037 self.place = _PlaceIndexer(self) 

1038 self.lage = _LageIndexer(self) 

1039 self.buchung = _BuchungIndexer(self) 

1040 self.part = _PartIndexer(self) 

1041 self.fsdata = _FsDataIndexer(self) 

1042 self.fsindex = _FsIndexIndexer(self) 

1043 

1044 self.initMemory = gws.lib.osx.process_rss_size() 

1045 

1046 def run(self): 

1047 with self.ix.db.connect() as conn: 

1048 with ProgressIndicator(f'ALKIS: indexing'): 

1049 self.place.load_or_collect() 

1050 self.memory_info() 

1051 

1052 self.buchung.load_or_collect() 

1053 self.memory_info() 

1054 

1055 self.lage.load_or_collect() 

1056 self.memory_info() 

1057 

1058 self.fsdata.load_or_collect() 

1059 gws.log.info(f'ALKIS: fs counts: {self.fsdata.counts}') 

1060 self.memory_info() 

1061 

1062 self.part.load_or_collect() 

1063 self.memory_info() 

1064 

1065 self.fsindex.collect() 

1066 self.memory_info() 

1067 

1068 self.place.write() 

1069 self.buchung.write() 

1070 self.lage.write() 

1071 self.fsdata.write() 

1072 self.part.write() 

1073 self.fsindex.write() 

1074 

1075 def memory_info(self): 

1076 v = gws.lib.osx.process_rss_size() - self.initMemory 

1077 if v > 0: 

1078 gws.log.info(f'ALKIS: memory used: {v:.2f} MB', stacklevel=2) 

1079 

1080 def read_flat(self, cls): 

1081 cpath = self.cacheDir + '/flat_' + cls.__name__ 

1082 if self.withCache and gws.u.is_file(cpath): 

1083 return gws.u.unserialize_from_path(cpath) 

1084 

1085 rs = self._read_flat(cls) 

1086 if self.withCache: 

1087 gws.u.serialize_to_path(rs, cpath) 

1088 

1089 return rs 

1090 

1091 def _read_flat(self, cls): 

1092 cnt = self.reader.count(cls) 

1093 if cnt <= 0: 

1094 gws.log.warning(f'ALKIS: read {cls.__name__}: empty table') 

1095 return [] 

1096 

1097 rs = [] 

1098 with ProgressIndicator(f'ALKIS: read {cls.__name__}', cnt) as progress: 

1099 for ax in self.reader.read_all(cls): 

1100 rs.append(ax) 

1101 progress.update(1) 

1102 return rs 

1103 

1104 def read_grouped(self, cls): 

1105 cpath = self.cacheDir + '/grouped_' + cls.__name__ 

1106 if self.withCache and gws.u.is_file(cpath): 

1107 return gws.u.unserialize_from_path(cpath) 

1108 

1109 rs = self._read_grouped(cls) 

1110 if self.withCache: 

1111 gws.u.serialize_to_path(rs, cpath) 

1112 

1113 return rs 

1114 

1115 def _read_grouped(self, cls): 

1116 cnt = self.reader.count(cls) 

1117 if cnt <= 0: 

1118 gws.log.warning(f'ALKIS: read {cls.__name__}: empty table') 

1119 return [] 

1120 

1121 groups = {} 

1122 with ProgressIndicator(f'ALKIS: read {cls.__name__}', cnt) as progress: 

1123 for ax in self.reader.read_all(cls): 

1124 groups.setdefault(ax.identifikator, []).append(ax) 

1125 progress.update(1) 

1126 for g in groups.values(): 

1127 g.sort(key=_sortkey_lebenszeitintervall) 

1128 

1129 return list(groups.items()) 

1130 

1131 def props_from(self, ax, atts): 

1132 d = {} 

1133 

1134 for a in atts: 

1135 v = getattr(ax, a['name'], None) 

1136 if isinstance(v, gid.Object): 

1137 # @TODO handle properties which are objects 

1138 continue 

1139 if dtx.is_date(v): 

1140 v = dtx.to_string('%d.%m.%Y', v) 

1141 if not gws.u.is_empty(v): 

1142 d[a['name']] = v 

1143 

1144 return dt.Object(**d) 

1145 

1146 

1147def _from_ax(cls, ax, **kwargs): 

1148 d = {} 

1149 

1150 if ax: 

1151 for k in cls.__annotations__: 

1152 v = getattr(ax, k, None) 

1153 if v: 

1154 d[k] = v 

1155 

1156 d['uid'] = ax.identifikator 

1157 d['beginnt'] = ax.lebenszeitintervall.beginnt 

1158 d['endet'] = ax.lebenszeitintervall.endet 

1159 d['isHistoric'] = d['endet'] is not None 

1160 if ax.anlass and ax.anlass[0].code != '000000': 

1161 d['anlass'] = ax.anlass[0] 

1162 if ax.geom: 

1163 d['geom'] = ax.geom 

1164 

1165 d.update(kwargs) 

1166 return cls(**d) 

1167 

1168 

1169def _anteil(ax): 

1170 try: 

1171 z = float(ax.anteil.zaehler) 

1172 z = str(int(z) if z.is_integer() else z) 

1173 n = float(ax.anteil.nenner) 

1174 n = str(int(n) if n.is_integer() else n) 

1175 return z + '/' + n 

1176 except (AttributeError, ValueError, TypeError): 

1177 pass 

1178 

1179 

1180def _meta_attributes(meta): 

1181 return sorted( 

1182 [a for a in meta['attributes'] if a['name'] in dt.PROPS], 

1183 key=lambda a: a['title'] 

1184 ) 

1185 

1186 

1187def _geom_of(o): 

1188 if not o.geom: 

1189 gws.log.warning(f'{o.__class__.__name__}:{o.uid}: no geometry') 

1190 return 

1191 return shapely.wkb.loads(o.geom, hex=True) 

1192 

1193 

1194def _pop(obj, attr): 

1195 v = getattr(obj, attr, None) 

1196 try: 

1197 delattr(obj, attr) 

1198 except AttributeError: 

1199 pass 

1200 return v 

1201 

1202 

1203def _sortkey_beginnt(o): 

1204 return o.beginnt 

1205 

1206 

1207def _sortkey_lebenszeitintervall(o): 

1208 return o.lebenszeitintervall.beginnt 

1209 

1210 

1211def _sortkey_namensnummer(nn: dt.Namensnummer): 

1212 return _natkey(nn.recs[-1].laufendeNummerNachDIN1421), nn.recs[-1].beginnt 

1213 

1214 

1215def _sortkey_buchungsstelle(bs: dt.Buchungsstelle): 

1216 return _natkey(bs.recs[-1].laufendeNummer), bs.recs[-1].beginnt 

1217 

1218 

1219def _sortkey_buchungsstelle_by_bblatt(bs: dt.Buchungsstelle): 

1220 return bs.buchungsblattkennzeichenList[0], bs.recs[-1].beginnt 

1221 

1222 

1223def _sortkey_part(pa: dt.Part): 

1224 return pa.name.text, -pa.geomFlaeche 

1225 

1226 

1227def _sortkey_gebaeude(ge: dt.Gebaeude): 

1228 # sort Gebaeude by area (big->small) 

1229 return ge.recs[-1].beginnt, -ge.recs[-1].geomFlaeche 

1230 

1231 

1232def _natkey(v): 

1233 if not v: 

1234 return [] 

1235 return [ 

1236 '{:080d}'.format(int(digits)) if digits else chars.lower() 

1237 for digits, chars in re.findall(r'(\d+)|(\D+)', v.strip()) 

1238 ] 

1239 

1240 

1241def _comma(a): 

1242 return ','.join(str(s) if s is not None else '' for s in a) 

1243 

1244 

1245def _str(x): 

1246 return None if x is None else str(x)