Source code for znsocket.objects.list_obj

import typing as t
from collections.abc import MutableSequence

import redis.exceptions
import znjson

from znsocket.abc import (
    ListCallbackTypedDict,
    ListRepr,
    RefreshDataTypeDict,
    RefreshTypeDict,
    ZnSocketObject,
)
from znsocket.client import Client
from znsocket.exceptions import FrozenStorageError
from znsocket.utils import decode, encode, handle_error


# TODO: cache for self.key
def _used_fallback(self: "List") -> bool:
    result = int(self.redis.llen(self.key))
    # I don't know
    # if result == 0 and self._adapter_available:
    #     result = int(
    #         self.socket.call("adapter:get", key=self.key, method="__len__")
    #     )
    if result == 0 and self.fallback is not None:
        return True
    return False


[docs] class List(MutableSequence, ZnSocketObject): def __init__( self, r: Client | t.Any, key: str, socket: Client | None = None, callbacks: ListCallbackTypedDict | None = None, repr_type: ListRepr = "length", converter: list[t.Type[znjson.ConverterBase]] | None = None, convert_nan: bool = False, fallback: str | None = None, fallback_policy: t.Literal["copy", "frozen"] | None = None, ): """Synchronized list object. The content of this list is stored/read from the server. The data is not stored in this object at all, making it suitable for distributed applications and real-time synchronization. Parameters ---------- r : znsocket.Client or redis.Redis Connection to the server. key : str The key in the server to store the data from this list. socket : znsocket.Client, optional Socket connection for callbacks. If None, the connection from `r` will be used if it is a Client. callbacks : dict[str, Callable], optional Optional function callbacks for methods which modify the database. repr_type : {'length', 'minimal', 'full'}, optional Control the `repr` appearance of the object. Reduce for better performance. Default is 'length'. converter : list[znjson.ConverterBase], optional Optional list of znjson converters to use for encoding/decoding the data. convert_nan : bool, optional Convert NaN and Infinity to None. Both are not native JSON values and cannot be encoded/decoded. Default is False. fallback : str, optional The key of a fallback list to use if this list is empty. fallback_policy : {'copy', 'frozen'}, optional The policy to use for the fallback list. 'copy': Copy the fallback list to this list on initialization. 'frozen': Use the fallback list as a read-only source of data. Examples -------- >>> client = znsocket.Client("http://localhost:5000") >>> my_list = znsocket.List(client, "my_list") >>> my_list.append("item1") >>> my_list.append("item2") >>> len(my_list) 2 >>> my_list[0] 'item1' """ self.redis = r self._key = key self.repr_type = repr_type self.socket = socket if socket else (r if isinstance(r, Client) else None) self.converter = converter self._on_refresh = lambda x: None self.convert_nan = convert_nan self.fallback = fallback self.fallback_policy = fallback_policy if isinstance(r, Client): self._pipeline_kwargs = {} else: self._pipeline_kwargs = {} self._callbacks = { "setitem": None, "delitem": None, "insert": None, "append": None, } if callbacks: self._callbacks.update(callbacks) self._adapter_available = False if self.socket is not None: # check from the server if the adapter is available self._adapter_available = self.socket.call("check_adapter", key=self.key) # If fallback policy is "copy" and the list is empty, copy from fallback if ( self.fallback is not None and self.fallback_policy == "copy" and int(self.redis.llen(self.key)) == 0 and not self._adapter_available ): fallback_lst = type(self)( r=self.redis, key=self.fallback, socket=self.socket, convert_nan=self.convert_nan, converter=self.converter, ) if len(fallback_lst) > 0: # here we use the internal key, because we create a new list object. fallback_lst.copy(self._key) @property def key(self) -> str: """The key of the list in the server. Returns ------- str The prefixed key used to store this list in the server. """ return f"znsocket.List:{self._key}" def __len__(self) -> int: result = int(self.redis.llen(self.key)) if result == 0 and self._adapter_available: result = int( self.socket.call("adapter:get", key=self.key, method="__len__") ) if ( result == 0 and self.fallback is not None and self.fallback_policy is not None and self.fallback_policy != "copy" ): # Use fallback for length if policy is "frozen" or other non-copy policies # If policy is "copy", data should have been copied during initialization fallback_lst = type(self)( r=self.redis, key=self.fallback, socket=self.socket, convert_nan=self.convert_nan, ) result = len(fallback_lst) return result def __getitem__(self, index: int | list | slice) -> t.Any | list[t.Any]: # noqa C901 from znsocket.objects.dict_obj import Dict single_item = isinstance(index, int) original_slice = isinstance(index, slice) if single_item: index = [index] elif original_slice: # If we have an adapter and it's a slice, use the efficient slice method if self._adapter_available: start, stop, step = index.indices(len(self)) adapter_values = self.socket.call( "adapter:get", key=self.key, method="slice", start=start, stop=stop, step=step, ) items = [] for value in adapter_values: item = decode(self, value) if isinstance(item, str): if item.startswith("znsocket.List:"): key = item.split(":", 1)[1] item = List(r=self.redis, key=key) elif item.startswith("znsocket.Dict:"): key = item.split(":", 1)[1] item = Dict(r=self.redis, key=key) items.append(item) return items else: # Fallback to individual index calls for non-adapter slices index = list(range(*index.indices(len(self)))) pipeline = self.redis.pipeline(**self._pipeline_kwargs) for i in index: pipeline.lindex(self.key, i) data = pipeline.execute() items = [] for idx, value in zip(index, data): if value is None: if self._adapter_available: value = self.socket.call( "adapter:get", key=self.key, method="__getitem__", index=idx, ) if ( value is None and self.fallback is not None and self.fallback_policy != "copy" ): # Only use fallback for item access if policy is not "copy" # If policy is "copy", data should have been copied during initialization fallback_lst = type(self)( r=self.redis, key=self.fallback, socket=self.socket, convert_nan=self.convert_nan, converter=self.converter, ) try: value = fallback_lst[idx] value = encode(self, value) except IndexError: # Fallback doesn't have this index either pass if value is None: # check if the value is still None raise IndexError("list index out of range") item = decode(self, value) if isinstance(item, str): if item.startswith("znsocket.List:"): key = item.split(":", 1)[1] item = List(r=self.redis, key=key) elif item.startswith("znsocket.Dict:"): key = item.split(":", 1)[1] item = Dict(r=self.redis, key=key) items.append(item) return items[0] if single_item else items def __setitem__(self, index: int | list | slice, value: t.Any) -> None: # noqa C901 from znsocket.objects.dict_obj import Dict if self._adapter_available: raise FrozenStorageError(key=self.key) single_item = isinstance(index, int) if single_item: index = [index] value = [value] LENGTH = len(self) if isinstance(index, slice): index = list(range(*index.indices(LENGTH))) if any(not isinstance(i, int) for i in index): raise TypeError("list indices must be integers or slices") if len(index) != len(value): raise ValueError( f"attempt to assign sequence of size {len(value)} to extended slice of size {len(index)}" ) pipeline = self.redis.pipeline(**self._pipeline_kwargs) for i, v in zip(index, value): if i >= LENGTH or i < -LENGTH: raise IndexError("list index out of range") if isinstance(v, Dict): v = v.key if isinstance(v, List): if value.key == self.key: raise ValueError("Can not set circular reference to self") v = v.key pipeline.lset(self.key, i, encode(self, v)) pipeline.execute() if callback := self._callbacks["setitem"]: callback(index, value) if self.socket is not None: refresh: RefreshTypeDict = {"indices": index} refresh_data: RefreshDataTypeDict = {"target": self.key, "data": refresh} self.socket.sio.emit("refresh", refresh_data, namespace="/znsocket") def __delitem__(self, index: int | list | slice) -> None: if self._adapter_available: raise FrozenStorageError(key=self.key) single_item = isinstance(index, int) if single_item: index = [index] if isinstance(index, slice): index = list(range(*index.indices(len(self)))) if len(index) == 0: return # nothing to delete pipeline = self.redis.pipeline(**self._pipeline_kwargs) for i in index: pipeline.lset(self.key, i, "__DELETED__") pipeline.lrem(self.key, 0, "__DELETED__") try: pipeline.execute() except redis.exceptions.ResponseError: raise IndexError("list index out of range") if self._callbacks["delitem"]: self._callbacks["delitem"](index) if self.socket is not None: refresh: RefreshTypeDict = {"start": min(index), "stop": None} refresh_data: RefreshDataTypeDict = {"target": self.key, "data": refresh} self.socket.sio.emit("refresh", refresh_data, namespace="/znsocket")
[docs] def insert(self, index: int, value: t.Any) -> None: from znsocket.objects.dict_obj import Dict if self._adapter_available: raise FrozenStorageError(key=self.key) if isinstance(value, Dict): value = value.key if isinstance(value, List): if value.key == self.key: raise ValueError("Can not set circular reference to self") value = value.key if index >= len(self): self.redis.rpush(self.key, encode(self, value)) elif index == 0: self.redis.lpush(self.key, encode(self, value)) else: pivot = self.redis.lindex(self.key, index) self.redis.linsert(self.key, "BEFORE", pivot, encode(self, value)) if callback := self._callbacks["insert"]: callback(index, value) if self.socket is not None: refresh: RefreshTypeDict = {"start": index, "stop": None} refresh_data: RefreshDataTypeDict = {"target": self.key, "data": refresh} self.socket.sio.emit("refresh", refresh_data, namespace="/znsocket")
def __eq__(self, value: object) -> bool: if isinstance(value, List): return self[:] == value[:] elif isinstance(value, list): return self[:] == value return False def __repr__(self) -> str: if self.repr_type == "length": return f"List(len={len(self)})" elif self.repr_type == "minimal": return "List(<unknown>)" elif self.repr_type == "full": data = self.redis.lrange(self.key, 0, -1) data = [decode(self, i) for i in data] return f"List({data})" else: raise ValueError(f"Invalid repr_type: {self.repr_type}")
[docs] def append(self, value: t.Any) -> None: """Append an item to the end of the list. Parameters ---------- value : Any The item to append to the list. Raises ------ FrozenStorageError If the list is backed by an adapter and is read-only. ValueError If attempting to create a circular reference to self. Examples -------- >>> my_list = znsocket.List(client, "my_list") >>> my_list.append("new_item") >>> my_list[-1] 'new_item' """ from znsocket.objects.dict_obj import Dict if self._adapter_available: raise FrozenStorageError(key=self.key) # check if the list has a fallback option and it would go to the fallback list # For frozen policy, we need to copy fallback data before modifying if list is empty if ( self.fallback is not None and self.fallback_policy is not None and int(self.redis.llen(self.key)) == 0 and not self._adapter_available ): fallback_lst = type(self)( r=self.redis, key=self.fallback, socket=self.socket, convert_nan=self.convert_nan, converter=self.converter, ) if len(fallback_lst) > 0: # here we use the internal key, because we create a new list object. fallback_lst.copy(self._key) if callback := self._callbacks["append"]: callback(value) if isinstance(value, Dict): value = value.key if isinstance(value, List): if value.key == self.key: raise ValueError("Can not set circular reference to self") value = value.key self.redis.rpush(self.key, encode(self, value)) if self.socket is not None: refresh: RefreshTypeDict = {"indices": [len(self) - 1]} refresh_data: RefreshDataTypeDict = {"target": self.key, "data": refresh} self.socket.sio.emit("refresh", refresh_data, namespace="/znsocket")
[docs] def extend(self, values: t.Iterable) -> None: """Extend the list with an iterable using redis pipelines.""" from znsocket.objects.dict_obj import Dict if self._adapter_available: raise FrozenStorageError(key=self.key) if self.socket is not None: refresh: RefreshTypeDict = {"start": len(self), "stop": None} pipe = self.redis.pipeline(**self._pipeline_kwargs) for value in values: if isinstance(value, Dict): value = value.key if isinstance(value, List): if value.key == self.key: raise ValueError("Can not set circular reference to self") value = value.key pipe.rpush(self.key, encode(self, value)) pipe.execute() if self.socket is not None: refresh_data: RefreshDataTypeDict = {"target": self.key, "data": refresh} self.socket.sio.emit("refresh", refresh_data, namespace="/znsocket")
[docs] def pop(self, index: int = -1) -> t.Any: """Pop an item from the list.""" if self._adapter_available: raise FrozenStorageError(key=self.key) if index < 0: index = len(self) + index value = self.redis.lindex(self.key, index) if value is None: raise IndexError("pop index out of range") pipeline = self.redis.pipeline(**self._pipeline_kwargs) pipeline.lset(self.key, index, "__DELETED__") pipeline.lrem(self.key, 0, "__DELETED__") try: pipeline.execute() except redis.exceptions.ResponseError: raise IndexError("pop index out of range") if self.socket is not None: refresh: RefreshTypeDict = {"start": index, "stop": None} refresh_data: RefreshDataTypeDict = {"target": self.key, "data": refresh} self.socket.sio.emit("refresh", refresh_data, namespace="/znsocket") return decode(self, value)
[docs] def copy(self, key: str) -> "List": """Copy the list to a new key. Creates a new list with the same content but under a different key. This operation does not trigger any callbacks as the original data is not modified. Parameters ---------- key : str The new key for the copied list. Returns ------- List A new List instance pointing to the copied data. Raises ------ ValueError If the copy operation fails. Examples -------- >>> original_list = znsocket.List(client, "original") >>> original_list.extend([1, 2, 3]) >>> copied_list = original_list.copy("copied") >>> len(copied_list) 3 """ # TODO!! currently, it is not possible to do copy(list.key) # because it will be prefixed with "znsocket.List:" twice! # The same is true for the Dict object. if self._adapter_available: success = self.socket.call( "adapter:get", method="copy", key=self.key, target=key, ) handle_error(success) elif not self.redis.copy(self.key, f"znsocket.List:{key}"): raise ValueError("Could not copy list") return List( r=self.redis, key=key, socket=self.socket, converter=self.converter, convert_nan=self.convert_nan, )
def on_refresh(self, callback: t.Callable[[RefreshDataTypeDict], None]) -> None: if self.socket is None: raise ValueError("No socket connection available") self.socket.refresh_callbacks[self.key] = callback