Coverage for gws-app / gws / gis / gdalx / __init__.py: 81%

411 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-03 10:12 +0100

1"""GDAL/OGR wrapper.""" 

2 

3from typing import Optional, Iterable, cast 

4 

5import contextlib 

6import numpy as np 

7 

8from osgeo import gdal 

9from osgeo import ogr 

10from osgeo import osr 

11 

12import gws 

13import gws.base.shape 

14import gws.lib.crs 

15import gws.lib.bounds 

16import gws.lib.image 

17import gws.lib.datetimex as datetimex 

18 

19 

20class Error(gws.Error): 

21 pass 

22 

23 

24class DriverInfo(gws.Data): 

25 index: int 

26 name: str 

27 longName: str 

28 metaData: dict 

29 

30 

31class _DriverInfoCache(gws.Data): 

32 infos: list[DriverInfo] 

33 extToName: dict 

34 vectorNames: set[str] 

35 rasterNames: set[str] 

36 

37 

38class _DataSetOptions(gws.Data): 

39 path: str 

40 mode: str 

41 driver: str 

42 encoding: str 

43 defaultCrs: gws.Crs 

44 geometryAsText: bool 

45 gdalOpts: dict 

46 

47 

48def drivers() -> list[DriverInfo]: 

49 """Enumerate GDAL drivers.""" 

50 

51 di = _fetch_driver_infos() 

52 return di.infos 

53 

54 

55@contextlib.contextmanager 

56def gdal_config(options: dict): 

57 """Temporarily set GDAL config options.""" 

58 

59 prev = {} 

60 for key, value in options.items(): 

61 prev[key] = gdal.GetConfigOption(key) 

62 gdal.SetConfigOption(key, value) 

63 

64 try: 

65 yield 

66 finally: 

67 for key, value in prev.items(): 

68 gdal.SetConfigOption(key, value) 

69 

70 

71def open_raster( 

72 path: str, 

73 mode: str = 'r', 

74 driver: str = '', 

75 default_crs: Optional[gws.Crs] = None, 

76 options: dict = None, 

77) -> 'RasterDataSet': 

78 """Create a raster DataSet from a path. 

79 

80 Args: 

81 path: File path. 

82 mode: 'r' (=read), 'a' (=update), 'w' (=create/write) 

83 driver: Driver name, if omitted, will be suggested from the path extension. 

84 default_crs: Default CRS for geometries (fallback to Webmercator). 

85 options: Options for gdal.OpenEx/CreateDataSource. 

86 """ 

87 

88 dso = _DataSetOptions( 

89 path=path, 

90 mode=mode, 

91 driver=driver, 

92 defaultCrs=default_crs, 

93 gdalOpts=options or {}, 

94 ) 

95 

96 return cast(RasterDataSet, _open(dso, need_raster=True)) 

97 

98 

99def open_vector( 

100 path: str, 

101 mode: str = 'r', 

102 driver: str = '', 

103 encoding: Optional[str] = 'utf8', 

104 default_crs: Optional[gws.Crs] = None, 

105 geometry_as_text: bool = False, 

106 options: dict = None, 

107) -> 'VectorDataSet': 

108 """Create a vector DataSet from a path. 

109 

110 Args: 

111 path: File path. 

112 mode: 'r' (=read), 'a' (=update), 'w' (=create/write) 

113 driver: Driver name, if omitted, will be suggested from the path extension. 

114 encoding: If not None, strings will be automatically decoded. 

115 default_crs: Default CRS for geometries (fallback to Webmercator). 

116 geometry_as_text: Don't interpret geometry, extract raw WKT. 

117 options: Options for gdal.OpenEx/CreateDataSource. 

118 

119 

120 Returns: 

121 DataSet object. 

122 

123 """ 

124 

125 dso = _DataSetOptions( 

126 path=path, 

127 mode=mode, 

128 driver=driver, 

129 defaultCrs=default_crs, 

130 encoding=encoding, 

131 geometryAsText=geometry_as_text, 

132 gdalOpts=options or {}, 

133 ) 

134 

135 return cast(VectorDataSet, _open(dso, need_raster=False)) 

136 

137 

138def _open(dso: _DataSetOptions, need_raster): 

139 if not dso.mode: 

140 dso.mode = 'r' 

141 if dso.mode not in 'rwa': 

142 raise Error(f'invalid open mode {dso.mode!r}') 

143 

144 gdal.UseExceptions() 

145 

146 drv = _driver_from_args(dso.path, dso.driver, need_raster) 

147 dso.defaultCrs = dso.defaultCrs or gws.lib.crs.WEBMERCATOR 

148 

149 if dso.mode == 'w': 

150 gd = drv.CreateDataSource(dso.path, _option_list(dso.gdalOpts)) 

151 if gd is None: 

152 raise Error(f'cannot create {dso.path!r}') 

153 if need_raster: 

154 return RasterDataSet(dso, gd) 

155 return VectorDataSet(dso, gd) 

156 

157 flags = gdal.OF_VERBOSE_ERROR 

158 if dso.mode == 'r': 

159 flags += gdal.OF_READONLY 

160 if dso.mode == 'a': 

161 flags += gdal.OF_UPDATE 

162 if need_raster: 

163 flags += gdal.OF_RASTER 

164 else: 

165 flags += gdal.OF_VECTOR 

166 

167 gd = gdal.OpenEx(dso.path, flags, open_options=_option_list(dso.gdalOpts)) 

168 if gd is None: 

169 raise Error(f'cannot open {dso.path!r}') 

170 

171 if need_raster: 

172 return RasterDataSet(dso, gd) 

173 return VectorDataSet(dso, gd) 

174 

175 

176def open_from_image( 

177 image: gws.Image, 

178 bounds: gws.Bounds, 

179 rotation: gws.Size = None, 

180 options: dict = None, 

181) -> 'RasterDataSet': 

182 """Create an in-memory Dataset from an Image. 

183 

184 Args: 

185 image: Image object. 

186 bounds: Geographic bounds. 

187 x_rotation: GeoTransform x rotation. 

188 y_rotation: GeoTransform y rotation. 

189 options: Driver-specific creation options. 

190 """ 

191 

192 gdal.UseExceptions() 

193 

194 drv = gdal.GetDriverByName('MEM') 

195 img_array = image.to_array() 

196 band_count = img_array.shape[2] 

197 

198 gd = drv.Create( 

199 '', 

200 xsize=img_array.shape[1], 

201 ysize=img_array.shape[0], 

202 bands=band_count, 

203 eType=gdal.GDT_Byte, 

204 options=_option_list(options), 

205 ) 

206 for band in range(band_count): 

207 gd.GetRasterBand(band + 1).WriteArray(img_array[:, :, band]) 

208 

209 gt = _bounds_to_geotransform(bounds, (gd.RasterXSize, gd.RasterYSize), rotation) 

210 

211 gd.SetGeoTransform(gt) 

212 gd.SetSpatialRef(_srs_from_srid(bounds.crs.srid)) 

213 

214 dso = _DataSetOptions(path='') 

215 return RasterDataSet(dso, gd) 

216 

217 

218## 

219 

220 

221class _DataSet: 

222 gdDataset: gdal.Dataset 

223 gdDriver: gdal.Driver 

224 dso: _DataSetOptions 

225 driverName: str 

226 

227 def __init__(self, dso: _DataSetOptions, gd_dataset): 

228 self.gdDataset = gd_dataset 

229 self.gdDriver = self.gdDataset.GetDriver() 

230 self.driverName = self.gdDriver.GetDescription() 

231 self.dso = dso 

232 

233 def __enter__(self): 

234 return self 

235 

236 def __exit__(self, exc_type, exc_val, exc_tb): 

237 self.close() 

238 return False 

239 

240 def close(self): 

241 self.gdDataset.FlushCache() 

242 setattr(self, 'gdDataset', None) 

243 

244 def crs(self) -> Optional[gws.Crs]: 

