Coverage for gws-app / gws / base / auth / cli.py: 0%

51 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-03 10:12 +0100

1from typing import Optional, cast 

2 

3import gws 

4import gws.config 

5import gws.lib.datetimex as dtx 

6import gws.lib.cli as cli 

7 

8from . import manager 

9 

10gws.ext.new.cli('auth') 

11 

12 

13class RemoveParams(gws.CliParams): 

14 older: Optional[int] 

15 """Remove sessions older than N seconds.""" 

16 uid: Optional[list[str]] 

17 """Remove specific sessions.""" 

18 all: Optional[bool] 

19 """Remove all sessions.""" 

20 

21 

22class Object(gws.Node): 

23 

24 @gws.ext.command.cli('authSessions') 

25 def sessions(self, p: gws.EmptyRequest): 

26 """Show active authorization sessions""" 

27 

28 root = gws.config.load() 

29 sm = root.app.authMgr.sessionMgr 

30 

31 sm.cleanup() 

32 

33 rows = [] 

34 

35 for s in sm.get_all(): 

36 dc = dtx.total_difference(s.created).seconds 

37 du = dtx.total_difference(s.updated).seconds 

38 dd = dtx.total_difference(s.created, s.updated).seconds 

39 rows.append((du, dict( 

40 uid=s.uid, 

41 user=s.user.loginName, 

42 started=dtx.to_iso_string(s.created, sep=' ') + f' ({dtx.format_duration(dc)})', 

43 updated=dtx.to_iso_string(s.updated, sep=' ') + f' ({dtx.format_duration(du)})', 

44 duration=dtx.format_duration(dd), 

45 ))) 

46 

47 # oldest first 

48 rows.sort(key=lambda r: -r[0]) 

49 

50 cli.info('') 

51 cli.info(f'Active sessions: {len(rows)}') 

52 cli.info(cli.text_table([r[1] for r in rows], header='auto')) 

53 

54 @gws.ext.command.cli('authSessrem') 

55 def sessrem(self, p: RemoveParams): 

56 """Remove authorization sessions.""" 

57 

58 root = gws.config.load() 

59 sm = root.app.authMgr.sessionMgr 

60 

61 n = 0 

62 

63 for s in sm.get_all(): 

64 du = dtx.total_difference(s.updated).seconds 

65 if p.all: 

66 sm.delete(s) 

67 n += 1 

68 continue 

69 if p.older and du > p.older: 

70 sm.delete(s) 

71 n += 1 

72 continue 

73 if p.uid and s.uid in p.uid: 

74 sm.delete(s) 

75 n += 1 

76 continue 

77 

78 cli.info('') 

79 cli.info(f'Removed sessions: {n}')