Created
October 30, 2022 18:09
-
-
Save MaxAntony/7210a9fb60f9a247150afe2248b1e141 to your computer and use it in GitHub Desktop.
conectarse a telegram y encender pc via WOL en una raspberry pi pico w
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import struct | |
import socket | |
import network | |
import utime | |
import time | |
import gc | |
import ujson | |
import urequests | |
wifi_config = { | |
'ssid':'nombrewifi', | |
'password':'contrasenawifi' | |
} | |
utelegram_config = { | |
'token': '5601296679:tokenbot' | |
} | |
MAC = 'XX:XX:XX:XX:XX:XX' | |
BROADCASTADDRESS = '192.168.100.255' | |
class ubot: | |
def __init__(self, token, offset=0): | |
self.url = 'https://api.telegram.org/bot' + token | |
self.commands = {} | |
self.default_handler = None | |
self.message_offset = offset | |
self.sleep_btw_updates = 3 | |
messages = self.read_messages() | |
if messages: | |
if self.message_offset==0: | |
self.message_offset = messages[-1]['update_id'] | |
else: | |
for message in messages: | |
if message['update_id'] >= self.message_offset: | |
self.message_offset = message['update_id'] | |
break | |
def send(self, chat_id, text): | |
data = {'chat_id': chat_id, 'text': text} | |
try: | |
headers = {'Content-type': 'application/json', 'Accept': 'text/plain'} | |
response = urequests.post(self.url + '/sendMessage', json=data, headers=headers) | |
response.close() | |
return True | |
except: | |
return False | |
def read_messages(self): | |
result = [] | |
self.query_updates = { | |
'offset': self.message_offset + 1, | |
'limit': 1, | |
'timeout': 30, | |
'allowed_updates': ['message']} | |
try: | |
update_messages = urequests.post(self.url + '/getUpdates', json=self.query_updates).json() | |
if 'result' in update_messages: | |
for item in update_messages['result']: | |
result.append(item) | |
return result | |
except (ValueError): | |
return None | |
except (OSError): | |
print("OSError: request timed out") | |
return None | |
def listen(self): | |
while True: | |
self.read_once() | |
time.sleep(self.sleep_btw_updates) | |
gc.collect() | |
def read_once(self): | |
messages = self.read_messages() | |
if messages: | |
if self.message_offset==0: | |
self.message_offset = messages[-1]['update_id'] | |
self.message_handler(messages[-1]) | |
else: | |
for message in messages: | |
if message['update_id'] >= self.message_offset: | |
self.message_offset = message['update_id'] | |
self.message_handler(message) | |
break | |
def register(self, command, handler): | |
self.commands[command] = handler | |
def set_default_handler(self, handler): | |
self.default_handler = handler | |
def set_sleep_btw_updates(self, sleep_time): | |
self.sleep_btw_updates = sleep_time | |
def message_handler(self, message): | |
if 'text' in message['message']: | |
parts = message['message']['text'].split(' ') | |
if parts[0] in self.commands: | |
self.commands[parts[0]](message) | |
else: | |
if self.default_handler: | |
self.default_handler(message) | |
debug = True | |
sta_if = network.WLAN(network.STA_IF) | |
sta_if.active(True) | |
sta_if.scan() | |
sta_if.connect(wifi_config['ssid'], wifi_config['password']) | |
if debug: print('WAITING FOR NETWORK - sleep 20') | |
print(sta_if.isconnected()) | |
while(not sta_if.isconnected()): | |
print("red no disponible aun") | |
utime.sleep(1) | |
def get_message(message): | |
bot.send(message['message']['chat']['id'], message['message']['text'].upper()) | |
def reply_ping(message): | |
print(message) | |
bot.send(message['message']['chat']['id'], 'pong') | |
def sendWOL(): | |
PORT = '9' | |
wol_header = b'\xff'*6 | |
split_mac_address = MAC.split(':') | |
MSGw=struct.pack(b'!BBBBBB', | |
int(split_mac_address[0], 16), | |
int(split_mac_address[1], 16), | |
int(split_mac_address[2], 16), | |
int(split_mac_address[3], 16), | |
int(split_mac_address[4], 16), | |
int(split_mac_address[5], 16)) | |
MSG = wol_header+MSGw*16 | |
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
s.sendto(MSG, (BROADCASTADDRESS, 1926)) | |
def start_pc(message): | |
bot.send(message['message']['chat']['id'], 'Encendiendo la computadora') | |
for _ in range(5): | |
sendWOL() | |
bot.send(message['message']['chat']['id'],'Magic packet enviado') | |
gc.collect() | |
bot.send(message['message']['chat']['id'], 'finalizado envio de paquetes') | |
if sta_if.isconnected(): | |
print('conectando a telegram') | |
bot = ubot(utelegram_config['token']) | |
print('conexion a bot exitosa') | |
bot.register('/ping', reply_ping) | |
bot.register('/startpc', start_pc) | |
bot.set_default_handler(get_message) | |
print('BOT LISTENING') | |
bot.listen() | |
else: | |
print('NOT CONNECTED - aborting') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment