Coverage for gws-app/gws/plugin/alkis/data/indexer.py: 0%
671 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-16 22:59 +0200
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-16 22:59 +0200
1from typing import Optional, Iterable
3import re
4from typing import Generic, TypeVar
6import shapely
7import shapely.strtree
8import shapely.wkb
10import gws
11import gws.lib.osx
12import gws.lib.datetimex as dtx
13from gws.lib.cli import ProgressIndicator
14import gws.plugin.postgres.provider
16from . import types as dt
17from . import index
18from . import norbit6
20from .geo_info_dok import gid6 as gid
23def run(ix: index.Object, data_schema: str, with_force=False, with_cache=False):
24 if with_force:
25 ix.drop()
26 elif ix.status().complete:
27 return
29 rdr = norbit6.Object(ix.db, schema=data_schema)
30 rr = _Runner(ix, rdr, with_cache)
31 rr.run()
34##
36T = TypeVar("T")
39class _ObjectDict(Generic[T]):
40 def __init__(self, cls):
41 self.d = {}
42 self.cls = cls
44 def add(self, uid, recs) -> T:
45 o = self.cls(uid=uid, recs=recs)
46 o.isHistoric = all(r.isHistoric for r in recs)
47 self.d[o.uid] = o
48 return o
50 def get(self, uid, default=None) -> Optional[T]:
51 return self.d.get(uid, default)
53 def get_many(self, uids) -> list[T]:
54 res = {}
56 for uid in uids:
57 if uid not in res:
58 o = self.d.get(uid)
59 if o:
60 res[uid] = o
62 return list(res.values())
64 def get_from_ptr(self, obj: dt.Entity, attr):
65 uids = []
67 for r in obj.recs:
68 v = _pop(r, attr)
69 if isinstance(v, list):
70 uids.extend(v)
71 elif isinstance(v, str):
72 uids.append(v)
74 return self.get_many(uids)
76 def __iter__(self) -> Iterable[T]:
77 yield from self.d.values()
79 def __len__(self):
80 return len(self.d)
83class _ObjectMap:
85 def __init__(self):
86 self.Anschrift: _ObjectDict[dt.Anschrift] = _ObjectDict(dt.Anschrift)
87 self.Buchungsblatt: _ObjectDict[dt.Buchungsblatt] = _ObjectDict(dt.Buchungsblatt)
88 self.Buchungsstelle: _ObjectDict[dt.Buchungsstelle] = _ObjectDict(dt.Buchungsstelle)
89 self.Flurstueck: _ObjectDict[dt.Flurstueck] = _ObjectDict(dt.Flurstueck)
90 self.Gebaeude: _ObjectDict[dt.Gebaeude] = _ObjectDict(dt.Gebaeude)
91 self.Lage: _ObjectDict[dt.Lage] = _ObjectDict(dt.Lage)
92 self.Namensnummer: _ObjectDict[dt.Namensnummer] = _ObjectDict(dt.Namensnummer)
93 self.Part: _ObjectDict[dt.Part] = _ObjectDict(dt.Part)
94 self.Person: _ObjectDict[dt.Person] = _ObjectDict(dt.Person)
96 self.placeAll: dict = {}
97 self.placeIdx: dict = {}
98 self.catalog: dict = {}
101class _Indexer:
102 CACHE_KEY: str = ''
104 def __init__(self, runner: '_Runner'):
105 self.rr = runner
106 self.ix: index.Object = runner.ix
107 self.om = _ObjectMap()
109 def load_or_collect(self):
110 if not self.load_cache():
111 self.collect()
112 self.store_cache()
114 def load_cache(self):
115 if not self.rr.withCache or not self.CACHE_KEY:
116 return False
117 cpath = self.rr.cacheDir + '/' + self.CACHE_KEY
118 if not gws.u.is_file(cpath):
119 return False
120 om = gws.u.unserialize_from_path(cpath)
121 if not om:
122 return False
123 gws.log.info(f'ALKIS: use cache {self.CACHE_KEY!r}')
124 self.om = om
125 return True
127 def store_cache(self):
128 if not self.rr.withCache or not self.CACHE_KEY:
129 return
130 cpath = self.rr.cacheDir + '/' + self.CACHE_KEY
131 gws.u.serialize_to_path(self.om, cpath)
132 gws.log.info(f'ALKIS: store cache {self.CACHE_KEY!r}')
134 def collect(self):
135 pass
137 def write_table(self, table_id, values):
138 if self.ix.has_table(table_id):
139 return
140 with ProgressIndicator(f'ALKIS: write {table_id!r}', len(values)) as progress:
141 self.ix.create_table(table_id, values, progress)
143 def write(self):
144 pass
147class _PlaceIndexer(_Indexer):
148 """Index places (Administration- und Verwaltungseinheiten).
150 References: https://de.wikipedia.org/wiki/Amtlicher_Gemeindeschl%C3%BCssel
151 """
153 CACHE_KEY = 'obj_place'
155 empty1 = dt.EnumPair(code='0', text='')
156 empty2 = dt.EnumPair(code='00', text='')
158 def add(self, kind, ax, key_obj, **kwargs):
159 if ax.endet:
160 return
162 code = self.code(kind, key_obj)
163 value = dt.EnumPair(code, ax.bezeichnung)
165 p = dt.Place(**kwargs)
167 p.uid = kind + code
168 p.kind = kind
169 setattr(p, kind, value)
171 self.om.placeAll[p.uid] = p
172 self.om.placeIdx[p.uid] = value
174 return value
176 def collect(self):
177 self.om.placeAll = {}
178 self.om.placeIdx = {}
180 for ax in self.rr.read_flat(gid.AX_Bundesland):
181 self.add('land', ax, ax.schluessel)
183 for ax in self.rr.read_flat(gid.AX_Regierungsbezirk):
184 o = ax.schluessel
185 self.add('regierungsbezirk', ax, o, land=self.get_land(o))
187 for ax in self.rr.read_flat(gid.AX_KreisRegion):
188 o = ax.schluessel
189 self.add('kreis', ax, o, land=self.get_land(o), regierungsbezirk=self.get_regierungsbezirk(o))
191 for ax in self.rr.read_flat(gid.AX_Gemeinde):
192 o = ax.gemeindekennzeichen
193 self.add('gemeinde', ax, o, land=self.get_land(o), regierungsbezirk=self.get_regierungsbezirk(o), kreis=self.get_kreis(o))
195 # @TODO map Gemarkung to Gemeinde (see https://de.wikipedia.org/wiki/Liste_der_Gemarkungen_in_Nordrhein-Westfalen etc)
197 for ax in self.rr.read_flat(gid.AX_Gemarkung):
198 if str(ax.schluessel.gemarkungsnummer) in self.ix.excludeGemarkung:
199 continue
200 o = ax.schluessel
201 self.add('gemarkung', ax, o, land=self.get_land(o))
203 for ax in self.rr.read_flat(gid.AX_Buchungsblattbezirk):
204 o = ax.schluessel
205 self.add('buchungsblattbezirk', ax, o, land=self.get_land(o))
207 for ax in self.rr.read_flat(gid.AX_Dienststelle):
208 o = ax.schluessel
209 self.add('dienststelle', ax, o, land=self.get_land(o))
211 def write(self):
212 values = []
214 for place in self.om.placeAll.values():
215 values.append(dict(
216 uid=place.uid,
217 data=index.serialize(place),
218 ))
220 self.write_table(index.TABLE_PLACE, values)
222 def get_land(self, o):
223 return self.get('land', o) or self.empty2
225 def get_regierungsbezirk(self, o):
226 return self.get('regierungsbezirk', o) or self.empty1
228 def get_kreis(self, o):
229 return self.get('kreis', o) or self.empty2
231 def get_gemeinde(self, o):
232 return self.get('gemeinde', o) or self.empty1
234 def get_gemarkung(self, o):
235 return self.get('gemarkung', o) or self.empty1
237 def get_buchungsblattbezirk(self, o):
238 return self.get('buchungsblattbezirk', o) or self.empty1
240 def get_dienststelle(self, o):
241 return self.get('dienststelle', o) or self.empty1
243 def get(self, kind, o):
244 return self.om.placeIdx.get(kind + self.code(kind, o))
246 def is_empty(self, p: dt.EnumPair):
247 return p.code == '0' or p.code == '00'
249 CODES = {
250 'land': lambda o: o.land,
251 'regierungsbezirk': lambda o: o.land + (o.regierungsbezirk or '0'),
252 'kreis': lambda o: o.land + (o.regierungsbezirk or '0') + o.kreis,
253 'gemeinde': lambda o: o.land + (o.regierungsbezirk or '0') + o.kreis + o.gemeinde,
254 'gemarkung': lambda o: o.land + o.gemarkungsnummer,
255 'buchungsblattbezirk': lambda o: o.land + o.bezirk,
256 'dienststelle': lambda o: o.land + o.stelle,
257 }
259 def code(self, kind, o):
260 return self.CODES[kind](o)
263class _LageIndexer(_Indexer):
264 CACHE_KEY = 'obj_lage'
266 def collect(self):
267 for ax in self.rr.read_flat(gid.AX_LagebezeichnungKatalogeintrag):
268 self.om.catalog[self.lage_key(ax.schluessel)] = ax.bezeichnung
270 for cls in (gid.AX_LagebezeichnungMitHausnummer, gid.AX_LagebezeichnungOhneHausnummer):
271 for uid, axs in self.rr.read_grouped(cls):
272 self.om.Lage.add(uid, [
273 _from_ax(
274 dt.LageRecord,
275 ax,
276 strasse=self.strasse(ax),
277 hausnummer=index.normalize_hausnummer(ax.hausnummer),
278 )
279 for ax in axs
280 ])
282 # use the PTO (art=HNR) geometry for lage coordinates
283 # PTO.dientZurDarstellungVon -> lage.uid
285 for pto in self.rr.read_flat(gid.AP_PTO):
286 if pto.lebenszeitintervall.endet is not None:
287 continue
288 art = _pop(pto, 'art') or ''
289 if art.upper() != 'HNR':
290 continue
292 uids = _pop(pto, 'dientZurDarstellungVon')
293 if not uids or not isinstance(uids, list):
294 continue
296 geom = _geom_of(pto)
297 if not geom:
298 continue
300 x = geom.centroid.x
301 y = geom.centroid.y
302 for la in self.om.Lage.get_many(uids):
303 la.x = x
304 la.y = y
306 # read related Gebaeude records
308 atts = _meta_attributes(gid.METADATA['AX_Gebaeude'])
310 for uid, axs in self.rr.read_grouped(gid.AX_Gebaeude):
311 self.om.Gebaeude.add(uid, [
312 _from_ax(
313 dt.GebaeudeRecord,
314 ax,
315 name=', '.join(ax.name) if ax.name else None,
316 amtlicheFlaeche=ax.grundflaeche or 0,
317 props=self.rr.props_from(ax, atts),
318 _zeigtAuf=ax.zeigtAuf,
319 )
320 for ax in axs
321 ])
323 for ge in self.om.Gebaeude:
324 for r in ge.recs:
325 geom = _geom_of(r)
326 r.geomFlaeche = round(geom.area, 2) if geom else 0
328 # omit Gebaeude geometries for now
329 for ge in self.om.Gebaeude:
330 for r in ge.recs:
331 _pop(r, 'geom')
333 for la in self.om.Lage:
334 la.gebaeudeList = []
336 # AX_Gebaeude.zeigtAuf -> AX_LagebezeichnungMitHausnummer
337 for ge in self.om.Gebaeude:
338 for la in self.om.Lage.get_from_ptr(ge, '_zeigtAuf'):
339 la.gebaeudeList.append(ge)
341 def strasse(self, ax):
342 if isinstance(ax.lagebezeichnung, str):
343 return ax.lagebezeichnung
344 return self.om.catalog.get(self.lage_key(ax.lagebezeichnung), '')
346 def lage_key(self, r):
347 return _comma([
348 getattr(r, 'land'),
349 getattr(r, 'regierungsbezirk'),
350 getattr(r, 'kreis'),
351 getattr(r, 'gemeinde'),
352 getattr(r, 'lage'),
353 ])
355 def write(self):
356 values = []
358 for la in self.om.Lage:
359 values.append(dict(
360 uid=la.uid,
361 rc=len(la.recs),
362 data=index.serialize(la),
363 ))
365 self.write_table(index.TABLE_LAGE, values)
368class _BuchungIndexer(_Indexer):
369 CACHE_KEY = 'obj_buchungsblatt'
371 buchungsblattkennzeichenMap: dict[str, dt.Buchungsblatt] = {}
373 def collect(self):
374 for uid, axs in self.rr.read_grouped(gid.AX_Anschrift):
375 self.om.Anschrift.add(uid, [
376 _from_ax(
377 dt.AnschriftRecord,
378 ax,
379 ort=ax.ort_AmtlichesOrtsnamensverzeichnis or ax.ort_Post,
380 plz=ax.postleitzahlPostzustellung,
381 telefon=ax.telefon[0] if ax.telefon else None
382 )
383 for ax in axs
384 ])
386 for uid, axs in self.rr.read_grouped(gid.AX_Person):
387 self.om.Person.add(uid, [
388 _from_ax(
389 dt.PersonRecord,
390 ax,
391 anrede=ax.anrede.text if ax.anrede else None,
392 _hat=ax.hat,
393 )
394 for ax in axs
395 ])
397 # AX_Person.hat -> [AX_Anschrift]
398 for pe in self.om.Person:
399 pe.anschriftList = self.om.Anschrift.get_from_ptr(pe, '_hat')
401 for uid, axs in self.rr.read_grouped(gid.AX_Namensnummer):
402 self.om.Namensnummer.add(uid, [
403 _from_ax(
404 dt.NamensnummerRecord,
405 ax,
406 anteil=_anteil(ax),
407 _benennt=ax.benennt,
408 _istBestandteilVon=ax.istBestandteilVon
409 )
410 for ax in axs
411 ])
413 # AX_Namensnummer.benennt -> AX_Person
414 for nn in self.om.Namensnummer:
415 nn.laufendeNummer = nn.recs[-1].laufendeNummerNachDIN1421
416 nn.personList = self.om.Person.get_from_ptr(nn, '_benennt')
418 for uid, axs in self.rr.read_grouped(gid.AX_Buchungsstelle):
419 self.om.Buchungsstelle.add(uid, [
420 _from_ax(
421 dt.BuchungsstelleRecord,
422 ax,
423 anteil=_anteil(ax),
424 _an=ax.an,
425 _zu=ax.zu,
426 _istBestandteilVon=ax.istBestandteilVon,
427 )
428 for ax in axs
429 ])
431 for bs in self.om.Buchungsstelle:
432 bs.laufendeNummer = bs.recs[-1].laufendeNummer
433 bs.fsUids = []
434 bs.flurstueckskennzeichenList = []
436 for uid, axs in self.rr.read_grouped(gid.AX_Buchungsblatt):
437 self.om.Buchungsblatt.add(uid, [
438 _from_ax(
439 dt.BuchungsblattRecord,
440 ax,
441 buchungsblattbezirk=self.rr.place.get_buchungsblattbezirk(ax.buchungsblattbezirk),
442 )
443 for ax in axs
444 ])
446 for bb in self.om.Buchungsblatt:
447 bb.buchungsstelleList = []
448 bb.namensnummerList = []
449 bb.buchungsblattkennzeichen = bb.recs[-1].buchungsblattkennzeichen
450 self.buchungsblattkennzeichenMap[bb.buchungsblattkennzeichen] = bb
452 # AX_Buchungsstelle.istBestandteilVon -> AX_Buchungsblatt
453 for bs in self.om.Buchungsstelle:
454 bb_list = self.om.Buchungsblatt.get_from_ptr(bs, '_istBestandteilVon')
455 bs.buchungsblattUids = [bb.uid for bb in bb_list]
456 bs.buchungsblattkennzeichenList = [bb.buchungsblattkennzeichen for bb in bb_list]
457 for bb in bb_list:
458 bb.buchungsstelleList.append(bs)
460 # AX_Namensnummer.istBestandteilVon -> AX_Buchungsblatt
461 for nn in self.om.Namensnummer:
462 bb_list = self.om.Buchungsblatt.get_from_ptr(nn, '_istBestandteilVon')
463 nn.buchungsblattUids = [bb.uid for bb in bb_list]
464 nn.buchungsblattkennzeichenList = [bb.buchungsblattkennzeichen for bb in bb_list]
465 for bb in bb_list:
466 bb.namensnummerList.append(nn)
468 for bb in self.om.Buchungsblatt:
469 bb.buchungsstelleList.sort(key=_sortkey_buchungsstelle)
470 bb.namensnummerList.sort(key=_sortkey_namensnummer)
472 # AX_Buchungsstelle.an -> [AX_Buchungsstelle]
473 # AX_Buchungsstelle.zu -> [AX_Buchungsstelle]
474 # see Erläuterungen zu ALKIS Version 6, page 116-119
476 for bs in self.om.Buchungsstelle:
477 bs.childUids = []
478 bs.parentUids = []
479 bs.parentkennzeichenList = []
481 for bs in self.om.Buchungsstelle:
482 parent_uids = set()
483 parent_knz = set()
485 for r in bs.recs:
486 parent_uids.update(_pop(r, '_an'))
487 parent_uids.update(_pop(r, '_zu'))
489 for parent_bs in self.om.Buchungsstelle.get_many(parent_uids):
490 parent_bs.childUids.append(bs.uid)
491 bs.parentUids.append(parent_bs.uid)
492 for bb_knz in parent_bs.buchungsblattkennzeichenList:
493 parent_knz.add(bb_knz + '.' + parent_bs.laufendeNummer)
495 bs.parentkennzeichenList = sorted(parent_knz)
497 def write(self):
498 values = []
500 for bb in self.om.Buchungsblatt:
501 values.append(dict(
502 uid=bb.uid,
503 rc=len(bb.recs),
504 data=index.serialize(bb),
505 ))
507 self.write_table(index.TABLE_BUCHUNGSBLATT, values)
510class _PartIndexer(_Indexer):
511 CACHE_KEY = 'obj_part'
512 MIN_PART_AREA = 1
514 parts: list[dt.Part] = []
516 fs_list = []
517 fs_geom = []
519 stree: shapely.strtree.STRtree
521 def collect(self):
523 for kind in dt.Part.KIND:
524 self.collect_kind(kind)
526 for fs in self.rr.fsdata.om.Flurstueck:
527 self.fs_list.append(fs)
528 # NB take only the most recent fs geometry into account
529 self.fs_geom.append(_geom_of(fs.recs[-1]))
531 self.stree = shapely.strtree.STRtree(self.fs_geom)
533 with ProgressIndicator(f'ALKIS: parts', len(self.om.Part)) as progress:
534 for pa in self.om.Part:
535 self.compute_intersections(pa)
536 progress.update(1)
538 self.parts.sort(key=_sortkey_part)
540 for pa in self.parts:
541 pa.isHistoric = all(r.isHistoric for r in pa.recs)
543 def collect_kind(self, kind):
544 _, key = dt.Part.KIND[kind]
545 classes = [
546 getattr(gid, meta['name'])
547 for meta in gid.METADATA.values()
548 if (
549 meta['kind'] == 'object'
550 and meta['geom']
551 and re.search(key + r'/\w+/', meta['key'])
552 )
553 ]
555 for cls in classes:
556 self.collect_class(kind, cls)
558 def collect_class(self, kind, cls):
559 meta = gid.METADATA[cls.__name__]
560 atts = _meta_attributes(meta)
562 for uid, axs in self.rr.read_grouped(cls):
563 pa = self.om.Part.add(uid, [
564 _from_ax(
565 dt.PartRecord,
566 ax,
567 props=self.rr.props_from(ax, atts),
568 )
569 for ax in axs
570 ])
571 pa.kind = kind
572 pa.name = dt.EnumPair(meta['uid'], meta['title'])
574 def compute_intersections(self, pa: dt.Part):
575 parts_map = {}
577 for r in pa.recs:
578 geom = _geom_of(r)
579 if not geom:
580 continue
582 for i in self.stree.query(geom):
583 part_geom = shapely.intersection(self.fs_geom[i], geom)
584 part_area = round(part_geom.area, 2)
585 if part_area < self.MIN_PART_AREA:
586 continue
588 fs = self.fs_list[i]
590 part = parts_map.setdefault(fs.uid, dt.Part(
591 uid=pa.uid,
592 recs=[],
593 kind=pa.kind,
594 name=pa.name,
595 fs=fs.uid,
596 ))
598 # computed area corrected with respect to FS's "amtlicheFlaeche"
599 part_area_corrected = round(
600 fs.recs[-1].amtlicheFlaeche * (part_area / fs.recs[-1].geomFlaeche),
601 2)
603 part.recs.append(dt.PartRecord(
604 uid=r.uid,
605 beginnt=r.beginnt,
606 endet=r.endet,
607 anlass=r.anlass,
608 props=r.props,
609 geomFlaeche=part_area,
610 amtlicheFlaeche=part_area_corrected,
611 isHistoric=r.endet is not None,
612 ))
614 part.geom = shapely.wkb.dumps(part_geom, srid=self.ix.crs.srid, hex=True)
615 part.geomFlaeche = part_area
616 part.amtlicheFlaeche = part_area_corrected
618 self.parts.extend(parts_map.values())
620 def write(self):
621 values = []
623 for n, pa in enumerate(self.parts, 1):
624 geom = _pop(pa, 'geom')
625 data = index.serialize(pa)
626 pa.geom = geom
627 values.append(dict(
628 n=n,
629 fs=pa.fs,
630 uid=pa.uid,
631 beginnt=pa.recs[-1].beginnt,
632 endet=pa.recs[-1].endet,
633 kind=pa.kind,
634 name=pa.name.text,
635 parthistoric=pa.isHistoric,
636 data=data,
637 geom=geom,
638 ))
640 self.write_table(index.TABLE_PART, values)
643class _FsDataIndexer(_Indexer):
644 CACHE_KEY = 'obj_flurstueck'
646 def __init__(self, runner: '_Runner'):
647 super().__init__(runner)
648 self.counts = {
649 'ok': 0,
650 'no_geometry': 0,
651 'excluded': 0,
652 }
654 def collect(self):
655 for uid, axs in self.rr.read_grouped(gid.AX_Flurstueck):
656 recs = gws.u.compact(self.record(ax) for ax in axs)
657 if recs:
658 self.om.Flurstueck.add(uid, recs)
660 for uid, axs in self.rr.read_grouped(gid.AX_HistorischesFlurstueck):
661 recs = gws.u.compact(self.record(ax) for ax in axs)
662 if not recs:
663 continue
664 # For a historic FS, 'beginnt' is basically when the history beginnt
665 # (see comments for AX_HistorischesFlurstueck in gid6).
666 # we set F.endet=F.beginnt to designate this one as 'historic'
667 for r in recs:
668 r.endet = r.beginnt
669 r.isHistoric = True
670 self.om.Flurstueck.add(uid, recs)
672 for fs in self.om.Flurstueck:
673 fs.flurstueckskennzeichen = fs.recs[-1].flurstueckskennzeichen
674 fs.fsnummer = index.make_fsnummer(fs.recs[-1])
675 fs.x = fs.recs[-1].x
676 fs.y = fs.recs[-1].y
677 self.process_lage(fs)
678 self.process_gebaeude(fs)
679 self.process_buchung(fs)
681 # check the 'nachfolgerFlurstueckskennzeichen' array
682 # and mark each referenced FS as a "vorgaenger" FS
683 # It is a M:N relation, therefore 'vorgaengerFlurstueckskennzeichen' is also an array
685 knz_to_fs = {
686 fs.flurstueckskennzeichen: fs
687 for fs in self.om.Flurstueck
688 }
689 for fs in self.om.Flurstueck:
690 nfs = fs.recs[-1].nachfolgerFlurstueckskennzeichen
691 if not nfs:
692 continue
693 for nf_knz in nfs:
694 nf_fs = knz_to_fs.get(nf_knz)
695 if not nf_fs:
696 gws.log.warning(f'ALKIS: nachfolgerFlurstueck missing {fs.flurstueckskennzeichen!r}->{nf_knz!r}')
697 continue
698 if not nf_fs.recs[-1].vorgaengerFlurstueckskennzeichen:
699 nf_fs.recs[-1].vorgaengerFlurstueckskennzeichen = []
700 nf_fs.recs[-1].vorgaengerFlurstueckskennzeichen.append(fs.flurstueckskennzeichen)
702 def record(self, ax):
703 r: dt.FlurstueckRecord = _from_ax(
704 dt.FlurstueckRecord,
705 ax,
706 amtlicheFlaeche=ax.amtlicheFlaeche or 0,
707 flurnummer=_str(ax.flurnummer),
708 zaehler=_str(ax.flurstuecksnummer.zaehler),
709 nenner=_str(ax.flurstuecksnummer.nenner),
710 zustaendigeStelle=[self.rr.place.get_dienststelle(p) for p in (ax.zustaendigeStelle or [])],
712 _weistAuf=ax.weistAuf,
713 _zeigtAuf=ax.zeigtAuf,
714 _istGebucht=ax.istGebucht,
715 _buchung=ax.buchung,
716 )
718 # basic data
720 r.gemarkung = self.rr.place.get_gemarkung(ax.gemarkung)
721 r.gemeinde = self.rr.place.get_gemeinde(ax.gemeindezugehoerigkeit)
722 r.regierungsbezirk = self.rr.place.get_regierungsbezirk(ax.gemeindezugehoerigkeit)
723 r.kreis = self.rr.place.get_kreis(ax.gemeindezugehoerigkeit)
724 r.land = self.rr.place.get_land(ax.gemeindezugehoerigkeit)
726 if self.rr.place.is_empty(r.gemarkung) or self.rr.place.is_empty(r.gemeinde):
727 # exclude Flurstücke that refer to Gemeinde/Gemarkung
728 # which do not exist in the reference AX tables
729 self.counts['excluded'] += 1
730 return None
732 # geometry
734 geom = _geom_of(r)
735 if not geom:
736 self.counts['no_geometry'] += 1
737 return None
739 r.geomFlaeche = round(geom.area, 2)
740 r.x = round(geom.centroid.x, 2)
741 r.y = round(geom.centroid.y, 2)
743 self.counts['ok'] += 1
744 return r
746 def process_lage(self, fs: dt.Flurstueck):
747 fs.lageList = []
749 # AX_Flurstueck.weistAuf -> AX_LagebezeichnungMitHausnummer
750 # AX_Flurstueck.zeigtAuf -> AX_LagebezeichnungOhneHausnummer
751 fs.lageList.extend(self.rr.lage.om.Lage.get_from_ptr(fs, '_weistAuf'))
752 fs.lageList.extend(self.rr.lage.om.Lage.get_from_ptr(fs, '_zeigtAuf'))
754 def process_gebaeude(self, fs: dt.Flurstueck):
755 ge_map = {}
757 for la in fs.lageList:
758 for ge in la.gebaeudeList:
759 ge_map[ge.uid] = ge
761 fs.gebaeudeList = list(ge_map.values())
762 fs.gebaeudeList.sort(key=_sortkey_gebaeude)
764 fs.gebaeudeAmtlicheFlaeche = sum(ge.recs[-1].amtlicheFlaeche for ge in fs.gebaeudeList if not ge.recs[-1].endet)
765 fs.gebaeudeGeomFlaeche = sum(ge.recs[-1].geomFlaeche for ge in fs.gebaeudeList if not ge.recs[-1].endet)
767 def process_buchung(self, fs: dt.Flurstueck):
768 bs_historic_map = {}
769 bs_seen = set()
770 buchung_map = {}
772 # for each Flurstück record, we collect all related Buchungsstellen (with respect to parent-child relations)
773 # then group Buchungsstellen by their Buchungsblatt
774 # and create Buchung objects for a FS
776 for r in fs.recs:
777 hist_buchung = _pop(r, '_buchung')
778 if hist_buchung:
779 bs_list = self.historic_buchungsstelle_list(r, hist_buchung)
780 else:
781 bs_list = self.buchungsstelle_list(r)
783 for bs in bs_list:
784 # a Buchungsstelle referred to by an expired Flurstück might not be expired itself,
785 # so we have to track its state separately by wrapping it in a BuchungsstelleReference
786 # a BuchungsstelleReference is historic if its Flurstück is
787 bs_historic_map[bs.uid] = r.isHistoric
789 if bs.uid in bs_seen:
790 continue
791 bs_seen.add(bs.uid)
793 # populate Flurstück references in a Buchungsstelle
794 if fs.uid not in bs.fsUids:
795 bs.fsUids.append(fs.uid)
796 bs.flurstueckskennzeichenList.append(fs.flurstueckskennzeichen)
798 # create Buchung records by grouping Buchungsstellen
799 for bb_uid in bs.buchungsblattUids:
800 bu = buchung_map.setdefault(bb_uid, dt.Buchung(recs=[], buchungsblattUid=bb_uid))
801 bu.recs.append(dt.BuchungsstelleReference(buchungsstelle=bs))
803 fs.buchungList = list(buchung_map.values())
805 for bu in fs.buchungList:
806 for ref in bu.recs:
807 ref.isHistoric = bs_historic_map[ref.buchungsstelle.uid]
808 bu.isHistoric = all(ref.isHistoric for ref in bu.recs)
810 return fs
812 def historic_buchungsstelle_list(self, r: dt.FlurstueckRecord, hist_buchung):
813 # an AX_HistorischesFlurstueck with a special 'buchung' reference
815 bs_list = []
817 for bu in hist_buchung:
818 bb = self.rr.buchung.buchungsblattkennzeichenMap.get(bu.buchungsblattkennzeichen)
819 if not bb:
820 continue
821 # create a fake historic Buchungstelle
822 bs_list.append(dt.Buchungsstelle(
823 uid=bb.uid + '_' + bu.laufendeNummerDerBuchungsstelle,
824 recs=[
825 dt.BuchungsstelleRecord(
826 endet=r.endet,
827 laufendeNummer=bu.laufendeNummerDerBuchungsstelle,
828 isHistoric=True,
829 )
830 ],
831 buchungsblattUids=[bb.uid],
832 buchungsblattkennzeichenList=[bb.buchungsblattkennzeichen],
833 parentUids=[],
834 childUids=[],
835 fsUids=[],
836 parentkennzeichenList=[],
837 flurstueckskennzeichenList=[],
838 laufendeNummer=bu.laufendeNummerDerBuchungsstelle,
839 isHistoric=True,
840 ))
842 return bs_list
844 def buchungsstelle_list(self, r: dt.FlurstueckRecord):
845 # AX_Flurstueck.istGebucht -> AX_Buchungsstelle
847 this_bs = self.rr.buchung.om.Buchungsstelle.get(_pop(r, '_istGebucht'))
848 if not this_bs:
849 return []
851 bs_list = []
853 # A Flurstück points to a Buchungsstelle (F.istGebucht -> B).
854 # A Buchungsstelle can have parent (B.an -> parent.uid) and child (child.an -> B.uid) Buchungsstellen
855 # (these references are populated in _BuchungIndexer above).
856 # Our task here, given F.istGebucht -> B, collect B's parents and children
857 # These are Buchungsstellen that directly or indirectly mention the current Flurstück.
859 queue: list[dt.Buchungsstelle] = [this_bs]
860 while queue:
861 bs = queue.pop(0)
862 bs_list.insert(0, bs)
863 for uid in bs.parentUids:
864 queue.append(self.rr.buchung.om.Buchungsstelle.get(uid))
866 # remove this_bs
867 bs_list.pop()
869 queue: list[dt.Buchungsstelle] = [this_bs]
870 while queue:
871 bs = queue.pop(0)
872 bs_list.append(bs)
873 # sort related (child) Buchungsstellen by their BB-Kennzeichen
874 child_bs_list = self.rr.buchung.om.Buchungsstelle.get_many(bs.childUids)
875 child_bs_list.sort(key=_sortkey_buchungsstelle_by_bblatt)
876 queue.extend(child_bs_list)
878 # if len(bs_list) > 1:
879 # gws.log.debug(f'bs chain: {r.uid=} {this_bs.uid=} {[bs.uid for bs in bs_list]}')
881 return bs_list
883 def write(self):
884 values = []
886 for fs in self.om.Flurstueck:
887 geoms = [_pop(r, 'geom') for r in fs.recs]
888 data = index.serialize(fs)
889 for r, g in zip(fs.recs, geoms):
890 r.geom = g
892 values.append(dict(
893 uid=fs.uid,
894 rc=len(fs.recs),
895 fshistoric=fs.isHistoric,
896 data=data,
897 geom=geoms[-1],
898 ))
900 self.write_table(index.TABLE_FLURSTUECK, values)
903class _FsIndexIndexer(_Indexer):
904 entries = {
905 index.TABLE_INDEXFLURSTUECK: [],
906 index.TABLE_INDEXLAGE: [],
907 index.TABLE_INDEXBUCHUNGSBLATT: [],
908 index.TABLE_INDEXPERSON: [],
909 index.TABLE_INDEXGEOM: [],
910 }
912 def collect(self):
913 with ProgressIndicator(f'ALKIS: creating indexes', len(self.rr.fsdata.om.Flurstueck)) as progress:
914 for fs in self.rr.fsdata.om.Flurstueck:
915 for r in fs.recs:
916 self.add(fs, r)
917 progress.update(1)
919 def add(self, fs: dt.Flurstueck, r: dt.FlurstueckRecord):
920 base = dict(
921 fs=r.uid,
922 fshistoric=r.isHistoric,
923 )
925 places = dict(
926 land=r.land.text,
927 land_t=index.text_key(r.land.text),
928 landcode=r.land.code,
930 regierungsbezirk=r.regierungsbezirk.text,
931 regierungsbezirk_t=index.text_key(r.regierungsbezirk.text),
932 regierungsbezirkcode=r.regierungsbezirk.code,
934 kreis=r.kreis.text,
935 kreis_t=index.text_key(r.kreis.text),
936 kreiscode=r.kreis.code,
938 gemeinde=r.gemeinde.text,
939 gemeinde_t=index.text_key(r.gemeinde.text),
940 gemeindecode=r.gemeinde.code,
942 gemarkung=r.gemarkung.text,
943 gemarkung_t=index.text_key(r.gemarkung.text),
944 gemarkungcode=r.gemarkung.code,
946 )
948 self.entries[index.TABLE_INDEXFLURSTUECK].append(dict(
949 **base,
950 **places,
952 amtlicheflaeche=r.amtlicheFlaeche,
953 geomflaeche=r.geomFlaeche,
955 flurnummer=r.flurnummer,
956 zaehler=r.zaehler,
957 nenner=r.nenner,
958 flurstuecksfolge=r.flurstuecksfolge,
959 flurstueckskennzeichen=r.flurstueckskennzeichen,
961 x=r.x,
962 y=r.y,
963 ))
965 self.entries[index.TABLE_INDEXGEOM].append(dict(
966 **base,
967 geomflaeche=r.geomFlaeche,
968 x=r.x,
969 y=r.y,
970 geom=r.geom,
971 ))
973 for la in fs.lageList:
974 for la_r in la.recs:
975 self.entries[index.TABLE_INDEXLAGE].append(dict(
976 **base,
977 **places,
978 lageuid=la_r.uid,
979 lagehistoric=la_r.isHistoric,
980 strasse=la_r.strasse,
981 strasse_t=index.strasse_key(la_r.strasse),
982 hausnummer=la_r.hausnummer,
983 x=la.x or r.x,
984 y=la.y or r.y,
985 ))
987 for bu in fs.buchungList:
988 bb = self.rr.buchung.om.Buchungsblatt.get(bu.buchungsblattUid)
990 for bb_r in bb.recs:
991 self.entries[index.TABLE_INDEXBUCHUNGSBLATT].append(dict(
992 **base,
993 buchungsblattuid=bb_r.uid,
994 buchungsblattkennzeichen=bb_r.buchungsblattkennzeichen,
995 buchungsblatthistoric=bu.isHistoric,
996 ))
998 pe_uids = set()
1000 for nn in bb.namensnummerList:
1001 for pe in nn.personList:
1002 if pe.uid in pe_uids:
1003 continue
1004 pe_uids.add(pe.uid)
1005 for pe_r in pe.recs:
1006 self.entries[index.TABLE_INDEXPERSON].append(dict(
1007 **base,
1008 personuid=pe_r.uid,
1009 personhistoric=pe_r.isHistoric,
1010 name=pe_r.nachnameOderFirma,
1011 name_t=index.text_key(pe_r.nachnameOderFirma),
1012 vorname=pe_r.vorname,
1013 vorname_t=index.text_key(pe_r.vorname),
1014 ))
1016 def write(self):
1017 for table_id, values in self.entries.items():
1018 if not self.ix.has_table(table_id):
1019 for n, v in enumerate(values, 1):
1020 v['n'] = n
1021 self.write_table(table_id, values)
1024class _Runner:
1025 def __init__(self, ix: index.Object, reader: dt.Reader, with_cache=False):
1026 self.ix: index.Object = ix
1027 self.reader: dt.Reader = reader
1029 self.withCache = with_cache
1030 self.cacheDir = gws.c.CACHE_DIR + '/alkis'
1031 if self.withCache:
1032 gws.u.ensure_dir(self.cacheDir)
1034 self.place = _PlaceIndexer(self)
1035 self.lage = _LageIndexer(self)
1036 self.buchung = _BuchungIndexer(self)
1037 self.part = _PartIndexer(self)
1038 self.fsdata = _FsDataIndexer(self)
1039 self.fsindex = _FsIndexIndexer(self)
1041 self.initMemory = gws.lib.osx.process_rss_size()
1043 def run(self):
1044 with self.ix.db.connect() as conn:
1045 with ProgressIndicator(f'ALKIS: indexing'):
1046 self.place.load_or_collect()
1047 self.memory_info()
1049 self.buchung.load_or_collect()
1050 self.memory_info()
1052 self.lage.load_or_collect()
1053 self.memory_info()
1055 self.fsdata.load_or_collect()
1056 gws.log.info(f'ALKIS: fs counts: {self.fsdata.counts}')
1057 self.memory_info()
1059 self.part.load_or_collect()
1060 self.memory_info()
1062 self.fsindex.collect()
1063 self.memory_info()
1065 self.place.write()
1066 self.buchung.write()
1067 self.lage.write()
1068 self.fsdata.write()
1069 self.part.write()
1070 self.fsindex.write()
1072 def memory_info(self):
1073 v = gws.lib.osx.process_rss_size() - self.initMemory
1074 if v > 0:
1075 gws.log.info(f'ALKIS: memory used: {v:.2f} MB', stacklevel=2)
1077 def read_flat(self, cls):
1078 cpath = self.cacheDir + '/flat_' + cls.__name__
1079 if self.withCache and gws.u.is_file(cpath):
1080 return gws.u.unserialize_from_path(cpath)
1082 rs = self._read_flat(cls)
1083 if self.withCache:
1084 gws.u.serialize_to_path(rs, cpath)
1086 return rs
1088 def _read_flat(self, cls):
1089 cnt = self.reader.count(cls)
1090 if cnt <= 0:
1091 gws.log.warning(f'ALKIS: read {cls.__name__}: empty table')
1092 return []
1094 rs = []
1095 with ProgressIndicator(f'ALKIS: read {cls.__name__}', cnt) as progress:
1096 for ax in self.reader.read_all(cls):
1097 rs.append(ax)
1098 progress.update(1)
1099 return rs
1101 def read_grouped(self, cls):
1102 cpath = self.cacheDir + '/grouped_' + cls.__name__
1103 if self.withCache and gws.u.is_file(cpath):
1104 return gws.u.unserialize_from_path(cpath)
1106 rs = self._read_grouped(cls)
1107 if self.withCache:
1108 gws.u.serialize_to_path(rs, cpath)
1110 return rs
1112 def _read_grouped(self, cls):
1113 cnt = self.reader.count(cls)
1114 if cnt <= 0:
1115 gws.log.warning(f'ALKIS: read {cls.__name__}: empty table')
1116 return []
1118 groups = {}
1119 with ProgressIndicator(f'ALKIS: read {cls.__name__}', cnt) as progress:
1120 for ax in self.reader.read_all(cls):
1121 groups.setdefault(ax.identifikator, []).append(ax)
1122 progress.update(1)
1123 for g in groups.values():
1124 g.sort(key=_sortkey_lebenszeitintervall)
1126 return list(groups.items())
1128 def props_from(self, ax, atts):
1129 d = {}
1131 for a in atts:
1132 v = getattr(ax, a['name'], None)
1133 if isinstance(v, gid.Object):
1134 # @TODO handle properties which are objects
1135 continue
1136 if dtx.is_date(v):
1137 v = dtx.to_string('%d.%m.%Y', v)
1138 if not gws.u.is_empty(v):
1139 d[a['name']] = v
1141 return dt.Object(**d)
1144def _from_ax(cls, ax, **kwargs):
1145 d = {}
1147 if ax:
1148 for k in cls.__annotations__:
1149 v = getattr(ax, k, None)
1150 if v:
1151 d[k] = v
1153 d['uid'] = ax.identifikator
1154 d['beginnt'] = ax.lebenszeitintervall.beginnt
1155 d['endet'] = ax.lebenszeitintervall.endet
1156 d['isHistoric'] = d['endet'] is not None
1157 if ax.anlass and ax.anlass[0].code != '000000':
1158 d['anlass'] = ax.anlass[0]
1159 if ax.geom:
1160 d['geom'] = ax.geom
1162 d.update(kwargs)
1163 return cls(**d)
1166def _anteil(ax):
1167 try:
1168 z = float(ax.anteil.zaehler)
1169 z = str(int(z) if z.is_integer() else z)
1170 n = float(ax.anteil.nenner)
1171 n = str(int(n) if n.is_integer() else n)
1172 return z + '/' + n
1173 except (AttributeError, ValueError, TypeError):
1174 pass
1177def _meta_attributes(meta):
1178 return sorted(
1179 [a for a in meta['attributes'] if a['name'] in dt.PROPS],
1180 key=lambda a: a['title']
1181 )
1184def _geom_of(o):
1185 if not o.geom:
1186 gws.log.warning(f'{o.__class__.__name__}:{o.uid}: no geometry')
1187 return
1188 return shapely.wkb.loads(o.geom, hex=True)
1191def _pop(obj, attr):
1192 v = getattr(obj, attr, None)
1193 try:
1194 delattr(obj, attr)
1195 except AttributeError:
1196 pass
1197 return v
1200def _sortkey_beginnt(o):
1201 return o.beginnt
1204def _sortkey_lebenszeitintervall(o):
1205 return o.lebenszeitintervall.beginnt
1208def _sortkey_namensnummer(nn: dt.Namensnummer):
1209 return _natkey(nn.recs[-1].laufendeNummerNachDIN1421), nn.recs[-1].beginnt
1212def _sortkey_buchungsstelle(bs: dt.Buchungsstelle):
1213 return _natkey(bs.recs[-1].laufendeNummer), bs.recs[-1].beginnt
1216def _sortkey_buchungsstelle_by_bblatt(bs: dt.Buchungsstelle):
1217 return bs.buchungsblattkennzeichenList[0], bs.recs[-1].beginnt
1220def _sortkey_part(pa: dt.Part):
1221 return pa.name.text, -pa.geomFlaeche
1224def _sortkey_gebaeude(ge: dt.Gebaeude):
1225 # sort Gebaeude by area (big->small)
1226 return ge.recs[-1].beginnt, -ge.recs[-1].geomFlaeche
1229def _natkey(v):
1230 if not v:
1231 return []
1232 return [
1233 '{:080d}'.format(int(digits)) if digits else chars.lower()
1234 for digits, chars in re.findall(r'(\d+)|(\D+)', v.strip())
1235 ]
1238def _comma(a):
1239 return ','.join(str(s) if s is not None else '' for s in a)
1242def _str(x):
1243 return None if x is None else str(x)