Python API¶
Client¶
- class znsocket.Client(address: str, decode_responses: bool = True, namespace: str = '/znsocket', refresh_callbacks: dict[str, ~typing.Callable[[~typing.Any], None]] = <factory>, delay_between_calls: ~datetime.timedelta | None = None, retry: int = 1, connect_wait_timeout: int = 1, max_message_size_bytes: int | None = None, enable_compression: bool = True, compression_threshold: int = 1024)[source]¶
Bases:
object
Client to interact with a znsocket server.
The Client class provides an interface to connect to and communicate with a znsocket server using websockets. It supports Redis-like commands, automatic message chunking for large data, compression, and provides automatic reconnection capabilities.
Large messages are automatically handled through compression and chunking: - Messages are compressed using gzip when they exceed the compression threshold - If compressed messages still exceed size limits, they are split into chunks - All chunking and compression operations are transparent to the user
- Parameters:
address (str) – The address of the znsocket server (e.g., “http://localhost:5000”).
decode_responses (bool, optional) – Whether to decode responses from the server. Default is True.
namespace (str, optional) – The namespace to connect to. Default is “/znsocket”.
refresh_callbacks (dict, optional) – A dictionary of callbacks to call when a refresh event is received.
delay_between_calls (datetime.timedelta, optional) – The time to wait between calls. Default is None.
retry (int, optional) – The number of times to retry a failed call. Default is 1.
connect_wait_timeout (int, optional) – Timeout in seconds for connection establishment. Default is 1.
max_message_size_bytes (int, optional) – Maximum size in bytes for a single message before chunking. If None, uses server limit or defaults to 1MB. Messages exceeding this size are automatically chunked.
enable_compression (bool, optional) – Whether to enable gzip compression for large messages. Default is True.
compression_threshold (int, optional) – Minimum message size in bytes to trigger compression. Default is 1024 (1KB).
- sio¶
The underlying socket.io client instance.
- Type:
socketio.Client
Examples
Basic usage: >>> client = Client(”http://localhost:5000”) >>> client.hset(“mykey”, “field1”, “value1”) >>> client.hget(“mykey”, “field1”) ‘value1’
Large data with automatic chunking: >>> import numpy as np >>> client = Client(”http://localhost:5000”) >>> large_data = np.random.rand(1000, 1000) # ~8MB array >>> client.hset(“data”, “array”, large_data) # Automatically compressed/chunked >>> retrieved = client.hget(“data”, “array”)
Custom chunking configuration: >>> client = Client( … “http://localhost:5000”, … max_message_size_bytes=500000, # 500KB limit … enable_compression=True, … compression_threshold=2048 # Compress messages > 2KB … )
- call(event: str, *args, **kwargs) Any [source]¶
Call an event on the server.
Automatically handles chunking for large messages that exceed the size limit.
- Parameters:
event (str) – The event name to call on the server.
*args – Positional arguments to pass to the event.
**kwargs – Keyword arguments to pass to the event.
- Returns:
The response from the server.
- Return type:
Any
- Raises:
socketio.exceptions.TimeoutError – If the server does not respond within the timeout period after all retries.
- classmethod from_url(url, namespace: str = '/znsocket', **kwargs) Client [source]¶
Connect to a znsocket server using a URL.
- Parameters:
url (str) – The URL of the znsocket server. Should be in the format “znsocket://127.0.0.1:5000”.
namespace (str, optional) – The namespace to connect to. Default is “/znsocket”.
**kwargs – Additional keyword arguments to pass to the Client constructor.
- Returns:
A new Client instance connected to the specified server.
- Return type:
Examples
>>> client = Client.from_url("znsocket://127.0.0.1:5000") >>> client.hset("key", "field", "value")
- pipeline(*args, **kwargs) Pipeline [source]¶
Create a pipeline for batching Redis commands.
- Parameters:
*args – Positional arguments to pass to the Pipeline constructor.
**kwargs – Keyword arguments to pass to the Pipeline constructor.
- Returns:
A new Pipeline instance for batching commands.
- Return type:
Pipeline
Examples
>>> client = Client("http://localhost:5000") >>> pipe = client.pipeline() >>> pipe.hset("key1", "field1", "value1") >>> pipe.hset("key2", "field2", "value2") >>> results = pipe.execute()
- register_adapter_callback(key: str, callback: Callable[[tuple[list, dict]], Any]) None [source]¶
Register an adapter callback for a specific key.
- Parameters:
key (str) – The adapter key to register the callback for.
callback (Callable[[tuple[list, dict]], Any]) – The callback function to handle adapter requests for this key.
Server¶
- class znsocket.Server(port: int = 5000, max_http_buffer_size: int | None = None, async_mode: str | None = None, logger: bool = False, storage: str = 'memory')[source]¶
Bases:
object
znsocket server implementation.
The Server class provides a websocket-based server that implements Redis-compatible operations with automatic support for large message handling through chunking and compression. It uses eventlet for async operations and socket.io for websocket communication.
Large Message Handling: - Automatically receives and reassembles chunked messages from clients - Supports compressed message decompression using gzip - Handles both single compressed messages and multi-chunk transmissions - Provides automatic cleanup of expired chunk storage
The server automatically handles three types of message transmission: 1. Normal messages: Small messages sent directly 2. Compressed messages: Large messages compressed and sent as single units 3. Chunked messages: Very large messages split into multiple chunks
- Parameters:
port (int, optional) – The port number to bind the server to. Default is 5000.
max_http_buffer_size (int, optional) – Maximum size of HTTP buffer in bytes. This determines the largest single message the server can receive. Default is None (uses socket.io default of ~1MB). Messages larger than this limit must be chunked by clients.
async_mode (str, optional) – Async mode to use (‘eventlet’, ‘gevent’, etc.). Default is None.
logger (bool, optional) – Whether to enable logging. Default is False.
storage (str, optional) – Storage backend to use (‘memory’ or ‘redis’). Default is ‘memory’.
Examples
Basic server with default settings: >>> server = Server(port=5000) >>> server.run() # This will block and run the server
Server with larger buffer for handling big messages: >>> server = Server( … port=5000, … max_http_buffer_size=10 * 1024 * 1024, # 10MB buffer … logger=True … ) >>> server.run()
Server with Redis backend: >>> server = Server(port=5000, storage=’redis’) >>> server.run()
- classmethod from_url(url: str, **kwargs) Server [source]¶
Create a Server instance from a URL.
- Parameters:
url (str) – The URL to parse, should be in format “znsocket://host:port”.
**kwargs – Additional keyword arguments to pass to the Server constructor.
- Returns:
A new Server instance configured with the port from the URL.
- Return type:
- Raises:
ValueError – If the URL doesn’t start with “znsocket://”.
- run() None [source]¶
Run the server (blocking).
Starts the znsocket server and blocks until the server is stopped. The server will listen on the configured port and handle incoming websocket connections.
Notes
This method blocks the current thread. To run the server in a non-blocking way, consider using threading or asyncio.
Storage¶
- class znsocket.Storage(content: dict = <factory>)[source]¶
Bases:
object
In-memory storage backend for znsocket server.
The Storage class provides Redis-compatible data storage operations including hash tables, lists, sets, and basic key-value operations. All data is stored in memory using Python data structures.
- Parameters:
content (dict, optional) – Initial content for the storage. Default is an empty dictionary.
- content¶
The internal storage dictionary containing all data.
- Type:
dict
Examples
>>> storage = Storage() >>> storage.hset("users", "user1", "John") 1 >>> storage.hget("users", "user1") 'John'
- hget(name, key)[source]¶
Get the value of a hash field.
- Parameters:
name (str) – The name of the hash.
key (str) – The field name to get.
- Returns:
The value of the field, or None if the field does not exist.
- Return type:
str or None
- hset(name: str, key: str | None = None, value: str | None = None, mapping: dict | None = None, items: list | None = None)[source]¶
Set field(s) in a hash.
- Parameters:
name (str) – The name of the hash.
key (str, optional) – The field name to set.
value (str, optional) – The value to set for the field.
mapping (dict, optional) – A dictionary of field-value pairs to set.
items (list, optional) – A list of alternating field-value pairs to set.
- Returns:
The number of fields that were added.
- Return type:
int
- Raises:
DataError – If no key-value pairs are provided or if value is None when key is provided.
List¶
- class znsocket.List(r: Client | Any, key: str, socket: Client | None = None, callbacks: ListCallbackTypedDict | None = None, repr_type: Literal['full', 'length', 'minimal'] = 'length', converter: list[Type[ConverterBase]] | None = None, convert_nan: bool = False, fallback: str | None = None, fallback_policy: Literal['copy', 'frozen'] | None = None)[source]¶
Bases:
MutableSequence
,ZnSocketObject
- append(value: Any) None [source]¶
Append an item to the end of the list.
- Parameters:
value (Any) – The item to append to the list.
- Raises:
FrozenStorageError – If the list is backed by an adapter and is read-only.
ValueError – If attempting to create a circular reference to self.
Examples
>>> my_list = znsocket.List(client, "my_list") >>> my_list.append("new_item") >>> my_list[-1] 'new_item'
- copy(key: str) List [source]¶
Copy the list to a new key.
Creates a new list with the same content but under a different key. This operation does not trigger any callbacks as the original data is not modified.
- Parameters:
key (str) – The new key for the copied list.
- Returns:
A new List instance pointing to the copied data.
- Return type:
- Raises:
ValueError – If the copy operation fails.
Examples
>>> original_list = znsocket.List(client, "original") >>> original_list.extend([1, 2, 3]) >>> copied_list = original_list.copy("copied") >>> len(copied_list) 3
- property key: str¶
The key of the list in the server.
- Returns:
The prefixed key used to store this list in the server.
- Return type:
str
Dict¶
- class znsocket.Dict(r: Client | Any, key: str, socket: Client | None = None, callbacks: DictCallbackTypedDict | None = None, repr_type: Literal['full', 'keys', 'minimal'] = 'keys', converter: list[Type[ConverterBase]] | None = None, convert_nan: bool = False, fallback: str | None = None, fallback_policy: Literal['copy', 'frozen'] | None = None)[source]¶
Bases:
MutableMapping
,ZnSocketObject
- copy(key: str) Dict [source]¶
Copy the dict to a new key.
This will not trigger any callbacks as the data is not modified.
- property key: str¶
The key in the server to store the data from this dict.
- Returns:
The prefixed key used to store this dict in the server.
- Return type:
str
ListAdapter¶
- class znsocket.ListAdapter(key: str, socket: Client, object: Sequence, item_transform_callback: ItemTransformCallback | None = None, converter: list[type] | None = None, convert_nan: bool = False, r: Client | None = None)[source]¶
Bases:
object
Connect any object to a znsocket server to be used instead of loading data from the database.
The ListAdapter allows you to expose any sequence-like object through the znsocket server, making it accessible to clients as if it were a regular List. Data is transmitted via sockets through the server to the client on demand.
- Parameters:
key (str) – The key identifier for this adapter in the server.
socket (Client) – The znsocket client connection to use for communication.
object (Sequence) – The sequence object to expose through the adapter.
item_transform_callback (ItemTransformCallback, optional) – Optional callback to transform list items when accessed. If provided, the callback will be called for each item access and can return transformed data along with the adapter type to create (“Dict”, “List”, or None).
converter (list[type], optional) – Optional list of znjson converters to use for encoding/decoding the data.
convert_nan (bool, optional) – Convert NaN and Infinity to None. Both are not native JSON values and cannot be encoded/decoded. Default is False.
r (Client, optional) – Alternative client connection. If None, uses the socket connection.
Examples
>>> client = znsocket.Client("http://localhost:5000") >>> my_data = [1, 2, 3, 4, 5] >>> adapter = znsocket.ListAdapter("my_adapter", client, my_data) >>> # Now clients can access my_data as znsocket.List(client, "my_adapter")
>>> # With custom transformation callback >>> def ase_transform(item, key, socket, converter=None, convert_nan=False): ... from zndraw.converter import ASEConverter ... import znsocket ... transformed = ASEConverter().encode(item) ... return znsocket.DictAdapter(key, socket, transformed, converter, convert_nan) >>> ase_data = [atoms1, atoms2, atoms3] # ASE Atoms objects >>> adapter = znsocket.ListAdapter("ase_adapter", client, ase_data, ase_transform)
>>> # Accessing items creates adapters automatically >>> client_list = znsocket.List(client, "ase_adapter") >>> dict_item = client_list[0] # Creates DictAdapter with key "ase_adapter:0" >>> isinstance(dict_item, znsocket.Dict) True >>> dict_item["some_key"] # Access the transformed data
- map_callback(data: tuple[list, dict]) Any [source]¶
Map a callback to the object.
This method handles incoming requests from clients and routes them to the appropriate methods on the wrapped object.
- Parameters:
data (tuple) – The request data containing arguments and keyword arguments.
- Returns:
The result of the requested operation, encoded if necessary.
- Return type:
Any
- Raises:
NotImplementedError – If the requested method is not implemented by the adapter.
DictAdapter¶
- class znsocket.DictAdapter(key: str, socket: Client, object: Mapping, converter: list[type] | None = None, convert_nan: bool = False, r: Client | None = None)[source]¶
Bases:
object
Connect any object to a znsocket server to be used instead of loading data from the database.
The DictAdapter allows you to expose any mapping-like object through the znsocket server, making it accessible to clients as if it were a regular Dict. Data is transmitted via sockets through the server to the client on demand.
- Parameters:
key (str) – The key identifier for this adapter in the server.
socket (Client) – The znsocket client connection to use for communication.
object (Mapping) – The mapping object to expose through the adapter.
converter (list[type], optional) – Optional list of znjson converters to use for encoding/decoding the data.
convert_nan (bool, optional) – Convert NaN and Infinity to None. Both are not native JSON values and cannot be encoded/decoded. Default is False.
r (Client, optional) – Alternative client connection. If None, uses the socket connection.
Examples
>>> client = znsocket.Client("http://localhost:5000") >>> my_data = {"a": 1, "b": 2, "c": 3} >>> adapter = znsocket.DictAdapter("my_adapter", client, my_data) >>> # Now clients can access my_data as znsocket.Dict(client, "my_adapter")
- map_callback(data)[source]¶
Map a callback to the object.
This method handles incoming requests from clients and routes them to the appropriate methods on the wrapped object.
- Parameters:
data (tuple) – The request data containing arguments and keyword arguments.
- Returns:
The result of the requested operation, encoded if necessary.
- Return type:
Any
- Raises:
NotImplementedError – If the requested method is not implemented by the adapter.
Segments¶
- class znsocket.Segments(r: Client | Any, origin: List, key: str)[source]¶
Bases:
ZnSocketObject
,MutableSequence
Copy of a list object with piece table segments.
This class implements a copy-on-write list using segment-based storage for efficiency. It’s particularly useful for large objects where only small parts are modified, avoiding the need to copy the entire object.
The data is stored in segments represented as tuples: (start, end, data), where start and end are the start and end indices of the part taken from the data list.
- Parameters:
- Raises:
TypeError – If origin is not a List object.
Examples
>>> client = znsocket.Client("http://localhost:5000") >>> original_list = znsocket.List(client, "original") >>> original_list.extend([1, 2, 3, 4, 5]) >>> segments = znsocket.Segments(client, original_list, "segments") >>> len(segments) 5 >>> segments[0] 1
- classmethod from_list(origin: List, key: str) Segments [source]¶
Create a Segments object from a list.
- get_raw() Any [source]¶
Get the raw data of the segments list.
- Returns:
A list of segments, where each segment is a tuple of (start, end, data).
- Return type:
list
- property key: str¶
The key in the server to store the data from this segments object.
- Returns:
The prefixed key used to store this segments object in the server.
- Return type:
str
Functions¶
- znsocket.attach_events(sio: Server, namespace: str = '/znsocket', storage=None) None [source]¶
Attach event handlers to a socket.io server.
This function sets up all the event handlers needed for the znsocket server to respond to client requests. It handles Redis-compatible operations, pipeline commands, and adapter functionality.
- Parameters:
sio (socketio.Server) – The socket.io server instance to attach events to.
namespace (str, optional) – The namespace to attach events to. Default is “/znsocket”.
storage (Storage, optional) – The storage backend to use. If None, a new Storage instance is created.
- Returns:
The socket.io server instance with events attached.
- Return type:
socketio.Server
Examples
>>> sio = socketio.Server() >>> attach_events(sio) >>> # Now sio can handle znsocket events