245 srid = _srid_from_srs(self.gdDataset.GetSpatialRef()) 

246 return gws.lib.crs.get(srid) if srid else None 

247 

248 def set_crs(self, crs: gws.Crs): 

249 srs = _srs_from_srid(crs.srid) 

250 self.gdDataset.SetSpatialRef(srs) 

251 

252 

253class RasterDataSet(_DataSet): 

254 def to_image(self) -> gws.Image: 

255 """Convert the raster dataset to an Image object.""" 

256 

257 band_count = self.gdDataset.RasterCount 

258 x_size = self.gdDataset.RasterXSize 

259 y_size = self.gdDataset.RasterYSize 

260 

261 arr_shape = (y_size, x_size, band_count) 

262 arr = np.zeros(arr_shape, dtype=np.uint8) 

263 

264 for band in range(band_count): 

265 gd_band = self.gdDataset.GetRasterBand(band + 1) 

266 arr[:, :, band] = gd_band.ReadAsArray(0, 0, x_size, y_size) 

267 

268 return gws.lib.image.from_array(arr) 

269 

270 def create_copy(self, path: str, driver: str = '', strict=False, options: dict = None): 

271 """Create a copy of a DataSet. 

272 

273 Args: 

274 path: Destination path. 

275 driver: Driver name, if omitted, will be suggested from the path extension. 

276 strict: If True, fail if some options are not supported. 

277 options: Driver-specific creation options. 

278 """ 

279 

280 gdal.UseExceptions() 

281 

282 drv = _driver_from_args(path, driver, need_raster=True) 

283 gd = drv.CreateCopy( 

284 path, 

285 self.gdDataset, 

286 strict=1 if strict else 0, 

287 options=_option_list(options), 

288 ) 

289 gd.SetMetadata(self.gdDataset.GetMetadata()) 

290 gd.FlushCache() 

291 gd = None 

292 

293 def bounds(self) -> gws.Bounds: 

294 return _geotransform_to_bounds( 

295 self.gdDataset.GetGeoTransform(), 

296 (self.gdDataset.RasterXSize, self.gdDataset.RasterYSize), 

297 self.crs() or self.dso.defaultCrs, 

298 ) 

299 

300 

301class VectorDataSet(_DataSet): 

302 @contextlib.contextmanager 

303 def transaction(self): 

304 self.gdDataset.StartTransaction() 

305 try: 

306 yield self 

307 self.gdDataset.CommitTransaction() 

308 except: 

309 self.gdDataset.RollbackTransaction() 

310 raise 

311 

312 def create_layer( 

313 self, 

314 name: str, 

315 columns: dict[str, gws.AttributeType], 

316 geometry_type: gws.GeometryType = None, 

317 crs: gws.Crs = None, 

318 overwrite=False, 

319 options: dict = None, 

320 ) -> 'VectorLayer': 

321 """Create a new layer. 

322 

323 Args: 

324 name: Layer name. 

325 columns: Column definitions. 

326 geometry_type: Geometry type. 

327 crs: CRS for geometries. 

328 overwrite: If True, overwrite existing layer. 

329 options: Driver-specific creation options. 

330 """ 

331 

332 opts = dict(options or {}) 

333 if overwrite: 

334 opts['OVERWRITE'] = 'YES' 

335 enc = (self.dso.encoding or '').upper() 

336 if enc: 

337 driver = self.gdDriver.GetName() 

338 if 'Shapefile' in driver: 

339 opts['ENCODING'] = enc 

340 

341 geom_type = ogr.wkbUnknown 

342 srs = None 

343 

344 if geometry_type: 

345 geom_type = _GEOM_TO_OGR.get(geometry_type) 

346 if not geom_type: 

347 gws.log.warning(f'gdal: unsupported {geometry_type=}') 

348 geom_type = ogr.wkbUnknown 

349 crs = crs or self.dso.defaultCrs 

350 srs = _srs_from_srid(crs.srid) 

351 

352 gd_layer = self.gdDataset.CreateLayer( 

353 name, 

354 geom_type=geom_type, 

355 srs=srs, 

356 options=_option_list(opts), 

357 ) 

358 for col_name, col_type in columns.items(): 

359 fd = ogr.FieldDefn(col_name, _ATTR_TO_OGR[col_type]) 

360 if col_type == gws.AttributeType.bool: 

361 fd.SetSubType(ogr.OFSTBoolean) 

362 gd_layer.CreateField(fd) 

363 

364 return VectorLayer(self, gd_layer) 

365 

366 def layers(self) -> list['VectorLayer']: 

367 """Get all layers.""" 

368 

369 cnt = self.gdDataset.GetLayerCount() 

370 return [VectorLayer(self, self.gdDataset.GetLayerByIndex(n)) for n in range(cnt)] 

371 

372 def layer(self, name_or_index: str | int) -> Optional['VectorLayer']: 

373 """Get a layer by name or index.""" 

374 

375 gd_layer = None 

376 if isinstance(name_or_index, int): 

377 gd_layer = self.gdDataset.GetLayerByIndex(name_or_index) 

378 elif isinstance(name_or_index, str): 

379 gd_layer = self.gdDataset.GetLayerByName(name_or_index) 

380 return VectorLayer(self, gd_layer) if gd_layer else None 

381 

382 def require_layer(self, name_or_index: str | int) -> 'VectorLayer': 

383 """Get a layer by name or index, raise an error if not found.""" 

384 

385 la = self.layer(name_or_index) 

386 if la: 

387 return la 

388 raise Error(f'layer {name_or_index} not found') 

389 

390 

391class VectorLayer: 

392 name: str 

393 dso: _DataSetOptions 

394 gdLayer: ogr.Layer 

395 gdDefn: ogr.FeatureDefn 

396 

397 def __init__(self, ds: VectorDataSet, gd_layer: ogr.Layer): 

398 self.gdLayer = gd_layer 

399 self.gdDefn = self.gdLayer.GetLayerDefn() 

400 self.name = self.gdDefn.GetName() 

401 self.dso = ds.dso 

402 

403 def describe(self) -> gws.DataSetDescription: 

404 desc = gws.DataSetDescription( 

405 columns=[], 

406 columnMap={}, 

407 fullName=self.name, 

408 geometryName='', 

409 geometrySrid=0, 

410 geometryType='', 

411 name=self.name, 

412 schema='', 

413 ) 

414 

415 cols = [] 

416 

417 fid_col = self.gdLayer.GetFIDColumn() 

418 if fid_col: 

419 cols.append( 

420 gws.ColumnDescription( 

421 name=fid_col, 

422 type=_OGR_TO_ATTR[ogr.OFTInteger], 

423 nativeType=ogr.OFTInteger, 

424 isPrimaryKey=True, 

425 columnIndex=0, 

426 ) 

427 ) 

428 

429 for i in range(self.gdDefn.GetFieldCount()): 

430 fdef = self.gdDefn.GetFieldDefn(i) 

431 typ = fdef.GetType() 

432 if typ not in _OGR_TO_ATTR: 

433 continue 

434 cols.append( 

435 gws.ColumnDescription( 

436 name=fdef.GetName(), 

437 type=_OGR_TO_ATTR[typ], 

438 nativeType=typ, 

439 columnIndex=i, 

440 ) 

441 ) 

442 

443 for i in range(self.gdDefn.GetGeomFieldCount()): 

444 fdef: ogr.GeomFieldDefn = self.gdDefn.GetGeomFieldDefn(i) 

445 typ = fdef.GetType() 

446 cols.append( 

447 gws.ColumnDescription( 

448 name=fdef.GetName() or 'geom', 

449 type=gws.AttributeType.geometry, 

450 nativeType=typ, 

451 columnIndex=i, 

452 geometryType=_OGR_TO_GEOM.get(typ) or gws.GeometryType.geometry, 

453 geometrySrid=_srid_from_srs(fdef.GetSpatialRef()) or self.dso.defaultCrs.srid, 

454 ) 

455 ) 

