Coverage for gws-app/gws/lib/xmlx/serializer.py: 91%

118 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-16 22:59 +0200

1from typing import Optional 

2 

3import gws 

4 

5from . import error, namespace, util 

6 

7 

8class Serializer: 

9 def __init__(self, el: gws.XmlElement, opts: Optional[gws.XmlOptions]): 

10 self.root = el 

11 self.buf = [] 

12 

13 self.opts = opts or gws.XmlOptions() 

14 self.defaultNamespace = self.opts.defaultNamespace 

15 

16 self.nsIndexXmlns = {} 

17 self.nsIndexUri = {} 

18 if self.opts.namespaces: 

19 for xmlns, ns in self.opts.namespaces.items(): 

20 self.nsIndexXmlns[xmlns] = ns 

21 self.nsIndexUri[ns.uri] = ns 

22 

23 def to_string(self): 

24 if self.opts.withXmlDeclaration or self.opts.doctype: 

25 self.buf.append(_XML_DECL) 

26 if self.opts.doctype: 

27 self.buf.append(f'<!DOCTYPE {self.opts.doctype}>') 

28 

29 self._el_to_string(self.root, is_root=True) 

30 

31 return ''.join(self.buf) 

32 

33 def to_list(self): 

34 return self._el_to_list(self.root) 

35 

36 ## 

37 

38 def _el_to_list(self, el): 

39 name = self._make_name(el.tag) 

40 attr = {self._make_name(k): v for k, v in el.attrib.items()} 

41 text = (el.text or '').strip() 

42 tail = (el.tail or '').strip() 

43 

44 sub = [self._el_to_list(c) for c in el] 

45 

46 if self.opts.foldTags and len(sub) == 1 and (not attr and not text and not tail): 

47 # single wrapper tag, create 'tag/subtag 

48 inner = sub[0] 

49 inner[0] = name + '/' + inner[0] 

50 return inner 

51 

52 if len(sub) == 1: 

53 sub = sub[0] 

54 

55 res = [name, attr, text, sub, tail] 

56 return [x for x in res if x] 

57 

58 def _el_to_string(self, el, is_root=False): 

59 open_pos = len(self.buf) 

60 self.buf.append('') 

61 

62 open_tag = self._make_name(el.tag) 

63 close_tag = open_tag 

64 

65 if el.attrib: 

66 atts = self._process_atts(el.attrib) 

67 else: 

68 atts = {} 

69 

70 s = self._text_to_string(el.text) 

71 if s: 

72 self.buf.append(s) 

73 

74 for c in el: 

75 self._el_to_string(c) 

76 

77 if is_root and self.opts.withNamespaceDeclarations: 

78 atts.update(self._namespace_declarations()) 

79 

80 if atts: 

81 open_tag += ' ' + ' '.join(f'{k}="{v}"' for k, v in atts.items()) 

82 

83 if len(self.buf) > open_pos + 1: 

84 self.buf[open_pos] = f'<{open_tag}>' 

85 self.buf.append(f'</{close_tag}>') 

86 else: 

87 self.buf[open_pos] += f'<{open_tag}/>' 

88 

89 s = self._text_to_string(el.tail) 

90 if s: 

91 self.buf.append(s) 

92 

93 # def _process_root_atts(self, attrib): 

94 # atts = {} 

95 

96 # for key, val in attrib.items(): 

97 # if key == namespace.XMLNS: 

98 # if self.opts.removeNamespaces: 

99 # continue 

100 # ns = namespace.find_by_uri(val) 

101 # if not ns: 

102 # raise error.NamespaceError(f'unknown default namespace {val!r}') 

103 # self.defaultNamespace = ns 

104 # continue 

105 

106 # if key.startswith(namespace.XMLNS + ':'): 

107 # if self.opts.removeNamespaces: 

108 # continue 

109 # ns = namespace.find_by_uri(val) 

110 # if not ns: 

111 # raise error.NamespaceError(f'unknown namespace {val!r} for {key!r}') 

112 # self.namespaceMap[key.split(':')[1]] = ns 

113 # continue 

114 

115 # if val is None: 

116 # continue 

117 # n = self._make_name(key) 

118 # if n: 

119 # atts[n] = self._value_to_string(val) 

120 

121 # return atts 

122 

123 def _process_atts(self, attrib): 

124 atts = {} 

125 

126 for key, val in attrib.items(): 

127 if val is None: 

128 continue 

129 n = self._make_name(key) 

130 if n: 

131 atts[n] = self._value_to_string(val) 

132 

133 return atts 

134 

135 def _namespace_declarations(self): 

136 xmlns_to_ns = {} 

137 

138 if self.defaultNamespace: 

139 xmlns_to_ns[''] = self.defaultNamespace 

140 

141 for xmlns, ns in self.nsIndexXmlns.items(): 

142 if self.defaultNamespace and ns.uid == self.defaultNamespace.uid: 

143 continue 

144 if self.opts.customXmlns and ns.uid in self.opts.customXmlns: 

145 xmlns = self.opts.customXmlns[ns.uid] 

146 xmlns_to_ns[xmlns] = ns 

147 

148 return namespace.declarations( 

149 xmlns_to_ns, 

150 with_schema_locations=self.opts.withSchemaLocations, 

151 ) 

152 

153 def _text_to_string(self, arg): 

154 s, ok = util.atom_to_string(arg) 

155 if not ok: 

156 s = str(arg) 

157 if self.opts.compactWhitespace: 

158 s = ' '.join(s.strip().split()) 

159 return util.escape_text(s) 

160 

161 def _value_to_string(self, arg): 

162 s, ok = util.atom_to_string(arg) 

163 if not ok: 

164 s = str(arg) 

165 return util.escape_attribute(s.strip()) 

166 

167 def _make_name(self, name): 

168 if self.opts.removeNamespaces: 

169 return namespace.unqualify_name(name) 

170 

171 xmlns, uri, pname = namespace.split_name(name) 

172 if not xmlns and not uri: 

173 return pname 

174 

175 ns = None 

176 if xmlns: 

177 if xmlns == namespace.XMLNS: 

178 return name 

179 ns = self.nsIndexXmlns.get(xmlns) or namespace.find_by_xmlns(xmlns) 

180 else: 

181 ns = self.nsIndexUri.get(uri) or namespace.find_by_uri(uri) 

182 

183 if not ns: 

184 raise error.NamespaceError(f'unknown namespace for {name!r}') 

185 

186 if self.defaultNamespace and ns.uid == self.defaultNamespace.uid: 

187 return pname 

188 

189 xmlns = ns.xmlns or ns.uid 

190 if self.opts.customXmlns and ns.uid in self.opts.customXmlns: 

191 xmlns = self.opts.customXmlns[ns.uid] 

192 

193 self.nsIndexXmlns[xmlns] = ns 

194 if uri: 

195 self.nsIndexUri[uri] = ns 

196 

197 return xmlns + ':' + pname 

198 

199 

200_XML_DECL = '<?xml version="1.0" encoding="UTF-8"?>'