mcdreforged.minecraft.rcon.rcon_connection 源代码

Simple rcon client implement
import socket
import struct
import time
from logging import Logger
from threading import RLock
from typing import Optional

class PacketType:

class Packet:
	def __init__(self, packet_type=None, payload=None):
		self.packet_id = 0
		self.packet_type = packet_type
		self.payload = payload

	def flush(self):
		data = struct.pack('<ii', self.packet_id, self.packet_type) + bytes(self.payload + '\x00\x00', encoding='utf8')
		return struct.pack('<i', len(data)) + data

[文档] class RconConnection: """ A simply rcon client for connect to any Minecraft servers that supports rcon protocol """ BUFFER_SIZE = 2 ** 10
[文档] def __init__(self, address: str, port: int, password: str, *, logger: Optional[Logger] = None): """ Create a rcon client instance :param address: The address of the rcon server :param port: The port if the rcon server :param password: The password of the rcon connection :keyword logger: Optional, an instance of ``logging.Logger``. It's used to output some warning information like failing to receive a packet """ self.logger = logger self.address = address self.port = port self.password = password self.socket = None self.command_lock = RLock()
def __del__(self): self.disconnect() def __send(self, data): if type(data) is Packet: data = data.flush() self.socket.send(data) time.sleep(0.03) # MC-72390 def __receive(self, length): data = bytes() while len(data) < length: data += self.socket.recv(min(self.BUFFER_SIZE, length - len(data))) return data def __receive_packet(self): length = struct.unpack('<i', self.__receive(4))[0] data = self.__receive(length) packet = Packet() packet.packet_id = struct.unpack('<i', data[0:4])[0] packet.packet_type = struct.unpack('<i', data[4:8])[0] packet.payload = data[8:-2].decode('utf8') return packet
[文档] def connect(self) -> bool: """ Start a connection to the rcon server and try to log in :return: If connect and login success """ if self.socket is not None: try: self.disconnect() except Exception: pass self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.connect((self.address, self.port)) self.__send(Packet(PacketType.LOGIN_REQUEST, self.password)) success = self.__receive_packet().packet_id != PacketType.LOGIN_FAIL if not success: self.disconnect() return success
[文档] def disconnect(self): """ Disconnect from the server """ if self.socket is None: return self.socket.close() self.socket = None
[文档] def send_command(self, command: str, max_retry_time: int = 3) -> Optional[str]: """ Send a command to the rcon server :param command: The command you want to send to the server :param max_retry_time: The maximum retry time of the operation :return: The command execution result form the server, or None if *max_retry_time* retries exceeded """ with self.command_lock: for i in range(max_retry_time): try: self.__send(Packet(PacketType.COMMAND_REQUEST, command)) self.__send(Packet(PacketType.ENDING_PACKET, 'lol')) result = '' while True: packet = self.__receive_packet() if packet.payload == 'Unknown request {}'.format(hex(PacketType.ENDING_PACKET)[2:]): break result += packet.payload return result except Exception: if self.logger is not None: self.logger.warning('Rcon Fail to received packet') try: self.disconnect() if self.connect(): # next try continue except Exception: pass break return None
if __name__ == '__main__': rcon = RconConnection('localhost', 25575, 'password') print('Login success? ', rcon.connect()) while True: print('Server ->', rcon.send_command(input('Server <- ')))