456 

457 desc.columns = cols 

458 desc.columnMap = {c.name: c for c in cols} 

459 

460 for c in cols: 

461 # NB take the last geom 

462 if c.geometryType: 

463 desc.geometryName = c.name 

464 desc.geometryType = c.geometryType 

465 desc.geometrySrid = c.geometrySrid 

466 

467 return desc 

468 

469 def insert(self, records: list[gws.FeatureRecord]) -> list[int]: 

470 desc = self.describe() 

471 fids = [] 

472 

473 for rec in records: 

474 gd_feature = ogr.Feature(self.gdDefn) 

475 if desc.geometryType and rec.shape: 

476 gd_feature.SetGeometry( 

477 ogr.CreateGeometryFromWkt( 

478 rec.shape.to_wkt(), 

479 _srs_from_srid(rec.shape.crs.srid), 

480 ) 

481 ) 

482 

483 if rec.uid and isinstance(rec.uid, int): 

484 gd_feature.SetFID(rec.uid) 

485 

486 for col in desc.columns: 

487 if col.geometryType or col.isPrimaryKey: 

488 continue 

489 val = rec.attributes.get(col.name) 

490 if val is None: 

491 continue 

492 try: 

493 _attr_to_ogr(gd_feature, int(col.nativeType), col.columnIndex, val, self.dso.encoding) 

494 except Exception as exc: 

495 raise Error(f'field cannot be set: {col.name=} {val=}') from exc 

496 

497 self.gdLayer.CreateFeature(gd_feature) 

498 fids.append(gd_feature.GetFID()) 

499 

500 return fids 

501 

502 def count(self, force=False): 

503 return self.gdLayer.GetFeatureCount(force=1 if force else 0) 

504 

505 def get_all(self) -> list[gws.FeatureRecord]: 

506 return list(self.iter_features()) 

507 

508 def iter_features(self) -> Iterable[gws.FeatureRecord]: 

509 self.gdLayer.ResetReading() 

510 

511 while True: 

512 gd_feature = self.gdLayer.GetNextFeature() 

513 if not gd_feature: 

514 break 

515 yield self._feature_record(gd_feature) 

516 

517 def get(self, fid: int) -> Optional[gws.FeatureRecord]: 

518 gd_feature = self.gdLayer.GetFeature(fid) 

519 if gd_feature: 

520 return self._feature_record(gd_feature) 

521 

522 def _feature_record(self, gd_feature): 

523 rec = gws.FeatureRecord( 

524 attributes={}, 

525 shape=None, 

526 meta={'layerName': self.name}, 

527 uid=str(gd_feature.GetFID()), 

528 ) 

529 

530 for i in range(gd_feature.GetFieldCount()): 

531 gd_field_defn: ogr.FieldDefn = gd_feature.GetFieldDefnRef(i) 

532 name = gd_field_defn.GetName() 

533 val = _attr_from_ogr(gd_feature, gd_field_defn.type, i, self.dso.encoding) 

534 rec.attributes[name] = val 

535 

536 cnt = gd_feature.GetGeomFieldCount() 

537 if cnt > 0: 

538 # NB take the last geom 

539 # @TODO multigeometry support 

540 gd_geom_defn = gd_feature.GetGeomFieldRef(cnt - 1) 

541 if gd_geom_defn: 

542 srid = _srid_from_srs(gd_geom_defn.GetSpatialReference()) or self.dso.defaultCrs.srid 

543 wkt = gd_geom_defn.ExportToWkt() 

544 if self.dso.geometryAsText: 

545 rec.ewkt = f'SRID={srid};{wkt}' 

546 else: 

547 rec.shape = gws.base.shape.from_wkt(wkt, gws.lib.crs.get(srid)) 

548 

549 return rec 

550 

551 

552## 

553 

554 

555def _driver_from_args(path, driver_name, need_raster): 

556 di = _fetch_driver_infos() 

557 

558 if not driver_name: 

559 ext = path.split('.')[-1] 

560 names = di.extToName.get(ext) 

561 if not names: 

562 raise Error(f'no default driver found for {path!r}') 

563 if len(names) > 1: 

564 if ext in ('tif', 'tiff'): 

565 driver_name = 'GTiff' 

566 else: 

567 raise Error(f'multiple drivers found for {path!r}: {names}') 

568 else: 

569 driver_name = names[0] 

570 

571 is_vector = driver_name in di.vectorNames 

572 is_raster = driver_name in di.rasterNames 

573 

574 if need_raster: 

575 if not is_raster: 

576 raise Error(f'driver {driver_name!r} is not raster') 

577 return gdal.GetDriverByName(driver_name) 

578 

579 if not is_vector: 

580 raise Error(f'driver {driver_name!r} is not vector') 

581 return ogr.GetDriverByName(driver_name) 

582 

583 

584_di_cache: Optional[_DriverInfoCache] = None 

585 

586 

587def _fetch_driver_infos() -> _DriverInfoCache: 

588 global _di_cache 

589 

590 if _di_cache: 

591 return _di_cache 

592 

593 _di_cache = _DriverInfoCache( 

594 infos=[], 

595 extToName={}, 

596 vectorNames=set(), 

597 rasterNames=set(), 

598 ) 

599 

600 for n in range(gdal.GetDriverCount()): 

601 drv = gdal.GetDriver(n) 

602 inf = DriverInfo( 

603 index=n, 

604 name=str(drv.ShortName), 

605 longName=str(drv.LongName), 

606 metaData=dict(drv.GetMetadata() or {}), 

607 ) 

608 _di_cache.infos.append(inf) 

609 

610 for e in inf.metaData.get(gdal.DMD_EXTENSIONS, '').split(): 

611 _di_cache.extToName.setdefault(e, []).append(inf.name) 

612 if inf.metaData.get('DCAP_VECTOR') == 'YES': 

613 _di_cache.vectorNames.add(inf.name) 

614 if inf.metaData.get('DCAP_RASTER') == 'YES': 

615 _di_cache.rasterNames.add(inf.name) 

616 

617 return _di_cache 

618 

619 

620_name_to_srid = {} 

621 

622 

623def _srs_from_srid(srid): 

624 srs = osr.SpatialReference() 

625 srs.ImportFromEPSG(srid) 

626 return srs 

627 

628 

629def _srid_from_srs(srs): 

630 if not srs: 

631 return 0 

632 

633 name = srs.GetName() 

634 if not name: 

635 wkt = srs.ExportToWkt() 

636 gws.log.warning(f'gdalx: no name for SRS {wkt!r}') 

637 return 0 

638 

639 if name in _name_to_srid: 

640 return _name_to_srid[name] 

641 

642 srid = srs.GetAuthorityCode(None) 

643 if not srid: 

644 wkt = srs.ExportToWkt() 

645 gws.log.warning(f'gdalx: no srid for SRS {wkt!r}') 

646 srid = 0 

647 

648 _name_to_srid[name] = srid 

649 return srid 

650 

651 

652def _attr_from_ogr(gd_feature: ogr.Feature, gtype: int, idx: int, encoding: str = 'utf8'): 

653 if gd_feature.IsFieldNull(idx): 

654 return None 

655 

656 if gtype == ogr.OFTString: 

657 b = gd_feature.GetFieldAsBinary(idx) 

658 if encoding: 

659 return b.decode(encoding) 

660 return bytes(b) 

661 

662 if gtype in {ogr.OFTDate, ogr.OFTTime, ogr.OFTDateTime}: 

663 # python GetFieldAsDateTime appears to use float seconds, as in 

664 # GetFieldAsDateTime (int i, int *pnYear, int *pnMonth, int *pnDay, int *pnHour, int *pnMinute, float *pfSecond, int *pnTZFlag) 

665 # 

666 v = gd_feature.GetFieldAsDateTime(idx) 

