Last active
August 25, 2024 14:57
-
-
Save d3d9/c2c796b44220eaf70eeaf876b2f6e100 to your computer and use it in GitHub Desktop.
im Zusammenhang mit https://github.com/d3d9/hst-svg-netzplan. depdata aus https://github.com/d3d9/dm_tomatrixled.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
from dataclasses import dataclass, field | |
from datetime import datetime, timezone | |
from enum import Enum | |
from requests import get | |
from typing import Set, List, Dict, Callable, Union, Optional, Any, Tuple, Iterable | |
import xml.etree.ElementTree as ET | |
@dataclass | |
class Meldung: | |
symbol: str | |
text: str | |
color: Optional[str] = None | |
efa: bool = False | |
# blocking: bool = False | |
def __post_init__(self): | |
if self.text is None: | |
self.text = "" | |
class MOT(Enum): | |
TRAIN = 1 | |
HISPEED = 2 | |
TRAM = 3 | |
BUS = 4 | |
HANGING = 5 | |
trainTMOTefa = {0, 1, 13, 14, 15, 16, 18} | |
trainMOT = {MOT.TRAIN, MOT.HISPEED} | |
@dataclass | |
class Departure: | |
linenum: str | |
direction: str | |
direction_planned: str | |
deptime: datetime | |
deptime_planned: datetime | |
realtime: bool | |
delay: int = 0 # minutes | |
messages: Union[List[str], List[Meldung]] = field(default_factory=list) # str als Zwischenschritt | |
coursesummary: Optional[str] = None | |
mot: Optional[MOT] = None | |
# accessibility? | |
# operator? | |
platformno: Optional[str] = None | |
platformno_planned: Optional[str] = None | |
platformtype: Optional[str] = None | |
stopname: Optional[str] = None | |
stopid: Optional[str] = None | |
place: Optional[str] = None | |
cancelled: Optional[bool] = None | |
earlytermination: Optional[bool] = None | |
headsign: Optional[str] = None | |
color: Optional[str] = None | |
arrtime: Optional[datetime] = None | |
arrtime_planned: Optional[datetime] = None | |
disp_countdown: Optional[int] = None # minutes | |
disp_linenum: Optional[str] = None | |
disp_direction: Optional[str] = None | |
type_getpayload = Dict[str, Union[str, int, Iterable[Union[str, int]]]] | |
type_data = Dict[str, Any] | |
type_depmsgdata = Tuple[List[Departure], List[Meldung], type_data] | |
type_depfnlist = List[Tuple[Callable[..., type_depmsgdata], | |
List[Dict[str, Any]]]] | |
type_depfns = Dict[Tuple[str, bool], type_depfnlist] | |
def readefaxml(root: ET.Element, tz: timezone, | |
ignore_infoTypes: Optional[Set] = None, ignore_infoIDs: Optional[Set] = None, | |
content_for_short_titles: bool = True) -> type_depmsgdata: | |
deps: List[Departure] = [] | |
stop_messages: List[Meldung] = [] | |
# (itdStopInfoList bei Abfahrten bzw. infoLink bei itdOdvName) | |
# treten (alle?) auch bei einzelnen Abfahrten auf.. erstmal keine daten hierbei | |
# evtl. auslesen und schauen, was wirklich haltbezogen ist und nicht anderswo dabei ist | |
_itdOdv = root.find('itdDepartureMonitorRequest').find('itdOdv') | |
_itdOdvPlace = _itdOdv.find('itdOdvPlace') | |
if _itdOdvPlace.get('state') != "identified": | |
return deps, stop_messages, {} | |
place = _itdOdvPlace.findtext('odvPlaceElem') | |
_itdOdvName = _itdOdv.find('itdOdvName') | |
if _itdOdvName.get('state') != "identified": | |
return deps, stop_messages, {} | |
_itdOdvNameElem = _itdOdvName.find('odvNameElem') | |
stopname = _itdOdvNameElem.text or _itdOdvNameElem[0].tail or next((t.tail for t in _itdOdvNameElem if t is not None), None) | |
for dep in root.iter('itdDeparture'): | |
servingline = dep.find('itdServingLine') | |
_itdNoTrain = servingline.find('itdNoTrain') | |
_itdNoTrainName = _itdNoTrain.get('name', '') | |
linenum = servingline.attrib['number'] | |
_trainName = servingline.attrib.get('trainName', '') | |
if linenum.endswith(" "+_itdNoTrainName): | |
linenum = linenum.replace(" "+_itdNoTrainName, "") | |
elif linenum.endswith(" "+_trainName): | |
linenum = linenum.replace(" "+_trainName, "") | |
countdown = int(dep.attrib['countdown']) | |
isrealtime = bool(int(servingline.attrib['realtime'])) | |
if isrealtime: | |
delay = int(_itdNoTrain.attrib['delay']) | |
else: | |
delay = 0 | |
cancelled = delay == -9999 | |
messages: List[str] = [] | |
genAttrList = dep.find('genAttrList') | |
direction_planned = servingline.get('direction') | |
direction_actual = direction_planned | |
earlytermination = False | |
_earlytermv = genAttrList.findtext("./genAttrElem[name='EarlyTermination']/value") if genAttrList else None | |
if (not cancelled) and _earlytermv: | |
direction_actual = _earlytermv | |
earlytermination = True | |
# Beobachtungen bzgl. Steigänderung: | |
# genAttrElem mit name platformChange und value changed | |
# platform bei itdDeparture entspricht originaler, platformName der neuen.. | |
# haben aber eigentlich unterschiedliche Bedeutungen | |
# (bei Bussen steht dann da z. B. "Bstg. 1" in platformName | |
# Auseinanderhalten eigentlich sinnvoll, bei platformChange muss aber wohl ne Ausnahme gemacht werden | |
# weiter beobachten, wie sowas in weiteren Fällen aussieht.. | |
# Sowas wie "Aachen, Hbf,Aachen" verbessern | |
_ds = direction_actual.split(",") | |
if len(_ds) > 1 and direction_actual.startswith(_ds[-1].strip()): | |
disp_direction = ",".join(_ds[:-1]) | |
else: | |
disp_direction = direction_actual | |
itddatetime = dep.find('itdDateTime') | |
itddatea = itddatetime.find('itdDate').attrib | |
itdtimea = itddatetime.find('itdTime').attrib | |
deptime_planned = datetime(int(itddatea['year']), int(itddatea['month']), int(itddatea['day']), int(itdtimea['hour']), int(itdtimea['minute']), tzinfo=tz) | |
deptime = deptime_planned | |
if isrealtime and not cancelled: | |
itdrtdatetime = dep.find('itdRTDateTime') | |
itdrtdatea = itdrtdatetime.find('itdDate').attrib | |
itdrttimea = itdrtdatetime.find('itdTime').attrib | |
deptime = datetime(int(itdrtdatea['year']), int(itdrtdatea['month']), int(itdrtdatea['day']), int(itdrttimea['hour']), int(itdrttimea['minute']), tzinfo=tz) | |
for _infoLink in dep.iter('infoLink'): | |
if ((ignore_infoTypes and _infoLink.findtext("./paramList/param[name='infoType']/value") in ignore_infoTypes) | |
or (ignore_infoIDs and _infoLink.findtext("./paramList/param[name='infoID']/value") in ignore_infoIDs)): | |
continue | |
_iLTtext = _infoLink.findtext('infoLinkText') | |
if _iLTtext: | |
# kurze, inhaltslose (DB-)Meldungstitel | |
if content_for_short_titles and _iLTtext in {"Störung.", "Bauarbeiten.", "Information."}: | |
_infoLink_infoText = _infoLink.find('infoText') | |
if _infoLink_infoText is None: continue | |
_iLiTcontent = _infoLink_infoText.findtext('content') | |
if _iLiTcontent: | |
messages.append(f"{_iLTtext[:-1]}: {_iLiTcontent}") | |
continue | |
# else: weiter, nächste Zeile | |
messages.append(_iLTtext) | |
else: | |
_infoLink_infoText = _infoLink.find('infoText') | |
if _infoLink_infoText is None: continue | |
_iLiTsubject = _infoLink_infoText.findtext('subject') | |
_iLiTsubtitle = _infoLink_infoText.findtext('subtitle') | |
_msg = "" | |
if _iLiTsubject: _msg += (_iLiTsubject + (" " if _iLiTsubject.endswith(":") else ": ")) | |
if _iLiTsubtitle: _msg += _iLiTsubtitle | |
if _msg: messages.append(_msg) | |
itdNoTrainText = servingline.findtext('itdNoTrain') | |
if itdNoTrainText: | |
messages.append(f"{linenum}: {itdNoTrainText}") | |
mot = None | |
motType = int(servingline.get('motType')) | |
if motType in {5, 6, 7, 10, 17, 19}: | |
mot = MOT.BUS | |
elif motType in {0, 1, 13, 14, 15, 16, 18}: | |
if motType in {15, 16} or (genAttrList and any(s in {"HIGHSPEEDTRAIN", "LONG_DISTANCE_TRAINS"} for s in (x.findtext('value') for x in genAttrList.findall('genAttrElem')))): | |
mot = MOT.HISPEED | |
else: | |
mot = MOT.TRAIN | |
elif motType in {2, 3, 4, 8}: | |
mot = MOT.TRAM | |
elif motType == 11: | |
mot = MOT.HANGING | |
deps.append(Departure(linenum=linenum, | |
direction=direction_actual, | |
direction_planned=direction_planned, | |
deptime=deptime, | |
deptime_planned=deptime_planned, | |
realtime=isrealtime, | |
delay=delay, | |
messages=messages, | |
coursesummary=servingline.findtext('itdRouteDescText'), | |
mot=mot, | |
platformno=dep.get('platform'), | |
platformtype=dep.get('pointType', ""), | |
stopname=(dep.get('nameWO') or stopname), | |
stopid=dep.get('gid'), | |
place=place, | |
cancelled=cancelled, | |
earlytermination=earlytermination, | |
disp_countdown=countdown, | |
disp_direction=disp_direction)) | |
return deps, stop_messages, {} | |
def getefadeps(serverurl: str, timeout: Union[int, float], ifopt: str, limit: int, tz: timezone, | |
# added: | |
getdeps_placelist = ["Hagen ", "HA-"], | |
getdeps_mincountdown = 0, | |
# | |
userealtime: bool = True, exclMOT: Optional[Set[int]] = None, inclMOT: Optional[Set[int]] = None, | |
ignore_infoTypes: Optional[Set] = None, ignore_infoIDs: Optional[Set] = None, content_for_short_titles: bool = True) -> type_depmsgdata: | |
payload: type_getpayload = {'name_dm': ifopt, 'type_dm': 'any', 'mode': 'direct', 'useRealtime': int(userealtime), 'limit': str(limit)} | |
if inclMOT: | |
payload['includedMeans'] = inclMOT | |
elif exclMOT: | |
payload['excludedMeans'] = exclMOT | |
payload['useProxFootSearch'] = 0 | |
payload['deleteAssignedStops_dm'] = 1 | |
r = get(serverurl, timeout=timeout, params=payload) | |
r.raise_for_status() | |
try: | |
root = ET.fromstring(r.content) | |
deps, messages, data = readefaxml(root, tz, ignore_infoTypes, ignore_infoIDs, content_for_short_titles) | |
nowtime = datetime.now(tz) | |
for dep in deps: | |
# ggf. anders runden? | |
# dep.disp_countdown = dep.disp_countdown if dep.disp_countdown is not None else int(round((dep.deptime-nowtime).total_seconds()/60)) | |
dep.disp_countdown = dep.disp_countdown if dep.disp_countdown is not None else int((dep.deptime-nowtime.replace(second=0, microsecond=0)).total_seconds()/60) | |
dep.disp_linenum = (dep.disp_linenum or dep.linenum) | |
if not dep.disp_direction: | |
if dep.headsign: | |
dep.disp_direction = dep.headsign.replace("\n", "/") | |
else: | |
dep.disp_direction = dep.direction | |
if getdeps_placelist: | |
for place in getdeps_placelist: # auslagern, feiner machen, +abk.verz.? | |
dep.disp_direction = dep.disp_direction.replace(place, "") | |
if dep.mot is None: | |
dep.mot = MOT.BUS | |
if dep.delay is None: | |
dep.delay = 0 | |
sorteddeps = sorted([dep for dep in deps if (dep.disp_countdown or 0) >= getdeps_mincountdown], | |
key=lambda dep: (dep.disp_countdown, not dep.cancelled, -dep.delay, not dep.earlytermination, dep.disp_linenum)) | |
return sorteddeps, messages, data | |
except Exception as e: | |
# logger.debug(f"request data:\n{r.content}") | |
print("error. request data: \n{r.content}") | |
print(e) | |
raise e |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3.7 | |
from flask import Flask, request, abort, make_response | |
from flask_cors import CORS | |
from datetime import datetime | |
from dataclasses import asdict | |
from depdata import getefadeps | |
from enum import Enum | |
from json import dumps, JSONEncoder | |
from sys import stderr | |
from time import sleep | |
from traceback import print_exc | |
app = Flask(__name__) | |
CORS(app) | |
#efaserver = 'https://openservice.vrr.de/vrr/XML_DM_REQUEST' | |
efaserver = 'https://efa.vrr.de/vrr/XML_DM_REQUEST' | |
servertimeout = 6 | |
removekeys = ("direction", "platformno_planned", "stopname", "headsign", | |
"color", "arrtime", "arrtime_planned", "disp_linenum") | |
replacements = { | |
"Stadtmitte/Volme Galerie": "Stadtmitte", | |
"Hbf": "Hagen Hbf", | |
"Hauptbahnhof": "Hagen Hbf", | |
"Ennepetal Busbahnhof": "Ennepetal Busbf", | |
"Breckerfeld Busbahnhof": "Breckerfeld Busbf", | |
"Breckerf. Wengeberg Penninckw.": "Breckerfeld-Wengeberg", | |
"Schwerte, Bahnhof": "Schwerte Bf", | |
"Essen Hauptbahnhof": "Essen Hbf", | |
"MG Hbf /Europaplatz": "Mönchengladbach Hbf", | |
"Kierspe, Feuerwehrgerätehaus": "Kierspe Feuerwehrgeräteh." | |
} | |
manydeps = {"de:05914:2003", "de:05914:2007", "de:05914:2008", "de:05914:2006", | |
"de:05914:2059", "de:05914:2060", "de:05914:2194", "de:05914:2020", | |
"de:05914:2016", "de:05914:2280", "de:05914:2083"} | |
limit_normal = 20 | |
limit_high = 30 | |
class CustomEncoder(JSONEncoder): | |
def default(self, o): | |
if isinstance(o, datetime): | |
return o.isoformat() | |
if isinstance(o, Enum): | |
return o.name | |
return JSONEncoder.default(self, o) | |
def cleandir(_d): | |
if _d.startswith("Hagen "): # fuer direction_planned | |
_d = _d[6:] | |
if _d in replacements: | |
_d = replacements[_d] | |
if "f." in _d: | |
_d = _d.replace("Hbf.", "Hbf").replace("Bf.", "Bf") | |
return _d | |
def cleandict(depd): | |
for k in removekeys: | |
depd.pop(k, None) | |
depd["disp_direction"] = cleandir(depd["disp_direction"]) | |
if depd["earlytermination"]: | |
depd["direction_planned"] = cleandir(depd["direction_planned"]) | |
else: | |
depd.pop("direction_planned", None) | |
return depd | |
@app.route("/") | |
def deps(): | |
if not "stopid" in request.args: | |
abort(500, description="no get parameter stopid") | |
ifopt = request.args.get("stopid") | |
_last_e = None | |
for _try in range(1,4): | |
try: | |
efa = getefadeps(efaserver, servertimeout, ifopt, limit_high if ifopt in manydeps else limit_normal, datetime.utcnow().astimezone().tzinfo) | |
break | |
except Exception as e: | |
sleep(0.3 * _try) | |
print(f"\n! {e.__class__.__name__}, try {_try}", file=stderr) | |
print_exc() | |
_last_e = e | |
else: | |
abort(500, description=f"EFA request: {_last_e.__class__.__name__}\n{_last_e}") | |
d = [cleandict(asdict(_dep)) for di, _dep in enumerate(efa[0]) if (di < 10 or _dep.disp_countdown <= 180)] | |
o = dumps(d, cls=CustomEncoder) | |
r = make_response(o) | |
r.mimetype = 'application/json' | |
return r | |
if __name__ == "__main__": | |
app.run(host='0.0.0.0', ssl_context=('fullchain.pem', 'privkey.pem')) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[Unit] | |
Description=depflask | |
[Service] | |
ExecStart=/bin/sh -c 'gunicorn -w8 --access-logfile - --certfile fullchain.pem --keyfile=privkey.pem --bind 0.0.0.0:8007 depflask:app' | |
Restart=always | |
User=root | |
Group=root | |
WorkingDirectory=/var/depflask | |
Environment="PYTHONUNBUFFERED=TRUE" | |
[Install] | |
WantedBy=multi-user.target |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
document.querySelectorAll("*").forEach(obj => { | |
if (!obj.style) return; | |
if (!!obj.style.opacity && obj.style.opacity == "1") obj.style.opacity = null; | |
if (!!obj.style.fill && obj.style.fill == "rgb(0, 0, 0)") obj.style.fill = null; | |
if (!!obj.style.fill && obj.style.fill == "none") obj.style.fillRule = null; | |
if (!!obj.style.fillRule && obj.style.fillRule == "nonzero") obj.style.fillRule = null; | |
if (!!obj.style.fillOpacity && obj.style.fillOpacity == "1") obj.style.fillOpacity = null; | |
if (!!obj.style.stroke && obj.style.stroke == "none") obj.style.stroke = null; | |
if (!!obj.style.strokeWidth && parseFloat(obj.style.strokeWidth) == 1) obj.style.strokeWidth = null; | |
if (!!obj.style.strokeOpacity && obj.style.strokeOpacity == "1") obj.style.strokeOpacity = null; | |
if (!!obj.style.strokeDashoffset && parseFloat(obj.style.strokeDashoffset) == 0) obj.style.strokeDashoffset = null; | |
if (!!obj.style.strokeDasharray && obj.style.strokeDasharray == "none") obj.style.strokeDasharray = null; | |
if (!!obj.style.strokeMiterlimit && obj.style.strokeMiterlimit == 4) obj.style.strokeMiterlimit = null; | |
if (!!obj.style.strokeLinecap && obj.style.strokeLinecap == "butt") obj.style.strokeLinecap = null; | |
if (!!obj.style.strokeLinejoin && obj.style.strokeLinejoin == "miter") obj.style.strokeLinejoin = null; | |
if (!!obj.style.display && obj.style.display == "inline") obj.style.display = null; | |
if (!!obj.attributes.style && obj.attributes.style.value.includes("-inkscape-font-specification")) | |
obj.attributes.style.value = | |
obj.attributes.style.value | |
.replace("-inkscape-font-specification:'Fira Sans';", "") | |
.replace("-inkscape-font-specification:'Fira Sans, Normal';", "") | |
.replace("-inkscape-font-specification:'Fira Sans Normal';", "") | |
.replace("-inkscape-font-specification:'Fira Sans Medium';", "") | |
.replace("-inkscape-font-specification:'Fira Sans Italic';", "") | |
.replace("-inkscape-font-specification:'Fira Sans Light Italic';", "") | |
.replace("-inkscape-font-specification:'Fira Sans Semi-Bold';", "") | |
.replace("-inkscape-font-specification:'Fira Sans Semi-Bold Italic';", "") | |
.replace("-inkscape-font-specification:'Fira Sans, Bold';", "") | |
.replace("-inkscape-font-specification:'Fira Sans Bold';", "") | |
.replace("-inkscape-font-specification:'Fira Sans'", "") | |
.replace("-inkscape-font-specification:'Fira Sans, Normal'", "") | |
.replace("-inkscape-font-specification:'Fira Sans Normal'", "") | |
.replace("-inkscape-font-specification:'Fira Sans Medium'", "") | |
.replace("-inkscape-font-specification:'Fira Sans Italic'", "") | |
.replace("-inkscape-font-specification:'Fira Sans Light Italic'", "") | |
.replace("-inkscape-font-specification:'Fira Sans Semi-Bold'", "") | |
.replace("-inkscape-font-specification:'Fira Sans Semi-Bold Italic'", "") | |
.replace("-inkscape-font-specification:'Fira Sans, Bold'", "") | |
.replace("-inkscape-font-specification:'Fira Sans Bold'", ""); | |
Array.from(obj.attributes).forEach(attr => { | |
if (attr.namespaceURI == "http://www.inkscape.org/namespaces/inkscape" || attr.namespaceURI == "http://sodipodi.sourceforge.net/DTD/sodipodi-0.dtd") { | |
obj.removeAttribute(attr.nodeName); | |
} | |
}); | |
if (obj.tagName == "tspan") return; | |
if (!!obj.style.fontStyle && obj.style.fontStyle == "normal") obj.style.fontStyle = null; | |
if (!!obj.style.fontVariant && obj.style.fontVariant == "normal") obj.style.fontVariant = null; | |
if (!!obj.style.fontWeight && obj.style.fontWeight == "normal") obj.style.fontWeight = null; | |
if (!!obj.style.fontStretch && obj.style.fontStretch == "normal") obj.style.fontStretch = null; | |
if (!!obj.style.writingMode && obj.style.writingMode == "horizontal-tb") obj.style.writingMode = null; | |
if (!!obj.style.textAnchor && obj.style.textAnchor == "start") obj.style.textAnchor = null; | |
}); | |
stylestrings = {} | |
document.querySelectorAll("*").forEach(obj => { | |
if (!obj.style || !obj.style.cssText || (obj.parentElement && obj.parentElement.classList.contains("bficon"))) return; | |
if (obj.style.cssText in stylestrings) stylestrings[obj.style.cssText].push(obj); | |
else stylestrings[obj.style.cssText] = [obj]; | |
}); | |
var svg = document.getElementById("svg2"); | |
var style = document.createElementNS(svg.namespaceURI, 'style'); | |
style.setAttribute("type", "text/css"); | |
svg.prepend(style); | |
i = 0; | |
csscontent = ""; | |
for (stylestring in stylestrings) { | |
elems = stylestrings[stylestring]; | |
if (elems.length < 3) continue; | |
classname = "_gen" + i; | |
csscontent += "."+classname+" {\n"+stylestring+"\n}\n\n"; | |
elems.forEach(obj => { | |
obj.classList.add(classname); | |
obj.removeAttribute("style"); | |
}); | |
i++; | |
} | |
var CDATASectionNode = document.createCDATASection(csscontent); | |
style.appendChild(CDATASectionNode); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment