-# coding: UTF-8
-#------------------------
-# Работа с GPX-файлами
-#------------------------
-from lxml import etree
-import os
-import math
-import datetime
-from dateutil.parser import parse
-from pprint import pprint
-
-def deg2rad(deg):
-# Преобразование из градусов в радианы.
- return deg / (180 / math.pi)
-
-tagprefix = { '1.0': '{http://www.topografix.com/GPX/1/0}', '1.1': '{http://www.topografix.com/GPX/1/1}'}
-
-def StripPrefix(tag,version):
- nstag=tagprefix[version]
- return tag.replace(nstag,'')
-
-def formattime(dt):
- return dt.strftime("%Y-%m-%dT%H:%M:%S")+"."+str(int(dt.microsecond/1000)).zfill(3)+"Z"
-
-class Link:
- # Ссылка. Может присутствовать в точке, треке, файле.
- def __init__(self, node, version):
- self.version = version
- self.href = node.get("href")
- self.text = None
- self.type = None
- if node is not None:
- for child in node:
- if child.tag == tagprefix[version] + "text":
- self.text = child.text
- elif child.tag == tagprefix[version] + "type":
- self.type = child.text
- else:
- raise ValueError("Неизвестный тип узла: '%s' " % child.tag)
-
- def write(self,root):
- linknode = etree.SubElement(root,"link");
- linknode.set("href",self.href)
- if self.text:
- etree.SubElement(linknode,"text").text = self.text
- if self.type:
- etree.SubElement(linknode,"type").text = self.type
-
-class GPXTrackPt:
- # Точка с координатами. Может присутствовать в последовательности точек в треке, маршруте, а также в списке путевых точек.#
-
- def __init__(self, node, version):
- # Извлекаем данные из GPX-тэгов.#
- self.version = version
- # Сначала обязательные.#
- if node is not None:
- self.lat = float(node.get("lat"))
- self.lon = float(node.get("lon"))
- self.elevation = None
- self.time = None
- self.speed = None
- self.link = None
- self.additional_info = {}
- # Потом необязательные.#
- if node is not None:
- for child in node:
- if child.tag == tagprefix[version] + "time":
- self.time = parse(child.text)
- elif child.tag == tagprefix[version] + "ele":
- self.elevation = float(child.text)
- # Стандартом скорость не предусмотрена, но де-факто в Run.GPS используется.#
- elif child.tag == tagprefix[version] + "speed":
- self.speed = float(child.text)
- elif child.tag == tagprefix[version] + "link":
- self.link = Link(child,version)
- elif StripPrefix(child.tag,version) in ['magvar','geoidheight','name','cmt','desc',
- 'src','sym','type','fix','sat','hdop','vdop','pdop','ageofdgpsdata','dgpsid']:
- self.additional_info[StripPrefix(child.tag,version)]=child.text
- elif child.tag == tagprefix[version] + "extensions":
- pass
- else:
- raise ValueError("Неизвестный тип узла: '%s'" % child.tag)
-
- for i in self.additional_info.keys():
- val = self.additional_info[i];
- if val:
- if not val.strip(' \n\t'):
- del self.additional_info[i]
-
-
- def write(self,root,tag ="wpt"):
- wptnode = etree.SubElement(root,tag)
- wptnode.set("lat",repr(self.lat));
- wptnode.set("lon",repr(self.lon));
- if self.elevation:
- etree.SubElement(wptnode,"ele").text = repr(self.elevation)
- if self.time:
- etree.SubElement(wptnode,"time").text = formattime(self.time)
- if self.speed:
- etree.SubElement(wptnode,"speed").text = repr(self.speed)
- if self.link:
- self.link.write(wptnode)
- for i in self.additional_info.keys():
- etree.SubElement(wptnode,i).text = self.additional_info[i]
-
- def distance(self, other):
- # Расстояние между двумя точками на глобусе.#
- try:
- # http://www.platoscave.net/blog/2009/oct/5/calculate-distance-latitude-longitude-python/
-
- radius = 6378700.0 # meters
-
- lat1, lon1 = self.lat, self.lon
- lat2, lon2 = other.lat, other.lon
-
- dlat = math.radians(lat2-lat1)
- dlon = math.radians(lon2-lon1)
- a = math.sin(dlat/2) * math.sin(dlat/2) + math.cos(math.radians(lat1)) \
- * math.cos(math.radians(lat2)) * math.sin(dlon/2) * math.sin(dlon/2)
- c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
- d = radius * c
-
- except ValueError, e:
- raise ValueError(e)
- return d
-
- def duration(self, other):
- # Время между двумя точками.#
- return other.time - self.time
-
-class GPXRoute:
- # Маршрут.#
-
- def __init__(self, node, version):
- self.name = None
- self.version = version
- self.rtepts = []
- self.additional_info = {}
- self.link = None
- if node is not None:
- for child in node:
- if child.tag == tagprefix[version] + "name":
- self.name = child.text
- elif child.tag == tagprefix[version] + "link":
- self.link = Link(child,version)
- elif StripPrefix(child.tag,version) in [ "cmt", "desc", "src", "number", "type" ]:
- self.additional_info[StripPrefix(child.tag,version)]=child.text
- elif child.tag == tagprefix[version] + "rtept":
- self.rtepts.append(GPXTrackPt(child, version))
- elif child.tag == tagprefix[version] + "extensions":
- pass
- else:
- raise ValueError("Неизвестный тип узла <%s>" % child.tag)
- for i in self.additional_info.keys():
- val = self.additional_info[i];
- if val:
- if not val.strip(' \n\t'):
- del self.additional_info[i]
-
- def write(self,root):
- rtenode = etree.SubElement(root,"rte")
- if self.name:
- etree.SubElement(rtenode,"name").text = self.name
- if self.link:
- self.link.write(rtenode)
- for i in self.additional_info.keys():
- print "storing %s " % ( i )
- etree.SubElement(rtenode,i).text = self.additional_info[i]
- for i in self.rtepts:
- i.write(rtenode,"rtept")
-
-class GPXTrackSeg:
- # Один сегмент трека.#
-
- def __init__(self, node, version):
- self.version = version
- self.trkpts = []
- self.elevation_gain = 0.0
- self.elevation_loss = 0.0
- if node is not None:
- for child in node:
- if child.tag == tagprefix[version] + "trkpt":
- self.trkpts.append(GPXTrackPt(child, self.version))
- else:
- raise ValueError("Неизвестный тип узла <%s>" % node.nodeName)
- self._get_elevation()
-
- def write(self,root):
- trksegnode = etree.SubElement(root,"trkseg")
- for i in self.trkpts:
- i.write(trksegnode,"trkpt")
-
- def _get_elevation(self):
-
- elev_data = []
- for pt in self.trkpts:
- if pt.elevation:
- elev_data.append(pt.elevation)
-
- gain = 0.0
- loss = 0.0
- last_elevation = None
-
- window_size = 5
- i = 0
- moving_averages = []
-
- while i < len(elev_data) - window_size + 1:
- this_window = elev_data[i : i + window_size]
- window_average = sum(this_window) / window_size
- moving_averages.append(window_average)
- i += 1
-
- if len(moving_averages)>2:
- elev_data = moving_averages
-
- for pt in elev_data:
- if last_elevation is not None:
- try:
- if pt > last_elevation:
- gain += pt - last_elevation
- else:
- loss += last_elevation - pt
- except:
- pass
- last_elevation=pt
-
- self.elevation_gain = gain
- self.elevation_loss = loss
-
- def distance(self):
- # Расчет длины сегмента.#
- _length = 0.0
- last_pt = None
- for pt in self.trkpts:
- if last_pt is not None:
- _length += last_pt.distance(pt)
- last_pt = pt
- return _length
-
- def filtered_distance(self,max_speed):
- # Расчет длины сегмента с фильтрацией предположительно сбойных участков.#
- _length = 0.0
- last_pt = None
- for pt in self.trkpts:
- if last_pt is not None:
- _delta = last_pt.distance(pt)
- _time_delta = pt.time - last_pt.time
- _time_delta_s = _time_delta.days*86400 + _time_delta.seconds + _time_delta.microseconds/1000000
- if _time_delta_s > 0:
- if _delta/_time_delta_s < max_speed:
- _length += _delta
- else:
- _length += _delta
- last_pt = pt
- return _length
-
- def duration(self):
- # Расчет продолжительности сегмента.#
- return self.trkpts[0].duration(self.trkpts[-1])
-
- def bound_box(self):
- # Обрамляющий прямоугольник для сегмента.#
- minlat =360
- maxlat = -360
- minlon = 360
- maxlon = -360
- for pt in self.trkpts:
- if pt.lat<minlat:
- minlat=pt.lat
- if pt.lat>maxlat:
- maxlat=pt.lat
- if pt.lon<minlon:
- minlon=pt.lon
- if pt.lon>maxlon:
- maxlon=pt.lon
- return ((minlat,minlon),(maxlat,maxlon))
-
-class GPXTrack:
- # Трек.#
-
- def __init__(self, node, version):
- # Создаем трек из GPX-данных.#
-
- self.version = version
- self.trksegs = []
- self.sport= None
- self.additional_info = {}
- self.name = None
- self.link = None
-
- if node is not None:
- for child in node:
- if child.tag == tagprefix[version] + "name":
- self.name = child.text
- elif child.tag == tagprefix[version] + "trkseg":
- if len(child) > 0:
- self.trksegs.append(GPXTrackSeg(child, self.version))
- elif child.tag == tagprefix[version] + "link":
- self.link = Link(child,version)
- elif StripPrefix(child.tag,version) in [ "number", "desc", "cmt", "src", "type" ]:
- self.additional_info[StripPrefix(child.tag,version)] = child.text
- elif child.tag == tagprefix[version] + "extensions":
- for ext in child:
- # Из расширений извлекаем вид спорта - специфично для Run.GPS.#
- if ext.tag == tagprefix[version] + "sport":
- self.sport = ext.text
-
- for i in self.additional_info.keys():
- val = self.additional_info[i];
- if val:
- if not val.strip(' \n\t'):
- del self.additional_info[i]
-
- def write(self,root):
- trknode = etree.SubElement(root,"trk")
- if self.name:
- etree.SubElement(trknode,"name").text = self.name
- if self.sport:
- etree.SubElement(etree.SubElement(trknode,"extensions"),"sport").text=self.sport
- if self.link:
- self.link.write(trknode)
- for i in self.additional_info.keys():
- etree.SubElement(trknode,i).text = self.additional_info[i]
- for i in self.trksegs:
- i.write(trknode)
-
- def elevation_gain(self):
- # Набор высоты по треку.#
- return sum([trkseg.elevation_gain for trkseg in self.trksegs])
-
- def elevation_loss(self):
- # Сброс высоты по треку.#
- return sum([trkseg.elevation_loss for trkseg in self.trksegs])
-
- def distance(self):
- # Длина трека.#
- try:
- return sum([trkseg.distance() for trkseg in self.trksegs])
- except IndexError:
- print "Пустой сегмент трека, расчет длины невозможен."
-
- def filtered_distance(self,max_speed=100):
- # Длина трека с фильтрацией сбойных участков.#
- try:
- return sum([trkseg.filtered_distance(max_speed=max_speed) for trkseg in self.trksegs])
- except IndexError:
- print "Пустой сегмент трека, расчет длины невозможен."
-
- def duration(self):
- # Продолжительность трека, не включая паузы между сегментами.#
- dur = datetime.timedelta(0)
- for trkseg in self.trksegs:
- try:
- dur += trkseg.duration()
- except:
- pass
- return dur
-
- def full_duration(self):
- # Продолжительность трека, включая паузы между сегментами.#
- try:
- return self.start().duration(self.end())
- except:
- return None
-
- def start(self):
- # Стартовая точка.#
- try:
- return self.trksegs[0].trkpts[0]
- except IndexError:
- return None
-
- def end(self):
- # Финишная точка.#
- try:
- return self.trksegs[-1].trkpts[-1]
- except IndexError:
- return None
-
- def start_time(self):
- # Время старта.#
- try:
- return self.start().time
- except:
- return None
-
- def end_time(self):
- # Время финиша.#
- try:
- return self.end().time
- except:
- return None
-
- def bound_box(self):
- # Обрамляющий прямоугольник для всего трека.#
- minlat =360
- maxlat = -360
- minlon = 360
- maxlon = -360
- for trkseg in self.trksegs:
- for pt in trkseg.trkpts:
- if pt.lat<minlat:
- minlat=pt.lat
- if pt.lat>maxlat:
- maxlat=pt.lat
- if pt.lon<minlon:
- minlon=pt.lon
- if pt.lon>maxlon:
- maxlon=pt.lon
- return ((minlat,minlon),(maxlat,maxlon))
-
-
-class GPX:
- # Работа с GPX-документами.#
-
- def __init__(self):
- # Создание пустого GPX-объекта#
- PATH = os.path.dirname(__file__)
- self.creator = None
- self.time = None
- self.tracks = []
- self.routes = []
- self.waypoints = []
- self.version = ""
- self.author = None
- self.name = None
- self.link = None
- self.copyright = None
- self.meta = {}
-
- def ReadTree(self,tree):
- # Загрузка из дерева etree.#
- root = tree
-
- # Test if this is a GPX file or not and if it's version 1.1
- if root.tag == "{http://www.topografix.com/GPX/1/1}gpx" and root.get("version") == "1.1":
- self.version = "1.1"
- elif root.tag == "{http://www.topografix.com/GPX/1/0}gpx" and root.get("version") == "1.0":
- self.version = "1.0"
- elif root.get("version") not in [ "1.1", "1.0"]:
- raise ValueError("Версия формата %s не поддерживается. Используйте GPX 1.1." % root.get("version"))
- else:
- raise ValueError("Неизвестный формат дерева")
- # attempt to validate the xml file against the schema
-
- # initalize the GPX document for parsing.
- self._init_version(root)
-
-
- def ReadFile(self, fd):
- # Загрузка из файла.#
- gpx_doc = etree.parse(fd)
- root = gpx_doc.getroot()
-
- # Test if this is a GPX file or not and if it's version 1.1
- if root.tag == "{http://www.topografix.com/GPX/1/1}gpx" and root.get("version") == "1.1":
- self.version = "1.1"
- elif root.tag == "{http://www.topografix.com/GPX/1/0}gpx" and root.get("version") == "1.0":
- self.version = "1.0"
- elif root.get("version") not in [ "1.1", "1.0"]:
- raise ValueError("Версия формата %s не поддерживается. Используйте GPX 1.1." % root.get("version"))
- else:
- raise ValueError("Неизвестный формат файла")
- # initalize the GPX document for parsing.
- self._init_version(root)
-
- def _init_version(self,root):
- # Инициализация объекта.#
- self.creator = root.get("creator")
- if root is not None:
- for child in root:
- if child.tag == tagprefix[self.version] + "author":
- self.author = child.text
- elif child.tag == tagprefix[self.version] + "trk":
- self.tracks.append(GPXTrack(child, self.version))
- elif child.tag == tagprefix[self.version] + "rte":
- self.routes.append(GPXRoute(child, self.version))
- elif child.tag == tagprefix[self.version] + "wpt":
- self.waypoints.append(GPXTrackPt(child, self.version))
- elif child.tag == tagprefix[self.version] + "metadata":
- for meta in child:
- if meta.tag == tagprefix[self.version] + "author":
- self.author = meta.text
- elif meta.tag == tagprefix[self.version] + "name":
- self.name = meta.text
- elif meta.tag == tagprefix[self.version] + "bounds":
- # Границы пропускаем - их будем вычислять сами заново
- pass
- elif meta.tag == tagprefix[self.version] + "copyright":
- self.copyright = meta.text
- elif meta.tag == tagprefix[self.version] + "link":
- self.link = Link(meta,self.version)
- else:
- self.meta[StripPrefix(meta.tag,self.version)]=meta.text
-
- for i in self.meta.keys():
- val = self.meta[i];
- if val:
- if not val.strip(' \n\t'):
- del self.meta[i]
-
- def elevation_gain(self):
- # Суммарный набор высоты.#
- return sum([track.elevation_gain() for track in self.tracks])
-
- def elevation_loss(self):
- # Суммарная потеря высоты.#
- return sum([track.elevation_loss() for track in self.tracks])
-
- def distance(self):
- # Суммарная дистанция.#
- try:
- return sum([track.distance() for track in self.tracks])
- except IndexError:
- print "Пустой файл!"
-
- def filtered_distance(self):
- # Суммарная дистанция с фильтрацией сбойных данных.#
- try:
- return sum([track.filtered_distance() for track in self.tracks])
- except IndexError:
- print "Пустой файл!"
-
- def duration(self):
- # Суммарная продолжительность, не включая паузы.#
- dur = datetime.timedelta(0)
- for track in self.tracks:
- dur += track.duration()
- return dur
-
- def full_duration(self):
- # Суммарная продолжительность, включая паузы#
- if hasattr(self.start(), 'duration'):
- return self.start().duration(self.end())
- else:
- return None
-
- def start(self):
- # Точка старта.#
- return self.tracks[0].start()
-
- def end(self):
- # Точка финиша.#
- return self.tracks[-1].end()
-
- def start_time(self):
- # Время старта.#
- if self.start():
- return self.start().time
- else:
- return None
-
- def end_time(self):
- # Время финиша.#
- return self.end().time
-
- def bound_box(self):
- # Обрамляющий прямоугольник для всех треков в файле.#
- minlat =360
- maxlat = -360
- minlon = 360
- maxlon = -360
- for track in self.tracks:
- for trkseg in track.trksegs:
- for pt in trkseg.trkpts:
- if pt.lat<minlat:
- minlat=pt.lat
- if pt.lat>maxlat:
- maxlat=pt.lat
- if pt.lon<minlon:
- minlon=pt.lon
- if pt.lon>maxlon:
- maxlon=pt.lon
- return ((minlat,minlon),(maxlat,maxlon))
-
- def FixNames(self,linkname):
- pprint(linkname)
- pprint(type(linkname))
- if type(linkname) is unicode:
- goodname=linkname
- else:
- goodname = linkname.decode('UTF-8')
- badname = goodname.encode('ascii','replace')
- if self.name and self.name.startswith(badname):
- self.name=self.name.replace(badname,goodname)
- for i in self.tracks:
- if i.name and i.name.startswith(badname):
- i.name = i.name.replace(badname,goodname)
-
- def ProcessTrackSegs(self,seconds_gap=120):
- for tr in self.tracks:
- # Объединяем все точки трека в единую последовательность
- all_trackpoints = []
- for trseg in tr.trksegs:
- all_trackpoints.extend(trseg.trkpts)
- new_segs = []
- current_seg = None
- prev_pt = None
- for pt in all_trackpoints:
- if not current_seg:
- # Начало нового сегмента
- current_seg = GPXTrackSeg(None,self.version)
- current_seg.trkpts.append(pt)
- else:
- delta = pt.time - prev_pt.time
- if delta.days > 0 or delta.seconds > seconds_gap:
- # Нашли место разрыва
- new_segs.append(current_seg)
- current_seg = None
- else:
- current_seg.trkpts.append(pt)
- prev_pt=pt
- # Если есть, что закинуть в хвост - пишем
- if current_seg is not None:
- new_segs.append(current_seg)
- # Обработали
- for trs in new_segs:
- trs._get_elevation()
- tr.trksegs = new_segs
-
-
- def XMLTree(self):
- root = etree.Element("gpx", attrib ={"creator": "pygpx",
- "version": "1.1",
- "xmlns": "http://www.topografix.com/GPX/1/1"});
-
- # meta subtree
-
- meta = etree.SubElement( root, "metadata" );
- if self.name:
- metarecord = etree.SubElement( meta, "name" )
- metarecord.text = self.name
- if self.author:
- metarecord = etree.SubElement( meta, "author" )
- metarecord.text = self.author
- if self.copyright:
- metarecord = etree.SubElement( meta, "copyright" )
- metarecord.text = self.copyright
- if self.link:
- self.link.write(meta)
- for metatag in self.meta.iterkeys():
- metarecord = etree.SubElement( meta, metatag )
- metarecord.text = self.meta[metatag]
-
- # wpts subtree
-
- for i in self.waypoints:
- i.write(root)
-
- # routes subtree
-
- for i in self.routes:
- i.write(root)
-
- # tracks subtree
-
- for i in self.tracks:
- i.write(root)
-
- return root
-
\ No newline at end of file