Last active
April 16, 2020 19:39
-
-
Save OrangeTux/a23aeddc10d780df55efe2d3bb7705cb to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from datetime import datetime | |
try: | |
import websockets | |
except ModuleNotFoundError: | |
print("This example relies on the 'websockets' package.") | |
print("Please install it by running: ") | |
print() | |
print(" $ pip install websockets") | |
import sys | |
sys.exit(1) | |
from ocpp.routing import on | |
from ocpp.v16 import ChargePoint as cp | |
from ocpp.v16.enums import Action, RegistrationStatus | |
from ocpp.v16 import call_result, call | |
class ChargePoint(cp): | |
@on(Action.BootNotification) | |
def on_boot_notitication(self, charge_point_vendor, charge_point_model, **kwargs): | |
return call_result.BootNotificationPayload( | |
current_time=datetime.utcnow().isoformat(), | |
interval=10, | |
status=RegistrationStatus.accepted | |
) | |
async def get_configuration(self): | |
response = await self.call(call.GetConfigurationPayload()) | |
# Setting is a dict with 3 keys: key, value and readonly. | |
# See section 7.29 in https://github.com/mobilityhouse/ocpp/blob/master/docs/v16/ocpp-1.6.pdf | |
for setting in response.configuration_key: | |
print(f"{setting['key']}: {setting['value']}") | |
async def on_connect(websocket, path): | |
""" For every new charge point that connects, create a ChargePoint instance | |
and start listening for messages. | |
""" | |
charge_point_id = path.strip('/') | |
cp = ChargePoint(charge_point_id, websocket) | |
await asyncio.gather(cp.start(), cp.get_configuration()) | |
async def main(): | |
server = await websockets.serve( | |
on_connect, | |
'0.0.0.0', | |
9000, | |
subprotocols=['ocpp1.6'] | |
) | |
await server.wait_closed() | |
if __name__ == '__main__': | |
try: | |
# asyncio.run() is used when running this example with Python 3.7 and | |
# higher. | |
asyncio.run(main()) | |
except AttributeError: | |
# For Python 3.6 a bit more code is required to run the main() task on | |
# an event loop. | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(main()) | |
loop.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment