Source code for eolib.data.eo_reader

from typing import Optional
from eolib.data.number_encoding_utils import decode_number
from eolib.data.string_encoding_utils import decode_string


[docs] class EoReader(object): """ A class for reading EO data from a sequence of bytes. ``EoReader`` features a chunked reading mode, which is important for accurate emulation of the official game client. See the `chunked reading documentation <https://github.com/Cirras/eo-protocol/blob/master/docs/chunks.md>`_ for more information. """ _data: memoryview _position: int _chunked_reading_mode: bool _chunk_start: int _next_break: int
[docs] def __init__(self, data: bytes): """ Creates a new `EoReader` instance for the specified data. Args: data (bytes): The byte array containing the input data. """ self._data = memoryview(data) self._position = 0 self._chunked_reading_mode = False self._chunk_start = 0 self._next_break = -1
[docs] def slice(self, index: Optional[int] = None, length: Optional[int] = None) -> "EoReader": """ Creates a new `EoReader` whose input data is a shared subsequence of this reader's data. The input data of the new reader will start at position `index` in this reader and contain up to `length` bytes. The two reader's position and chunked reading mode will be independent. The new reader's position will be zero, and its chunked reading mode will be false. Args: index (int, optional): The position in this reader at which the data of the new reader will start; must be non-negative. Defaults to the current reader position. length (int, optional): The length of the shared subsequence of data to supply to the new reader; must be non-negative. Defaults to the length of the remaining data starting from `index`. Returns: The new reader. Raises: ValueError: If `index` or `length` is negative. """ if index is None: index = self.position if length is None: length = max(0, len(self._data) - index) if index < 0: raise ValueError(f"negative index: {index}") if length < 0: raise ValueError(f"negative length: {length}") begin = max(0, min(len(self._data), index)) end = begin + min(len(self._data) - begin, length) return EoReader(self._data[begin:end])
[docs] def get_byte(self) -> int: """ Reads a raw byte from the input data. Returns: A raw byte. """ return self._read_byte()
[docs] def get_bytes(self, length: int) -> bytearray: """ Reads an array of raw bytes from the input data. Args: length (int): The number of bytes to read. Returns: An array of raw bytes. """ return self._read_bytes(length)
[docs] def get_char(self) -> int: """ Reads an encoded 1-byte integer from the input data. Returns: A decoded 1-byte integer. """ return decode_number(self._read_bytes(1))
[docs] def get_short(self) -> int: """ Reads an encoded 2-byte integer from the input data. Returns: A decoded 2-byte integer. """ return decode_number(self._read_bytes(2))
[docs] def get_three(self) -> int: """ Reads an encoded 3-byte integer from the input data. Returns: A decoded 3-byte integer. """ return decode_number(self._read_bytes(3))
[docs] def get_int(self) -> int: """ Reads an encoded 4-byte integer from the input data. Returns: A decoded 4-byte integer. """ return decode_number(self._read_bytes(4))
[docs] def get_string(self) -> str: """ Reads a string from the input data. Returns: A string. """ string_bytes = self._read_bytes(self.remaining) return self._decode_ansi(string_bytes)
[docs] def get_fixed_string(self, length: int, padded: bool = False) -> str: """ Reads a string with a fixed length from the input data. Args: length (int): The length of the string. padded (bool, optional): True if the string is padded with trailing `0xFF` bytes. Returns: A decoded string. Raises: ValueError: If the length is negative. """ if length < 0: raise ValueError("Negative length") bytes_ = self._read_bytes(length) if padded: bytes_ = self._remove_padding(bytes_) return self._decode_ansi(bytes_)
[docs] def get_encoded_string(self) -> str: """ Reads an encoded string from the input data. Returns: A decoded string. """ bytes_ = self._read_bytes(self.remaining) decode_string(bytes_) return self._decode_ansi(bytes_)
[docs] def get_fixed_encoded_string(self, length: int, padded: bool = False) -> str: """ Reads an encoded string with a fixed length from the input data. Args: length (int): The length of the string. padded (bool, optional): True if the string is padded with trailing `0xFF` bytes. Returns: A decoded string. Raises: ValueError: If the length is negative. """ if length < 0: raise ValueError("Negative length") bytes_ = self._read_bytes(length) decode_string(bytes_) if padded: bytes_ = self._remove_padding(bytes_) return self._decode_ansi(bytes_)
@property def chunked_reading_mode(self) -> bool: """ Gets or sets the chunked reading mode for the reader. In chunked reading mode: - The reader will treat ``0xFF`` bytes as the end of the current chunk. - :meth:`next_chunk` can be called to move to the next chunk. """ return self._chunked_reading_mode @chunked_reading_mode.setter def chunked_reading_mode(self, chunked_reading_mode: bool) -> None: self._chunked_reading_mode = chunked_reading_mode if self._next_break == -1: self._next_break = self._find_next_break_index() @property def remaining(self) -> int: """ If chunked reading mode is enabled, gets the number of bytes remaining in the current chunk. Otherwise, gets the total number of bytes remaining in the input data. """ if self.chunked_reading_mode: return self._next_break - min(self.position, self._next_break) else: return len(self._data) - self.position
[docs] def next_chunk(self) -> None: """ Moves the reader position to the start of the next chunk in the input data. Raises: RuntimeError: If not in chunked reading mode. """ if not self.chunked_reading_mode: raise RuntimeError("Not in chunked reading mode.") self._position = self._next_break if self._position < len(self._data): # Skip the break byte self._position += 1 self._chunk_start = self._position self._next_break = self._find_next_break_index()
@property def position(self) -> int: """ Gets the current position in the input data. """ return self._position def _read_byte(self) -> int: """ Reads a raw byte from the input data. Returns: A raw byte. """ if self.remaining > 0: byte = self._data[self._position] self._position += 1 return byte return 0 def _read_bytes(self, length) -> bytearray: """ Reads an array of raw bytes from the input data. Args: length (int): The number of bytes to read. Returns: An array of raw bytes. """ length = min(length, self.remaining) result = bytearray(self._data[self._position : self._position + length]) self._position += length return result def _find_next_break_index(self) -> int: """ Finds the index of the next break byte (0xFF) in the input data. Returns: The index of the next break byte, or the length of the data if not found. """ for i in range(self._chunk_start, len(self._data)): if self._data[i] == 0xFF: return i return len(self._data) @staticmethod def _remove_padding(array: bytearray) -> bytearray: """ Removes padding (trailing 0xFF bytes) from a sequence of bytes. Args: array (bytearray): The sequence of bytes. Returns: The bytes without padding. """ padding_start = array.find(bytes([0xFF])) if padding_start != -1: return array[:padding_start] return array @staticmethod def _decode_ansi(bytes: bytearray) -> str: """ Decodes windows-1252 bytes to a string. Args: bytes (bytearray): The sequence of bytes to decode. Returns: The decoded string. """ return bytes.decode('windows-1252', 'replace')