Coverage for gws-app / gws / gis / cache / cli.py: 0%
63 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 10:17 +0100
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 10:17 +0100
1"""Command-line cache commands."""
3from typing import Optional
5import gws
6import gws.base.action
7import gws.lib.uom
8import gws.lib.cli as cli
9import gws.config
11from . import core
13gws.ext.new.cli('cache')
16class StatusParams(gws.CliParams):
17 layer: Optional[list[str]]
18 """list of layer IDs"""
21class DropParams(gws.CliParams):
22 layer: Optional[list[str]]
23 """list of layer IDs"""
26class SeedParams(gws.CliParams):
27 layer: list[str]
28 """list of layer IDs"""
29 levels: list[int]
30 """zoom levels to build the cache for"""
31 concurrency: int = 0
32 """number of concurrent threads to use for seeding (0 for auto)"""
35class Object(gws.Node):
37 @gws.ext.command.cli('cacheStatus')
38 def do_status(self, p: StatusParams):
39 """Display the cache status."""
41 root = gws.config.loader.load()
42 status = core.status(root, gws.u.to_list(p.layer))
44 for e in status.entries:
45 cli.info('')
46 cli.info('=' * 80)
48 cli.info(f'CACHE {e.uid}')
49 cli.info(f'DIR {e.dirname}')
51 ls = []
52 for la in e.layers:
53 title = gws.u.get(la, 'title', '?')
54 ls.append(f'{la.uid} ({la.extType} {title!r})')
55 cli.info(f"LAYER {', '.join(ls)}")
57 table = []
59 for z, g in sorted(e.grids.items()):
60 table.append({
61 'level': z,
62 'scale': '1:' + str(round(gws.lib.uom.res_to_scale(g.res))),
63 'grid': f'{g.maxX} x {g.maxY}',
64 'total': g.totalTiles,
65 'cached': g.cachedTiles,
66 '%%': int(100 * (g.cachedTiles / g.totalTiles)),
67 })
68 cli.info('')
69 cli.info(cli.text_table(table, ['level', 'scale', 'grid', 'total', 'cached', '%%']))
71 if status.staleDirs:
72 cli.info('')
73 cli.info('=' * 80)
74 cli.info(f'{len(status.staleDirs)} STALE CACHES ("gws cache cleanup" to remove):')
75 for d in status.staleDirs:
76 cli.info(f' {d}')
78 @gws.ext.command.cli('cacheCleanup')
79 def do_cleanup(self, p: gws.CliParams):
80 """Remove stale cache directories."""
82 root = gws.config.loader.load()
83 core.cleanup(root)
85 @gws.ext.command.cli('cacheDrop')
86 def do_drop(self, p: DropParams):
87 """Remove active cache directories."""
89 root = gws.config.loader.load()
90 core.drop(root, gws.u.to_list(p.layer))
92 @gws.ext.command.cli('cacheSeed')
93 def do_seed(self, p: SeedParams):
94 """Seed cache for layers."""
96 root = gws.config.loader.load()
97 status = core.status(root, gws.u.to_list(p.layer))
99 levels = []
100 if p.levels:
101 levels = [int(x) for x in gws.u.to_list(p.levels)]
103 core.seed(root, status.entries, levels, p.concurrency or 0)