Python API

Client

class znsocket.Client(address: str, decode_responses: bool = True, namespace: str = '/znsocket', refresh_callbacks: dict[str, ~typing.Callable[[~typing.Any], None]] = <factory>, delay_between_calls: ~datetime.timedelta | None = None, retry: int = 1, connect_wait_timeout: int = 1, max_message_size_bytes: int | None = None, enable_compression: bool = True, compression_threshold: int = 1024)[source]

Bases: object

Client to interact with a znsocket server.

The Client class provides an interface to connect to and communicate with a znsocket server using websockets. It supports Redis-like commands, automatic message chunking for large data, compression, and provides automatic reconnection capabilities.

Large messages are automatically handled through compression and chunking: - Messages are compressed using gzip when they exceed the compression threshold - If compressed messages still exceed size limits, they are split into chunks - All chunking and compression operations are transparent to the user

Parameters:
  • address (str) – The address of the znsocket server (e.g., “http://localhost:5000”).

  • decode_responses (bool, optional) – Whether to decode responses from the server. Default is True.

  • namespace (str, optional) – The namespace to connect to. Default is “/znsocket”.

  • refresh_callbacks (dict, optional) – A dictionary of callbacks to call when a refresh event is received.

  • delay_between_calls (datetime.timedelta, optional) – The time to wait between calls. Default is None.

  • retry (int, optional) – The number of times to retry a failed call. Default is 1.

  • connect_wait_timeout (int, optional) – Timeout in seconds for connection establishment. Default is 1.

  • max_message_size_bytes (int, optional) – Maximum size in bytes for a single message before chunking. If None, uses server limit or defaults to 1MB. Messages exceeding this size are automatically chunked.

  • enable_compression (bool, optional) – Whether to enable gzip compression for large messages. Default is True.

  • compression_threshold (int, optional) – Minimum message size in bytes to trigger compression. Default is 1024 (1KB).

sio

The underlying socket.io client instance.

Type:

socketio.Client

Examples

Basic usage: >>> client = Client(”http://localhost:5000”) >>> client.hset(“mykey”, “field1”, “value1”) >>> client.hget(“mykey”, “field1”) ‘value1’

Large data with automatic chunking: >>> import numpy as np >>> client = Client(”http://localhost:5000”) >>> large_data = np.random.rand(1000, 1000) # ~8MB array >>> client.hset(“data”, “array”, large_data) # Automatically compressed/chunked >>> retrieved = client.hget(“data”, “array”)

Custom chunking configuration: >>> client = Client( … “http://localhost:5000”, … max_message_size_bytes=500000, # 500KB limit … enable_compression=True, … compression_threshold=2048 # Compress messages > 2KB … )

call(event: str, *args, **kwargs) Any[source]

Call an event on the server.

Automatically handles chunking for large messages that exceed the size limit.

Parameters:
  • event (str) – The event name to call on the server.

  • *args – Positional arguments to pass to the event.

  • **kwargs – Keyword arguments to pass to the event.

Returns:

The response from the server.

Return type:

Any

Raises:

socketio.exceptions.TimeoutError – If the server does not respond within the timeout period after all retries.

classmethod from_url(url, namespace: str = '/znsocket', **kwargs) Client[source]

Connect to a znsocket server using a URL.

Parameters:
  • url (str) – The URL of the znsocket server. Should be in the format “znsocket://127.0.0.1:5000”.

  • namespace (str, optional) – The namespace to connect to. Default is “/znsocket”.

  • **kwargs – Additional keyword arguments to pass to the Client constructor.

Returns:

A new Client instance connected to the specified server.

Return type:

Client

Examples

>>> client = Client.from_url("znsocket://127.0.0.1:5000")
>>> client.hset("key", "field", "value")
pipeline(*args, **kwargs) Pipeline[source]

Create a pipeline for batching Redis commands.

Parameters:
  • *args – Positional arguments to pass to the Pipeline constructor.

  • **kwargs – Keyword arguments to pass to the Pipeline constructor.

Returns:

A new Pipeline instance for batching commands.

Return type:

Pipeline

Examples

>>> client = Client("http://localhost:5000")
>>> pipe = client.pipeline()
>>> pipe.hset("key1", "field1", "value1")
>>> pipe.hset("key2", "field2", "value2")
>>> results = pipe.execute()
register_adapter_callback(key: str, callback: Callable[[tuple[list, dict]], Any]) None[source]

Register an adapter callback for a specific key.

Parameters:
  • key (str) – The adapter key to register the callback for.

  • callback (Callable[[tuple[list, dict]], Any]) – The callback function to handle adapter requests for this key.

Server

class znsocket.Server(port: int = 5000, max_http_buffer_size: int | None = None, async_mode: str | None = None, logger: bool = False, storage: str = 'memory')[source]

Bases: object

znsocket server implementation.

The Server class provides a websocket-based server that implements Redis-compatible operations with automatic support for large message handling through chunking and compression. It uses eventlet for async operations and socket.io for websocket communication.

Large Message Handling: - Automatically receives and reassembles chunked messages from clients - Supports compressed message decompression using gzip - Handles both single compressed messages and multi-chunk transmissions - Provides automatic cleanup of expired chunk storage

The server automatically handles three types of message transmission: 1. Normal messages: Small messages sent directly 2. Compressed messages: Large messages compressed and sent as single units 3. Chunked messages: Very large messages split into multiple chunks

Parameters:
  • port (int, optional) – The port number to bind the server to. Default is 5000.

  • max_http_buffer_size (int, optional) – Maximum size of HTTP buffer in bytes. This determines the largest single message the server can receive. Default is None (uses socket.io default of ~1MB). Messages larger than this limit must be chunked by clients.

  • async_mode (str, optional) – Async mode to use (‘eventlet’, ‘gevent’, etc.). Default is None.

  • logger (bool, optional) – Whether to enable logging. Default is False.

  • storage (str, optional) – Storage backend to use (‘memory’ or ‘redis’). Default is ‘memory’.

Examples

Basic server with default settings: >>> server = Server(port=5000) >>> server.run() # This will block and run the server

Server with larger buffer for handling big messages: >>> server = Server( … port=5000, … max_http_buffer_size=10 * 1024 * 1024, # 10MB buffer … logger=True … ) >>> server.run()

Server with Redis backend: >>> server = Server(port=5000, storage=’redis’) >>> server.run()

classmethod from_url(url: str, **kwargs) Server[source]

Create a Server instance from a URL.

Parameters:
  • url (str) – The URL to parse, should be in format “znsocket://host:port”.

  • **kwargs – Additional keyword arguments to pass to the Server constructor.

Returns:

A new Server instance configured with the port from the URL.

Return type:

Server

Raises:

ValueError – If the URL doesn’t start with “znsocket://”.

run() None[source]

Run the server (blocking).

Starts the znsocket server and blocks until the server is stopped. The server will listen on the configured port and handle incoming websocket connections.

Notes

This method blocks the current thread. To run the server in a non-blocking way, consider using threading or asyncio.

Storage

class znsocket.Storage(content: dict = <factory>)[source]

Bases: object

In-memory storage backend for znsocket server.

The Storage class provides Redis-compatible data storage operations including hash tables, lists, sets, and basic key-value operations. All data is stored in memory using Python data structures.

Parameters:

content (dict, optional) – Initial content for the storage. Default is an empty dictionary.

content

The internal storage dictionary containing all data.

Type:

dict

Examples

>>> storage = Storage()
>>> storage.hset("users", "user1", "John")
1
>>> storage.hget("users", "user1")
'John'
hget(name, key)[source]

Get the value of a hash field.

Parameters:
  • name (str) – The name of the hash.

  • key (str) – The field name to get.

Returns:

The value of the field, or None if the field does not exist.

Return type:

str or None

hset(name: str, key: str | None = None, value: str | None = None, mapping: dict | None = None, items: list | None = None)[source]

Set field(s) in a hash.

Parameters:
  • name (str) – The name of the hash.

  • key (str, optional) – The field name to set.

  • value (str, optional) – The value to set for the field.

  • mapping (dict, optional) – A dictionary of field-value pairs to set.

  • items (list, optional) – A list of alternating field-value pairs to set.

Returns:

The number of fields that were added.

Return type:

int

Raises:

DataError – If no key-value pairs are provided or if value is None when key is provided.

List

class znsocket.List(r: Client | Any, key: str, socket: Client | None = None, callbacks: ListCallbackTypedDict | None = None, repr_type: Literal['full', 'length', 'minimal'] = 'length', converter: list[Type[ConverterBase]] | None = None, convert_nan: bool = False, fallback: str | None = None, fallback_policy: Literal['copy', 'frozen'] | None = None)[source]

Bases: MutableSequence, ZnSocketObject

append(value: Any) None[source]

Append an item to the end of the list.

Parameters:

value (Any) – The item to append to the list.

Raises:
  • FrozenStorageError – If the list is backed by an adapter and is read-only.

  • ValueError – If attempting to create a circular reference to self.

Examples

>>> my_list = znsocket.List(client, "my_list")
>>> my_list.append("new_item")
>>> my_list[-1]
'new_item'
copy(key: str) List[source]

Copy the list to a new key.

Creates a new list with the same content but under a different key. This operation does not trigger any callbacks as the original data is not modified.

Parameters:

key (str) – The new key for the copied list.

Returns:

A new List instance pointing to the copied data.

Return type:

List

Raises:

ValueError – If the copy operation fails.

Examples

>>> original_list = znsocket.List(client, "original")
>>> original_list.extend([1, 2, 3])
>>> copied_list = original_list.copy("copied")
>>> len(copied_list)
3
extend(values: Iterable) None[source]

Extend the list with an iterable using redis pipelines.

insert(index: int, value: Any) None[source]

S.insert(index, value) – insert value before index

property key: str

The key of the list in the server.

Returns:

The prefixed key used to store this list in the server.

Return type:

str

pop(index: int = -1) Any[source]

Pop an item from the list.

Dict

class znsocket.Dict(r: Client | Any, key: str, socket: Client | None = None, callbacks: DictCallbackTypedDict | None = None, repr_type: Literal['full', 'keys', 'minimal'] = 'keys', converter: list[Type[ConverterBase]] | None = None, convert_nan: bool = False, fallback: str | None = None, fallback_policy: Literal['copy', 'frozen'] | None = None)[source]

Bases: MutableMapping, ZnSocketObject

copy(key: str) Dict[source]

Copy the dict to a new key.

This will not trigger any callbacks as the data is not modified.

items() a set-like object providing a view on D's items[source]
property key: str

The key in the server to store the data from this dict.

Returns:

The prefixed key used to store this dict in the server.

Return type:

str

keys() a set-like object providing a view on D's keys[source]
update(*args, **kwargs)[source]

Update the dict with another dict or iterable.

values() an object providing a view on D's values[source]

ListAdapter

class znsocket.ListAdapter(key: str, socket: Client, object: Sequence, item_transform_callback: ItemTransformCallback | None = None, converter: list[type] | None = None, convert_nan: bool = False, r: Client | None = None)[source]

Bases: object

Connect any object to a znsocket server to be used instead of loading data from the database.

The ListAdapter allows you to expose any sequence-like object through the znsocket server, making it accessible to clients as if it were a regular List. Data is transmitted via sockets through the server to the client on demand.

Parameters:
  • key (str) – The key identifier for this adapter in the server.

  • socket (Client) – The znsocket client connection to use for communication.

  • object (Sequence) – The sequence object to expose through the adapter.

  • item_transform_callback (ItemTransformCallback, optional) – Optional callback to transform list items when accessed. If provided, the callback will be called for each item access and can return transformed data along with the adapter type to create (“Dict”, “List”, or None).

  • converter (list[type], optional) – Optional list of znjson converters to use for encoding/decoding the data.

  • convert_nan (bool, optional) – Convert NaN and Infinity to None. Both are not native JSON values and cannot be encoded/decoded. Default is False.

  • r (Client, optional) – Alternative client connection. If None, uses the socket connection.

Examples

>>> client = znsocket.Client("http://localhost:5000")
>>> my_data = [1, 2, 3, 4, 5]
>>> adapter = znsocket.ListAdapter("my_adapter", client, my_data)
>>> # Now clients can access my_data as znsocket.List(client, "my_adapter")
>>> # With custom transformation callback
>>> def ase_transform(item, key, socket, converter=None, convert_nan=False):
...     from zndraw.converter import ASEConverter
...     import znsocket
...     transformed = ASEConverter().encode(item)
...     return znsocket.DictAdapter(key, socket, transformed, converter, convert_nan)
>>> ase_data = [atoms1, atoms2, atoms3]  # ASE Atoms objects
>>> adapter = znsocket.ListAdapter("ase_adapter", client, ase_data, ase_transform)
>>> # Accessing items creates adapters automatically
>>> client_list = znsocket.List(client, "ase_adapter")
>>> dict_item = client_list[0]  # Creates DictAdapter with key "ase_adapter:0"
>>> isinstance(dict_item, znsocket.Dict)
True
>>> dict_item["some_key"]  # Access the transformed data
map_callback(data: tuple[list, dict]) Any[source]

Map a callback to the object.

This method handles incoming requests from clients and routes them to the appropriate methods on the wrapped object.

Parameters:

data (tuple) – The request data containing arguments and keyword arguments.

Returns:

The result of the requested operation, encoded if necessary.

Return type:

Any

Raises:

NotImplementedError – If the requested method is not implemented by the adapter.

DictAdapter

class znsocket.DictAdapter(key: str, socket: Client, object: Mapping, converter: list[type] | None = None, convert_nan: bool = False, r: Client | None = None)[source]

Bases: object

Connect any object to a znsocket server to be used instead of loading data from the database.

The DictAdapter allows you to expose any mapping-like object through the znsocket server, making it accessible to clients as if it were a regular Dict. Data is transmitted via sockets through the server to the client on demand.

Parameters:
  • key (str) – The key identifier for this adapter in the server.

  • socket (Client) – The znsocket client connection to use for communication.

  • object (Mapping) – The mapping object to expose through the adapter.

  • converter (list[type], optional) – Optional list of znjson converters to use for encoding/decoding the data.

  • convert_nan (bool, optional) – Convert NaN and Infinity to None. Both are not native JSON values and cannot be encoded/decoded. Default is False.

  • r (Client, optional) – Alternative client connection. If None, uses the socket connection.

Examples

>>> client = znsocket.Client("http://localhost:5000")
>>> my_data = {"a": 1, "b": 2, "c": 3}
>>> adapter = znsocket.DictAdapter("my_adapter", client, my_data)
>>> # Now clients can access my_data as znsocket.Dict(client, "my_adapter")
map_callback(data)[source]

Map a callback to the object.

This method handles incoming requests from clients and routes them to the appropriate methods on the wrapped object.

Parameters:

data (tuple) – The request data containing arguments and keyword arguments.

Returns:

The result of the requested operation, encoded if necessary.

Return type:

Any

Raises:

NotImplementedError – If the requested method is not implemented by the adapter.

Segments

class znsocket.Segments(r: Client | Any, origin: List, key: str)[source]

Bases: ZnSocketObject, MutableSequence

Copy of a list object with piece table segments.

This class implements a copy-on-write list using segment-based storage for efficiency. It’s particularly useful for large objects where only small parts are modified, avoiding the need to copy the entire object.

The data is stored in segments represented as tuples: (start, end, data), where start and end are the start and end indices of the part taken from the data list.

Parameters:
  • r (Client or redis.Redis) – Connection to the server.

  • origin (List) – The original List object to create segments from.

  • key (str) – The key in the server to store the segments data.

Raises:

TypeError – If origin is not a List object.

Examples

>>> client = znsocket.Client("http://localhost:5000")
>>> original_list = znsocket.List(client, "original")
>>> original_list.extend([1, 2, 3, 4, 5])
>>> segments = znsocket.Segments(client, original_list, "segments")
>>> len(segments)
5
>>> segments[0]
1
classmethod from_list(origin: List, key: str) Segments[source]

Create a Segments object from a list.

Parameters:
  • origin (List) – The original List object to create segments from.

  • key (str) – The key in the server to store the segments data.

Returns:

A new Segments object based on the original list.

Return type:

Segments

Raises:

TypeError – If origin is not a List object.

get_raw() Any[source]

Get the raw data of the segments list.

Returns:

A list of segments, where each segment is a tuple of (start, end, data).

Return type:

list

insert(index: int, value: Any) None[source]

Insert an item in the segments list.

property key: str

The key in the server to store the data from this segments object.

Returns:

The prefixed key used to store this segments object in the server.

Return type:

str

Functions

znsocket.attach_events(sio: Server, namespace: str = '/znsocket', storage=None) None[source]

Attach event handlers to a socket.io server.

This function sets up all the event handlers needed for the znsocket server to respond to client requests. It handles Redis-compatible operations, pipeline commands, and adapter functionality.

Parameters:
  • sio (socketio.Server) – The socket.io server instance to attach events to.

  • namespace (str, optional) – The namespace to attach events to. Default is “/znsocket”.

  • storage (Storage, optional) – The storage backend to use. If None, a new Storage instance is created.

Returns:

The socket.io server instance with events attached.

Return type:

socketio.Server

Examples

>>> sio = socketio.Server()
>>> attach_events(sio)
>>> # Now sio can handle znsocket events