X-Git-Url: https://git.rvb.name/pyrungps.git/blobdiff_plain/1690b8b7c8ccfdf60be2b33b314e6c26bdef86e6..94f62add438cdf546fdae207ede72c05daf66c00:/pyrungps/pygpx.py diff --git a/pyrungps/pygpx.py b/pyrungps/pygpx.py new file mode 100644 index 0000000..e80077d --- /dev/null +++ b/pyrungps/pygpx.py @@ -0,0 +1,656 @@ +# coding: UTF-8 +#------------------------ +# Работа с GPX-файлами +#------------------------ +from lxml import etree +import os +import math +import datetime +from dateutil.parser import parse +from pprint import pprint + +def deg2rad(deg): +# Преобразование из градусов в радианы. + return deg / (180 / math.pi) + +tagprefix = { '1.0': '{http://www.topografix.com/GPX/1/0}', '1.1': '{http://www.topografix.com/GPX/1/1}'} + +def StripPrefix(tag,version): + nstag=tagprefix[version] + return tag.replace(nstag,'') + +def formattime(dt): + return dt.strftime("%Y-%m-%dT%H:%M:%S")+"."+str(int(dt.microsecond/1000)).zfill(3)+"Z" + +class Link: + # Ссылка. Может присутствовать в точке, треке, файле. + def __init__(self, node, version): + self.version = version + self.href = node.get("href") + self.text = None + self.type = None + if node is not None: + for child in node: + if child.tag == tagprefix[version] + "text": + self.text = child.text + elif child.tag == tagprefix[version] + "type": + self.type = child.text + else: + raise ValueError("Неизвестный тип узла: '%s' " % child.tag) + + def write(self,root): + linknode = etree.SubElement(root,"link"); + linknode.set("href",self.href) + if self.text: + etree.SubElement(linknode,"text").text = self.text + if self.type: + etree.SubElement(linknode,"type").text = self.type + +class GPXTrackPt: + # Точка с координатами. Может присутствовать в последовательности точек в треке, маршруте, а также в списке путевых точек.# + + def __init__(self, node, version): + # Извлекаем данные из GPX-тэгов.# + self.version = version + # Сначала обязательные.# + if node is not None: + self.lat = float(node.get("lat")) + self.lon = float(node.get("lon")) + self.elevation = None + self.time = None + self.speed = None + self.link = None + self.additional_info = {} + # Потом необязательные.# + if node is not None: + for child in node: + if child.tag == tagprefix[version] + "time": + self.time = parse(child.text) + elif child.tag == tagprefix[version] + "ele": + self.elevation = float(child.text) + # Стандартом скорость не предусмотрена, но де-факто в Run.GPS используется.# + elif child.tag == tagprefix[version] + "speed": + self.speed = float(child.text) + elif child.tag == tagprefix[version] + "link": + self.link = Link(child,version) + elif StripPrefix(child.tag,version) in ['magvar','geoidheight','name','cmt','desc', + 'src','sym','type','fix','sat','hdop','vdop','pdop','ageofdgpsdata','dgpsid']: + self.additional_info[StripPrefix(child.tag,version)]=child.text + elif child.tag == tagprefix[version] + "extensions": + pass + else: + raise ValueError("Неизвестный тип узла: '%s'" % child.tag) + + for i in self.additional_info.keys(): + val = self.additional_info[i]; + if val: + if not val.strip(' \n\t'): + del self.additional_info[i] + + + def write(self,root,tag ="wpt"): + wptnode = etree.SubElement(root,tag) + wptnode.set("lat",repr(self.lat)); + wptnode.set("lon",repr(self.lon)); + if self.elevation: + etree.SubElement(wptnode,"ele").text = repr(self.elevation) + if self.time: + etree.SubElement(wptnode,"time").text = formattime(self.time) + if self.speed: + etree.SubElement(wptnode,"speed").text = repr(self.speed) + if self.link: + self.link.write(wptnode) + for i in self.additional_info.keys(): + etree.SubElement(wptnode,i).text = self.additional_info[i] + + def distance(self, other): + # Расстояние между двумя точками на глобусе.# + # http://www.platoscave.net/blog/2009/oct/5/calculate-distance-latitude-longitude-python/ + + radius = 6378700.0 # meters + + lat1, lon1 = self.lat, self.lon + lat2, lon2 = other.lat, other.lon + + dlat = math.radians(lat2-lat1) + dlon = math.radians(lon2-lon1) + a = math.sin(dlat/2) * math.sin(dlat/2) + math.cos(math.radians(lat1)) \ + * math.cos(math.radians(lat2)) * math.sin(dlon/2) * math.sin(dlon/2) + c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a)) + d = radius * c + + return d + + def duration(self, other): + # Время между двумя точками.# + return other.time - self.time + +class GPXRoute: + # Маршрут.# + + def __init__(self, node, version): + self.name = None + self.version = version + self.rtepts = [] + self.additional_info = {} + self.link = None + if node is not None: + for child in node: + if child.tag == tagprefix[version] + "name": + self.name = child.text + elif child.tag == tagprefix[version] + "link": + self.link = Link(child,version) + elif StripPrefix(child.tag,version) in [ "cmt", "desc", "src", "number", "type" ]: + self.additional_info[StripPrefix(child.tag,version)]=child.text + elif child.tag == tagprefix[version] + "rtept": + self.rtepts.append(GPXTrackPt(child, version)) + elif child.tag == tagprefix[version] + "extensions": + pass + else: + raise ValueError("Неизвестный тип узла <%s>" % child.tag) + for i in self.additional_info.keys(): + val = self.additional_info[i]; + if val: + if not val.strip(' \n\t'): + del self.additional_info[i] + + def write(self,root): + rtenode = etree.SubElement(root,"rte") + if self.name: + etree.SubElement(rtenode,"name").text = self.name + if self.link: + self.link.write(rtenode) + for i in self.additional_info.keys(): + print("storing %s " % ( i )) + etree.SubElement(rtenode,i).text = self.additional_info[i] + for i in self.rtepts: + i.write(rtenode,"rtept") + +class GPXTrackSeg: + # Один сегмент трека.# + + def __init__(self, node, version): + self.version = version + self.trkpts = [] + self.elevation_gain = 0.0 + self.elevation_loss = 0.0 + if node is not None: + for child in node: + if child.tag == tagprefix[version] + "trkpt": + self.trkpts.append(GPXTrackPt(child, self.version)) + else: + raise ValueError("Неизвестный тип узла <%s>" % node.nodeName) + self._get_elevation() + + def write(self,root): + trksegnode = etree.SubElement(root,"trkseg") + for i in self.trkpts: + i.write(trksegnode,"trkpt") + + def _get_elevation(self): + + elev_data = [] + for pt in self.trkpts: + if pt.elevation: + elev_data.append(pt.elevation) + + gain = 0.0 + loss = 0.0 + last_elevation = None + + window_size = 5 + i = 0 + moving_averages = [] + + while i < len(elev_data) - window_size + 1: + this_window = elev_data[i : i + window_size] + window_average = sum(this_window) / window_size + moving_averages.append(window_average) + i += 1 + + if len(moving_averages)>2: + elev_data = moving_averages + + for pt in elev_data: + if last_elevation is not None: + try: + if pt > last_elevation: + gain += pt - last_elevation + else: + loss += last_elevation - pt + except: + pass + last_elevation=pt + + self.elevation_gain = gain + self.elevation_loss = loss + + def distance(self): + # Расчет длины сегмента.# + _length = 0.0 + last_pt = None + for pt in self.trkpts: + if last_pt is not None: + _length += last_pt.distance(pt) + last_pt = pt + return _length + + def filtered_distance(self,max_speed): + # Расчет длины сегмента с фильтрацией предположительно сбойных участков.# + _length = 0.0 + last_pt = None + for pt in self.trkpts: + if last_pt is not None: + _delta = last_pt.distance(pt) + _time_delta = pt.time - last_pt.time + _time_delta_s = _time_delta.days*86400 + _time_delta.seconds + _time_delta.microseconds/1000000 + if _time_delta_s > 0: + if _delta/_time_delta_s < max_speed: + _length += _delta + else: + _length += _delta + last_pt = pt + return _length + + def duration(self): + # Расчет продолжительности сегмента.# + return self.trkpts[0].duration(self.trkpts[-1]) + + def bound_box(self): + # Обрамляющий прямоугольник для сегмента.# + minlat =360 + maxlat = -360 + minlon = 360 + maxlon = -360 + for pt in self.trkpts: + if pt.latmaxlat: + maxlat=pt.lat + if pt.lonmaxlon: + maxlon=pt.lon + return ((minlat,minlon),(maxlat,maxlon)) + +class GPXTrack: + # Трек.# + + def __init__(self, node, version): + # Создаем трек из GPX-данных.# + + self.version = version + self.trksegs = [] + self.sport= None + self.additional_info = {} + self.name = None + self.link = None + + if node is not None: + for child in node: + if child.tag == tagprefix[version] + "name": + self.name = child.text + elif child.tag == tagprefix[version] + "trkseg": + if len(child) > 0: + self.trksegs.append(GPXTrackSeg(child, self.version)) + elif child.tag == tagprefix[version] + "link": + self.link = Link(child,version) + elif StripPrefix(child.tag,version) in [ "number", "desc", "cmt", "src", "type" ]: + self.additional_info[StripPrefix(child.tag,version)] = child.text + elif child.tag == tagprefix[version] + "extensions": + for ext in child: + # Из расширений извлекаем вид спорта - специфично для Run.GPS.# + if ext.tag == tagprefix[version] + "sport": + self.sport = ext.text + + for i in self.additional_info.keys(): + val = self.additional_info[i]; + if val: + if not val.strip(' \n\t'): + del self.additional_info[i] + + def write(self,root): + trknode = etree.SubElement(root,"trk") + if self.name: + etree.SubElement(trknode,"name").text = self.name + if self.sport: + etree.SubElement(etree.SubElement(trknode,"extensions"),"sport").text=self.sport + if self.link: + self.link.write(trknode) + for i in self.additional_info.keys(): + etree.SubElement(trknode,i).text = self.additional_info[i] + for i in self.trksegs: + i.write(trknode) + + def elevation_gain(self): + # Набор высоты по треку.# + return sum([trkseg.elevation_gain for trkseg in self.trksegs]) + + def elevation_loss(self): + # Сброс высоты по треку.# + return sum([trkseg.elevation_loss for trkseg in self.trksegs]) + + def distance(self): + # Длина трека.# + try: + return sum([trkseg.distance() for trkseg in self.trksegs]) + except IndexError: + print("Пустой сегмент трека, расчет длины невозможен.") + + def filtered_distance(self,max_speed=100): + # Длина трека с фильтрацией сбойных участков.# + try: + return sum([trkseg.filtered_distance(max_speed=max_speed) for trkseg in self.trksegs]) + except IndexError: + print("Пустой сегмент трека, расчет длины невозможен.") + + def duration(self): + # Продолжительность трека, не включая паузы между сегментами.# + dur = datetime.timedelta(0) + for trkseg in self.trksegs: + try: + dur += trkseg.duration() + except: + pass + return dur + + def full_duration(self): + # Продолжительность трека, включая паузы между сегментами.# + try: + return self.start().duration(self.end()) + except: + return None + + def start(self): + # Стартовая точка.# + try: + return self.trksegs[0].trkpts[0] + except IndexError: + return None + + def end(self): + # Финишная точка.# + try: + return self.trksegs[-1].trkpts[-1] + except IndexError: + return None + + def start_time(self): + # Время старта.# + try: + return self.start().time + except: + return None + + def end_time(self): + # Время финиша.# + try: + return self.end().time + except: + return None + + def bound_box(self): + # Обрамляющий прямоугольник для всего трека.# + minlat =360 + maxlat = -360 + minlon = 360 + maxlon = -360 + for trkseg in self.trksegs: + for pt in trkseg.trkpts: + if pt.latmaxlat: + maxlat=pt.lat + if pt.lonmaxlon: + maxlon=pt.lon + return ((minlat,minlon),(maxlat,maxlon)) + + +class GPX: + # Работа с GPX-документами.# + + def __init__(self): + # Создание пустого GPX-объекта# + PATH = os.path.dirname(__file__) + self.creator = None + self.time = None + self.tracks = [] + self.routes = [] + self.waypoints = [] + self.version = "" + self.author = None + self.name = None + self.link = None + self.copyright = None + self.meta = {} + + def ReadTree(self,tree): + # Загрузка из дерева etree.# + root = tree + + # Test if this is a GPX file or not and if it's version 1.1 + if root.tag == "{http://www.topografix.com/GPX/1/1}gpx" and root.get("version") == "1.1": + self.version = "1.1" + elif root.tag == "{http://www.topografix.com/GPX/1/0}gpx" and root.get("version") == "1.0": + self.version = "1.0" + elif root.get("version") not in [ "1.1", "1.0"]: + raise ValueError("Версия формата %s не поддерживается. Используйте GPX 1.1." % root.get("version")) + else: + raise ValueError("Неизвестный формат дерева") + # attempt to validate the xml file against the schema + + # initalize the GPX document for parsing. + self._init_version(root) + + + def ReadFile(self, fd): + # Загрузка из файла.# + gpx_doc = etree.parse(fd) + root = gpx_doc.getroot() + + # Test if this is a GPX file or not and if it's version 1.1 + if root.tag == "{http://www.topografix.com/GPX/1/1}gpx" and root.get("version") == "1.1": + self.version = "1.1" + elif root.tag == "{http://www.topografix.com/GPX/1/0}gpx" and root.get("version") == "1.0": + self.version = "1.0" + elif root.get("version") not in [ "1.1", "1.0"]: + raise ValueError("Версия формата %s не поддерживается. Используйте GPX 1.1." % root.get("version")) + else: + raise ValueError("Неизвестный формат файла") + # initalize the GPX document for parsing. + self._init_version(root) + + def _init_version(self,root): + # Инициализация объекта.# + self.creator = root.get("creator") + if root is not None: + for child in root: + if child.tag == tagprefix[self.version] + "author": + self.author = child.text + elif child.tag == tagprefix[self.version] + "trk": + self.tracks.append(GPXTrack(child, self.version)) + elif child.tag == tagprefix[self.version] + "rte": + self.routes.append(GPXRoute(child, self.version)) + elif child.tag == tagprefix[self.version] + "wpt": + self.waypoints.append(GPXTrackPt(child, self.version)) + elif child.tag == tagprefix[self.version] + "metadata": + for meta in child: + if meta.tag == tagprefix[self.version] + "author": + self.author = meta.text + elif meta.tag == tagprefix[self.version] + "name": + self.name = meta.text + elif meta.tag == tagprefix[self.version] + "bounds": + # Границы пропускаем - их будем вычислять сами заново + pass + elif meta.tag == tagprefix[self.version] + "copyright": + self.copyright = meta.text + elif meta.tag == tagprefix[self.version] + "link": + self.link = Link(meta,self.version) + else: + self.meta[StripPrefix(meta.tag,self.version)]=meta.text + + for i in self.meta.keys(): + val = self.meta[i]; + if val: + if not val.strip(' \n\t'): + del self.meta[i] + + def elevation_gain(self): + # Суммарный набор высоты.# + return sum([track.elevation_gain() for track in self.tracks]) + + def elevation_loss(self): + # Суммарная потеря высоты.# + return sum([track.elevation_loss() for track in self.tracks]) + + def distance(self): + # Суммарная дистанция.# + try: + return sum([track.distance() for track in self.tracks]) + except IndexError: + print("Пустой файл!") + + def filtered_distance(self): + # Суммарная дистанция с фильтрацией сбойных данных.# + try: + return sum([track.filtered_distance() for track in self.tracks]) + except IndexError: + print("Пустой файл!") + + def duration(self): + # Суммарная продолжительность, не включая паузы.# + dur = datetime.timedelta(0) + for track in self.tracks: + dur += track.duration() + return dur + + def full_duration(self): + # Суммарная продолжительность, включая паузы# + if hasattr(self.start(), 'duration'): + return self.start().duration(self.end()) + else: + return None + + def start(self): + # Точка старта.# + return self.tracks[0].start() + + def end(self): + # Точка финиша.# + return self.tracks[-1].end() + + def start_time(self): + # Время старта.# + if self.start(): + return self.start().time + else: + return None + + def end_time(self): + # Время финиша.# + return self.end().time + + def bound_box(self): + # Обрамляющий прямоугольник для всех треков в файле.# + minlat =360 + maxlat = -360 + minlon = 360 + maxlon = -360 + for track in self.tracks: + for trkseg in track.trksegs: + for pt in trkseg.trkpts: + if pt.latmaxlat: + maxlat=pt.lat + if pt.lonmaxlon: + maxlon=pt.lon + return ((minlat,minlon),(maxlat,maxlon)) + + def FixNames(self,linkname): + pprint(linkname) + pprint(type(linkname)) + goodname = linkname + badname = goodname.encode('ascii','replace') + if self.name and self.name.startswith(badname): + self.name=self.name.replace(badname,goodname) + for i in self.tracks: + if i.name and i.name.startswith(badname): + i.name = i.name.replace(badname,goodname) + + def ProcessTrackSegs(self,seconds_gap=120): + for tr in self.tracks: + # Объединяем все точки трека в единую последовательность + all_trackpoints = [] + for trseg in tr.trksegs: + all_trackpoints.extend(trseg.trkpts) + new_segs = [] + current_seg = None + prev_pt = None + for pt in all_trackpoints: + if not current_seg: + # Начало нового сегмента + current_seg = GPXTrackSeg(None,self.version) + current_seg.trkpts.append(pt) + else: + delta = pt.time - prev_pt.time + if delta.days > 0 or delta.seconds > seconds_gap: + # Нашли место разрыва + new_segs.append(current_seg) + current_seg = None + else: + current_seg.trkpts.append(pt) + prev_pt=pt + # Если есть, что закинуть в хвост - пишем + if current_seg is not None: + new_segs.append(current_seg) + # Обработали + for trs in new_segs: + trs._get_elevation() + tr.trksegs = new_segs + + + def XMLTree(self): + root = etree.Element("gpx", attrib ={"creator": "pygpx", + "version": "1.1", + "xmlns": "http://www.topografix.com/GPX/1/1"}); + + # meta subtree + + meta = etree.SubElement( root, "metadata" ); + if self.name: + metarecord = etree.SubElement( meta, "name" ) + metarecord.text = self.name + if self.author: + metarecord = etree.SubElement( meta, "author" ) + metarecord.text = self.author + if self.copyright: + metarecord = etree.SubElement( meta, "copyright" ) + metarecord.text = self.copyright + if self.link: + self.link.write(meta) + for metatag in self.meta.keys(): + metarecord = etree.SubElement( meta, metatag ) + metarecord.text = self.meta[metatag] + + # wpts subtree + + for i in self.waypoints: + i.write(root) + + # routes subtree + + for i in self.routes: + i.write(root) + + # tracks subtree + + for i in self.tracks: + i.write(root) + + return root + \ No newline at end of file