667 sec, fsec = divmod(v[5], 1) 

668 try: 

669 return datetimex.new(v[0], v[1], v[2], v[3], v[4], int(sec), int(fsec * 1e6), tz=_tzflag_to_tz(v[6])) 

670 except ValueError: 

671 return 

672 

673 if gtype == ogr.OFSTBoolean: 

674 return gd_feature.GetFieldAsInteger(idx) != 0 

675 if gtype in {ogr.OFTInteger, ogr.OFTInteger64}: 

676 return gd_feature.GetFieldAsInteger(idx) 

677 if gtype in {ogr.OFTIntegerList, ogr.OFTInteger64List}: 

678 return gd_feature.GetFieldAsIntegerList(idx) 

679 if gtype in {ogr.OFTReal, ogr.OFSTFloat32}: 

680 return gd_feature.GetFieldAsDouble(idx) 

681 if gtype == ogr.OFTRealList: 

682 return gd_feature.GetFieldAsDoubleList(idx) 

683 if gtype == ogr.OFTBinary: 

684 return gd_feature.GetFieldAsBinary(idx) 

685 

686 

687def _tzflag_to_tz(tzflag): 

688 # see gdal/ogr/ogrutils.cpp OGRGetISO8601DateTime 

689 

690 if tzflag == 0 or tzflag == 1: 

691 return '' 

692 if tzflag == 100: 

693 return 'UTC' 

694 if tzflag % 4 != 0: 

695 # @TODO 

696 raise Error(f'unsupported timezone {tzflag=}') 

697 hrs = (100 - tzflag) // 4 

698 return f'Etc/GMT{hrs:+}' 

699 

700 

701def _attr_to_ogr(gd_feature: ogr.Feature, gtype: int, idx: int, value, encoding): 

702 if gtype == ogr.OFTDate: 

703 return gd_feature.SetField(idx, datetimex.to_iso_date_string(value)) 

704 if gtype == ogr.OFTTime: 

705 return gd_feature.SetField(idx, datetimex.to_iso_time_string(value)) 

706 if gtype == ogr.OFTDateTime: 

707 return gd_feature.SetField(idx, datetimex.to_iso_string(value)) 

708 if gtype == ogr.OFSTBoolean: 

709 return gd_feature.SetField(idx, bool(value)) 

710 if gtype in {ogr.OFTInteger, ogr.OFTInteger64}: 

711 return gd_feature.SetField(idx, int(value)) 

712 if gtype in {ogr.OFTIntegerList, ogr.OFTInteger64List}: 

713 return gd_feature.SetField(idx, [int(x) for x in value]) 

714 if gtype in {ogr.OFTReal, ogr.OFSTFloat32}: 

715 return gd_feature.SetField(idx, float(value)) 

716 if gtype == ogr.OFTRealList: 

717 return gd_feature.SetField(idx, [float(x) for x in value]) 

718 

719 return gd_feature.SetField(idx, value) 

720 

721 

722def is_attribute_supported(typ): 

723 return typ in _ATTR_TO_OGR 

724 

725 

726def _bounds_to_geotransform(bounds: gws.Bounds, px_size: gws.Size, rotation: gws.Size | None) -> tuple[float, float, float, float, float, float]: 

727 ext = bounds.extent 

728 res_x = (ext[2] - ext[0]) / px_size[0] 

729 res_y = (ext[1] - ext[3]) / px_size[1] 

730 xr = rotation[0] if rotation else 0.0 

731 yr = rotation[1] if rotation else 0.0 

732 return (ext[0], res_x, xr, ext[3], yr, res_y) 

733 

734 

735def _geotransform_to_bounds(gt: tuple[float, float, float, float, float, float], px_size: gws.Size, crs: gws.Crs) -> gws.Bounds: 

736 x0 = gt[0] 

737 x1 = x0 + gt[1] * px_size[0] 

738 y1 = gt[3] 

739 y0 = y1 + gt[5] * px_size[1] 

740 return gws.lib.bounds.from_extent((x0, y0, x1, y1), crs, always_xy=True) 

741 

742 

743def _option_list(opts: dict | None) -> list[str]: 

744 if not opts: 

745 return [] 

746 return [f'{k}={v}' for k, v in opts.items()] 

747 

748 

749_ATTR_TO_OGR = { 

750 gws.AttributeType.bool: ogr.OFTInteger, 

751 gws.AttributeType.bytes: ogr.OFTBinary, 

752 gws.AttributeType.date: ogr.OFTDate, 

753 gws.AttributeType.datetime: ogr.OFTDateTime, 

754 gws.AttributeType.float: ogr.OFTReal, 

755 gws.AttributeType.floatlist: ogr.OFTRealList, 

756 gws.AttributeType.int: ogr.OFTInteger, 

757 gws.AttributeType.intlist: ogr.OFTIntegerList, 

758 gws.AttributeType.str: ogr.OFTString, 

759 gws.AttributeType.strlist: ogr.OFTStringList, 

760 gws.AttributeType.time: ogr.OFTTime, 

761} 

762 

763_OGR_TO_ATTR = { 

764 ogr.OFTBinary: gws.AttributeType.bytes, 

765 ogr.OFSTBoolean: gws.AttributeType.bool, 

766 ogr.OFTDate: gws.AttributeType.date, 

767 ogr.OFTDateTime: gws.AttributeType.datetime, 

768 ogr.OFTReal: gws.AttributeType.float, 

769 ogr.OFTRealList: gws.AttributeType.floatlist, 

770 ogr.OFTInteger: gws.AttributeType.int, 

771 ogr.OFTIntegerList: gws.AttributeType.intlist, 

772 ogr.OFTInteger64: gws.AttributeType.int, 

773 ogr.OFTInteger64List: gws.AttributeType.intlist, 

774 ogr.OFTString: gws.AttributeType.str, 

775 ogr.OFTStringList: gws.AttributeType.strlist, 

776 ogr.OFTTime: gws.AttributeType.time, 

777} 

778 

779_GEOM_TO_OGR = { 

780 gws.GeometryType.curve: ogr.wkbCurve, 

781 gws.GeometryType.geometrycollection: ogr.wkbGeometryCollection, 

782 gws.GeometryType.linestring: ogr.wkbLineString, 

783 gws.GeometryType.multicurve: ogr.wkbMultiCurve, 

784 gws.GeometryType.multilinestring: ogr.wkbMultiLineString, 

785 gws.GeometryType.multipoint: ogr.wkbMultiPoint, 

786 gws.GeometryType.multipolygon: ogr.wkbMultiPolygon, 

787 gws.GeometryType.multisurface: ogr.wkbMultiSurface, 

788 gws.GeometryType.point: ogr.wkbPoint, 

789 gws.GeometryType.polygon: ogr.wkbPolygon, 

790 gws.GeometryType.polyhedralsurface: ogr.wkbPolyhedralSurface, 

791 gws.GeometryType.surface: ogr.wkbSurface, 

792} 

793 

794_OGR_TO_GEOM = { 

795 ogr.wkbCurve: gws.GeometryType.curve, 

796 ogr.wkbGeometryCollection: gws.GeometryType.geometrycollection, 

797 ogr.wkbLineString: gws.GeometryType.linestring, 

798 ogr.wkbMultiCurve: gws.GeometryType.multicurve, 

799 ogr.wkbMultiLineString: gws.GeometryType.multilinestring, 

800 ogr.wkbMultiPoint: gws.GeometryType.multipoint, 

801 ogr.wkbMultiPolygon: gws.GeometryType.multipolygon, 

802 ogr.wkbMultiSurface: gws.GeometryType.multisurface, 

803 ogr.wkbPoint: gws.GeometryType.point, 

804 ogr.wkbPolygon: gws.GeometryType.polygon, 

805 ogr.wkbPolyhedralSurface: gws.GeometryType.polyhedralsurface, 

806 ogr.wkbSurface: gws.GeometryType.surface, 

807}