Source code for znsocket.objects.dict_obj

import typing as t
from collections.abc import MutableMapping

import znjson

from znsocket.abc import (
    DictCallbackTypedDict,
    DictRepr,
    RefreshDataTypeDict,
    RefreshTypeDict,
    ZnSocketObject,
)
from znsocket.client import Client
from znsocket.utils import decode, encode, handle_error


[docs] class Dict(MutableMapping, ZnSocketObject): def __init__( self, r: Client | t.Any, key: str, socket: Client | None = None, callbacks: DictCallbackTypedDict | None = None, repr_type: DictRepr = "keys", converter: list[t.Type[znjson.ConverterBase]] | None = None, convert_nan: bool = False, fallback: str | None = None, fallback_policy: t.Literal["copy", "frozen"] | None = None, ): """Synchronized dict object. The content of this dict is stored/read from the server. The data is not stored in this object at all, making it suitable for distributed applications and real-time synchronization. Parameters ---------- r : znsocket.Client or redis.Redis Connection to the server. key : str The key in the server to store the data from this dict. socket : znsocket.Client, optional Socket connection for callbacks. If None, the connection from `r` will be used if it is a Client. callbacks : dict[str, Callable], optional Optional function callbacks for methods which modify the database. repr_type : {'keys', 'minimal', 'full'}, optional Control the `repr` appearance of the object. Reduce for better performance. Default is 'keys'. converter : list[znjson.ConverterBase], optional Optional list of znjson converters to use for encoding/decoding the data. convert_nan : bool, optional Convert NaN and Infinity to None. Both are not native JSON values and cannot be encoded/decoded. Default is False. fallback : str, optional The key of a fallback dict to use if this dict is empty. fallback_policy : {'copy', 'frozen'}, optional The policy to use for the fallback dict. 'copy': Copy the fallback dict to this dict on initialization. 'frozen': Use the fallback dict as a read-only source of data. fallback : str, optional The key of a fallback dict to use if this dict is empty. fallback_policy : {'copy', 'frozen'}, optional The policy to use for the fallback dict. 'copy': Copy the fallback dict to this dict on initialization. 'frozen': Use the fallback dict as a read-only source of data. Examples -------- >>> client = znsocket.Client("http://localhost:5000") >>> my_dict = znsocket.Dict(client, "my_dict") >>> my_dict["key1"] = "value1" >>> my_dict["key2"] = "value2" >>> len(my_dict) 2 >>> my_dict["key1"] 'value1' """ self.redis = r self.socket = socket if socket else (r if isinstance(r, Client) else None) self.converter = converter self._key = key self.repr_type = repr_type self.convert_nan = convert_nan self.fallback = fallback self.fallback_policy = fallback_policy self._callbacks = { "setitem": None, "delitem": None, } if callbacks: self._callbacks.update(callbacks) self._adapter_available = False if self.socket is not None: # check from the server if the adapter is available self._adapter_available = self.socket.call("check_adapter", key=self.key) self._fallback_object: t.Optional["Dict"] = None if ( self.fallback is not None and int(self.redis.hlen(self.key)) == 0 and not self._adapter_available # TODO: what should happen, if adapter is available but empty ): self._fallback_object = type(self)( r=self.redis, key=self.fallback, socket=self.socket, convert_nan=self.convert_nan, converter=self.converter, repr_type=self.repr_type, ) if len(self._fallback_object) > 0 and self.fallback_policy == "copy": self._fallback_object.copy(self._key) @property def key(self) -> str: """The key in the server to store the data from this dict. Returns ------- str The prefixed key used to store this dict in the server. """ return f"znsocket.Dict:{self._key}" def __getitem__(self, key: str) -> t.Any: from znsocket.objects.list_obj import List value = self.redis.hget(self.key, key) if value is None: if self._adapter_available: value = self.socket.call( "adapter:get", key=self.key, method="__getitem__", dict_key=key ) if value is None and self._fallback_object is not None: return self._fallback_object.get(key, None) if value is None: raise KeyError(key) value = decode(self, value) if isinstance(value, str): if value.startswith("znsocket.List:"): ref_key = value.split(":", 1)[1] value = List(r=self.redis, key=ref_key) elif value.startswith("znsocket.Dict:"): ref_key = value.split(":", 1)[1] value = Dict(r=self.redis, key=ref_key, repr_type=self.repr_type) return value def __setitem__(self, key: str, value: t.Any) -> None: from znsocket.objects.list_obj import List if self._adapter_available: from znsocket.exceptions import FrozenStorageError raise FrozenStorageError(key=self.key) if isinstance(value, List): value = value.key if isinstance(value, Dict): if value.key == self.key: raise ValueError("Can not set circular reference to self") value = value.key self.redis.hset(self.key, key, encode(self, value)) if callback := self._callbacks["setitem"]: callback(key, value) if self.socket is not None: refresh: RefreshTypeDict = {"keys": [key]} refresh_data: RefreshDataTypeDict = {"target": self.key, "data": refresh} self.socket.sio.emit("refresh", refresh_data, namespace="/znsocket") def __delitem__(self, key: str) -> None: if self._adapter_available: from znsocket.exceptions import FrozenStorageError raise FrozenStorageError(key=self.key) if not self.redis.hexists(self.key, key): raise KeyError(key) self.redis.hdel(self.key, key) if callback := self._callbacks["delitem"]: callback(key) if self.socket is not None: refresh: RefreshTypeDict = {"keys": [key]} refresh_data: RefreshDataTypeDict = {"target": self.key, "data": refresh} self.socket.sio.emit("refresh", refresh_data, namespace="/znsocket") def __iter__(self): return iter(self.keys()) def __len__(self) -> int: result = self.redis.hlen(self.key) if result == 0 and self._adapter_available: result = int( self.socket.call("adapter:get", key=self.key, method="__len__") ) if result == 0 and self._fallback_object is not None: result = len(self._fallback_object) return result
[docs] def keys(self) -> list[str]: result = self.redis.hkeys(self.key) if len(result) == 0 and self._adapter_available: result = self.socket.call("adapter:get", key=self.key, method="keys") if len(result) == 0 and self._fallback_object is not None: result = self._fallback_object.keys() return result
[docs] def values(self) -> list[t.Any]: # noqa: C901 vals = self.redis.hvals(self.key) if len(vals) == 0 and self._adapter_available: adapter_values = self.socket.call( "adapter:get", key=self.key, method="values" ) response = [] for value in adapter_values: value = decode(self, value) if isinstance(value, str): if value.startswith("znsocket.List:"): from znsocket import List ref_key = value.split(":", 1)[1] value = List(r=self.redis, key=ref_key) elif value.startswith("znsocket.Dict:"): ref_key = value.split(":", 1)[1] value = Dict( r=self.redis, key=ref_key, repr_type=self.repr_type ) response.append(value) return response if len(vals) == 0 and self._fallback_object is not None: return self._fallback_object.values() response = [] for v in vals: value = decode(self, v) if isinstance(value, str): if value.startswith("znsocket.List:"): from znsocket import List ref_key = value.split(":", 1)[1] value = List(r=self.redis, key=ref_key) elif value.startswith("znsocket.Dict:"): ref_key = value.split(":", 1)[1] value = Dict(r=self.redis, key=ref_key, repr_type=self.repr_type) response.append(value) return response
[docs] def items(self) -> list[t.Tuple[str, t.Any]]: # noqa: C901 from znsocket.objects.list_obj import List all_items = self.redis.hgetall(self.key) if len(all_items) == 0 and self._adapter_available: adapter_items = self.socket.call( "adapter:get", key=self.key, method="items" ) response = [] for k, v in adapter_items: value = decode(self, v) if isinstance(value, str): if value.startswith("znsocket.List:"): ref_key = value.split(":", 1)[1] value = List(r=self.redis, key=ref_key) elif value.startswith("znsocket.Dict:"): ref_key = value.split(":", 1)[1] value = Dict( r=self.redis, key=ref_key, repr_type=self.repr_type ) response.append((k, value)) return response if len(all_items) == 0 and self._fallback_object is not None: return self._fallback_object.items() response = [] for k, v in all_items.items(): value = decode(self, v) if isinstance(value, str): if value.startswith("znsocket.List:"): ref_key = value.split(":", 1)[1] value = List(r=self.redis, key=ref_key) elif value.startswith("znsocket.Dict:"): ref_key = value.split(":", 1)[1] value = Dict(r=self.redis, key=ref_key, repr_type=self.repr_type) response.append((k, value)) return response
def __contains__(self, key: str) -> bool: result = self.redis.hexists(self.key, key) if not result and self._adapter_available: result = self.socket.call( "adapter:get", key=self.key, method="__contains__", dict_key=key ) if not result and self._fallback_object is not None: result = key in self._fallback_object return result def __repr__(self) -> str: if self.repr_type == "keys": return f"Dict(keys={self.keys()})" elif self.repr_type == "minimal": return "Dict(<unknown>)" elif self.repr_type == "full": data = dict(self.items()) return f"Dict({data})" else: raise ValueError(f"Invalid repr_type: {self.repr_type}") def __eq__(self, value: object) -> bool: if isinstance(value, Dict): return dict(self) == dict(value) elif isinstance(value, dict): return dict(self) == value return False
[docs] def copy(self, key: str) -> "Dict": """Copy the dict to a new key. This will not trigger any callbacks as the data is not modified. """ if self._adapter_available: result = self.socket.call( "adapter:get", key=self.key, method="copy", target=key ) handle_error(result) return Dict( r=self.redis, key=key, socket=self.socket, converter=self.converter, convert_nan=self.convert_nan, ) else: if not self.redis.copy(self.key, f"znsocket.Dict:{key}"): raise ValueError("Could not copy dict") return Dict( r=self.redis, key=key, socket=self.socket, converter=self.converter, convert_nan=self.convert_nan, )
def on_refresh(self, callback: t.Callable[[RefreshDataTypeDict], None]) -> None: if self.socket is None: raise ValueError("No socket connection available") self.socket.refresh_callbacks[self.key] = callback
[docs] def update(self, *args, **kwargs): # noqa: C901 """Update the dict with another dict or iterable.""" from znsocket.objects.list_obj import List if self._adapter_available: from znsocket.exceptions import FrozenStorageError raise FrozenStorageError(key=self.key) if len(args) > 1: raise TypeError("update expected at most 1 argument, got %d" % len(args)) if args: other = args[0] if isinstance(other, Dict): other = dict(other) elif isinstance(other, MutableMapping): pass else: raise TypeError( "update expected at most 1 argument, got %d" % len(args) ) else: other = kwargs pipeline = self.redis.pipeline() for key, value in other.items(): if isinstance(value, Dict): if value.key == self.key: raise ValueError("Can not set circular reference to self") value = value.key if isinstance(value, List): value = value.key pipeline.hset(self.key, key, encode(self, value)) pipeline.execute() if self.socket is not None: refresh: RefreshTypeDict = {"keys": list(other.keys())} refresh_data: RefreshDataTypeDict = {"target": self.key, "data": refresh} self.socket.sio.emit("refresh", refresh_data, namespace="/znsocket")
def __or__(self, value: "dict|Dict") -> dict: if isinstance(value, Dict): value = dict(value) return dict(self) | value