I am new to socket programming so this might be a lack of my own understanding , but I am trying to create a clientless bot in python. The idea is to do this:
1. Login
2. Select char
3.Attack a training monk
4.Dance / Eat food
I want to do it off my raspberry pi.
Now I got the login and select char done fine. I can see my char login via my script and can also see incoming packets decrypted. I am catching ping request and responding to the server aswell. However after a while I receive a connection error:
[WinError 10053] An established connection was aborted by the software in your host machine
This maybe due to my own understanding of sockets I am currently working like this:
After choosing char open a socket to the gameword ip / port.
I listen on the socket on a loop for incoming data.
If there is data there I decrypt it.
For outgoing packets I do this:
If there is no data to receive on the socket I have a list with outgoing packets. I send the first item on this list , receive and reply then repeat until the outgoing packet list is empty.
Is this wrong? Should I be making a new socket for outgoing packets and keep one seperate for incomming packets?
Here is a video of my problem:
And here is the source code to my script (I know its not that great I was trying to get it working and to learn more about the protocol before clean up). I have left my ot login details in the script so you wont need to make a new char to test it.
Edit:
I have noticed that the error does not occur if I dont send packets to the server, I will experiment with new packet code for sending. I thought I had to keep the original listening socket for sending data , but now I think about it that could be wrong.
1. Login
2. Select char
3.Attack a training monk
4.Dance / Eat food
I want to do it off my raspberry pi.
Now I got the login and select char done fine. I can see my char login via my script and can also see incoming packets decrypted. I am catching ping request and responding to the server aswell. However after a while I receive a connection error:
[WinError 10053] An established connection was aborted by the software in your host machine
This maybe due to my own understanding of sockets I am currently working like this:
After choosing char open a socket to the gameword ip / port.
I listen on the socket on a loop for incoming data.
If there is data there I decrypt it.
For outgoing packets I do this:
If there is no data to receive on the socket I have a list with outgoing packets. I send the first item on this list , receive and reply then repeat until the outgoing packet list is empty.
Is this wrong? Should I be making a new socket for outgoing packets and keep one seperate for incomming packets?
Here is a video of my problem:
And here is the source code to my script (I know its not that great I was trying to get it working and to learn more about the protocol before clean up). I have left my ot login details in the script so you wont need to make a new char to test it.
Python:
#Tibia 8.60 clientless bot - DarkByte
import base64
import random
import socket
import struct
import zlib
#Default ot public rsa key used by most servers
ot_rsa = 109120132967399429278860960508995541528237502902798129123468757937266291492576446330739696001110603907230888610072655818825358503429057592827629436413108566029093628212635953836686562675849720620786279431090218017681061521755056710823876476444260558147179707119674283982419152118103759076030616683978566631413
class login_manager():
def __init__(self, rsa=ot_rsa):
self.RSA_KEY = rsa #Default is our ot_rsa
# self.xtea_key = bytes(random.randint(0,255) for i in range(16))
xtea_temp = "0bce009aa5178260cf6b6dd85611f51f"
self.xtea_key = bytes.fromhex(xtea_temp)
self.charlist = []
def add_packet_header(self,thepacket):
hex_len = (len(thepacket)+4) #Convert the length of the packet to its hex value , we add for the 4 adler bytes later on
ADLER_INT = zlib.adler32(thepacket) #Adler32 integer
result = struct.pack('=hI', len(thepacket)+4,ADLER_INT) #im learning struct this seems usefull.. h=short for the length var (we add 4 because the addler bytes are not added yet) I = integer for the addler digits
return result +thepacket
def EnterGameLoop(self,s,initialpacket):
#Enter the game
#
#We are now connect but need to handshake or we wont get pings to keep our connection alive
#GAMECLIENT1>( hex ) 03 00 98 09 00
#GAMECLIENT1>( hex )
#GAMECLIENT1>( hex ) 04 00 A0 01 00 01
send_queue = []
s.sendall(initialpacket)
#We are now connected... parse all packets as they come in
while True:
try:
s.settimeout(None) #No timeout
data = s.recv(1024) #Receave 1024 for any incoming packet (no packet should be more than this)
if data:
raw_data = (data)
encrypted_data = (data[6:])
decrypted_bytes = (self.xtea_decrypt(encrypted_data))
print(decrypted_bytes)
packetcode = decrypted_bytes[0]
print(packetcode)
if packetcode == 1:
print("Ping")
print("Server Says : ")
print(raw_data.hex())
thepacket = "01001e3333333333"
packetbytes = bytes.fromhex(thepacket)
packetbytes = self.xtea_encrypt(packetbytes)
print(packetbytes)
thepacket = self.add_packet_header(packetbytes)
print("Client Says : ")
print(thepacket.hex())
send_queue.append(thepacket)
else:
#no data parse the send queue
if len(send_queue) >0:
s.sendall(send_queue[0])
del send_queue[0]
except Exception as ex:
print(ex)
continue
def make_string_bytes(self,thestr):
#Make string format:
#bytearray
#1. Length of string
#2. 00 in hex (represents string
#3. The string
hex_len = (len(thestr)) #Convert the length of the string to its hex value
result = bytearray() #New byte array
result.append(hex_len) #Length of our string in hex
result.append(0) #Add a 0 (required by protocal)
b_array = bytearray(thestr.encode()) #put the string in a byte array
result.extend(b_array) #Add our byte string to the original bytearray with len and 00
return result
def xtea_encrypt(self,block,n=32,endian="="):
"""
Encrypt 64 bit data block using XTEA block cypher
* key = 128 bit (16 char)
* block = 64 bit (8 char)
* n = rounds (default 32)
* endian = byte order (see 'struct' doc - default big/network)
>>> z = xtea_encrypt('0123456789012345','ABCDEFGH')
>>> z.encode('hex')
'b67c01662ff6964a'
Only need to change byte order if sending/receiving from
alternative endian implementation
>>> z = xtea_encrypt('0123456789012345','ABCDEFGH',endian="<")
>>> z.encode('hex')
'ea0c3d7c1c22557f'
"""
v0,v1 = struct.unpack(endian+"2L",block)
k = struct.unpack(endian+"4L",self.xtea_key)
sum=0
delta = 0x9E3779B9
mask = 0xFFFFFFFF
for round in range(n):
v0 = (v0 + (((v1<<4 ^ v1>>5) + v1) ^ (sum + k[sum & 3]))) & mask
sum = (sum + delta) & mask
v1 = (v1 + (((v0<<4 ^ v0>>5) + v0) ^ (sum + k[sum>>11 & 3]))) & mask
return struct.pack(endian+"2L",v0,v1)
def xtea_decrypt_block(self, block):
v0, v1 = struct.unpack('=2I', block)
k = struct.unpack('=4I', self.xtea_key)
delta, mask, rounds = 0x9E3779B9, 0xFFFFFFFF, 32
sum = (delta * rounds) & mask
for round in range(rounds):
v1 = (v1 - (((v0 << 4 ^ v0 >> 5) + v0) ^ (sum + k[sum >> 11 & 3]))) & mask
sum = (sum - delta) & mask
v0 = (v0 - (((v1 << 4 ^ v1 >> 5) + v1) ^ (sum + k[sum & 3]))) & mask
return struct.pack('=2I', v0, v1)
def rsa_encrypt(self,thedata):
m = sum(x*pow(256, i) for i, x in enumerate(reversed(thedata)))
c = pow(m, 65537, self.RSA_KEY)
thedata = bytearray((c >> i) & 255 for i in reversed(range(0, 1024, 8)))
return thedata
def xtea_decrypt(self,thedata):
packet = b''.join(self.xtea_decrypt_block(thedata[i:i + 8]) for i in range(0, len(thedata), 8))
return packet
def pad_data(self,thepacket):
#Data for login / encryption must be 128bytes so if we are < this we can just pad random data
thepacket += bytearray(random.randint(0,255) for i in range(len(thepacket), 128))
print("rdd=" + str(len(thepacket)))
return (thepacket)
def handleEnterGamePackets(self,s):
print("MEGP")
headerSize = 6 #The recived header also contains our addler32 hash that tells us how big the rest of the packet data is
packetBytes = s.recv(headerSize) #Recive the header
complete_packetSize, adler32Checksum = struct.unpack('=HI', packetBytes)
packetBytes += s.recv(complete_packetSize-2)
loop_position=0
trimmed_bytes = packetBytes[2:]
print(trimmed_bytes.hex())
while loop_position < len(trimmed_bytes):
packetCode = trimmed_bytes[loop_position]
if packetCode == 31:
thepacket = self.createEnterGamePacket()
self.EnterGameLoop(s,thepacket)
break; #We are done here
loop_position = loop_position+1
def createEnterGamePacket(self):
print("Entering Game")
zero_byte = bytearray.fromhex("00") #used later
start_hex = "0A" #Packet id
start_hex = start_hex + "0200" #os (0200 = windows)
start_hex = start_hex + "5C03" #860 in hex (Version)
start_bytes = bytearray.fromhex(start_hex)
###Start Rsa Encrypted data:
enc_hex = zero_byte #Rsa always starts 00
key = self.xtea_key
enc_hex.extend(key)
enc_hex.append(0) #Rsa always starts 00 #Im using append because python kept repeating the entire xtea twice after this line if i use extend , i think its a python bug
acc_name_bytes = self.make_string_bytes(self.acc_name)
acc_char_bytes = self.make_string_bytes(self.char_name)
acc_pass_bytes = self.make_string_bytes(self.acc_password)
enc_hex.extend(acc_name_bytes)
enc_hex.extend(acc_char_bytes)
enc_hex.extend(acc_pass_bytes)
padded_data = self.pad_data(enc_hex) #Padded to 128bytes as required for rsa encrypt
encrypted_data = self.rsa_encrypt(padded_data)
start_bytes.extend(encrypted_data) #Add the encrypted data onto the none encrypted data
finaldata = start_bytes
finaldata = self.add_packet_header(finaldata)
return finaldata
def loginPacketHandler(self,s):
headerSize = 6 #The recived header also contains our addler32 hash that tells us how big the rest of the packet data is
packetBytes = s.recv(headerSize) #Recive the header
complete_packetSize, adler32Checksum = struct.unpack('=HI', packetBytes)
packetBytes += s.recv(complete_packetSize-2)
decrypted_bytes = (self.xtea_decrypt(packetBytes[headerSize:]))
print(decrypted_bytes)
if str(decrypted_bytes).find("Account name or password") >1: # Invalid password/username
print("Invalid Login data")
sys.exit()
#Valid login...
#Now we need to loop through each byte of our packet to look for certain bytes in chunks , these bytes act as a packetid
#and allow us to tell what each chunk the server is sending means for e.g packet id 20 us the server motd , 100 is the charlist ect
trimmed_bytes = decrypted_bytes[2:] #First 2 bytes are just packet length , means nothing to us at this point
loop_position =0
print(trimmed_bytes.hex())
while loop_position < len(trimmed_bytes):
# try:
packetCode = trimmed_bytes[loop_position]
print(packetCode)
if packetCode == 20:
#motd
motd_len = trimmed_bytes[loop_position+1] #The byte after our packet id is the length of our motd
motd = trimmed_bytes[loop_position+5:loop_position+3+motd_len]
print("Server Motd: " + (motd.decode()))
loop_position = motd_len +2 #Next loop position is just after out motd I add 1 because server motd always seems to have a 33(h) after it
if packetCode == 100:
#Char List
char_count = trimmed_bytes[loop_position+1] #The byte after our packet id is how many chars we have on our account
print("Getting information for {} chars".format(char_count))
char_position = loop_position+2 #Initial position is the byte after out char count
while len(self.charlist) < char_count: #Loop until we have all chars extracted
charname_len = (trimmed_bytes[char_position]) #The lenghth of this chars name
charname_start_pos = char_position +2 # There is always a 00 after our charname length , prob incase they ever wanted massive names for some reason skip it anyway
charname = trimmed_bytes[charname_start_pos:charname_start_pos+charname_len].decode()
char_position = charname_start_pos+charname_len #Reset char position to now be the end of our current chars name
#This is the worlds name , usually /"online" or "offline" on ot servers but we will get it anyway
world_name_len = (trimmed_bytes[char_position]) #The lenghth of the chars world name
world_name = (trimmed_bytes[char_position+2:char_position+2+world_name_len]).decode() #Get the world name
char_position = char_position+2+world_name_len #Set our char_position var to the end of world name
#Our next 6 bytes are our game worlds ip address and port , lets get the ip first:
ip_sub_1 = (trimmed_bytes[char_position]) # Part one of our ip adress
ip_sub_2 = (trimmed_bytes[char_position+1]) # Part two of our ip adress
ip_sub_3 = (trimmed_bytes[char_position+2]) # Part three of our ip adress
ip_sub_4 = (trimmed_bytes[char_position+3]) # Part four of our ip adress
server_ip_adress = "{}.{}.{}.{}".format(ip_sub_1,ip_sub_2,ip_sub_3,ip_sub_4)
char_position = char_position +4 # Skip the 4 bytes that made up our game world ip address
#This got messy , need a cleanup of the code here maybe look into struct
port1 = hex(trimmed_bytes[char_position]) #Get the first byte of our port
print(trimmed_bytes[char_position])
port_part1 = (port1[2:]) #Remove the 0x that shows before the port (as its a long)
port2 = hex(trimmed_bytes[char_position+1]) #Get the first byte of our port
port_part2 = (port2[2:]) #Remove the 0x that shows before the port (as its a long)
port = (int(port_part2 + port_part1,16))
char_position = char_position +2 # Skip the 2 bytes that made up our game port
loop_position = char_position
#Todo somewhere here...Get premium acc day I think its the byte im on tbh but the ot servers I play just leave u as a facc
self.char_name = charname
char_dict={"charname" : charname,
"world_name" : world_name,
"server_ip_adress" : server_ip_adress,
"port" : port,
}
self.charlist.append(char_dict)
# except:
# continue
loop_position = loop_position +1
def send_login_packet(self,acc_name,acc_password):
self.acc_name = acc_name
self.acc_password = acc_password
#This is the first packet that gets sent , it is rsa encrypted and contains the username/password for our account
start_hex = "01" #Packet id
start_hex = start_hex + "0200" #os (0200 = windows)
start_hex = start_hex + "5C03" #860 in hex (Version)
start_hex = start_hex + "ffda124e27db124ebf9c114e" #Tibia.spr , .dat and .pic signature should not need changing unless using a custom client
start_bytes = bytearray.fromhex(start_hex)
###Start Rsa Encryptes data:
enc_hex = bytearray.fromhex("00") #Rsa always starts 00
key = self.xtea_key
acc_name_bytes = self.make_string_bytes(acc_name)
acc_pass_bytes = self.make_string_bytes(acc_password)
enc_hex.extend(key)
enc_hex.extend(acc_name_bytes)
enc_hex.extend(acc_pass_bytes)
padded_data = self.pad_data(enc_hex) #Padded to 128bytes as required for rsa encrypt
encrypted_data = self.rsa_encrypt(padded_data)
start_bytes.extend(encrypted_data) #Add the encrypted data onto the none encrypted data
finaldata = start_bytes
finaldata = self.add_packet_header(finaldata)
with socket.socket() as c:
c.connect(('masiyah.se', 7171))
c.sendall(finaldata)
self.loginPacketHandler(c)
thenum = 0
print("Choose a char:")
for char in self.charlist:
thenum = thenum +1
print("{}. {}({})".format(thenum,char["charname"],char["world_name"]))
selection = input("Enter a number:")
selected_charname = (self.charlist[int(selection)-1]["charname"])
selected_server_ip_adress = (self.charlist[int(selection)-1]["server_ip_adress"])
selected_port = (self.charlist[int(selection)-1]["port"])
print("Logging in as {}".format(selected_charname))
with socket.socket() as c:# now connecting to game server to send packets
print(selected_port)
c.connect((selected_server_ip_adress, int(selected_port)))
self.handleEnterGamePackets(c)# We should get a challenge to login to char here somewhere
login_handler = login_manager()
acc_name = 'testbot'
acc_password = 'dreamscape21'
login_handler.send_login_packet(acc_name,acc_password)
Edit:
I have noticed that the error does not occur if I dont send packets to the server, I will experiment with new packet code for sending. I thought I had to keep the original listening socket for sending data , but now I think about it that could be wrong.
Last edited: