X-Git-Url: https://git.rvb.name/pyrungps.git/blobdiff_plain/1690b8b7c8ccfdf60be2b33b314e6c26bdef86e6..94f62add438cdf546fdae207ede72c05daf66c00:/pygpx.py diff --git a/pygpx.py b/pygpx.py deleted file mode 100644 index 49ccdff..0000000 --- a/pygpx.py +++ /dev/null @@ -1,662 +0,0 @@ -# coding: UTF-8 -#------------------------ -# Работа с GPX-файлами -#------------------------ -from lxml import etree -import os -import math -import datetime -from dateutil.parser import parse -from pprint import pprint - -def deg2rad(deg): -# Преобразование из градусов в радианы. - return deg / (180 / math.pi) - -tagprefix = { '1.0': '{http://www.topografix.com/GPX/1/0}', '1.1': '{http://www.topografix.com/GPX/1/1}'} - -def StripPrefix(tag,version): - nstag=tagprefix[version] - return tag.replace(nstag,'') - -def formattime(dt): - return dt.strftime("%Y-%m-%dT%H:%M:%S")+"."+str(int(dt.microsecond/1000)).zfill(3)+"Z" - -class Link: - # Ссылка. Может присутствовать в точке, треке, файле. - def __init__(self, node, version): - self.version = version - self.href = node.get("href") - self.text = None - self.type = None - if node is not None: - for child in node: - if child.tag == tagprefix[version] + "text": - self.text = child.text - elif child.tag == tagprefix[version] + "type": - self.type = child.text - else: - raise ValueError("Неизвестный тип узла: '%s' " % child.tag) - - def write(self,root): - linknode = etree.SubElement(root,"link"); - linknode.set("href",self.href) - if self.text: - etree.SubElement(linknode,"text").text = self.text - if self.type: - etree.SubElement(linknode,"type").text = self.type - -class GPXTrackPt: - # Точка с координатами. Может присутствовать в последовательности точек в треке, маршруте, а также в списке путевых точек.# - - def __init__(self, node, version): - # Извлекаем данные из GPX-тэгов.# - self.version = version - # Сначала обязательные.# - if node is not None: - self.lat = float(node.get("lat")) - self.lon = float(node.get("lon")) - self.elevation = None - self.time = None - self.speed = None - self.link = None - self.additional_info = {} - # Потом необязательные.# - if node is not None: - for child in node: - if child.tag == tagprefix[version] + "time": - self.time = parse(child.text) - elif child.tag == tagprefix[version] + "ele": - self.elevation = float(child.text) - # Стандартом скорость не предусмотрена, но де-факто в Run.GPS используется.# - elif child.tag == tagprefix[version] + "speed": - self.speed = float(child.text) - elif child.tag == tagprefix[version] + "link": - self.link = Link(child,version) - elif StripPrefix(child.tag,version) in ['magvar','geoidheight','name','cmt','desc', - 'src','sym','type','fix','sat','hdop','vdop','pdop','ageofdgpsdata','dgpsid']: - self.additional_info[StripPrefix(child.tag,version)]=child.text - elif child.tag == tagprefix[version] + "extensions": - pass - else: - raise ValueError("Неизвестный тип узла: '%s'" % child.tag) - - for i in self.additional_info.keys(): - val = self.additional_info[i]; - if val: - if not val.strip(' \n\t'): - del self.additional_info[i] - - - def write(self,root,tag ="wpt"): - wptnode = etree.SubElement(root,tag) - wptnode.set("lat",repr(self.lat)); - wptnode.set("lon",repr(self.lon)); - if self.elevation: - etree.SubElement(wptnode,"ele").text = repr(self.elevation) - if self.time: - etree.SubElement(wptnode,"time").text = formattime(self.time) - if self.speed: - etree.SubElement(wptnode,"speed").text = repr(self.speed) - if self.link: - self.link.write(wptnode) - for i in self.additional_info.keys(): - etree.SubElement(wptnode,i).text = self.additional_info[i] - - def distance(self, other): - # Расстояние между двумя точками на глобусе.# - try: - # http://www.platoscave.net/blog/2009/oct/5/calculate-distance-latitude-longitude-python/ - - radius = 6378700.0 # meters - - lat1, lon1 = self.lat, self.lon - lat2, lon2 = other.lat, other.lon - - dlat = math.radians(lat2-lat1) - dlon = math.radians(lon2-lon1) - a = math.sin(dlat/2) * math.sin(dlat/2) + math.cos(math.radians(lat1)) \ - * math.cos(math.radians(lat2)) * math.sin(dlon/2) * math.sin(dlon/2) - c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a)) - d = radius * c - - except ValueError, e: - raise ValueError(e) - return d - - def duration(self, other): - # Время между двумя точками.# - return other.time - self.time - -class GPXRoute: - # Маршрут.# - - def __init__(self, node, version): - self.name = None - self.version = version - self.rtepts = [] - self.additional_info = {} - self.link = None - if node is not None: - for child in node: - if child.tag == tagprefix[version] + "name": - self.name = child.text - elif child.tag == tagprefix[version] + "link": - self.link = Link(child,version) - elif StripPrefix(child.tag,version) in [ "cmt", "desc", "src", "number", "type" ]: - self.additional_info[StripPrefix(child.tag,version)]=child.text - elif child.tag == tagprefix[version] + "rtept": - self.rtepts.append(GPXTrackPt(child, version)) - elif child.tag == tagprefix[version] + "extensions": - pass - else: - raise ValueError("Неизвестный тип узла <%s>" % child.tag) - for i in self.additional_info.keys(): - val = self.additional_info[i]; - if val: - if not val.strip(' \n\t'): - del self.additional_info[i] - - def write(self,root): - rtenode = etree.SubElement(root,"rte") - if self.name: - etree.SubElement(rtenode,"name").text = self.name - if self.link: - self.link.write(rtenode) - for i in self.additional_info.keys(): - print "storing %s " % ( i ) - etree.SubElement(rtenode,i).text = self.additional_info[i] - for i in self.rtepts: - i.write(rtenode,"rtept") - -class GPXTrackSeg: - # Один сегмент трека.# - - def __init__(self, node, version): - self.version = version - self.trkpts = [] - self.elevation_gain = 0.0 - self.elevation_loss = 0.0 - if node is not None: - for child in node: - if child.tag == tagprefix[version] + "trkpt": - self.trkpts.append(GPXTrackPt(child, self.version)) - else: - raise ValueError("Неизвестный тип узла <%s>" % node.nodeName) - self._get_elevation() - - def write(self,root): - trksegnode = etree.SubElement(root,"trkseg") - for i in self.trkpts: - i.write(trksegnode,"trkpt") - - def _get_elevation(self): - - elev_data = [] - for pt in self.trkpts: - if pt.elevation: - elev_data.append(pt.elevation) - - gain = 0.0 - loss = 0.0 - last_elevation = None - - window_size = 5 - i = 0 - moving_averages = [] - - while i < len(elev_data) - window_size + 1: - this_window = elev_data[i : i + window_size] - window_average = sum(this_window) / window_size - moving_averages.append(window_average) - i += 1 - - if len(moving_averages)>2: - elev_data = moving_averages - - for pt in elev_data: - if last_elevation is not None: - try: - if pt > last_elevation: - gain += pt - last_elevation - else: - loss += last_elevation - pt - except: - pass - last_elevation=pt - - self.elevation_gain = gain - self.elevation_loss = loss - - def distance(self): - # Расчет длины сегмента.# - _length = 0.0 - last_pt = None - for pt in self.trkpts: - if last_pt is not None: - _length += last_pt.distance(pt) - last_pt = pt - return _length - - def filtered_distance(self,max_speed): - # Расчет длины сегмента с фильтрацией предположительно сбойных участков.# - _length = 0.0 - last_pt = None - for pt in self.trkpts: - if last_pt is not None: - _delta = last_pt.distance(pt) - _time_delta = pt.time - last_pt.time - _time_delta_s = _time_delta.days*86400 + _time_delta.seconds + _time_delta.microseconds/1000000 - if _time_delta_s > 0: - if _delta/_time_delta_s < max_speed: - _length += _delta - else: - _length += _delta - last_pt = pt - return _length - - def duration(self): - # Расчет продолжительности сегмента.# - return self.trkpts[0].duration(self.trkpts[-1]) - - def bound_box(self): - # Обрамляющий прямоугольник для сегмента.# - minlat =360 - maxlat = -360 - minlon = 360 - maxlon = -360 - for pt in self.trkpts: - if pt.latmaxlat: - maxlat=pt.lat - if pt.lonmaxlon: - maxlon=pt.lon - return ((minlat,minlon),(maxlat,maxlon)) - -class GPXTrack: - # Трек.# - - def __init__(self, node, version): - # Создаем трек из GPX-данных.# - - self.version = version - self.trksegs = [] - self.sport= None - self.additional_info = {} - self.name = None - self.link = None - - if node is not None: - for child in node: - if child.tag == tagprefix[version] + "name": - self.name = child.text - elif child.tag == tagprefix[version] + "trkseg": - if len(child) > 0: - self.trksegs.append(GPXTrackSeg(child, self.version)) - elif child.tag == tagprefix[version] + "link": - self.link = Link(child,version) - elif StripPrefix(child.tag,version) in [ "number", "desc", "cmt", "src", "type" ]: - self.additional_info[StripPrefix(child.tag,version)] = child.text - elif child.tag == tagprefix[version] + "extensions": - for ext in child: - # Из расширений извлекаем вид спорта - специфично для Run.GPS.# - if ext.tag == tagprefix[version] + "sport": - self.sport = ext.text - - for i in self.additional_info.keys(): - val = self.additional_info[i]; - if val: - if not val.strip(' \n\t'): - del self.additional_info[i] - - def write(self,root): - trknode = etree.SubElement(root,"trk") - if self.name: - etree.SubElement(trknode,"name").text = self.name - if self.sport: - etree.SubElement(etree.SubElement(trknode,"extensions"),"sport").text=self.sport - if self.link: - self.link.write(trknode) - for i in self.additional_info.keys(): - etree.SubElement(trknode,i).text = self.additional_info[i] - for i in self.trksegs: - i.write(trknode) - - def elevation_gain(self): - # Набор высоты по треку.# - return sum([trkseg.elevation_gain for trkseg in self.trksegs]) - - def elevation_loss(self): - # Сброс высоты по треку.# - return sum([trkseg.elevation_loss for trkseg in self.trksegs]) - - def distance(self): - # Длина трека.# - try: - return sum([trkseg.distance() for trkseg in self.trksegs]) - except IndexError: - print "Пустой сегмент трека, расчет длины невозможен." - - def filtered_distance(self,max_speed=100): - # Длина трека с фильтрацией сбойных участков.# - try: - return sum([trkseg.filtered_distance(max_speed=max_speed) for trkseg in self.trksegs]) - except IndexError: - print "Пустой сегмент трека, расчет длины невозможен." - - def duration(self): - # Продолжительность трека, не включая паузы между сегментами.# - dur = datetime.timedelta(0) - for trkseg in self.trksegs: - try: - dur += trkseg.duration() - except: - pass - return dur - - def full_duration(self): - # Продолжительность трека, включая паузы между сегментами.# - try: - return self.start().duration(self.end()) - except: - return None - - def start(self): - # Стартовая точка.# - try: - return self.trksegs[0].trkpts[0] - except IndexError: - return None - - def end(self): - # Финишная точка.# - try: - return self.trksegs[-1].trkpts[-1] - except IndexError: - return None - - def start_time(self): - # Время старта.# - try: - return self.start().time - except: - return None - - def end_time(self): - # Время финиша.# - try: - return self.end().time - except: - return None - - def bound_box(self): - # Обрамляющий прямоугольник для всего трека.# - minlat =360 - maxlat = -360 - minlon = 360 - maxlon = -360 - for trkseg in self.trksegs: - for pt in trkseg.trkpts: - if pt.latmaxlat: - maxlat=pt.lat - if pt.lonmaxlon: - maxlon=pt.lon - return ((minlat,minlon),(maxlat,maxlon)) - - -class GPX: - # Работа с GPX-документами.# - - def __init__(self): - # Создание пустого GPX-объекта# - PATH = os.path.dirname(__file__) - self.creator = None - self.time = None - self.tracks = [] - self.routes = [] - self.waypoints = [] - self.version = "" - self.author = None - self.name = None - self.link = None - self.copyright = None - self.meta = {} - - def ReadTree(self,tree): - # Загрузка из дерева etree.# - root = tree - - # Test if this is a GPX file or not and if it's version 1.1 - if root.tag == "{http://www.topografix.com/GPX/1/1}gpx" and root.get("version") == "1.1": - self.version = "1.1" - elif root.tag == "{http://www.topografix.com/GPX/1/0}gpx" and root.get("version") == "1.0": - self.version = "1.0" - elif root.get("version") not in [ "1.1", "1.0"]: - raise ValueError("Версия формата %s не поддерживается. Используйте GPX 1.1." % root.get("version")) - else: - raise ValueError("Неизвестный формат дерева") - # attempt to validate the xml file against the schema - - # initalize the GPX document for parsing. - self._init_version(root) - - - def ReadFile(self, fd): - # Загрузка из файла.# - gpx_doc = etree.parse(fd) - root = gpx_doc.getroot() - - # Test if this is a GPX file or not and if it's version 1.1 - if root.tag == "{http://www.topografix.com/GPX/1/1}gpx" and root.get("version") == "1.1": - self.version = "1.1" - elif root.tag == "{http://www.topografix.com/GPX/1/0}gpx" and root.get("version") == "1.0": - self.version = "1.0" - elif root.get("version") not in [ "1.1", "1.0"]: - raise ValueError("Версия формата %s не поддерживается. Используйте GPX 1.1." % root.get("version")) - else: - raise ValueError("Неизвестный формат файла") - # initalize the GPX document for parsing. - self._init_version(root) - - def _init_version(self,root): - # Инициализация объекта.# - self.creator = root.get("creator") - if root is not None: - for child in root: - if child.tag == tagprefix[self.version] + "author": - self.author = child.text - elif child.tag == tagprefix[self.version] + "trk": - self.tracks.append(GPXTrack(child, self.version)) - elif child.tag == tagprefix[self.version] + "rte": - self.routes.append(GPXRoute(child, self.version)) - elif child.tag == tagprefix[self.version] + "wpt": - self.waypoints.append(GPXTrackPt(child, self.version)) - elif child.tag == tagprefix[self.version] + "metadata": - for meta in child: - if meta.tag == tagprefix[self.version] + "author": - self.author = meta.text - elif meta.tag == tagprefix[self.version] + "name": - self.name = meta.text - elif meta.tag == tagprefix[self.version] + "bounds": - # Границы пропускаем - их будем вычислять сами заново - pass - elif meta.tag == tagprefix[self.version] + "copyright": - self.copyright = meta.text - elif meta.tag == tagprefix[self.version] + "link": - self.link = Link(meta,self.version) - else: - self.meta[StripPrefix(meta.tag,self.version)]=meta.text - - for i in self.meta.keys(): - val = self.meta[i]; - if val: - if not val.strip(' \n\t'): - del self.meta[i] - - def elevation_gain(self): - # Суммарный набор высоты.# - return sum([track.elevation_gain() for track in self.tracks]) - - def elevation_loss(self): - # Суммарная потеря высоты.# - return sum([track.elevation_loss() for track in self.tracks]) - - def distance(self): - # Суммарная дистанция.# - try: - return sum([track.distance() for track in self.tracks]) - except IndexError: - print "Пустой файл!" - - def filtered_distance(self): - # Суммарная дистанция с фильтрацией сбойных данных.# - try: - return sum([track.filtered_distance() for track in self.tracks]) - except IndexError: - print "Пустой файл!" - - def duration(self): - # Суммарная продолжительность, не включая паузы.# - dur = datetime.timedelta(0) - for track in self.tracks: - dur += track.duration() - return dur - - def full_duration(self): - # Суммарная продолжительность, включая паузы# - if hasattr(self.start(), 'duration'): - return self.start().duration(self.end()) - else: - return None - - def start(self): - # Точка старта.# - return self.tracks[0].start() - - def end(self): - # Точка финиша.# - return self.tracks[-1].end() - - def start_time(self): - # Время старта.# - if self.start(): - return self.start().time - else: - return None - - def end_time(self): - # Время финиша.# - return self.end().time - - def bound_box(self): - # Обрамляющий прямоугольник для всех треков в файле.# - minlat =360 - maxlat = -360 - minlon = 360 - maxlon = -360 - for track in self.tracks: - for trkseg in track.trksegs: - for pt in trkseg.trkpts: - if pt.latmaxlat: - maxlat=pt.lat - if pt.lonmaxlon: - maxlon=pt.lon - return ((minlat,minlon),(maxlat,maxlon)) - - def FixNames(self,linkname): - pprint(linkname) - pprint(type(linkname)) - if type(linkname) is unicode: - goodname=linkname - else: - goodname = linkname.decode('UTF-8') - badname = goodname.encode('ascii','replace') - if self.name and self.name.startswith(badname): - self.name=self.name.replace(badname,goodname) - for i in self.tracks: - if i.name and i.name.startswith(badname): - i.name = i.name.replace(badname,goodname) - - def ProcessTrackSegs(self,seconds_gap=120): - for tr in self.tracks: - # Объединяем все точки трека в единую последовательность - all_trackpoints = [] - for trseg in tr.trksegs: - all_trackpoints.extend(trseg.trkpts) - new_segs = [] - current_seg = None - prev_pt = None - for pt in all_trackpoints: - if not current_seg: - # Начало нового сегмента - current_seg = GPXTrackSeg(None,self.version) - current_seg.trkpts.append(pt) - else: - delta = pt.time - prev_pt.time - if delta.days > 0 or delta.seconds > seconds_gap: - # Нашли место разрыва - new_segs.append(current_seg) - current_seg = None - else: - current_seg.trkpts.append(pt) - prev_pt=pt - # Если есть, что закинуть в хвост - пишем - if current_seg is not None: - new_segs.append(current_seg) - # Обработали - for trs in new_segs: - trs._get_elevation() - tr.trksegs = new_segs - - - def XMLTree(self): - root = etree.Element("gpx", attrib ={"creator": "pygpx", - "version": "1.1", - "xmlns": "http://www.topografix.com/GPX/1/1"}); - - # meta subtree - - meta = etree.SubElement( root, "metadata" ); - if self.name: - metarecord = etree.SubElement( meta, "name" ) - metarecord.text = self.name - if self.author: - metarecord = etree.SubElement( meta, "author" ) - metarecord.text = self.author - if self.copyright: - metarecord = etree.SubElement( meta, "copyright" ) - metarecord.text = self.copyright - if self.link: - self.link.write(meta) - for metatag in self.meta.iterkeys(): - metarecord = etree.SubElement( meta, metatag ) - metarecord.text = self.meta[metatag] - - # wpts subtree - - for i in self.waypoints: - i.write(root) - - # routes subtree - - for i in self.routes: - i.write(root) - - # tracks subtree - - for i in self.tracks: - i.write(root) - - return root - \ No newline at end of file