Source code for pyrakoon.protocol

# This file is part of Pyrakoon, a distributed key-value store client.
#
# Copyright (C) 2010 Incubaid BVBA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

'''Arakoon protocol implementation'''

import struct
import inspect
import operator
import itertools

try:
    import cStringIO as StringIO
except ImportError:
    import StringIO

from pyrakoon import utils

# Result codes
RESULT_SUCCESS = 0x0000
'''Success return code''' #pylint: disable=W0105

PROTOCOL_VERSION = 0x00000001
'''Protocol version''' #pylint: disable=W0105


# Wrappers for serialization communication
[docs]class Request(object): #pylint: disable=R0903 '''Wrapper for data requests generated by :meth:`Type.receive`''' __slots__ = '_count', def __init__(self, count): self._count = count def __repr__(self): return 'Request(%d)' % self.count count = property(operator.attrgetter('_count'), doc='Number of requested bytes')
[docs]class Result(object): #pylint: disable=R0903 '''Wrapper for value results generated by :meth:`Type.receive`''' __slots__ = '_value', def __init__(self, value): self._value = value def __repr__(self): return 'Result(%r)' % self.value value = property(operator.attrgetter('_value'), doc='Result value') # Type definitions
[docs]class Type(object): '''Base type for Arakoon serializable types''' PACKER = None ''' :class:`~struct.Struct` instance used by default :meth:`serialize` and :meth:`receive` implementations :type: :class:`struct.Struct` ''' #pylint: disable=W0105
[docs] def check(self, value): '''Check whether a value is valid for this type :param value: Value to test :type value: :obj:`object` :return: Whether the value is valid for this type :rtype: :class:`bool` ''' raise NotImplementedError
[docs] def serialize(self, value): '''Serialize value :param value: Value to serialize :type value: :obj:`object` :return: Iterable of bytes of the serialized value :rtype: iterable of :class:`str` ''' if not self.PACKER: raise NotImplementedError yield self.PACKER.pack(value)
[docs] def receive(self): '''Receive and parse a result from the server This method is a coroutine which yields :class:`Request` instances, and finally a :class:`Result`. When a :class:`Request` instance is yielded, the number of bytes as specified in the :attr:`~Request.count` attribute should be sent back. If finally a :class:`Result` instance is yield, its :attr:`~Result.value` attribute contains the actual message result. :see: :meth:`Message.receive` ''' if not self.PACKER: raise NotImplementedError data = yield Request(self.PACKER.size) result, = self.PACKER.unpack(data) yield Result(result)
[docs]class String(Type): '''String type'''
[docs] def check(self, value): if not isinstance(value, str): raise TypeError
[docs] def serialize(self, value): length = len(value) for bytes_ in UINT32.serialize(length): yield bytes_ yield struct.pack('<%ds' % length, value)
[docs] def receive(self): length_receiver = UINT32.receive() request = length_receiver.next() #pylint: disable=E1101 while isinstance(request, Request): value = yield request request = length_receiver.send(value) #pylint: disable=E1101 if not isinstance(request, Result): raise TypeError length = request.value if length == 0: result = '' else: data = yield Request(length) result, = struct.unpack('<%ds' % length, data) yield Result(result)
STRING = String()
[docs]class UnsignedInteger(Type): '''Unsigned integer type''' def __init__(self, bits, pack): '''Initialize an unsigned integer type :param bits: Bits containing the value :type bits: :class:`int` :param pack: Struct type, passed to `struct.Struct` :type pack: :class:`str` ''' super(UnsignedInteger, self).__init__() self.MAX_INT = (2 ** bits) - 1 #pylint: disable=C0103 self.PACKER = struct.Struct(pack) #pylint: disable=C0103
[docs] def check(self, value): if not isinstance(value, (int, long)): raise TypeError if value < 0: raise ValueError('Unsigned integer expected') if value > self.MAX_INT: raise ValueError('Integer overflow')
UINT32 = UnsignedInteger(32, '<I') UINT64 = UnsignedInteger(64, '<Q')
[docs]class SignedInteger(Type): '''Signed integer type''' def __init__(self, bits, pack): '''Initialize an unsigned integer type :param bits: Bits containing the value :type bits: :class:`int` :param pack: Struct type, passed to `struct.Struct` :type pack: :class:`str` ''' super(SignedInteger, self).__init__() self.MAX_INT = ((2 ** bits) / 2) - 1 #pylint: disable=C0103 self.PACKER = struct.Struct(pack) #pylint: disable=C0103
[docs] def check(self, value): if not isinstance(value, (int, long)): raise TypeError if abs(value) > self.MAX_INT: raise ValueError('Integer overflow')
INT32 = SignedInteger(32, '<i') INT64 = SignedInteger(64, '<q')
[docs]class Float(Type): '''Float type''' PACKER = struct.Struct('<d')
[docs] def check(self, value): if not isinstance(value, float): raise TypeError
FLOAT = Float()
[docs]class Bool(Type): '''Bool type''' PACKER = struct.Struct('<c') TRUE = chr(1) FALSE = chr(0)
[docs] def check(self, value): if not isinstance(value, bool): raise TypeError
[docs] def serialize(self, value): if value: yield self.PACKER.pack(self.TRUE) else: yield self.PACKER.pack(self.FALSE)
[docs] def receive(self): value_receiver = super(Bool, self).receive() request = value_receiver.next() #pylint: disable=E1101 while isinstance(request, Request): value = yield request request = value_receiver.send(value) #pylint: disable=E1101 if not isinstance(request, Result): raise TypeError value = request.value if value == self.TRUE: yield Result(True) elif value == self.FALSE: yield Result(False) else: raise ValueError('Unexpected bool value "0x%02x"' % ord(value))
BOOL = Bool()
[docs]class Unit(Type): #pylint: disable=R0921 '''Unit type'''
[docs] def check(self, value): raise NotImplementedError('Unit can\'t be checked')
[docs] def serialize(self, value): raise NotImplementedError('Unit can\'t be serialized')
[docs] def receive(self): yield Result(None)
UNIT = Unit()
[docs]class Step(Type): '''Step type'''
[docs] def check(self, value): from pyrakoon import sequence if not isinstance(value, sequence.Step): raise TypeError if isinstance(value, sequence.Sequence): for step in value.steps: if not isinstance(step, sequence.Step): raise TypeError if isinstance(step, sequence.Sequence): self.check(step)
[docs] def serialize(self, value): for part in value.serialize(): yield part
[docs] def receive(self): raise NotImplementedError('Steps can\'t be received')
STEP = Step()
[docs]class Option(Type): '''Option type''' def __init__(self, inner_type): super(Option, self).__init__() self._inner_type = inner_type
[docs] def check(self, value): if value is None: return self._inner_type.check(value)
[docs] def serialize(self, value): if value is None: for bytes_ in BOOL.serialize(False): yield bytes_ else: for bytes_ in BOOL.serialize(True): yield bytes_ for bytes_ in self._inner_type.serialize(value): yield bytes_
[docs] def receive(self): has_value_receiver = BOOL.receive() request = has_value_receiver.next() #pylint: disable=E1101 while isinstance(request, Request): value = yield request request = has_value_receiver.send(value) #pylint: disable=E1101 if not isinstance(request, Result): raise TypeError has_value = request.value if not has_value: yield Result(None) else: receiver = self._inner_type.receive() request = receiver.next() #pylint: disable=E1101 while isinstance(request, Request): value = yield request request = receiver.send(value) #pylint: disable=E1101 if not isinstance(request, Result): raise TypeError yield Result(request.value)
[docs]class List(Type): '''List type''' def __init__(self, inner_type): super(List, self).__init__() self._inner_type = inner_type
[docs] def check(self, value): # Get rid of the usual suspects if isinstance(value, (str, unicode, )): raise TypeError values = tuple(value) for value in values: self._inner_type.check(value)
[docs] def serialize(self, value): values = tuple(value) for bytes_ in UINT32.serialize(len(values)): yield bytes_ for value in values: for bytes_ in self._inner_type.serialize(value): yield bytes_
[docs] def receive(self): count_receiver = UINT32.receive() request = count_receiver.next() #pylint: disable=E1101 while isinstance(request, Request): value = yield request request = count_receiver.send(value) #pylint: disable=E1101 if not isinstance(request, Result): raise TypeError count = request.value values = [None] * count for idx in xrange(count - 1, -1, -1): receiver = self._inner_type.receive() request = receiver.next() #pylint: disable=E1101 while isinstance(request, Request): value = yield request request = receiver.send(value) #pylint: disable=E1101 if not isinstance(request, Result): raise TypeError value = request.value # Note: can't 'yield' value, otherwise we might not read all values # from the stream, and leave it in an unclean state values[idx] = value yield Result(values)
[docs]class Array(Type): '''Array type''' def __init__(self, inner_type): super(Array, self).__init__() self._inner_type = inner_type
[docs] def check(self, value): raise NotImplementedError('Arrays can\'t be checked')
[docs] def serialize(self, value): raise NotImplementedError('Arrays can\'t be serialized')
[docs] def receive(self): count_receiver = UINT32.receive() request = count_receiver.next() #pylint: disable=E1101 while isinstance(request, Request): value = yield request request = count_receiver.send(value) #pylint: disable=E1101 if not isinstance(request, Result): raise TypeError count = request.value values = [None] * count for idx in xrange(0, count): receiver = self._inner_type.receive() request = receiver.next() #pylint: disable=E1101 while isinstance(request, Request): value = yield request request = receiver.send(value) #pylint: disable=E1101 if not isinstance(request, Result): raise TypeError value = request.value # Note: can't 'yield' value, otherwise we might not read all values # from the stream, and leave it in an unclean state values[idx] = value yield Result(values)
[docs]class Product(Type): '''Product type''' def __init__(self, *inner_types): super(Product, self).__init__() self._inner_types = tuple(inner_types)
[docs] def check(self, value): # Get rid of the usual suspects if isinstance(value, (str, unicode, )): raise TypeError values = tuple(value) if len(values) != len(self._inner_types): raise ValueError for type_, value_ in zip(self._inner_types, values): type_.check(value_)
[docs] def serialize(self, value): values = tuple(value) for type_, value_ in zip(self._inner_types, values): for bytes_ in type_.serialize(value_): yield bytes_
[docs] def receive(self): values = [] for type_ in self._inner_types: receiver = type_.receive() request = receiver.next() #pylint: disable=E1101 while isinstance(request, Request): value = yield request request = receiver.send(value) #pylint: disable=E1101 if not isinstance(request, Result): raise TypeError value = request.value values.append(value) yield Result(tuple(values))
[docs]class StatisticsType(Type): '''Statistics type''' #pylint: disable=R0912
[docs] def check(self, value): raise NotImplementedError('Statistics can\'t be checked')
[docs] def serialize(self, value): raise NotImplementedError('Statistics can\'t be serialized')
[docs] def receive(self): buffer_receiver = STRING.receive() request = buffer_receiver.next() #pylint: disable=E1101 while isinstance(request, Request): value = yield request request = buffer_receiver.send(value) #pylint: disable=E1101 if not isinstance(request, Result): raise TypeError read = StringIO.StringIO(request.value).read class NamedField(Type): '''NamedField type''' FIELD_TYPE_INT = 1 FIELD_TYPE_INT64 = 2 FIELD_TYPE_FLOAT = 3 FIELD_TYPE_STRING = 4 FIELD_TYPE_LIST = 5 def check(self, value): raise NotImplementedError('NamedFields can\'t be checked') def serialize(self, value): raise NotImplementedError('NamedFields can\'t be serialized') @classmethod def receive(cls): type_receiver = INT32.receive() request = type_receiver.next() #pylint: disable=E1101 while isinstance(request, Request): value = yield request #pylint: disable=E1101 request = type_receiver.send(value) if not isinstance(request, Result): raise TypeError type_ = request.value name_receiver = STRING.receive() request = name_receiver.next() #pylint: disable=E1101 while isinstance(request, Request): value = yield request #pylint: disable=E1101 request = name_receiver.send(value) if not isinstance(request, Result): raise TypeError name = request.value if type_ == cls.FIELD_TYPE_INT: value_receiver = INT32.receive() elif type_ == cls.FIELD_TYPE_INT64: value_receiver = INT64.receive() elif type_ == cls.FIELD_TYPE_FLOAT: value_receiver = FLOAT.receive() elif type_ == cls.FIELD_TYPE_STRING: value_receiver = STRING.receive() elif type_ == cls.FIELD_TYPE_LIST: value_receiver = List(NamedField).receive() else: raise ValueError('Unknown named field type %d' % type_) request = value_receiver.next() #pylint: disable=E1103 while isinstance(request, Request): value = yield request #pylint: disable=E1103 request = value_receiver.send(value) if not isinstance(request, Result): raise TypeError value = request.value if type_ == cls.FIELD_TYPE_LIST: result = dict() map(result.update, value) #pylint: disable=W0141 value = result yield Result({name: value}) result = utils.read_blocking(NamedField.receive(), read) if 'arakoon_stats' not in result: raise ValueError('Missing expected \'arakoon_stats\' value') yield Result(result['arakoon_stats'])
STATISTICS = StatisticsType() # Protocol message definitions ALLOW_DIRTY_ARG = ('allow_dirty', BOOL, False) '''Well-known `allow_dirty` argument''' #pylint: disable=W0105
[docs]class Message(object): '''Base type for Arakoon command messages''' MASK = 0xb1ff0000 '''Generic command mask value''' #pylint: disable=W0105 TAG = None '''Tag (code) of the command''' #pylint: disable=W0105 ARGS = None '''Arguments required for the command''' #pylint: disable=W0105 RETURN_TYPE = None '''Return type of the command''' #pylint: disable=W0105 DOC = None '''Docstring for methods exposing this command''' #pylint: disable=W0105 _tag_bytes = None '''Serialized representation of :attr:`TAG`''' #pylint: disable=W0105
[docs] def serialize(self): '''Serialize the command :return: Iterable of bytes of the serialized version of the command :rtype: iterable of :class:`str` ''' if self._tag_bytes is None: self._tag_bytes = ''.join(UINT32.serialize(self.TAG)) yield self._tag_bytes for arg in self.ARGS: if len(arg) == 2: name, type_ = arg elif len(arg) == 3: name, type_, _ = arg else: raise ValueError for bytes_ in type_.serialize(getattr(self, name)): yield bytes_
[docs] def receive(self): '''Read and deserialize the return value of the command Running as a coroutine, this method can read and parse the server result value once this command has been submitted. This method yields values of type :class:`Request` to request more data (which should then be injected using the :meth:`send` method of the coroutine). The number of requested bytes is provided in the :attr:`~Request.count` attribute of the :class:`Request` object. Finally a :class:`Result` value is generated, which contains the server result in its :attr:`~Result.value` attribute. :raise ArakoonError: Server returned an error code :see: :func:`pyrakoon.utils.process_blocking` ''' from pyrakoon import errors code_receiver = UINT32.receive() request = code_receiver.next() #pylint: disable=E1101 while isinstance(request, Request): value = yield request request = code_receiver.send(value) #pylint: disable=E1101 if not isinstance(request, Result): raise TypeError code = request.value if code == RESULT_SUCCESS: result_receiver = self.RETURN_TYPE.receive() else: # Error result_receiver = STRING.receive() request = result_receiver.next() #pylint: disable=E1103 while isinstance(request, Request): value = yield request request = result_receiver.send(value) #pylint: disable=E1103 if not isinstance(request, Result): raise TypeError result = request.value if code == RESULT_SUCCESS: yield Result(result) else: if code in errors.ERROR_MAP: raise errors.ERROR_MAP[code](result) else: raise errors.ArakoonError( 'Unknown error code 0x%x, server said: %s' % \ (code, result))
[docs]class Hello(Message): '''"hello" message''' __slots__ = '_client_id', '_cluster_id', TAG = 0x0001 | Message.MASK ARGS = ('client_id', STRING), ('cluster_id', STRING), RETURN_TYPE = STRING DOC = utils.format_doc(''' Send a "hello" command to the server This method will return the string returned by the server when receiving a "hello" command. :param client_id: Identifier of the client :type client_id: :class:`str` :param cluster_id: Identifier of the cluster connecting to. \ This must match the cluster configuration. :type cluster_id: :class:`str` :return: Message returned by the server :rtype: :class:`str` ''') def __init__(self, client_id, cluster_id): super(Hello, self).__init__() self._client_id = client_id self._cluster_id = cluster_id client_id = property(operator.attrgetter('_client_id')) cluster_id = property(operator.attrgetter('_cluster_id'))
[docs]class WhoMaster(Message): '''"who_master" message''' __slots__ = () TAG = 0x0002 | Message.MASK ARGS = () RETURN_TYPE = Option(STRING) DOC = utils.format_doc(''' Send a "who_master" command to the server This method returns the name of the current master node in the Arakoon cluster. :return: Name of cluster master node :rtype: :class:`str` ''')
[docs]class Exists(Message): '''"exists" message''' __slots__ = '_allow_dirty', '_key', TAG = 0x0007 | Message.MASK ARGS = ALLOW_DIRTY_ARG, ('key', STRING), RETURN_TYPE = BOOL DOC = utils.format_doc(''' Send an "exists" command to the server This method returns a boolean which tells whether the given `key` is set on the server. :param key: Key to test :type key: :class:`str` :param allow_dirty: Allow reads from slave nodes :type allow_dirty: :class:`bool` :return: Whether the given key is set on the server :rtype: :class:`bool` ''') def __init__(self, allow_dirty, key): super(Exists, self).__init__() self._allow_dirty = allow_dirty self._key = key key = property(operator.attrgetter('_key')) allow_dirty = property(operator.attrgetter('_allow_dirty'))
[docs]class Get(Message): '''"get" message''' __slots__ = '_allow_dirty', '_key', TAG = 0x0008 | Message.MASK ARGS = ALLOW_DIRTY_ARG, ('key', STRING), RETURN_TYPE = STRING DOC = utils.format_doc(''' Send a "get" command to the server This method returns the value of the requested key. :param key: Key to retrieve :type key: :class:`str` :param allow_dirty: Allow reads from slave nodes :type allow_dirty: :class:`bool` :return: Value for the given key :rtype: :class:`str` ''') def __init__(self, allow_dirty, key): super(Get, self).__init__() self._allow_dirty = allow_dirty self._key = key allow_dirty = property(operator.attrgetter('_allow_dirty')) key = property(operator.attrgetter('_key'))
[docs]class Set(Message): '''"set" message''' __slots__ = '_key', '_value', TAG = 0x0009 | Message.MASK ARGS = ('key', STRING), ('value', STRING), RETURN_TYPE = UNIT DOC = utils.format_doc(''' Send a "set" command to the server This method sets a given key to a given value on the server. :param key: Key to set :type key: :class:`str` :param value: Value to set :type value: :class:`str` ''') def __init__(self, key, value): super(Set, self).__init__() self._key = key self._value = value key = property(operator.attrgetter('_key')) value = property(operator.attrgetter('_value'))
[docs]class Delete(Message): '''"delete" message''' __slots__ = '_key', TAG = 0x000a | Message.MASK ARGS = ('key', STRING), RETURN_TYPE = UNIT DOC = utils.format_doc(''' Send a "delete" command to the server This method deletes a given key from the cluster. :param key: Key to delete :type key: :class:`str` ''') def __init__(self, key): super(Delete, self).__init__() self._key = key key = property(operator.attrgetter('_key'))
[docs]class PrefixKeys(Message): '''"prefix_keys" message''' __slots__ = '_allow_dirty', '_prefix', '_max_elements', TAG = 0x000c | Message.MASK ARGS = ALLOW_DIRTY_ARG, ('prefix', STRING), ('max_elements', INT32, -1), RETURN_TYPE = List(STRING) DOC = utils.format_doc(''' Send a "prefix_keys" command to the server This method retrieves a list of keys from the cluster matching a given prefix. A maximum number of returned keys can be provided. If set to *-1* (the default), all matching keys will be returned. :param prefix: Prefix to match :type prefix: :class:`str` :param max_elements: Maximum number of keys to return :type max_elements: :class:`int` :param allow_dirty: Allow reads from slave nodes :type allow_dirty: :class:`bool` :return: Keys matching the given prefix :rtype: iterable of :class:`str` ''') def __init__(self, allow_dirty, prefix, max_elements): super(PrefixKeys, self).__init__() self._allow_dirty = allow_dirty self._prefix = prefix self._max_elements = max_elements allow_dirty = property(operator.attrgetter('_allow_dirty')) prefix = property(operator.attrgetter('_prefix')) max_elements = property(operator.attrgetter('_max_elements'))
[docs]class TestAndSet(Message): '''"test_and_set" message''' __slots__ = '_key', '_test_value', '_set_value', TAG = 0x000d | Message.MASK ARGS = ('key', STRING), ('test_value', Option(STRING)), \ ('set_value', Option(STRING)), RETURN_TYPE = Option(STRING) DOC = utils.format_doc(''' Send a "test_and_set" command to the server When `test_value` is not :data:`None`, the value for `key` will only be modified if the existing value on the server is equal to `test_value`. When `test_value` is :data:`None`, the `key` will only be set of there was no value set for the `key` before. When `set_value` is :data:`None`, the `key` will be deleted on the server. The original value for `key` is returned. :param key: Key to act on :type key: :class:`str` :param test_value: Expected value to test for :type test_value: :class:`str` or :data:`None` :param set_value: New value to set :type set_value: :class:`str` or :data:`None` :return: Original value of `key` :rtype: :class:`str` ''') def __init__(self, key, test_value, set_value): super(TestAndSet, self).__init__() self._key = key self._test_value = test_value self._set_value = set_value key = property(operator.attrgetter('_key')) test_value = property(operator.attrgetter('_test_value')) set_value = property(operator.attrgetter('_set_value'))
[docs]class Sequence(Message): '''"sequence" and "synced_sequence" message''' __slots__ = '_steps', '_sync', ARGS = ('steps', List(STEP)), ('sync', BOOL, False), RETURN_TYPE = UNIT DOC = utils.format_doc(''' Send a "sequence" or "synced_sequence" command to the server The operations passed to the constructor should be instances of implementations of the :class:`pyrakoon.sequence.Step` class. These operations will be executed in an all-or-nothing transaction. :param steps: Steps to execute :type steps: iterable of :class:`pyrakoon.sequence.Step` :param sync: Use *synced_sequence* :type sync: :class:`bool` ''') def __init__(self, steps, sync): from pyrakoon import sequence super(Sequence, self).__init__() #pylint: disable=W0142 if len(steps) == 1 and isinstance(steps[0], sequence.Sequence): self._sequence = steps[0] else: self._sequence = sequence.Sequence(steps) self._sync = sync sequence = property(operator.attrgetter('_sequence')) sync = property(operator.attrgetter('_sync'))
[docs] def serialize(self): tag = (0x0010 if not self.sync else 0x0024) | Message.MASK for bytes_ in UINT32.serialize(tag): yield bytes_ sequence_bytes = ''.join(self.sequence.serialize()) for bytes_ in STRING.serialize(sequence_bytes): yield bytes_
[docs]class Range(Message): '''"Range" message''' __slots__ = '_allow_dirty', '_begin_key', '_begin_inclusive', '_end_key', \ '_end_inclusive', '_max_elements', TAG = 0x000b | Message.MASK ARGS = ALLOW_DIRTY_ARG, \ ('begin_key', Option(STRING)), ('begin_inclusive', BOOL), \ ('end_key', Option(STRING)), ('end_inclusive', BOOL), \ ('max_elements', INT32, -1), RETURN_TYPE = List(STRING) DOC = utils.format_doc(''' Send a "range" command to the server The operation will return a list of keys, in the range between `begin_key` and `end_key`. The `begin_inclusive` and `end_inclusive` flags denote whether the delimiters should be included. The `max_elements` flag can limit the number of returned keys. If it is negative, all matching keys are returned. :param begin_key: Begin of range :type begin_key: :class:`str` :param begin_inclusive: `begin_key` is in- or exclusive :type begin_inclusive: :class:`bool` :param end_key: End of range :type end_key: :class:`str` :param end_inclusive: `end_key` is in- or exclusive :param max_elements: Maximum number of keys to return :type max_elements: :class:`int` :param allow_dirty: Allow reads from slave nodes :type allow_dirty: :class:`bool` :return: List of matching keys :rtype: iterable of :class:`str` ''') #pylint: disable=R0913 def __init__(self, allow_dirty, begin_key, begin_inclusive, end_key, end_inclusive, max_elements): super(Range, self).__init__() self._allow_dirty = allow_dirty self._begin_key = begin_key self._begin_inclusive = begin_inclusive self._end_key = end_key self._end_inclusive = end_inclusive self._max_elements = max_elements allow_dirty = property(operator.attrgetter('_allow_dirty')) begin_key = property(operator.attrgetter('_begin_key')) begin_inclusive = property(operator.attrgetter('_begin_inclusive')) end_key = property(operator.attrgetter('_end_key')) end_inclusive = property(operator.attrgetter('_end_inclusive')) max_elements = property(operator.attrgetter('_max_elements'))
[docs]class RangeEntries(Message): '''"RangeEntries" message''' __slots__ = '_allow_dirty', '_begin_key', '_begin_inclusive', '_end_key', \ '_end_inclusive', '_max_elements', TAG = 0x000f | Message.MASK ARGS = ALLOW_DIRTY_ARG, \ ('begin_key', Option(STRING)), ('begin_inclusive', BOOL), \ ('end_key', Option(STRING)), ('end_inclusive', BOOL), \ ('max_elements', INT32, -1), RETURN_TYPE = List(Product(STRING, STRING)) DOC = utils.format_doc(''' Send a "range_entries" command to the server The operation will return a list of (key, value) tuples, for keys in the range between `begin_key` and `end_key`. The `begin_inclusive` and `end_inclusive` flags denote whether the delimiters should be included. The `max_elements` flag can limit the number of returned items. If it is negative, all matching items are returned. :param begin_key: Begin of range :type begin_key: :class:`str` :param begin_inclusive: `begin_key` is in- or exclusive :type begin_inclusive: :class:`bool` :param end_key: End of range :type end_key: :class:`str` :param end_inclusive: `end_key` is in- or exclusive :type end_inclusive: :class:`bool` :param max_elements: Maximum number of items to return :type max_elements: :class:`int` :param allow_dirty: Allow reads from slave nodes :type allow_dirty: :class:`bool` :return: List of matching (key, value) pairs :rtype: iterable of `(str, str)` ''') #pylint: disable=R0913 def __init__(self, allow_dirty, begin_key, begin_inclusive, end_key, end_inclusive, max_elements): super(RangeEntries, self).__init__() self._allow_dirty = allow_dirty self._begin_key = begin_key self._begin_inclusive = begin_inclusive self._end_key = end_key self._end_inclusive = end_inclusive self._max_elements = max_elements allow_dirty = property(operator.attrgetter('_allow_dirty')) begin_key = property(operator.attrgetter('_begin_key')) begin_inclusive = property(operator.attrgetter('_begin_inclusive')) end_key = property(operator.attrgetter('_end_key')) end_inclusive = property(operator.attrgetter('_end_inclusive')) max_elements = property(operator.attrgetter('_max_elements'))
[docs]class MultiGet(Message): '''"multi_get" message''' __slots__ = '_allow_dirty', '_keys', TAG = 0x0011 | Message.MASK ARGS = ALLOW_DIRTY_ARG, ('keys', List(STRING)), RETURN_TYPE = List(STRING) DOC = utils.format_doc(''' Send a "multi_get" command to the server This method returns a list of the values for all requested keys. :param keys: Keys to look up :type keys: iterable of :class:`str` :param allow_dirty: Allow reads from slave nodes :type allow_dirty: :class:`bool` :return: Requested values :rtype: iterable of :class:`str` ''') def __init__(self, allow_dirty, keys): super(MultiGet, self).__init__() self._allow_dirty = allow_dirty self._keys = keys allow_dirty = property(operator.attrgetter('_allow_dirty')) keys = property(operator.attrgetter('_keys'))
[docs]class MultiGetOption(Message): '''"multi_get_option" message''' __slots__ = '_allow_dirty', '_keys', TAG = 0x0031 | Message.MASK ARGS = ALLOW_DIRTY_ARG, ('keys', List(STRING)), RETURN_TYPE = Array(Option(STRING)) DOC = utils.format_doc(''' Send a "multi_get_option" command to the server This method returns a list of value options for all requested keys. :param keys: Keys to look up :type keys: iterable of :class:`str` :param allow_dirty: Allow reads from slave nodes :type allow_dirty: :class:`bool` :return: Requested values :rtype: iterable of (`str` or `None`) ''') def __init__(self, allow_dirty, keys): super(MultiGetOption, self).__init__() self._allow_dirty = allow_dirty self._keys = keys allow_dirty = property(operator.attrgetter('_allow_dirty')) keys = property(operator.attrgetter('_keys'))
[docs]class ExpectProgressPossible(Message): '''"expect_progress_possible" message''' __slots__ = () TAG = 0x0012 | Message.MASK ARGS = () RETURN_TYPE = BOOL DOC = utils.format_doc(''' Send a "expect_progress_possible" command to the server This method returns whether the master thinks progress is possible. :return: Whether the master thinks progress is possible :rtype: :class:`bool` ''')
[docs]class GetKeyCount(Message): '''"get_key_count" message''' __slots__ = () TAG = 0x001a | Message.MASK ARGS = () RETURN_TYPE = UINT64 DOC = utils.format_doc(''' Send a "get_key_count" command to the server This method returns the number of items stored in Arakoon. :return: Number of items stored in the database :rtype: :class:`int` ''')
[docs]class UserFunction(Message): '''"user_function" message''' __slots__ = '_function', '_arg', TAG = 0x0015 | Message.MASK ARGS = ('function', STRING), ('argument', Option(STRING)), RETURN_TYPE = Option(STRING) DOC = utils.format_doc(''' Send a "user_function" command to the server This method returns the result of the function invocation. :param function: Name of the user function to invoke :type function: :class:`str` :param argument: Argument to pass to the function :type argument: :class:`str` or :data:`None` :return: Result of the function invocation :rtype: :class:`str` or :data:`None` ''') def __init__(self, function, argument): super(UserFunction, self).__init__() self._function = function self._argument = argument function = property(operator.attrgetter('_function')) argument = property(operator.attrgetter('_argument'))
[docs]class Confirm(Message): '''"confirm" message''' __slots__ = '_key', '_value', TAG = 0x001c | Message.MASK ARGS = ('key', STRING), ('value', STRING), RETURN_TYPE = UNIT DOC = utils.format_doc(''' Send a "confirm" command to the server This method sets a given key to a given value on the server, unless the value bound to the key is already equal to the provided value, in which case the action becomes a no-op. :param key: Key to set :type key: :class:`str` :param value: Value to set :type value: :class:`str` ''') def __init__(self, key, value): super(Confirm, self).__init__() self._key = key self._value = value key = property(operator.attrgetter('_key')) value = property(operator.attrgetter('_value'))
[docs]class Assert(Message): '''"assert" message''' __slots__ = '_allow_dirty', '_key', '_value', TAG = 0x0016 | Message.MASK ARGS = ALLOW_DIRTY_ARG, ('key', STRING), ('value', Option(STRING)), RETURN_TYPE = UNIT DOC = utils.format_doc(''' Send an "assert" command to the server `assert key vo` throws an exception if the value associated with the key is not what was expected. :param key: Key to check :type key: :class:`str` :param value: Optional value to compare :type value: :class:`str` or :data:`None` :param allow_dirty: Allow reads from slave nodes :type allow_dirty: :class:`bool` ''') def __init__(self, allow_dirty, key, value): super(Assert, self).__init__() self._allow_dirty = allow_dirty self._key = key self._value = value allow_dirty = property(operator.attrgetter('_allow_dirty')) key = property(operator.attrgetter('_key')) value = property(operator.attrgetter('_value'))
[docs]class AssertExists(Message): '''"assert_exists" message''' __slots__ = '_allow_dirty', '_key', TAG = 0x0029 | Message.MASK ARGS = ALLOW_DIRTY_ARG, ('key', STRING), RETURN_TYPE = UNIT DOC = utils.format_doc(''' Send an "assert_exists" command to the server `assert_exists key` throws an exception if the key doesn't exist in the database. :param key: Key to check :type key: :class:`str` :param allow_dirty: Allow reads from slave nodes :type allow_dirty: :class:`bool` ''') def __init__(self, allow_dirty, key): super(AssertExists, self).__init__() self._allow_dirty = allow_dirty self._key = key allow_dirty = property(operator.attrgetter('_allow_dirty')) key = property(operator.attrgetter('_key'))
[docs]class RevRangeEntries(Message): '''"rev_range_entries" message''' __slots__ = '_allow_dirty', '_begin_key', '_begin_inclusive', '_end_key', \ '_end_inclusive', '_max_elements', TAG = 0x0023 | Message.MASK ARGS = ALLOW_DIRTY_ARG, \ ('begin_key', Option(STRING)), ('begin_inclusive', BOOL), \ ('end_key', Option(STRING)), ('end_inclusive', BOOL), \ ('max_elements', INT32, -1), RETURN_TYPE = List(Product(STRING, STRING)) DOC = utils.format_doc(''' Send a "rev_range_entries" command to the server The operation will return a list of (key, value) tuples, for keys in the reverse range between `begin_key` and `end_key`. The `begin_inclusive` and `end_inclusive` flags denote whether the delimiters should be included. The `max_elements` flag can limit the number of returned items. If it is negative, all matching items are returned. :param begin_key: Begin of range :type begin_key: :class:`str` :param begin_inclusive: `begin_key` is in- or exclusive :type begin_inclusive: :class:`bool` :param end_key: End of range :type end_key: :class:`str` :param end_inclusive: `end_key` is in- or exclusive :type end_inclusive: :class:`bool` :param max_elements: Maximum number of items to return :type max_elements: :class:`int` :param allow_dirty: Allow reads from slave nodes :type allow_dirty: :class:`bool` :return: List of matching (key, value) pairs :rtype: iterable of `(str, str)` ''') #pylint: disable=R0913 def __init__(self, allow_dirty, begin_key, begin_inclusive, end_key, end_inclusive, max_elements): super(RevRangeEntries, self).__init__() self._allow_dirty = allow_dirty self._begin_key = begin_key self._begin_inclusive = begin_inclusive self._end_key = end_key self._end_inclusive = end_inclusive self._max_elements = max_elements allow_dirty = property(operator.attrgetter('_allow_dirty')) begin_key = property(operator.attrgetter('_begin_key')) begin_inclusive = property(operator.attrgetter('_begin_inclusive')) end_key = property(operator.attrgetter('_end_key')) end_inclusive = property(operator.attrgetter('_end_inclusive')) max_elements = property(operator.attrgetter('_max_elements'))
[docs]class Statistics(Message): '''"statistics" message''' __slots__ = () TAG = 0x0013 | Message.MASK ARGS = () RETURN_TYPE = STATISTICS DOC = utils.format_doc(''' Send a "statistics" command to the server This method returns some server statistics. :return: Server statistics :rtype: `Statistics` ''')
[docs]class Version(Message): '''"version" message''' __slots__ = () TAG = 0x0028 | Message.MASK ARGS = () RETURN_TYPE = Product(INT32, INT32, INT32, STRING) DOC = utils.format_doc(''' Send a "version" command to the server This method returns the server version. :return: Server version :rtype: `(int, int, int, str)` ''')
[docs]class DeletePrefix(Message): '''"delete_prefix" message''' __slots__ = '_prefix', TAG = 0x0027 | Message.MASK ARGS = ('prefix', STRING), RETURN_TYPE = UINT32 DOC = utils.format_doc(''' Send a "delete_prefix" command to the server `delete_prefix prefix` will delete all key/value-pairs from the database where given `prefix` is a prefix of `key`. :param prefix: Prefix of binding keys to delete :type prefix: :class:`str` :return: Number of deleted bindings :rtype: :class:`int` ''') def __init__(self, prefix): super(DeletePrefix, self).__init__() self._prefix = prefix prefix = property(operator.attrgetter('_prefix'))
[docs]class Replace(Message): '''"replace" message''' __slots__ = '_key', '_value', TAG = 0x0033 | Message.MASK ARGS = ('key', STRING), ('value', Option(STRING)), RETURN_TYPE = Option(STRING) DOC = utils.format_doc(''' Send a "replace" command to the server `replace key value` will replace the value bound to the given key with the provided value, and return the old value bound to the key. If `value` is :data:`None`, the key is deleted. If the key was not present in the database, :data:`None` is returned. :param key: Key to replace :type key: :class:`str` :param value: Value to set :type value: :class:`str` or :data:`None` :return: Original value bound to the key :rtype: :class:`str` or :data:`None` ''') def __init__(self, key, value): self._key = key self._value = value key = property(operator.attrgetter('_key')) value = property(operator.attrgetter('_value'))
[docs]class Nop(Message): '''"nop" message''' __slots__ = () TAG = 0x0041 | Message.MASK ARGS = () RETURN_TYPE = UNIT DOC = utils.format_doc(''' Send a "nop" command to the server This enforces consensus throughout a cluster, but has no further effects. ''')
[docs]class GetCurrentState(Message): '''"get_current_state" message''' __slots__ = () TAG = 0x0032 | Message.MASK ARGS = () RETURN_TYPE = STRING DOC = utils.format_doc(''' Send a "get_current_state" command to the server This call returns a string representing the current state of the node, and can be used for troubleshooting purposes. :return: State of the server :rtype: :class:`str` ''')
def sanity_check(): '''Sanity check for some invariants on types defined in this module''' for (_name, value) in globals().iteritems(): if inspect.isclass(value) \ and getattr(value, '__module__', None) == __name__: # A `Message` which has `ALLOW_DIRTY_ARG` in its `ARGS` must have # an `allow_dirty` attribute, the constructor must take such # argument, and if `__slots__` is defined, there should be an # `_allow_dirty` field if issubclass(value, Message): if ALLOW_DIRTY_ARG in (value.ARGS or []): #pylint: disable=C0325 assert hasattr(value, 'allow_dirty') argspec = inspect.getargspec(value.__init__) assert 'allow_dirty' in argspec.args if hasattr(value, '__slots__'): assert '_allow_dirty' in value.__slots__ sanity_check() del sanity_check
[docs]def build_prologue(cluster): '''Return the string to send as prologue :param cluster: Name of the cluster to which a connection is made :type cluster: :class:`str` :return: Prologue to send to the Arakoon server :rtype: :class:`str` ''' return ''.join(itertools.chain( UINT32.serialize(Message.MASK), UINT32.serialize(PROTOCOL_VERSION), STRING.serialize(cluster)))