Coverage for gws-app / gws / plugin / alkis / data / indexer.py: 0%
673 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 10:12 +0100
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 10:12 +0100
1from typing import Optional, Iterable
3import re
4from typing import Generic, TypeVar
6import shapely
7import shapely.strtree
8import shapely.wkb
10import gws
11import gws.lib.osx
12import gws.lib.datetimex as dtx
13from gws.lib.cli import ProgressIndicator
14import gws.plugin.postgres.provider
16from . import types as dt
17from . import index
18from . import norbit6
20from .geo_info_dok import gid6 as gid
23def run(ix: index.Object, data_schema: str, with_force=False, with_cache=False):
24 if not ix.has_schema():
25 raise gws.Error(f'ALKIS: schema {ix.schema!r} does not exist')
27 if with_force:
28 ix.drop()
29 elif ix.status().complete:
30 return
32 rdr = norbit6.Object(ix.db, schema=data_schema)
33 rr = _Runner(ix, rdr, with_cache)
34 rr.run()
37##
39T = TypeVar("T")
42class _ObjectDict(Generic[T]):
43 def __init__(self, cls):
44 self.d = {}
45 self.cls = cls
47 def add(self, uid, recs) -> T:
48 o = self.cls(uid=uid, recs=recs)
49 o.isHistoric = all(r.isHistoric for r in recs)
50 self.d[o.uid] = o
51 return o
53 def get(self, uid, default=None) -> Optional[T]:
54 return self.d.get(uid, default)
56 def get_many(self, uids) -> list[T]:
57 res = {}
59 for uid in uids:
60 if uid not in res:
61 o = self.d.get(uid)
62 if o:
63 res[uid] = o
65 return list(res.values())
67 def get_from_ptr(self, obj: dt.Entity, attr):
68 uids = []
70 for r in obj.recs:
71 v = _pop(r, attr)
72 if isinstance(v, list):
73 uids.extend(v)
74 elif isinstance(v, str):
75 uids.append(v)
77 return self.get_many(uids)
79 def __iter__(self) -> Iterable[T]:
80 yield from self.d.values()
82 def __len__(self):
83 return len(self.d)
86class _ObjectMap:
88 def __init__(self):
89 self.Anschrift: _ObjectDict[dt.Anschrift] = _ObjectDict(dt.Anschrift)
90 self.Buchungsblatt: _ObjectDict[dt.Buchungsblatt] = _ObjectDict(dt.Buchungsblatt)
91 self.Buchungsstelle: _ObjectDict[dt.Buchungsstelle] = _ObjectDict(dt.Buchungsstelle)
92 self.Flurstueck: _ObjectDict[dt.Flurstueck] = _ObjectDict(dt.Flurstueck)
93 self.Gebaeude: _ObjectDict[dt.Gebaeude] = _ObjectDict(dt.Gebaeude)
94 self.Lage: _ObjectDict[dt.Lage] = _ObjectDict(dt.Lage)
95 self.Namensnummer: _ObjectDict[dt.Namensnummer] = _ObjectDict(dt.Namensnummer)
96 self.Part: _ObjectDict[dt.Part] = _ObjectDict(dt.Part)
97 self.Person: _ObjectDict[dt.Person] = _ObjectDict(dt.Person)
99 self.placeAll: dict = {}
100 self.placeIdx: dict = {}
101 self.catalog: dict = {}
104class _Indexer:
105 CACHE_KEY: str = ''
107 def __init__(self, runner: '_Runner'):
108 self.rr = runner
109 self.ix: index.Object = runner.ix
110 self.om = _ObjectMap()
112 def load_or_collect(self):
113 if not self.load_cache():
114 self.collect()
115 self.store_cache()
117 def load_cache(self):
118 if not self.rr.withCache or not self.CACHE_KEY:
119 return False
120 cpath = self.rr.cacheDir + '/' + self.CACHE_KEY
121 if not gws.u.is_file(cpath):
122 return False
123 om = gws.u.unserialize_from_path(cpath)
124 if not om:
125 return False
126 gws.log.info(f'ALKIS: use cache {self.CACHE_KEY!r}')
127 self.om = om
128 return True
130 def store_cache(self):
131 if not self.rr.withCache or not self.CACHE_KEY:
132 return
133 cpath = self.rr.cacheDir + '/' + self.CACHE_KEY
134 gws.u.serialize_to_path(self.om, cpath)
135 gws.log.info(f'ALKIS: store cache {self.CACHE_KEY!r}')
137 def collect(self):
138 pass
140 def write_table(self, table_id, values):
141 if self.ix.has_table(table_id):
142 return
143 with ProgressIndicator(f'ALKIS: write {table_id!r}', len(values)) as progress:
144 self.ix.create_table(table_id, values, progress)
146 def write(self):
147 pass
150class _PlaceIndexer(_Indexer):
151 """Index places (Administration- und Verwaltungseinheiten).
153 References: https://de.wikipedia.org/wiki/Amtlicher_Gemeindeschl%C3%BCssel
154 """
156 CACHE_KEY = 'obj_place'
158 empty1 = dt.EnumPair(code='0', text='')
159 empty2 = dt.EnumPair(code='00', text='')
161 def add(self, kind, ax, key_obj, **kwargs):
162 if ax.endet:
163 return
165 code = self.code(kind, key_obj)
166 value = dt.EnumPair(code, ax.bezeichnung)
168 p = dt.Place(**kwargs)
170 p.uid = kind + code
171 p.kind = kind
172 setattr(p, kind, value)
174 self.om.placeAll[p.uid] = p
175 self.om.placeIdx[p.uid] = value
177 return value
179 def collect(self):
180 self.om.placeAll = {}
181 self.om.placeIdx = {}
183 for ax in self.rr.read_flat(gid.AX_Bundesland):
184 self.add('land', ax, ax.schluessel)
186 for ax in self.rr.read_flat(gid.AX_Regierungsbezirk):
187 o = ax.schluessel
188 self.add('regierungsbezirk', ax, o, land=self.get_land(o))
190 for ax in self.rr.read_flat(gid.AX_KreisRegion):
191 o = ax.schluessel
192 self.add('kreis', ax, o, land=self.get_land(o), regierungsbezirk=self.get_regierungsbezirk(o))
194 for ax in self.rr.read_flat(gid.AX_Gemeinde):
195 o = ax.gemeindekennzeichen
196 self.add('gemeinde', ax, o, land=self.get_land(o), regierungsbezirk=self.get_regierungsbezirk(o), kreis=self.get_kreis(o))
198 # @TODO map Gemarkung to Gemeinde (see https://de.wikipedia.org/wiki/Liste_der_Gemarkungen_in_Nordrhein-Westfalen etc)
200 for ax in self.rr.read_flat(gid.AX_Gemarkung):
201 if str(ax.schluessel.gemarkungsnummer) in self.ix.excludeGemarkung:
202 continue
203 o = ax.schluessel
204 self.add('gemarkung', ax, o, land=self.get_land(o))
206 for ax in self.rr.read_flat(gid.AX_Buchungsblattbezirk):
207 o = ax.schluessel
208 self.add('buchungsblattbezirk', ax, o, land=self.get_land(o))
210 for ax in self.rr.read_flat(gid.AX_Dienststelle):
211 o = ax.schluessel
212 self.add('dienststelle', ax, o, land=self.get_land(o))
214 def write(self):
215 values = []
217 for place in self.om.placeAll.values():
218 values.append(dict(
219 uid=place.uid,
220 data=index.serialize(place),
221 ))
223 self.write_table(index.TABLE_PLACE, values)
225 def get_land(self, o):
226 return self.get('land', o) or self.empty2
228 def get_regierungsbezirk(self, o):
229 return self.get('regierungsbezirk', o) or self.empty1
231 def get_kreis(self, o):
232 return self.get('kreis', o) or self.empty2
234 def get_gemeinde(self, o):
235 return self.get('gemeinde', o) or self.empty1
237 def get_gemarkung(self, o):
238 return self.get('gemarkung', o) or self.empty1
240 def get_buchungsblattbezirk(self, o):
241 return self.get('buchungsblattbezirk', o) or self.empty1
243 def get_dienststelle(self, o):
244 return self.get('dienststelle', o) or self.empty1
246 def get(self, kind, o):
247 return self.om.placeIdx.get(kind + self.code(kind, o))
249 def is_empty(self, p: dt.EnumPair):
250 return p.code == '0' or p.code == '00'
252 CODES = {
253 'land': lambda o: o.land,
254 'regierungsbezirk': lambda o: o.land + (o.regierungsbezirk or '0'),
255 'kreis': lambda o: o.land + (o.regierungsbezirk or '0') + o.kreis,
256 'gemeinde': lambda o: o.land + (o.regierungsbezirk or '0') + o.kreis + o.gemeinde,
257 'gemarkung': lambda o: o.land + o.gemarkungsnummer,
258 'buchungsblattbezirk': lambda o: o.land + o.bezirk,
259 'dienststelle': lambda o: o.land + o.stelle,
260 }
262 def code(self, kind, o):
263 return self.CODES[kind](o)
266class _LageIndexer(_Indexer):
267 CACHE_KEY = 'obj_lage'
269 def collect(self):
270 for ax in self.rr.read_flat(gid.AX_LagebezeichnungKatalogeintrag):
271 self.om.catalog[self.lage_key(ax.schluessel)] = ax.bezeichnung
273 for cls in (gid.AX_LagebezeichnungMitHausnummer, gid.AX_LagebezeichnungOhneHausnummer):
274 for uid, axs in self.rr.read_grouped(cls):
275 self.om.Lage.add(uid, [
276 _from_ax(
277 dt.LageRecord,
278 ax,
279 strasse=self.strasse(ax),
280 hausnummer=index.normalize_hausnummer(ax.hausnummer),
281 )
282 for ax in axs
283 ])
285 # use the PTO (art=HNR) geometry for lage coordinates
286 # PTO.dientZurDarstellungVon -> lage.uid
288 for pto in self.rr.read_flat(gid.AP_PTO):
289 if pto.lebenszeitintervall.endet is not None:
290 continue
291 art = _pop(pto, 'art') or ''
292 if art.upper() != 'HNR':
293 continue
295 uids = _pop(pto, 'dientZurDarstellungVon')
296 if not uids or not isinstance(uids, list):
297 continue
299 geom = _geom_of(pto)
300 if not geom:
301 continue
303 x = geom.centroid.x
304 y = geom.centroid.y
305 for la in self.om.Lage.get_many(uids):
306 la.x = x
307 la.y = y
309 # read related Gebaeude records
311 atts = _meta_attributes(gid.METADATA['AX_Gebaeude'])
313 for uid, axs in self.rr.read_grouped(gid.AX_Gebaeude):
314 self.om.Gebaeude.add(uid, [
315 _from_ax(
316 dt.GebaeudeRecord,
317 ax,
318 name=', '.join(ax.name) if ax.name else None,
319 amtlicheFlaeche=ax.grundflaeche or 0,
320 props=self.rr.props_from(ax, atts),
321 _zeigtAuf=ax.zeigtAuf,
322 )
323 for ax in axs
324 ])
326 for ge in self.om.Gebaeude:
327 for r in ge.recs:
328 geom = _geom_of(r)
329 r.geomFlaeche = round(geom.area, 2) if geom else 0
331 # omit Gebaeude geometries for now
332 for ge in self.om.Gebaeude:
333 for r in ge.recs:
334 _pop(r, 'geom')
336 for la in self.om.Lage:
337 la.gebaeudeList = []
339 # AX_Gebaeude.zeigtAuf -> AX_LagebezeichnungMitHausnummer
340 for ge in self.om.Gebaeude:
341 for la in self.om.Lage.get_from_ptr(ge, '_zeigtAuf'):
342 la.gebaeudeList.append(ge)
344 def strasse(self, ax):
345 if isinstance(ax.lagebezeichnung, str):
346 return ax.lagebezeichnung
347 return self.om.catalog.get(self.lage_key(ax.lagebezeichnung), '')
349 def lage_key(self, r):
350 return _comma([
351 getattr(r, 'land'),
352 getattr(r, 'regierungsbezirk'),
353 getattr(r, 'kreis'),
354 getattr(r, 'gemeinde'),
355 getattr(r, 'lage'),
356 ])
358 def write(self):
359 values = []
361 for la in self.om.Lage:
362 values.append(dict(
363 uid=la.uid,
364 rc=len(la.recs),
365 data=index.serialize(la),
366 ))
368 self.write_table(index.TABLE_LAGE, values)
371class _BuchungIndexer(_Indexer):
372 CACHE_KEY = 'obj_buchungsblatt'
374 buchungsblattkennzeichenMap: dict[str, dt.Buchungsblatt] = {}
376 def collect(self):
377 for uid, axs in self.rr.read_grouped(gid.AX_Anschrift):
378 self.om.Anschrift.add(uid, [
379 _from_ax(
380 dt.AnschriftRecord,
381 ax,
382 ort=ax.ort_AmtlichesOrtsnamensverzeichnis or ax.ort_Post,
383 plz=ax.postleitzahlPostzustellung,
384 telefon=ax.telefon[0] if ax.telefon else None
385 )
386 for ax in axs
387 ])
389 for uid, axs in self.rr.read_grouped(gid.AX_Person):
390 self.om.Person.add(uid, [
391 _from_ax(
392 dt.PersonRecord,
393 ax,
394 anrede=ax.anrede.text if ax.anrede else None,
395 _hat=ax.hat,
396 )
397 for ax in axs
398 ])
400 # AX_Person.hat -> [AX_Anschrift]
401 for pe in self.om.Person:
402 pe.anschriftList = self.om.Anschrift.get_from_ptr(pe, '_hat')
404 for uid, axs in self.rr.read_grouped(gid.AX_Namensnummer):
405 self.om.Namensnummer.add(uid, [
406 _from_ax(
407 dt.NamensnummerRecord,
408 ax,
409 anteil=_anteil(ax),
410 _benennt=ax.benennt,
411 _istBestandteilVon=ax.istBestandteilVon
412 )
413 for ax in axs
414 ])
416 # AX_Namensnummer.benennt -> AX_Person
417 for nn in self.om.Namensnummer:
418 nn.laufendeNummer = nn.recs[-1].laufendeNummerNachDIN1421
419 nn.personList = self.om.Person.get_from_ptr(nn, '_benennt')
421 for uid, axs in self.rr.read_grouped(gid.AX_Buchungsstelle):
422 self.om.Buchungsstelle.add(uid, [
423 _from_ax(
424 dt.BuchungsstelleRecord,
425 ax,
426 anteil=_anteil(ax),
427 _an=ax.an,
428 _zu=ax.zu,
429 _istBestandteilVon=ax.istBestandteilVon,
430 )
431 for ax in axs
432 ])
434 for bs in self.om.Buchungsstelle:
435 bs.laufendeNummer = bs.recs[-1].laufendeNummer
436 bs.fsUids = []
437 bs.flurstueckskennzeichenList = []
439 for uid, axs in self.rr.read_grouped(gid.AX_Buchungsblatt):
440 self.om.Buchungsblatt.add(uid, [
441 _from_ax(
442 dt.BuchungsblattRecord,
443 ax,
444 buchungsblattbezirk=self.rr.place.get_buchungsblattbezirk(ax.buchungsblattbezirk),
445 )
446 for ax in axs
447 ])
449 for bb in self.om.Buchungsblatt:
450 bb.buchungsstelleList = []
451 bb.namensnummerList = []
452 bb.buchungsblattkennzeichen = bb.recs[-1].buchungsblattkennzeichen
453 self.buchungsblattkennzeichenMap[bb.buchungsblattkennzeichen] = bb
455 # AX_Buchungsstelle.istBestandteilVon -> AX_Buchungsblatt
456 for bs in self.om.Buchungsstelle:
457 bb_list = self.om.Buchungsblatt.get_from_ptr(bs, '_istBestandteilVon')
458 bs.buchungsblattUids = [bb.uid for bb in bb_list]
459 bs.buchungsblattkennzeichenList = [bb.buchungsblattkennzeichen for bb in bb_list]
460 for bb in bb_list:
461 bb.buchungsstelleList.append(bs)
463 # AX_Namensnummer.istBestandteilVon -> AX_Buchungsblatt
464 for nn in self.om.Namensnummer:
465 bb_list = self.om.Buchungsblatt.get_from_ptr(nn, '_istBestandteilVon')
466 nn.buchungsblattUids = [bb.uid for bb in bb_list]
467 nn.buchungsblattkennzeichenList = [bb.buchungsblattkennzeichen for bb in bb_list]
468 for bb in bb_list:
469 bb.namensnummerList.append(nn)
471 for bb in self.om.Buchungsblatt:
472 bb.buchungsstelleList.sort(key=_sortkey_buchungsstelle)
473 bb.namensnummerList.sort(key=_sortkey_namensnummer)
475 # AX_Buchungsstelle.an -> [AX_Buchungsstelle]
476 # AX_Buchungsstelle.zu -> [AX_Buchungsstelle]
477 # see Erläuterungen zu ALKIS Version 6, page 116-119
479 for bs in self.om.Buchungsstelle:
480 bs.childUids = []
481 bs.parentUids = []
482 bs.parentkennzeichenList = []
484 for bs in self.om.Buchungsstelle:
485 parent_uids = set()
486 parent_knz = set()
488 for r in bs.recs:
489 parent_uids.update(_pop(r, '_an'))
490 parent_uids.update(_pop(r, '_zu'))
492 for parent_bs in self.om.Buchungsstelle.get_many(parent_uids):
493 parent_bs.childUids.append(bs.uid)
494 bs.parentUids.append(parent_bs.uid)
495 for bb_knz in parent_bs.buchungsblattkennzeichenList:
496 parent_knz.add(bb_knz + '.' + parent_bs.laufendeNummer)
498 bs.parentkennzeichenList = sorted(parent_knz)
500 def write(self):
501 values = []
503 for bb in self.om.Buchungsblatt:
504 values.append(dict(
505 uid=bb.uid,
506 rc=len(bb.recs),
507 data=index.serialize(bb),
508 ))
510 self.write_table(index.TABLE_BUCHUNGSBLATT, values)
513class _PartIndexer(_Indexer):
514 CACHE_KEY = 'obj_part'
515 MIN_PART_AREA = 1
517 parts: list[dt.Part] = []
519 fs_list = []
520 fs_geom = []
522 stree: shapely.strtree.STRtree
524 def collect(self):
526 for kind in dt.Part.KIND:
527 self.collect_kind(kind)
529 for fs in self.rr.fsdata.om.Flurstueck:
530 self.fs_list.append(fs)
531 # NB take only the most recent fs geometry into account
532 self.fs_geom.append(_geom_of(fs.recs[-1]))
534 self.stree = shapely.strtree.STRtree(self.fs_geom)
536 with ProgressIndicator(f'ALKIS: parts', len(self.om.Part)) as progress:
537 for pa in self.om.Part:
538 self.compute_intersections(pa)
539 progress.update(1)
541 self.parts.sort(key=_sortkey_part)
543 for pa in self.parts:
544 pa.isHistoric = all(r.isHistoric for r in pa.recs)
546 def collect_kind(self, kind):
547 _, key = dt.Part.KIND[kind]
548 classes = [
549 getattr(gid, meta['name'])
550 for meta in gid.METADATA.values()
551 if (
552 meta['kind'] == 'object'
553 and meta['geom']
554 and re.search(key + r'/\w+/', meta['key'])
555 )
556 ]
558 for cls in classes:
559 self.collect_class(kind, cls)
561 def collect_class(self, kind, cls):
562 meta = gid.METADATA[cls.__name__]
563 atts = _meta_attributes(meta)
565 for uid, axs in self.rr.read_grouped(cls):
566 pa = self.om.Part.add(uid, [
567 _from_ax(
568 dt.PartRecord,
569 ax,
570 props=self.rr.props_from(ax, atts),
571 )
572 for ax in axs
573 ])
574 pa.kind = kind
575 pa.name = dt.EnumPair(meta['uid'], meta['title'])
577 def compute_intersections(self, pa: dt.Part):
578 parts_map = {}
580 for r in pa.recs:
581 geom = _geom_of(r)
582 if not geom:
583 continue
585 for i in self.stree.query(geom):
586 part_geom = shapely.intersection(self.fs_geom[i], geom)
587 part_area = round(part_geom.area, 2)
588 if part_area < self.MIN_PART_AREA:
589 continue
591 fs = self.fs_list[i]
593 part = parts_map.setdefault(fs.uid, dt.Part(
594 uid=pa.uid,
595 recs=[],
596 kind=pa.kind,
597 name=pa.name,
598 fs=fs.uid,
599 ))
601 # computed area corrected with respect to FS's "amtlicheFlaeche"
602 part_area_corrected = round(
603 fs.recs[-1].amtlicheFlaeche * (part_area / fs.recs[-1].geomFlaeche),
604 2)
606 part.recs.append(dt.PartRecord(
607 uid=r.uid,
608 beginnt=r.beginnt,
609 endet=r.endet,
610 anlass=r.anlass,
611 props=r.props,
612 geomFlaeche=part_area,
613 amtlicheFlaeche=part_area_corrected,
614 isHistoric=r.endet is not None,
615 ))
617 part.geom = shapely.wkb.dumps(part_geom, srid=self.ix.crs.srid, hex=True)
618 part.geomFlaeche = part_area
619 part.amtlicheFlaeche = part_area_corrected
621 self.parts.extend(parts_map.values())
623 def write(self):
624 values = []
626 for n, pa in enumerate(self.parts, 1):
627 geom = _pop(pa, 'geom')
628 data = index.serialize(pa)
629 pa.geom = geom
630 values.append(dict(
631 n=n,
632 fs=pa.fs,
633 uid=pa.uid,
634 beginnt=pa.recs[-1].beginnt,
635 endet=pa.recs[-1].endet,
636 kind=pa.kind,
637 name=pa.name.text,
638 parthistoric=pa.isHistoric,
639 data=data,
640 geom=geom,
641 ))
643 self.write_table(index.TABLE_PART, values)
646class _FsDataIndexer(_Indexer):
647 CACHE_KEY = 'obj_flurstueck'
649 def __init__(self, runner: '_Runner'):
650 super().__init__(runner)
651 self.counts = {
652 'ok': 0,
653 'no_geometry': 0,
654 'excluded': 0,
655 }
657 def collect(self):
658 for uid, axs in self.rr.read_grouped(gid.AX_Flurstueck):
659 recs = gws.u.compact(self.record(ax) for ax in axs)
660 if recs:
661 self.om.Flurstueck.add(uid, recs)
663 for uid, axs in self.rr.read_grouped(gid.AX_HistorischesFlurstueck):
664 recs = gws.u.compact(self.record(ax) for ax in axs)
665 if not recs:
666 continue
667 # For a historic FS, 'beginnt' is basically when the history beginnt
668 # (see comments for AX_HistorischesFlurstueck in gid6).
669 # we set F.endet=F.beginnt to designate this one as 'historic'
670 for r in recs:
671 r.endet = r.beginnt
672 r.isHistoric = True
673 self.om.Flurstueck.add(uid, recs)
675 for fs in self.om.Flurstueck:
676 fs.flurstueckskennzeichen = fs.recs[-1].flurstueckskennzeichen
677 fs.fsnummer = index.make_fsnummer(fs.recs[-1])
678 fs.x = fs.recs[-1].x
679 fs.y = fs.recs[-1].y
680 self.process_lage(fs)
681 self.process_gebaeude(fs)
682 self.process_buchung(fs)
684 # check the 'nachfolgerFlurstueckskennzeichen' array
685 # and mark each referenced FS as a "vorgaenger" FS
686 # It is a M:N relation, therefore 'vorgaengerFlurstueckskennzeichen' is also an array
688 knz_to_fs = {
689 fs.flurstueckskennzeichen: fs
690 for fs in self.om.Flurstueck
691 }
692 for fs in self.om.Flurstueck:
693 nfs = fs.recs[-1].nachfolgerFlurstueckskennzeichen
694 if not nfs:
695 continue
696 for nf_knz in nfs:
697 nf_fs = knz_to_fs.get(nf_knz)
698 if not nf_fs:
699 gws.log.warning(f'ALKIS: nachfolgerFlurstueck missing {fs.flurstueckskennzeichen!r}->{nf_knz!r}')
700 continue
701 if not nf_fs.recs[-1].vorgaengerFlurstueckskennzeichen:
702 nf_fs.recs[-1].vorgaengerFlurstueckskennzeichen = []
703 nf_fs.recs[-1].vorgaengerFlurstueckskennzeichen.append(fs.flurstueckskennzeichen)
705 def record(self, ax):
706 r: dt.FlurstueckRecord = _from_ax(
707 dt.FlurstueckRecord,
708 ax,
709 amtlicheFlaeche=ax.amtlicheFlaeche or 0,
710 flurnummer=_str(ax.flurnummer),
711 zaehler=_str(ax.flurstuecksnummer.zaehler),
712 nenner=_str(ax.flurstuecksnummer.nenner),
713 zustaendigeStelle=[self.rr.place.get_dienststelle(p) for p in (ax.zustaendigeStelle or [])],
715 _weistAuf=ax.weistAuf,
716 _zeigtAuf=ax.zeigtAuf,
717 _istGebucht=ax.istGebucht,
718 _buchung=ax.buchung,
719 )
721 # basic data
723 r.gemarkung = self.rr.place.get_gemarkung(ax.gemarkung)
724 r.gemeinde = self.rr.place.get_gemeinde(ax.gemeindezugehoerigkeit)
725 r.regierungsbezirk = self.rr.place.get_regierungsbezirk(ax.gemeindezugehoerigkeit)
726 r.kreis = self.rr.place.get_kreis(ax.gemeindezugehoerigkeit)
727 r.land = self.rr.place.get_land(ax.gemeindezugehoerigkeit)
729 if self.rr.place.is_empty(r.gemarkung) or self.rr.place.is_empty(r.gemeinde):
730 # exclude Flurstücke that refer to Gemeinde/Gemarkung
731 # which do not exist in the reference AX tables
732 self.counts['excluded'] += 1
733 return None
735 # geometry
737 geom = _geom_of(r)
738 if not geom:
739 self.counts['no_geometry'] += 1
740 return None
742 r.geomFlaeche = round(geom.area, 2)
743 r.x = round(geom.centroid.x, 2)
744 r.y = round(geom.centroid.y, 2)
746 self.counts['ok'] += 1
747 return r
749 def process_lage(self, fs: dt.Flurstueck):
750 fs.lageList = []
752 # AX_Flurstueck.weistAuf -> AX_LagebezeichnungMitHausnummer
753 # AX_Flurstueck.zeigtAuf -> AX_LagebezeichnungOhneHausnummer
754 fs.lageList.extend(self.rr.lage.om.Lage.get_from_ptr(fs, '_weistAuf'))
755 fs.lageList.extend(self.rr.lage.om.Lage.get_from_ptr(fs, '_zeigtAuf'))
757 def process_gebaeude(self, fs: dt.Flurstueck):
758 ge_map = {}
760 for la in fs.lageList:
761 for ge in la.gebaeudeList:
762 ge_map[ge.uid] = ge
764 fs.gebaeudeList = list(ge_map.values())
765 fs.gebaeudeList.sort(key=_sortkey_gebaeude)
767 fs.gebaeudeAmtlicheFlaeche = sum(ge.recs[-1].amtlicheFlaeche for ge in fs.gebaeudeList if not ge.recs[-1].endet)
768 fs.gebaeudeGeomFlaeche = sum(ge.recs[-1].geomFlaeche for ge in fs.gebaeudeList if not ge.recs[-1].endet)
770 def process_buchung(self, fs: dt.Flurstueck):
771 bs_historic_map = {}
772 bs_seen = set()
773 buchung_map = {}
775 # for each Flurstück record, we collect all related Buchungsstellen (with respect to parent-child relations)
776 # then group Buchungsstellen by their Buchungsblatt
777 # and create Buchung objects for a FS
779 for r in fs.recs:
780 hist_buchung = _pop(r, '_buchung')
781 if hist_buchung:
782 bs_list = self.historic_buchungsstelle_list(r, hist_buchung)
783 else:
784 bs_list = self.buchungsstelle_list(r)
786 for bs in bs_list:
787 # a Buchungsstelle referred to by an expired Flurstück might not be expired itself,
788 # so we have to track its state separately by wrapping it in a BuchungsstelleReference
789 # a BuchungsstelleReference is historic if its Flurstück is
790 bs_historic_map[bs.uid] = r.isHistoric
792 if bs.uid in bs_seen:
793 continue
794 bs_seen.add(bs.uid)
796 # populate Flurstück references in a Buchungsstelle
797 if fs.uid not in bs.fsUids:
798 bs.fsUids.append(fs.uid)
799 bs.flurstueckskennzeichenList.append(fs.flurstueckskennzeichen)
801 # create Buchung records by grouping Buchungsstellen
802 for bb_uid in bs.buchungsblattUids:
803 bu = buchung_map.setdefault(bb_uid, dt.Buchung(recs=[], buchungsblattUid=bb_uid))
804 bu.recs.append(dt.BuchungsstelleReference(buchungsstelle=bs))
806 fs.buchungList = list(buchung_map.values())
808 for bu in fs.buchungList:
809 for ref in bu.recs:
810 ref.isHistoric = bs_historic_map[ref.buchungsstelle.uid]
811 bu.isHistoric = all(ref.isHistoric for ref in bu.recs)
813 return fs
815 def historic_buchungsstelle_list(self, r: dt.FlurstueckRecord, hist_buchung):
816 # an AX_HistorischesFlurstueck with a special 'buchung' reference
818 bs_list = []
820 for bu in hist_buchung:
821 bb = self.rr.buchung.buchungsblattkennzeichenMap.get(bu.buchungsblattkennzeichen)
822 if not bb:
823 continue
824 # create a fake historic Buchungstelle
825 bs_list.append(dt.Buchungsstelle(
826 uid=bb.uid + '_' + bu.laufendeNummerDerBuchungsstelle,
827 recs=[
828 dt.BuchungsstelleRecord(
829 endet=r.endet,
830 laufendeNummer=bu.laufendeNummerDerBuchungsstelle,
831 isHistoric=True,
832 )
833 ],
834 buchungsblattUids=[bb.uid],
835 buchungsblattkennzeichenList=[bb.buchungsblattkennzeichen],
836 parentUids=[],
837 childUids=[],
838 fsUids=[],
839 parentkennzeichenList=[],
840 flurstueckskennzeichenList=[],
841 laufendeNummer=bu.laufendeNummerDerBuchungsstelle,
842 isHistoric=True,
843 ))
845 return bs_list
847 def buchungsstelle_list(self, r: dt.FlurstueckRecord):
848 # AX_Flurstueck.istGebucht -> AX_Buchungsstelle
850 this_bs = self.rr.buchung.om.Buchungsstelle.get(_pop(r, '_istGebucht'))
851 if not this_bs:
852 return []
854 bs_list = []
856 # A Flurstück points to a Buchungsstelle (F.istGebucht -> B).
857 # A Buchungsstelle can have parent (B.an -> parent.uid) and child (child.an -> B.uid) Buchungsstellen
858 # (these references are populated in _BuchungIndexer above).
859 # Our task here, given F.istGebucht -> B, collect B's parents and children
860 # These are Buchungsstellen that directly or indirectly mention the current Flurstück.
862 queue: list[dt.Buchungsstelle] = [this_bs]
863 while queue:
864 bs = queue.pop(0)
865 bs_list.insert(0, bs)
866 for uid in bs.parentUids:
867 queue.append(self.rr.buchung.om.Buchungsstelle.get(uid))
869 # remove this_bs
870 bs_list.pop()
872 queue: list[dt.Buchungsstelle] = [this_bs]
873 while queue:
874 bs = queue.pop(0)
875 bs_list.append(bs)
876 # sort related (child) Buchungsstellen by their BB-Kennzeichen
877 child_bs_list = self.rr.buchung.om.Buchungsstelle.get_many(bs.childUids)
878 child_bs_list.sort(key=_sortkey_buchungsstelle_by_bblatt)
879 queue.extend(child_bs_list)
881 # if len(bs_list) > 1:
882 # gws.log.debug(f'bs chain: {r.uid=} {this_bs.uid=} {[bs.uid for bs in bs_list]}')
884 return bs_list
886 def write(self):
887 values = []
889 for fs in self.om.Flurstueck:
890 geoms = [_pop(r, 'geom') for r in fs.recs]
891 data = index.serialize(fs)
892 for r, g in zip(fs.recs, geoms):
893 r.geom = g
895 values.append(dict(
896 uid=fs.uid,
897 rc=len(fs.recs),
898 fshistoric=fs.isHistoric,
899 data=data,
900 geom=geoms[-1],
901 ))
903 self.write_table(index.TABLE_FLURSTUECK, values)
906class _FsIndexIndexer(_Indexer):
907 entries = {
908 index.TABLE_INDEXFLURSTUECK: [],
909 index.TABLE_INDEXLAGE: [],
910 index.TABLE_INDEXBUCHUNGSBLATT: [],
911 index.TABLE_INDEXPERSON: [],
912 index.TABLE_INDEXGEOM: [],
913 }
915 def collect(self):
916 with ProgressIndicator(f'ALKIS: creating indexes', len(self.rr.fsdata.om.Flurstueck)) as progress:
917 for fs in self.rr.fsdata.om.Flurstueck:
918 for r in fs.recs:
919 self.add(fs, r)
920 progress.update(1)
922 def add(self, fs: dt.Flurstueck, r: dt.FlurstueckRecord):
923 base = dict(
924 fs=r.uid,
925 fshistoric=r.isHistoric,
926 )
928 places = dict(
929 land=r.land.text,
930 land_t=index.text_key(r.land.text),
931 landcode=r.land.code,
933 regierungsbezirk=r.regierungsbezirk.text,
934 regierungsbezirk_t=index.text_key(r.regierungsbezirk.text),
935 regierungsbezirkcode=r.regierungsbezirk.code,
937 kreis=r.kreis.text,
938 kreis_t=index.text_key(r.kreis.text),
939 kreiscode=r.kreis.code,
941 gemeinde=r.gemeinde.text,
942 gemeinde_t=index.text_key(r.gemeinde.text),
943 gemeindecode=r.gemeinde.code,
945 gemarkung=r.gemarkung.text,
946 gemarkung_t=index.text_key(r.gemarkung.text),
947 gemarkungcode=r.gemarkung.code,
949 )
951 self.entries[index.TABLE_INDEXFLURSTUECK].append(dict(
952 **base,
953 **places,
955 amtlicheflaeche=r.amtlicheFlaeche,
956 geomflaeche=r.geomFlaeche,
958 flurnummer=r.flurnummer,
959 zaehler=r.zaehler,
960 nenner=r.nenner,
961 flurstuecksfolge=r.flurstuecksfolge,
962 flurstueckskennzeichen=r.flurstueckskennzeichen,
964 x=r.x,
965 y=r.y,
966 ))
968 self.entries[index.TABLE_INDEXGEOM].append(dict(
969 **base,
970 geomflaeche=r.geomFlaeche,
971 x=r.x,
972 y=r.y,
973 geom=r.geom,
974 ))
976 for la in fs.lageList:
977 for la_r in la.recs:
978 self.entries[index.TABLE_INDEXLAGE].append(dict(
979 **base,
980 **places,
981 lageuid=la_r.uid,
982 lagehistoric=la_r.isHistoric,
983 strasse=la_r.strasse,
984 strasse_t=index.strasse_key(la_r.strasse),
985 hausnummer=la_r.hausnummer,
986 x=la.x or r.x,
987 y=la.y or r.y,
988 ))
990 for bu in fs.buchungList:
991 bb = self.rr.buchung.om.Buchungsblatt.get(bu.buchungsblattUid)
993 for bb_r in bb.recs:
994 self.entries[index.TABLE_INDEXBUCHUNGSBLATT].append(dict(
995 **base,
996 buchungsblattuid=bb_r.uid,
997 buchungsblattkennzeichen=bb_r.buchungsblattkennzeichen,
998 buchungsblatthistoric=bu.isHistoric,
999 ))
1001 pe_uids = set()
1003 for nn in bb.namensnummerList:
1004 for pe in nn.personList:
1005 if pe.uid in pe_uids:
1006 continue
1007 pe_uids.add(pe.uid)
1008 for pe_r in pe.recs:
1009 self.entries[index.TABLE_INDEXPERSON].append(dict(
1010 **base,
1011 personuid=pe_r.uid,
1012 personhistoric=pe_r.isHistoric,
1013 name=pe_r.nachnameOderFirma,
1014 name_t=index.text_key(pe_r.nachnameOderFirma),
1015 vorname=pe_r.vorname,
1016 vorname_t=index.text_key(pe_r.vorname),
1017 ))
1019 def write(self):
1020 for table_id, values in self.entries.items():
1021 if not self.ix.has_table(table_id):
1022 for n, v in enumerate(values, 1):
1023 v['n'] = n
1024 self.write_table(table_id, values)
1027class _Runner:
1028 def __init__(self, ix: index.Object, reader: dt.Reader, with_cache=False):
1029 self.ix: index.Object = ix
1030 self.reader: dt.Reader = reader
1032 self.withCache = with_cache
1033 self.cacheDir = gws.c.CACHE_DIR + '/alkis'
1034 if self.withCache:
1035 gws.u.ensure_dir(self.cacheDir)
1037 self.place = _PlaceIndexer(self)
1038 self.lage = _LageIndexer(self)
1039 self.buchung = _BuchungIndexer(self)
1040 self.part = _PartIndexer(self)
1041 self.fsdata = _FsDataIndexer(self)
1042 self.fsindex = _FsIndexIndexer(self)
1044 self.initMemory = gws.lib.osx.process_rss_size()
1046 def run(self):
1047 with self.ix.db.connect() as conn:
1048 with ProgressIndicator(f'ALKIS: indexing'):
1049 self.place.load_or_collect()
1050 self.memory_info()
1052 self.buchung.load_or_collect()
1053 self.memory_info()
1055 self.lage.load_or_collect()
1056 self.memory_info()
1058 self.fsdata.load_or_collect()
1059 gws.log.info(f'ALKIS: fs counts: {self.fsdata.counts}')
1060 self.memory_info()
1062 self.part.load_or_collect()
1063 self.memory_info()
1065 self.fsindex.collect()
1066 self.memory_info()
1068 self.place.write()
1069 self.buchung.write()
1070 self.lage.write()
1071 self.fsdata.write()
1072 self.part.write()
1073 self.fsindex.write()
1075 def memory_info(self):
1076 v = gws.lib.osx.process_rss_size() - self.initMemory
1077 if v > 0:
1078 gws.log.info(f'ALKIS: memory used: {v:.2f} MB', stacklevel=2)
1080 def read_flat(self, cls):
1081 cpath = self.cacheDir + '/flat_' + cls.__name__
1082 if self.withCache and gws.u.is_file(cpath):
1083 return gws.u.unserialize_from_path(cpath)
1085 rs = self._read_flat(cls)
1086 if self.withCache:
1087 gws.u.serialize_to_path(rs, cpath)
1089 return rs
1091 def _read_flat(self, cls):
1092 cnt = self.reader.count(cls)
1093 if cnt <= 0:
1094 gws.log.warning(f'ALKIS: read {cls.__name__}: empty table')
1095 return []
1097 rs = []
1098 with ProgressIndicator(f'ALKIS: read {cls.__name__}', cnt) as progress:
1099 for ax in self.reader.read_all(cls):
1100 rs.append(ax)
1101 progress.update(1)
1102 return rs
1104 def read_grouped(self, cls):
1105 cpath = self.cacheDir + '/grouped_' + cls.__name__
1106 if self.withCache and gws.u.is_file(cpath):
1107 return gws.u.unserialize_from_path(cpath)
1109 rs = self._read_grouped(cls)
1110 if self.withCache:
1111 gws.u.serialize_to_path(rs, cpath)
1113 return rs
1115 def _read_grouped(self, cls):
1116 cnt = self.reader.count(cls)
1117 if cnt <= 0:
1118 gws.log.warning(f'ALKIS: read {cls.__name__}: empty table')
1119 return []
1121 groups = {}
1122 with ProgressIndicator(f'ALKIS: read {cls.__name__}', cnt) as progress:
1123 for ax in self.reader.read_all(cls):
1124 groups.setdefault(ax.identifikator, []).append(ax)
1125 progress.update(1)
1126 for g in groups.values():
1127 g.sort(key=_sortkey_lebenszeitintervall)
1129 return list(groups.items())
1131 def props_from(self, ax, atts):
1132 d = {}
1134 for a in atts:
1135 v = getattr(ax, a['name'], None)
1136 if isinstance(v, gid.Object):
1137 # @TODO handle properties which are objects
1138 continue
1139 if dtx.is_date(v):
1140 v = dtx.to_string('%d.%m.%Y', v)
1141 if not gws.u.is_empty(v):
1142 d[a['name']] = v
1144 return dt.Object(**d)
1147def _from_ax(cls, ax, **kwargs):
1148 d = {}
1150 if ax:
1151 for k in cls.__annotations__:
1152 v = getattr(ax, k, None)
1153 if v:
1154 d[k] = v
1156 d['uid'] = ax.identifikator
1157 d['beginnt'] = ax.lebenszeitintervall.beginnt
1158 d['endet'] = ax.lebenszeitintervall.endet
1159 d['isHistoric'] = d['endet'] is not None
1160 if ax.anlass and ax.anlass[0].code != '000000':
1161 d['anlass'] = ax.anlass[0]
1162 if ax.geom:
1163 d['geom'] = ax.geom
1165 d.update(kwargs)
1166 return cls(**d)
1169def _anteil(ax):
1170 try:
1171 z = float(ax.anteil.zaehler)
1172 z = str(int(z) if z.is_integer() else z)
1173 n = float(ax.anteil.nenner)
1174 n = str(int(n) if n.is_integer() else n)
1175 return z + '/' + n
1176 except (AttributeError, ValueError, TypeError):
1177 pass
1180def _meta_attributes(meta):
1181 return sorted(
1182 [a for a in meta['attributes'] if a['name'] in dt.PROPS],
1183 key=lambda a: a['title']
1184 )
1187def _geom_of(o):
1188 if not o.geom:
1189 gws.log.warning(f'{o.__class__.__name__}:{o.uid}: no geometry')
1190 return
1191 return shapely.wkb.loads(o.geom, hex=True)
1194def _pop(obj, attr):
1195 v = getattr(obj, attr, None)
1196 try:
1197 delattr(obj, attr)
1198 except AttributeError:
1199 pass
1200 return v
1203def _sortkey_beginnt(o):
1204 return o.beginnt
1207def _sortkey_lebenszeitintervall(o):
1208 return o.lebenszeitintervall.beginnt
1211def _sortkey_namensnummer(nn: dt.Namensnummer):
1212 return _natkey(nn.recs[-1].laufendeNummerNachDIN1421), nn.recs[-1].beginnt
1215def _sortkey_buchungsstelle(bs: dt.Buchungsstelle):
1216 return _natkey(bs.recs[-1].laufendeNummer), bs.recs[-1].beginnt
1219def _sortkey_buchungsstelle_by_bblatt(bs: dt.Buchungsstelle):
1220 return bs.buchungsblattkennzeichenList[0], bs.recs[-1].beginnt
1223def _sortkey_part(pa: dt.Part):
1224 return pa.name.text, -pa.geomFlaeche
1227def _sortkey_gebaeude(ge: dt.Gebaeude):
1228 # sort Gebaeude by area (big->small)
1229 return ge.recs[-1].beginnt, -ge.recs[-1].geomFlaeche
1232def _natkey(v):
1233 if not v:
1234 return []
1235 return [
1236 '{:080d}'.format(int(digits)) if digits else chars.lower()
1237 for digits, chars in re.findall(r'(\d+)|(\D+)', v.strip())
1238 ]
1241def _comma(a):
1242 return ','.join(str(s) if s is not None else '' for s in a)
1245def _str(x):
1246 return None if x is None else str(x)