Created
November 29, 2010 18:30
-
-
Save l1am9111/720332 to your computer and use it in GitHub Desktop.
SA-MP Query class for python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from samp import Query, Rcon | |
import sys | |
ip = "127.0.0.1" | |
port = 7777 | |
if len(sys.argv) >= 3: | |
ip = str(sys.argv[1]) | |
port = int(sys.argv[2]) | |
query = Query(ip, port) | |
info = query.GetInformation() | |
print info | |
print query.GetRules() | |
if info['players'] <= 100: | |
print query.GetPlayers() | |
print query.GetDetailedPlayers() | |
else: print 'can\' get players because players is above 100' | |
print query.Ping() | |
query.Close() | |
rcon = Rcon(ip, port, "password") | |
output = "\n".join(["%s" % line for line in rcon.Send("cmdlist")]) | |
print output | |
rcon.Close() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from samp import Query, Rcon | |
ip = raw_input('ip? ') | |
port = int(raw_input('port? ')) | |
password = raw_input('password? ') | |
rcon = Rcon(ip, port, password) | |
while 1: | |
try: | |
command = raw_input('> ') | |
print "\n".join(["%s" % line for line in rcon.Send(command)]) | |
except KeyboardInterrupt: | |
rcon.Close() | |
break; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# SA-MP query by Killerkid | |
import socket, struct, random, datetime | |
from cStringIO import StringIO | |
class Query: | |
def __init__(self, ip, port): | |
self.ip, self.port = socket.gethostbyname(ip), port | |
self.data = StringIO("") | |
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
self.sock.connect((ip, port)) | |
self.sock.settimeout(1) | |
def CreatePacket(self, opcode): | |
ips = self.ip.split('.'); | |
packet = "SAMP{0}{1}{2}{3}{4}{5}{6}".format(chr(int(ips[0])), chr(int(ips[1])), chr(int(ips[2])), chr(int(ips[3])), chr(self.port & 0xFF), chr(self.port >> 8 & 0xFF), opcode) | |
if opcode == 'p': | |
packet += struct.pack("BBBB", random.randint(0, 255), random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)) | |
return packet | |
def GetInformation(self): | |
try: | |
self.sock.send(self.CreatePacket('i')) | |
info = {} | |
self.data = StringIO(self.sock.recv(2048)) | |
self.data.read(11) | |
info['passworded'] = struct.unpack('?', self.data.read(1))[0] | |
info['players'] = struct.unpack('h', self.data.read(2))[0] | |
info['maxplayers'] = struct.unpack('h', self.data.read(2))[0] | |
info['hostname'] = self.data.read(struct.unpack('i', self.data.read(4))[0]) | |
info['gamemode'] = self.data.read(struct.unpack('i', self.data.read(4))[0]) | |
info['mapname'] = self.data.read(struct.unpack('i', self.data.read(4))[0]) | |
except socket.timeout: | |
info['error'] = 1 | |
return info | |
def GetRules(self): | |
try: | |
self.sock.send(self.CreatePacket('r')) | |
rules = {} | |
self.data = StringIO(self.sock.recv(2048)) | |
self.data.read(11) | |
rulecount = struct.unpack('h', self.data.read(2))[0] | |
for i in range(rulecount): | |
name = self.data.read(struct.unpack('b', self.data.read(1))[0]) | |
rules[name] = self.data.read(struct.unpack('b', self.data.read(1))[0]) | |
except socket.timeout: | |
rules['error'] = 1 | |
return rules | |
def GetPlayers(self): | |
try: | |
self.sock.send(self.CreatePacket('c')) | |
players = [] | |
self.data = StringIO(self.sock.recv(2048)) | |
self.data.read(11) | |
playercount = struct.unpack('h', self.data.read(2))[0] | |
for i in range(playercount): | |
name = self.data.read(struct.unpack('b', self.data.read(1))[0]) | |
players.append([name, struct.unpack('i', self.data.read(4))[0]]) | |
except socket.timeout: | |
players = {'error': 1} | |
return players | |
def GetDetailedPlayers(self): | |
try: | |
self.sock.send(self.CreatePacket('d')) | |
players = [] | |
self.data = StringIO(self.sock.recv(2048)) | |
self.data.read(11) | |
playercount = struct.unpack('h', self.data.read(2))[0] | |
for i in range(playercount): | |
playerid = struct.unpack('b', self.data.read(1))[0] | |
name = self.data.read(struct.unpack('b', self.data.read(1))[0]) | |
score = struct.unpack('i', self.data.read(4))[0] | |
ping = struct.unpack('i', self.data.read(4))[0] | |
players.append([playerid, name, score, ping]) | |
except socket.timeout: | |
players = {'error': 1} | |
return players | |
def Close(self): | |
self.sock.close() | |
def Ping(self): | |
packet = self.CreatePacket('p') | |
a = datetime.datetime.now() | |
self.sock.send(packet) | |
self.sock.recv(2048) | |
b = datetime.datetime.now() | |
c = b - a | |
return int((c.days * 24 * 60 * 60 + c.seconds) * 1000 + c.microseconds / 1000.0) | |
class Rcon: | |
def __init__(self, ip, port, password): | |
self.ip, self.port, self.password = socket.gethostbyname(ip), port, password | |
self.data = StringIO("") | |
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
self.sock.connect((ip, port)) | |
self.sock.settimeout(0.5) | |
def CreatePacket(self, opcode, password, command): | |
ips = self.ip.split('.'); | |
packet = "SAMP{0}{1}{2}{3}{4}{5}{6}{7}{8}{9}{10}{11}{12}".format(chr(int(ips[0])), chr(int(ips[1])), chr(int(ips[2])), chr(int(ips[3])), chr(self.port & 0xFF), chr(self.port >> 8 & 0xFF), opcode, chr(len(password) & 0xFF), chr(len(password) >> 8 & 0xFF), password, chr(len(command) & 0xFF), chr(len(command) >> 8 & 0xFF), command) | |
return packet | |
def Send(self, command): | |
self.sock.send(self.CreatePacket('x', self.password, command)) | |
output = [] | |
while 1: | |
try: | |
self.data = StringIO(self.sock.recv(2048)) | |
self.data.read(11) | |
strlen = struct.unpack('h', self.data.read(2))[0] | |
if strlen == 0: break | |
output += [self.data.read(strlen)] | |
except: break; | |
return output | |
def Close(self): | |
self.sock.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment