Source code for znsocket.objects.segments_obj

import json
import typing as t
from collections.abc import MutableSequence

import redis

from znsocket.abc import (
    ZnSocketObject,
)
from znsocket.client import Client

if t.TYPE_CHECKING:
    from znsocket import List


[docs] class Segments(ZnSocketObject, MutableSequence): """Copy of a list object with piece table segments. This class implements a copy-on-write list using segment-based storage for efficiency. It's particularly useful for large objects where only small parts are modified, avoiding the need to copy the entire object. The data is stored in segments represented as tuples: (start, end, data), where start and end are the start and end indices of the part taken from the data list. Parameters ---------- r : Client or redis.Redis Connection to the server. origin : List The original List object to create segments from. key : str The key in the server to store the segments data. Raises ------ TypeError If origin is not a List object. Examples -------- >>> client = znsocket.Client("http://localhost:5000") >>> original_list = znsocket.List(client, "original") >>> original_list.extend([1, 2, 3, 4, 5]) >>> segments = znsocket.Segments(client, original_list, "segments") >>> len(segments) 5 >>> segments[0] 1 """ def __init__(self, r: Client | t.Any, origin: "List", key: str) -> None: from znsocket import List if not isinstance(origin, List): raise TypeError("origin must be a List") self.redis: redis.Redis = r self._origin = origin self._key = key self.converter = origin.converter self.convert_nan = origin.convert_nan # create a new segments list self.redis.lpush(self.key, json.dumps((0, len(origin), origin.key))) self._list = List( r=self.redis, key=key, converter=self.converter, convert_nan=self.convert_nan, ) @property def key(self) -> str: """The key in the server to store the data from this segments object. Returns ------- str The prefixed key used to store this segments object in the server. """ return f"znsocket.Segments:{self._key}"
[docs] def get_raw(self) -> t.Any: """Get the raw data of the segments list. Returns ------- list A list of segments, where each segment is a tuple of (start, end, data). """ # get all segments segments = self.redis.lrange(self.key, 0, -1) # decode the segments return [json.loads(segment) for segment in segments]
[docs] @classmethod def from_list(cls, origin: "List", key: str) -> "Segments": """Create a Segments object from a list. Parameters ---------- origin : List The original List object to create segments from. key : str The key in the server to store the segments data. Returns ------- Segments A new Segments object based on the original list. Raises ------ TypeError If origin is not a List object. """ from znsocket import List if not isinstance(origin, List): raise TypeError("origin must be a List") r = origin.redis return cls(r, origin, key)
def __len__(self) -> int: """Return the length of the segments list.""" # get all segments and sum their lengths segments = self.redis.lrange(self.key, 0, -1) length = 0 for segment in segments: segment = json.loads(segment) start, end, _ = segment length += end - start return length def __getitem__(self, index: int) -> t.Any: """Get an item from the segments list.""" from znsocket import List single_item = isinstance(index, int) if single_item: index = [index] # TODO: only quqery self.redis.lrange once with the length here as well if isinstance(index, slice): index = list(range(*index.indices(len(self)))) # convert any negative indices to positive length = len(self) index = [i + length if i < 0 else i for i in index] # get all segments segments = self.redis.lrange(self.key, 0, -1) items = [] size = 0 for segment in segments: segment = json.loads(segment) start, end, target = segment # TODO: converter and stuff lst = List( r=self.redis, key=target.split(":", 1)[1], converter=self.converter, convert_nan=self.convert_nan, ) # append to items in range for i in index: if size <= i < end - start + size: # get the item from the list items.append(lst[i - size + start]) size += end - start if single_item: return items[0] return items def __setitem__(self, index: int, value: t.Any) -> None: """Set an item in the segments list.""" # list to store the values at self._list.append(value) if index < 0: index += len(self) # TODO: update the segments, append / insert into the lst # [1, 2, 3, 4] -> |insert, x, 2| [1, 2, X, 3, 4] # (0, 3, target) -> (0, 2, target), (0, 1, new), (3, 4, target) # update the segments segments = self.redis.lrange(self.key, 0, -1) size = 0 for idx, segment in enumerate(segments): segment = json.loads(segment) start, end, target = segment if size <= index < end - start + size: pos_in_segment = index - size + start if target == self._list.key: self._list[pos_in_segment] = value else: # we are in the segment self.redis.lset(self.key, idx, "__SETITEM_IDENTIFIER__") if start < pos_in_segment: self.redis.linsert( self.key, "BEFORE", "__SETITEM_IDENTIFIER__", json.dumps((start, pos_in_segment, target)), ) self.redis.linsert( self.key, "BEFORE", "__SETITEM_IDENTIFIER__", json.dumps( (len(self._list) - 1, len(self._list), self._list.key) ), ) if pos_in_segment + 1 < end: self.redis.linsert( self.key, "BEFORE", "__SETITEM_IDENTIFIER__", json.dumps((pos_in_segment + 1, end, target)), ) self.redis.lrem(self.key, 0, "__SETITEM_IDENTIFIER__") return size += end - start raise IndexError("list index out of range") def __delitem__(self, index: int) -> None: if index < 0: index += len(self) segments = self.redis.lrange(self.key, 0, -1) size = 0 for idx, segment in enumerate(segments): segment = json.loads(segment) start, end, target = segment if size <= index < end - start + size: pos_in_segment = index - size + start self.redis.lset(self.key, idx, "__DELITEM_IDENTIFIER__") if start < pos_in_segment: self.redis.linsert( self.key, "BEFORE", "__DELITEM_IDENTIFIER__", json.dumps((start, pos_in_segment, target)), ) if pos_in_segment + 1 < end: self.redis.linsert( self.key, "BEFORE", "__DELITEM_IDENTIFIER__", json.dumps((pos_in_segment + 1, end, target)), ) self.redis.lrem(self.key, 0, "__DELITEM_IDENTIFIER__") return size += end - start raise IndexError("list index out of range")
[docs] def insert(self, index: int, value: t.Any) -> None: """Insert an item in the segments list.""" if index < 0: index += len(self) self._list.append(value) segments = self.redis.lrange(self.key, 0, -1) size = 0 for idx, segment in enumerate(segments): segment = json.loads(segment) start, end, target = segment if size <= index < end - start + size: pos_in_segment = index - size + start self.redis.lset(self.key, idx, "__INSERT_IDENTIFIER__") if start < pos_in_segment: self.redis.linsert( self.key, "BEFORE", "__INSERT_IDENTIFIER__", json.dumps((start, pos_in_segment, target)), ) self.redis.linsert( self.key, "BEFORE", "__INSERT_IDENTIFIER__", json.dumps((len(self._list) - 1, len(self._list), self._list.key)), ) if pos_in_segment < end: self.redis.linsert( self.key, "BEFORE", "__INSERT_IDENTIFIER__", json.dumps((pos_in_segment, end, target)), ) self.redis.lrem(self.key, 0, "__INSERT_IDENTIFIER__") return size += end - start if index == size: self.redis.rpush( self.key, json.dumps((len(self._list) - 1, len(self._list), self._list.key)), ) return raise IndexError("list index out of range")