# This file is part of Pyrakoon, a distributed key-value store client.
#
# Copyright (C) 2010 Incubaid BVBA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
'''Arakoon protocol implementation'''
import struct
import inspect
import operator
import itertools
try:
import cStringIO as StringIO
except ImportError:
import StringIO
from pyrakoon import utils
# Result codes
RESULT_SUCCESS = 0x0000
'''Success return code''' #pylint: disable=W0105
PROTOCOL_VERSION = 0x00000001
'''Protocol version''' #pylint: disable=W0105
# Wrappers for serialization communication
[docs]class Request(object): #pylint: disable=R0903
'''Wrapper for data requests generated by :meth:`Type.receive`'''
__slots__ = '_count',
def __init__(self, count):
self._count = count
def __repr__(self):
return 'Request(%d)' % self.count
count = property(operator.attrgetter('_count'),
doc='Number of requested bytes')
[docs]class Result(object): #pylint: disable=R0903
'''Wrapper for value results generated by :meth:`Type.receive`'''
__slots__ = '_value',
def __init__(self, value):
self._value = value
def __repr__(self):
return 'Result(%r)' % self.value
value = property(operator.attrgetter('_value'),
doc='Result value')
# Type definitions
[docs]class Type(object):
'''Base type for Arakoon serializable types'''
PACKER = None
'''
:class:`~struct.Struct` instance used by default :meth:`serialize` and
:meth:`receive` implementations
:type: :class:`struct.Struct`
''' #pylint: disable=W0105
[docs] def check(self, value):
'''Check whether a value is valid for this type
:param value: Value to test
:type value: :obj:`object`
:return: Whether the value is valid for this type
:rtype: :class:`bool`
'''
raise NotImplementedError
[docs] def serialize(self, value):
'''Serialize value
:param value: Value to serialize
:type value: :obj:`object`
:return: Iterable of bytes of the serialized value
:rtype: iterable of :class:`str`
'''
if not self.PACKER:
raise NotImplementedError
yield self.PACKER.pack(value)
[docs] def receive(self):
'''Receive and parse a result from the server
This method is a coroutine which yields :class:`Request` instances, and
finally a :class:`Result`. When a :class:`Request` instance is yielded,
the number of bytes as specified in the :attr:`~Request.count`
attribute should be sent back.
If finally a :class:`Result` instance is yield, its
:attr:`~Result.value` attribute contains the actual message result.
:see: :meth:`Message.receive`
'''
if not self.PACKER:
raise NotImplementedError
data = yield Request(self.PACKER.size)
result, = self.PACKER.unpack(data)
yield Result(result)
[docs]class String(Type):
'''String type'''
[docs] def check(self, value):
if not isinstance(value, str):
raise TypeError
[docs] def serialize(self, value):
length = len(value)
for bytes_ in UINT32.serialize(length):
yield bytes_
yield struct.pack('<%ds' % length, value)
[docs] def receive(self):
length_receiver = UINT32.receive()
request = length_receiver.next() #pylint: disable=E1101
while isinstance(request, Request):
value = yield request
request = length_receiver.send(value) #pylint: disable=E1101
if not isinstance(request, Result):
raise TypeError
length = request.value
if length == 0:
result = ''
else:
data = yield Request(length)
result, = struct.unpack('<%ds' % length, data)
yield Result(result)
STRING = String()
[docs]class UnsignedInteger(Type):
'''Unsigned integer type'''
def __init__(self, bits, pack):
'''Initialize an unsigned integer type
:param bits: Bits containing the value
:type bits: :class:`int`
:param pack: Struct type, passed to `struct.Struct`
:type pack: :class:`str`
'''
super(UnsignedInteger, self).__init__()
self.MAX_INT = (2 ** bits) - 1 #pylint: disable=C0103
self.PACKER = struct.Struct(pack) #pylint: disable=C0103
[docs] def check(self, value):
if not isinstance(value, (int, long)):
raise TypeError
if value < 0:
raise ValueError('Unsigned integer expected')
if value > self.MAX_INT:
raise ValueError('Integer overflow')
UINT32 = UnsignedInteger(32, '<I')
UINT64 = UnsignedInteger(64, '<Q')
[docs]class SignedInteger(Type):
'''Signed integer type'''
def __init__(self, bits, pack):
'''Initialize an unsigned integer type
:param bits: Bits containing the value
:type bits: :class:`int`
:param pack: Struct type, passed to `struct.Struct`
:type pack: :class:`str`
'''
super(SignedInteger, self).__init__()
self.MAX_INT = ((2 ** bits) / 2) - 1 #pylint: disable=C0103
self.PACKER = struct.Struct(pack) #pylint: disable=C0103
[docs] def check(self, value):
if not isinstance(value, (int, long)):
raise TypeError
if abs(value) > self.MAX_INT:
raise ValueError('Integer overflow')
INT32 = SignedInteger(32, '<i')
INT64 = SignedInteger(64, '<q')
[docs]class Float(Type):
'''Float type'''
PACKER = struct.Struct('<d')
[docs] def check(self, value):
if not isinstance(value, float):
raise TypeError
FLOAT = Float()
[docs]class Bool(Type):
'''Bool type'''
PACKER = struct.Struct('<c')
TRUE = chr(1)
FALSE = chr(0)
[docs] def check(self, value):
if not isinstance(value, bool):
raise TypeError
[docs] def serialize(self, value):
if value:
yield self.PACKER.pack(self.TRUE)
else:
yield self.PACKER.pack(self.FALSE)
[docs] def receive(self):
value_receiver = super(Bool, self).receive()
request = value_receiver.next() #pylint: disable=E1101
while isinstance(request, Request):
value = yield request
request = value_receiver.send(value) #pylint: disable=E1101
if not isinstance(request, Result):
raise TypeError
value = request.value
if value == self.TRUE:
yield Result(True)
elif value == self.FALSE:
yield Result(False)
else:
raise ValueError('Unexpected bool value "0x%02x"' % ord(value))
BOOL = Bool()
[docs]class Unit(Type): #pylint: disable=R0921
'''Unit type'''
[docs] def check(self, value):
raise NotImplementedError('Unit can\'t be checked')
[docs] def serialize(self, value):
raise NotImplementedError('Unit can\'t be serialized')
[docs] def receive(self):
yield Result(None)
UNIT = Unit()
[docs]class Step(Type):
'''Step type'''
[docs] def check(self, value):
from pyrakoon import sequence
if not isinstance(value, sequence.Step):
raise TypeError
if isinstance(value, sequence.Sequence):
for step in value.steps:
if not isinstance(step, sequence.Step):
raise TypeError
if isinstance(step, sequence.Sequence):
self.check(step)
[docs] def serialize(self, value):
for part in value.serialize():
yield part
[docs] def receive(self):
raise NotImplementedError('Steps can\'t be received')
STEP = Step()
[docs]class Option(Type):
'''Option type'''
def __init__(self, inner_type):
super(Option, self).__init__()
self._inner_type = inner_type
[docs] def check(self, value):
if value is None:
return
self._inner_type.check(value)
[docs] def serialize(self, value):
if value is None:
for bytes_ in BOOL.serialize(False):
yield bytes_
else:
for bytes_ in BOOL.serialize(True):
yield bytes_
for bytes_ in self._inner_type.serialize(value):
yield bytes_
[docs] def receive(self):
has_value_receiver = BOOL.receive()
request = has_value_receiver.next() #pylint: disable=E1101
while isinstance(request, Request):
value = yield request
request = has_value_receiver.send(value) #pylint: disable=E1101
if not isinstance(request, Result):
raise TypeError
has_value = request.value
if not has_value:
yield Result(None)
else:
receiver = self._inner_type.receive()
request = receiver.next() #pylint: disable=E1101
while isinstance(request, Request):
value = yield request
request = receiver.send(value) #pylint: disable=E1101
if not isinstance(request, Result):
raise TypeError
yield Result(request.value)
[docs]class List(Type):
'''List type'''
def __init__(self, inner_type):
super(List, self).__init__()
self._inner_type = inner_type
[docs] def check(self, value):
# Get rid of the usual suspects
if isinstance(value, (str, unicode, )):
raise TypeError
values = tuple(value)
for value in values:
self._inner_type.check(value)
[docs] def serialize(self, value):
values = tuple(value)
for bytes_ in UINT32.serialize(len(values)):
yield bytes_
for value in values:
for bytes_ in self._inner_type.serialize(value):
yield bytes_
[docs] def receive(self):
count_receiver = UINT32.receive()
request = count_receiver.next() #pylint: disable=E1101
while isinstance(request, Request):
value = yield request
request = count_receiver.send(value) #pylint: disable=E1101
if not isinstance(request, Result):
raise TypeError
count = request.value
values = [None] * count
for idx in xrange(count - 1, -1, -1):
receiver = self._inner_type.receive()
request = receiver.next() #pylint: disable=E1101
while isinstance(request, Request):
value = yield request
request = receiver.send(value) #pylint: disable=E1101
if not isinstance(request, Result):
raise TypeError
value = request.value
# Note: can't 'yield' value, otherwise we might not read all values
# from the stream, and leave it in an unclean state
values[idx] = value
yield Result(values)
[docs]class Array(Type):
'''Array type'''
def __init__(self, inner_type):
super(Array, self).__init__()
self._inner_type = inner_type
[docs] def check(self, value):
raise NotImplementedError('Arrays can\'t be checked')
[docs] def serialize(self, value):
raise NotImplementedError('Arrays can\'t be serialized')
[docs] def receive(self):
count_receiver = UINT32.receive()
request = count_receiver.next() #pylint: disable=E1101
while isinstance(request, Request):
value = yield request
request = count_receiver.send(value) #pylint: disable=E1101
if not isinstance(request, Result):
raise TypeError
count = request.value
values = [None] * count
for idx in xrange(0, count):
receiver = self._inner_type.receive()
request = receiver.next() #pylint: disable=E1101
while isinstance(request, Request):
value = yield request
request = receiver.send(value) #pylint: disable=E1101
if not isinstance(request, Result):
raise TypeError
value = request.value
# Note: can't 'yield' value, otherwise we might not read all values
# from the stream, and leave it in an unclean state
values[idx] = value
yield Result(values)
[docs]class Product(Type):
'''Product type'''
def __init__(self, *inner_types):
super(Product, self).__init__()
self._inner_types = tuple(inner_types)
[docs] def check(self, value):
# Get rid of the usual suspects
if isinstance(value, (str, unicode, )):
raise TypeError
values = tuple(value)
if len(values) != len(self._inner_types):
raise ValueError
for type_, value_ in zip(self._inner_types, values):
type_.check(value_)
[docs] def serialize(self, value):
values = tuple(value)
for type_, value_ in zip(self._inner_types, values):
for bytes_ in type_.serialize(value_):
yield bytes_
[docs] def receive(self):
values = []
for type_ in self._inner_types:
receiver = type_.receive()
request = receiver.next() #pylint: disable=E1101
while isinstance(request, Request):
value = yield request
request = receiver.send(value) #pylint: disable=E1101
if not isinstance(request, Result):
raise TypeError
value = request.value
values.append(value)
yield Result(tuple(values))
[docs]class StatisticsType(Type):
'''Statistics type'''
#pylint: disable=R0912
[docs] def check(self, value):
raise NotImplementedError('Statistics can\'t be checked')
[docs] def serialize(self, value):
raise NotImplementedError('Statistics can\'t be serialized')
[docs] def receive(self):
buffer_receiver = STRING.receive()
request = buffer_receiver.next() #pylint: disable=E1101
while isinstance(request, Request):
value = yield request
request = buffer_receiver.send(value) #pylint: disable=E1101
if not isinstance(request, Result):
raise TypeError
read = StringIO.StringIO(request.value).read
class NamedField(Type):
'''NamedField type'''
FIELD_TYPE_INT = 1
FIELD_TYPE_INT64 = 2
FIELD_TYPE_FLOAT = 3
FIELD_TYPE_STRING = 4
FIELD_TYPE_LIST = 5
def check(self, value):
raise NotImplementedError('NamedFields can\'t be checked')
def serialize(self, value):
raise NotImplementedError('NamedFields can\'t be serialized')
@classmethod
def receive(cls):
type_receiver = INT32.receive()
request = type_receiver.next() #pylint: disable=E1101
while isinstance(request, Request):
value = yield request
#pylint: disable=E1101
request = type_receiver.send(value)
if not isinstance(request, Result):
raise TypeError
type_ = request.value
name_receiver = STRING.receive()
request = name_receiver.next() #pylint: disable=E1101
while isinstance(request, Request):
value = yield request
#pylint: disable=E1101
request = name_receiver.send(value)
if not isinstance(request, Result):
raise TypeError
name = request.value
if type_ == cls.FIELD_TYPE_INT:
value_receiver = INT32.receive()
elif type_ == cls.FIELD_TYPE_INT64:
value_receiver = INT64.receive()
elif type_ == cls.FIELD_TYPE_FLOAT:
value_receiver = FLOAT.receive()
elif type_ == cls.FIELD_TYPE_STRING:
value_receiver = STRING.receive()
elif type_ == cls.FIELD_TYPE_LIST:
value_receiver = List(NamedField).receive()
else:
raise ValueError('Unknown named field type %d' % type_)
request = value_receiver.next() #pylint: disable=E1103
while isinstance(request, Request):
value = yield request
#pylint: disable=E1103
request = value_receiver.send(value)
if not isinstance(request, Result):
raise TypeError
value = request.value
if type_ == cls.FIELD_TYPE_LIST:
result = dict()
map(result.update, value) #pylint: disable=W0141
value = result
yield Result({name: value})
result = utils.read_blocking(NamedField.receive(), read)
if 'arakoon_stats' not in result:
raise ValueError('Missing expected \'arakoon_stats\' value')
yield Result(result['arakoon_stats'])
STATISTICS = StatisticsType()
# Protocol message definitions
ALLOW_DIRTY_ARG = ('allow_dirty', BOOL, False)
'''Well-known `allow_dirty` argument''' #pylint: disable=W0105
[docs]class Message(object):
'''Base type for Arakoon command messages'''
MASK = 0xb1ff0000
'''Generic command mask value''' #pylint: disable=W0105
TAG = None
'''Tag (code) of the command''' #pylint: disable=W0105
ARGS = None
'''Arguments required for the command''' #pylint: disable=W0105
RETURN_TYPE = None
'''Return type of the command''' #pylint: disable=W0105
DOC = None
'''Docstring for methods exposing this command''' #pylint: disable=W0105
_tag_bytes = None
'''Serialized representation of :attr:`TAG`''' #pylint: disable=W0105
[docs] def serialize(self):
'''Serialize the command
:return: Iterable of bytes of the serialized version of the command
:rtype: iterable of :class:`str`
'''
if self._tag_bytes is None:
self._tag_bytes = ''.join(UINT32.serialize(self.TAG))
yield self._tag_bytes
for arg in self.ARGS:
if len(arg) == 2:
name, type_ = arg
elif len(arg) == 3:
name, type_, _ = arg
else:
raise ValueError
for bytes_ in type_.serialize(getattr(self, name)):
yield bytes_
[docs] def receive(self):
'''Read and deserialize the return value of the command
Running as a coroutine, this method can read and parse the server
result value once this command has been submitted.
This method yields values of type :class:`Request` to request more data
(which should then be injected using the :meth:`send` method of the
coroutine). The number of requested bytes is provided in the
:attr:`~Request.count` attribute of the :class:`Request` object.
Finally a :class:`Result` value is generated, which contains the server
result in its :attr:`~Result.value` attribute.
:raise ArakoonError: Server returned an error code
:see: :func:`pyrakoon.utils.process_blocking`
'''
from pyrakoon import errors
code_receiver = UINT32.receive()
request = code_receiver.next() #pylint: disable=E1101
while isinstance(request, Request):
value = yield request
request = code_receiver.send(value) #pylint: disable=E1101
if not isinstance(request, Result):
raise TypeError
code = request.value
if code == RESULT_SUCCESS:
result_receiver = self.RETURN_TYPE.receive()
else:
# Error
result_receiver = STRING.receive()
request = result_receiver.next() #pylint: disable=E1103
while isinstance(request, Request):
value = yield request
request = result_receiver.send(value) #pylint: disable=E1103
if not isinstance(request, Result):
raise TypeError
result = request.value
if code == RESULT_SUCCESS:
yield Result(result)
else:
if code in errors.ERROR_MAP:
raise errors.ERROR_MAP[code](result)
else:
raise errors.ArakoonError(
'Unknown error code 0x%x, server said: %s' % \
(code, result))
[docs]class Hello(Message):
'''"hello" message'''
__slots__ = '_client_id', '_cluster_id',
TAG = 0x0001 | Message.MASK
ARGS = ('client_id', STRING), ('cluster_id', STRING),
RETURN_TYPE = STRING
DOC = utils.format_doc('''
Send a "hello" command to the server
This method will return the string returned by the server when
receiving a "hello" command.
:param client_id: Identifier of the client
:type client_id: :class:`str`
:param cluster_id: Identifier of the cluster connecting to. \
This must match the cluster configuration.
:type cluster_id: :class:`str`
:return: Message returned by the server
:rtype: :class:`str`
''')
def __init__(self, client_id, cluster_id):
super(Hello, self).__init__()
self._client_id = client_id
self._cluster_id = cluster_id
client_id = property(operator.attrgetter('_client_id'))
cluster_id = property(operator.attrgetter('_cluster_id'))
[docs]class WhoMaster(Message):
'''"who_master" message'''
__slots__ = ()
TAG = 0x0002 | Message.MASK
ARGS = ()
RETURN_TYPE = Option(STRING)
DOC = utils.format_doc('''
Send a "who_master" command to the server
This method returns the name of the current master node in the Arakoon
cluster.
:return: Name of cluster master node
:rtype: :class:`str`
''')
[docs]class Exists(Message):
'''"exists" message'''
__slots__ = '_allow_dirty', '_key',
TAG = 0x0007 | Message.MASK
ARGS = ALLOW_DIRTY_ARG, ('key', STRING),
RETURN_TYPE = BOOL
DOC = utils.format_doc('''
Send an "exists" command to the server
This method returns a boolean which tells whether the given `key` is
set on the server.
:param key: Key to test
:type key: :class:`str`
:param allow_dirty: Allow reads from slave nodes
:type allow_dirty: :class:`bool`
:return: Whether the given key is set on the server
:rtype: :class:`bool`
''')
def __init__(self, allow_dirty, key):
super(Exists, self).__init__()
self._allow_dirty = allow_dirty
self._key = key
key = property(operator.attrgetter('_key'))
allow_dirty = property(operator.attrgetter('_allow_dirty'))
[docs]class Get(Message):
'''"get" message'''
__slots__ = '_allow_dirty', '_key',
TAG = 0x0008 | Message.MASK
ARGS = ALLOW_DIRTY_ARG, ('key', STRING),
RETURN_TYPE = STRING
DOC = utils.format_doc('''
Send a "get" command to the server
This method returns the value of the requested key.
:param key: Key to retrieve
:type key: :class:`str`
:param allow_dirty: Allow reads from slave nodes
:type allow_dirty: :class:`bool`
:return: Value for the given key
:rtype: :class:`str`
''')
def __init__(self, allow_dirty, key):
super(Get, self).__init__()
self._allow_dirty = allow_dirty
self._key = key
allow_dirty = property(operator.attrgetter('_allow_dirty'))
key = property(operator.attrgetter('_key'))
[docs]class Set(Message):
'''"set" message'''
__slots__ = '_key', '_value',
TAG = 0x0009 | Message.MASK
ARGS = ('key', STRING), ('value', STRING),
RETURN_TYPE = UNIT
DOC = utils.format_doc('''
Send a "set" command to the server
This method sets a given key to a given value on the server.
:param key: Key to set
:type key: :class:`str`
:param value: Value to set
:type value: :class:`str`
''')
def __init__(self, key, value):
super(Set, self).__init__()
self._key = key
self._value = value
key = property(operator.attrgetter('_key'))
value = property(operator.attrgetter('_value'))
[docs]class Delete(Message):
'''"delete" message'''
__slots__ = '_key',
TAG = 0x000a | Message.MASK
ARGS = ('key', STRING),
RETURN_TYPE = UNIT
DOC = utils.format_doc('''
Send a "delete" command to the server
This method deletes a given key from the cluster.
:param key: Key to delete
:type key: :class:`str`
''')
def __init__(self, key):
super(Delete, self).__init__()
self._key = key
key = property(operator.attrgetter('_key'))
[docs]class PrefixKeys(Message):
'''"prefix_keys" message'''
__slots__ = '_allow_dirty', '_prefix', '_max_elements',
TAG = 0x000c | Message.MASK
ARGS = ALLOW_DIRTY_ARG, ('prefix', STRING), ('max_elements', INT32, -1),
RETURN_TYPE = List(STRING)
DOC = utils.format_doc('''
Send a "prefix_keys" command to the server
This method retrieves a list of keys from the cluster matching a given
prefix. A maximum number of returned keys can be provided. If set to
*-1* (the default), all matching keys will be returned.
:param prefix: Prefix to match
:type prefix: :class:`str`
:param max_elements: Maximum number of keys to return
:type max_elements: :class:`int`
:param allow_dirty: Allow reads from slave nodes
:type allow_dirty: :class:`bool`
:return: Keys matching the given prefix
:rtype: iterable of :class:`str`
''')
def __init__(self, allow_dirty, prefix, max_elements):
super(PrefixKeys, self).__init__()
self._allow_dirty = allow_dirty
self._prefix = prefix
self._max_elements = max_elements
allow_dirty = property(operator.attrgetter('_allow_dirty'))
prefix = property(operator.attrgetter('_prefix'))
max_elements = property(operator.attrgetter('_max_elements'))
[docs]class TestAndSet(Message):
'''"test_and_set" message'''
__slots__ = '_key', '_test_value', '_set_value',
TAG = 0x000d | Message.MASK
ARGS = ('key', STRING), ('test_value', Option(STRING)), \
('set_value', Option(STRING)),
RETURN_TYPE = Option(STRING)
DOC = utils.format_doc('''
Send a "test_and_set" command to the server
When `test_value` is not :data:`None`, the value for `key` will only be
modified if the existing value on the server is equal to `test_value`.
When `test_value` is :data:`None`, the `key` will only be set of there
was no value set for the `key` before.
When `set_value` is :data:`None`, the `key` will be deleted on the server.
The original value for `key` is returned.
:param key: Key to act on
:type key: :class:`str`
:param test_value: Expected value to test for
:type test_value: :class:`str` or :data:`None`
:param set_value: New value to set
:type set_value: :class:`str` or :data:`None`
:return: Original value of `key`
:rtype: :class:`str`
''')
def __init__(self, key, test_value, set_value):
super(TestAndSet, self).__init__()
self._key = key
self._test_value = test_value
self._set_value = set_value
key = property(operator.attrgetter('_key'))
test_value = property(operator.attrgetter('_test_value'))
set_value = property(operator.attrgetter('_set_value'))
[docs]class Sequence(Message):
'''"sequence" and "synced_sequence" message'''
__slots__ = '_steps', '_sync',
ARGS = ('steps', List(STEP)), ('sync', BOOL, False),
RETURN_TYPE = UNIT
DOC = utils.format_doc('''
Send a "sequence" or "synced_sequence" command to the server
The operations passed to the constructor should be instances of
implementations of the :class:`pyrakoon.sequence.Step` class. These
operations will be executed in an all-or-nothing transaction.
:param steps: Steps to execute
:type steps: iterable of :class:`pyrakoon.sequence.Step`
:param sync: Use *synced_sequence*
:type sync: :class:`bool`
''')
def __init__(self, steps, sync):
from pyrakoon import sequence
super(Sequence, self).__init__()
#pylint: disable=W0142
if len(steps) == 1 and isinstance(steps[0], sequence.Sequence):
self._sequence = steps[0]
else:
self._sequence = sequence.Sequence(steps)
self._sync = sync
sequence = property(operator.attrgetter('_sequence'))
sync = property(operator.attrgetter('_sync'))
[docs] def serialize(self):
tag = (0x0010 if not self.sync else 0x0024) | Message.MASK
for bytes_ in UINT32.serialize(tag):
yield bytes_
sequence_bytes = ''.join(self.sequence.serialize())
for bytes_ in STRING.serialize(sequence_bytes):
yield bytes_
[docs]class Range(Message):
'''"Range" message'''
__slots__ = '_allow_dirty', '_begin_key', '_begin_inclusive', '_end_key', \
'_end_inclusive', '_max_elements',
TAG = 0x000b | Message.MASK
ARGS = ALLOW_DIRTY_ARG, \
('begin_key', Option(STRING)), ('begin_inclusive', BOOL), \
('end_key', Option(STRING)), ('end_inclusive', BOOL), \
('max_elements', INT32, -1),
RETURN_TYPE = List(STRING)
DOC = utils.format_doc('''
Send a "range" command to the server
The operation will return a list of keys, in the range between
`begin_key` and `end_key`. The `begin_inclusive` and `end_inclusive`
flags denote whether the delimiters should be included.
The `max_elements` flag can limit the number of returned keys. If it is
negative, all matching keys are returned.
:param begin_key: Begin of range
:type begin_key: :class:`str`
:param begin_inclusive: `begin_key` is in- or exclusive
:type begin_inclusive: :class:`bool`
:param end_key: End of range
:type end_key: :class:`str`
:param end_inclusive: `end_key` is in- or exclusive
:param max_elements: Maximum number of keys to return
:type max_elements: :class:`int`
:param allow_dirty: Allow reads from slave nodes
:type allow_dirty: :class:`bool`
:return: List of matching keys
:rtype: iterable of :class:`str`
''')
#pylint: disable=R0913
def __init__(self, allow_dirty, begin_key, begin_inclusive,
end_key, end_inclusive, max_elements):
super(Range, self).__init__()
self._allow_dirty = allow_dirty
self._begin_key = begin_key
self._begin_inclusive = begin_inclusive
self._end_key = end_key
self._end_inclusive = end_inclusive
self._max_elements = max_elements
allow_dirty = property(operator.attrgetter('_allow_dirty'))
begin_key = property(operator.attrgetter('_begin_key'))
begin_inclusive = property(operator.attrgetter('_begin_inclusive'))
end_key = property(operator.attrgetter('_end_key'))
end_inclusive = property(operator.attrgetter('_end_inclusive'))
max_elements = property(operator.attrgetter('_max_elements'))
[docs]class RangeEntries(Message):
'''"RangeEntries" message'''
__slots__ = '_allow_dirty', '_begin_key', '_begin_inclusive', '_end_key', \
'_end_inclusive', '_max_elements',
TAG = 0x000f | Message.MASK
ARGS = ALLOW_DIRTY_ARG, \
('begin_key', Option(STRING)), ('begin_inclusive', BOOL), \
('end_key', Option(STRING)), ('end_inclusive', BOOL), \
('max_elements', INT32, -1),
RETURN_TYPE = List(Product(STRING, STRING))
DOC = utils.format_doc('''
Send a "range_entries" command to the server
The operation will return a list of (key, value) tuples, for keys in the
range between `begin_key` and `end_key`. The `begin_inclusive` and
`end_inclusive` flags denote whether the delimiters should be included.
The `max_elements` flag can limit the number of returned items. If it is
negative, all matching items are returned.
:param begin_key: Begin of range
:type begin_key: :class:`str`
:param begin_inclusive: `begin_key` is in- or exclusive
:type begin_inclusive: :class:`bool`
:param end_key: End of range
:type end_key: :class:`str`
:param end_inclusive: `end_key` is in- or exclusive
:type end_inclusive: :class:`bool`
:param max_elements: Maximum number of items to return
:type max_elements: :class:`int`
:param allow_dirty: Allow reads from slave nodes
:type allow_dirty: :class:`bool`
:return: List of matching (key, value) pairs
:rtype: iterable of `(str, str)`
''')
#pylint: disable=R0913
def __init__(self, allow_dirty, begin_key, begin_inclusive,
end_key, end_inclusive, max_elements):
super(RangeEntries, self).__init__()
self._allow_dirty = allow_dirty
self._begin_key = begin_key
self._begin_inclusive = begin_inclusive
self._end_key = end_key
self._end_inclusive = end_inclusive
self._max_elements = max_elements
allow_dirty = property(operator.attrgetter('_allow_dirty'))
begin_key = property(operator.attrgetter('_begin_key'))
begin_inclusive = property(operator.attrgetter('_begin_inclusive'))
end_key = property(operator.attrgetter('_end_key'))
end_inclusive = property(operator.attrgetter('_end_inclusive'))
max_elements = property(operator.attrgetter('_max_elements'))
[docs]class MultiGet(Message):
'''"multi_get" message'''
__slots__ = '_allow_dirty', '_keys',
TAG = 0x0011 | Message.MASK
ARGS = ALLOW_DIRTY_ARG, ('keys', List(STRING)),
RETURN_TYPE = List(STRING)
DOC = utils.format_doc('''
Send a "multi_get" command to the server
This method returns a list of the values for all requested keys.
:param keys: Keys to look up
:type keys: iterable of :class:`str`
:param allow_dirty: Allow reads from slave nodes
:type allow_dirty: :class:`bool`
:return: Requested values
:rtype: iterable of :class:`str`
''')
def __init__(self, allow_dirty, keys):
super(MultiGet, self).__init__()
self._allow_dirty = allow_dirty
self._keys = keys
allow_dirty = property(operator.attrgetter('_allow_dirty'))
keys = property(operator.attrgetter('_keys'))
[docs]class MultiGetOption(Message):
'''"multi_get_option" message'''
__slots__ = '_allow_dirty', '_keys',
TAG = 0x0031 | Message.MASK
ARGS = ALLOW_DIRTY_ARG, ('keys', List(STRING)),
RETURN_TYPE = Array(Option(STRING))
DOC = utils.format_doc('''
Send a "multi_get_option" command to the server
This method returns a list of value options for all requested keys.
:param keys: Keys to look up
:type keys: iterable of :class:`str`
:param allow_dirty: Allow reads from slave nodes
:type allow_dirty: :class:`bool`
:return: Requested values
:rtype: iterable of (`str` or `None`)
''')
def __init__(self, allow_dirty, keys):
super(MultiGetOption, self).__init__()
self._allow_dirty = allow_dirty
self._keys = keys
allow_dirty = property(operator.attrgetter('_allow_dirty'))
keys = property(operator.attrgetter('_keys'))
[docs]class ExpectProgressPossible(Message):
'''"expect_progress_possible" message'''
__slots__ = ()
TAG = 0x0012 | Message.MASK
ARGS = ()
RETURN_TYPE = BOOL
DOC = utils.format_doc('''
Send a "expect_progress_possible" command to the server
This method returns whether the master thinks progress is possible.
:return: Whether the master thinks progress is possible
:rtype: :class:`bool`
''')
[docs]class GetKeyCount(Message):
'''"get_key_count" message'''
__slots__ = ()
TAG = 0x001a | Message.MASK
ARGS = ()
RETURN_TYPE = UINT64
DOC = utils.format_doc('''
Send a "get_key_count" command to the server
This method returns the number of items stored in Arakoon.
:return: Number of items stored in the database
:rtype: :class:`int`
''')
[docs]class UserFunction(Message):
'''"user_function" message'''
__slots__ = '_function', '_arg',
TAG = 0x0015 | Message.MASK
ARGS = ('function', STRING), ('argument', Option(STRING)),
RETURN_TYPE = Option(STRING)
DOC = utils.format_doc('''
Send a "user_function" command to the server
This method returns the result of the function invocation.
:param function: Name of the user function to invoke
:type function: :class:`str`
:param argument: Argument to pass to the function
:type argument: :class:`str` or :data:`None`
:return: Result of the function invocation
:rtype: :class:`str` or :data:`None`
''')
def __init__(self, function, argument):
super(UserFunction, self).__init__()
self._function = function
self._argument = argument
function = property(operator.attrgetter('_function'))
argument = property(operator.attrgetter('_argument'))
[docs]class Confirm(Message):
'''"confirm" message'''
__slots__ = '_key', '_value',
TAG = 0x001c | Message.MASK
ARGS = ('key', STRING), ('value', STRING),
RETURN_TYPE = UNIT
DOC = utils.format_doc('''
Send a "confirm" command to the server
This method sets a given key to a given value on the server, unless
the value bound to the key is already equal to the provided value, in
which case the action becomes a no-op.
:param key: Key to set
:type key: :class:`str`
:param value: Value to set
:type value: :class:`str`
''')
def __init__(self, key, value):
super(Confirm, self).__init__()
self._key = key
self._value = value
key = property(operator.attrgetter('_key'))
value = property(operator.attrgetter('_value'))
[docs]class Assert(Message):
'''"assert" message'''
__slots__ = '_allow_dirty', '_key', '_value',
TAG = 0x0016 | Message.MASK
ARGS = ALLOW_DIRTY_ARG, ('key', STRING), ('value', Option(STRING)),
RETURN_TYPE = UNIT
DOC = utils.format_doc('''
Send an "assert" command to the server
`assert key vo` throws an exception if the value associated with the
key is not what was expected.
:param key: Key to check
:type key: :class:`str`
:param value: Optional value to compare
:type value: :class:`str` or :data:`None`
:param allow_dirty: Allow reads from slave nodes
:type allow_dirty: :class:`bool`
''')
def __init__(self, allow_dirty, key, value):
super(Assert, self).__init__()
self._allow_dirty = allow_dirty
self._key = key
self._value = value
allow_dirty = property(operator.attrgetter('_allow_dirty'))
key = property(operator.attrgetter('_key'))
value = property(operator.attrgetter('_value'))
[docs]class AssertExists(Message):
'''"assert_exists" message'''
__slots__ = '_allow_dirty', '_key',
TAG = 0x0029 | Message.MASK
ARGS = ALLOW_DIRTY_ARG, ('key', STRING),
RETURN_TYPE = UNIT
DOC = utils.format_doc('''
Send an "assert_exists" command to the server
`assert_exists key` throws an exception if the key doesn't exist in
the database.
:param key: Key to check
:type key: :class:`str`
:param allow_dirty: Allow reads from slave nodes
:type allow_dirty: :class:`bool`
''')
def __init__(self, allow_dirty, key):
super(AssertExists, self).__init__()
self._allow_dirty = allow_dirty
self._key = key
allow_dirty = property(operator.attrgetter('_allow_dirty'))
key = property(operator.attrgetter('_key'))
[docs]class RevRangeEntries(Message):
'''"rev_range_entries" message'''
__slots__ = '_allow_dirty', '_begin_key', '_begin_inclusive', '_end_key', \
'_end_inclusive', '_max_elements',
TAG = 0x0023 | Message.MASK
ARGS = ALLOW_DIRTY_ARG, \
('begin_key', Option(STRING)), ('begin_inclusive', BOOL), \
('end_key', Option(STRING)), ('end_inclusive', BOOL), \
('max_elements', INT32, -1),
RETURN_TYPE = List(Product(STRING, STRING))
DOC = utils.format_doc('''
Send a "rev_range_entries" command to the server
The operation will return a list of (key, value) tuples, for keys in
the reverse range between `begin_key` and `end_key`. The
`begin_inclusive` and `end_inclusive` flags denote whether the
delimiters should be included.
The `max_elements` flag can limit the number of returned items. If it is
negative, all matching items are returned.
:param begin_key: Begin of range
:type begin_key: :class:`str`
:param begin_inclusive: `begin_key` is in- or exclusive
:type begin_inclusive: :class:`bool`
:param end_key: End of range
:type end_key: :class:`str`
:param end_inclusive: `end_key` is in- or exclusive
:type end_inclusive: :class:`bool`
:param max_elements: Maximum number of items to return
:type max_elements: :class:`int`
:param allow_dirty: Allow reads from slave nodes
:type allow_dirty: :class:`bool`
:return: List of matching (key, value) pairs
:rtype: iterable of `(str, str)`
''')
#pylint: disable=R0913
def __init__(self, allow_dirty, begin_key, begin_inclusive,
end_key, end_inclusive, max_elements):
super(RevRangeEntries, self).__init__()
self._allow_dirty = allow_dirty
self._begin_key = begin_key
self._begin_inclusive = begin_inclusive
self._end_key = end_key
self._end_inclusive = end_inclusive
self._max_elements = max_elements
allow_dirty = property(operator.attrgetter('_allow_dirty'))
begin_key = property(operator.attrgetter('_begin_key'))
begin_inclusive = property(operator.attrgetter('_begin_inclusive'))
end_key = property(operator.attrgetter('_end_key'))
end_inclusive = property(operator.attrgetter('_end_inclusive'))
max_elements = property(operator.attrgetter('_max_elements'))
[docs]class Statistics(Message):
'''"statistics" message'''
__slots__ = ()
TAG = 0x0013 | Message.MASK
ARGS = ()
RETURN_TYPE = STATISTICS
DOC = utils.format_doc('''
Send a "statistics" command to the server
This method returns some server statistics.
:return: Server statistics
:rtype: `Statistics`
''')
[docs]class Version(Message):
'''"version" message'''
__slots__ = ()
TAG = 0x0028 | Message.MASK
ARGS = ()
RETURN_TYPE = Product(INT32, INT32, INT32, STRING)
DOC = utils.format_doc('''
Send a "version" command to the server
This method returns the server version.
:return: Server version
:rtype: `(int, int, int, str)`
''')
[docs]class DeletePrefix(Message):
'''"delete_prefix" message'''
__slots__ = '_prefix',
TAG = 0x0027 | Message.MASK
ARGS = ('prefix', STRING),
RETURN_TYPE = UINT32
DOC = utils.format_doc('''
Send a "delete_prefix" command to the server
`delete_prefix prefix` will delete all key/value-pairs from the
database where given `prefix` is a prefix of `key`.
:param prefix: Prefix of binding keys to delete
:type prefix: :class:`str`
:return: Number of deleted bindings
:rtype: :class:`int`
''')
def __init__(self, prefix):
super(DeletePrefix, self).__init__()
self._prefix = prefix
prefix = property(operator.attrgetter('_prefix'))
[docs]class Replace(Message):
'''"replace" message'''
__slots__ = '_key', '_value',
TAG = 0x0033 | Message.MASK
ARGS = ('key', STRING), ('value', Option(STRING)),
RETURN_TYPE = Option(STRING)
DOC = utils.format_doc('''
Send a "replace" command to the server
`replace key value` will replace the value bound to the given key with
the provided value, and return the old value bound to the key.
If `value` is :data:`None`, the key is deleted.
If the key was not present in the database, :data:`None` is returned.
:param key: Key to replace
:type key: :class:`str`
:param value: Value to set
:type value: :class:`str` or :data:`None`
:return: Original value bound to the key
:rtype: :class:`str` or :data:`None`
''')
def __init__(self, key, value):
self._key = key
self._value = value
key = property(operator.attrgetter('_key'))
value = property(operator.attrgetter('_value'))
[docs]class Nop(Message):
'''"nop" message'''
__slots__ = ()
TAG = 0x0041 | Message.MASK
ARGS = ()
RETURN_TYPE = UNIT
DOC = utils.format_doc('''
Send a "nop" command to the server
This enforces consensus throughout a cluster, but has no further
effects.
''')
[docs]class GetCurrentState(Message):
'''"get_current_state" message'''
__slots__ = ()
TAG = 0x0032 | Message.MASK
ARGS = ()
RETURN_TYPE = STRING
DOC = utils.format_doc('''
Send a "get_current_state" command to the server
This call returns a string representing the current state of the node,
and can be used for troubleshooting purposes.
:return: State of the server
:rtype: :class:`str`
''')
def sanity_check():
'''Sanity check for some invariants on types defined in this module'''
for (_name, value) in globals().iteritems():
if inspect.isclass(value) \
and getattr(value, '__module__', None) == __name__:
# A `Message` which has `ALLOW_DIRTY_ARG` in its `ARGS` must have
# an `allow_dirty` attribute, the constructor must take such
# argument, and if `__slots__` is defined, there should be an
# `_allow_dirty` field
if issubclass(value, Message):
if ALLOW_DIRTY_ARG in (value.ARGS or []): #pylint: disable=C0325
assert hasattr(value, 'allow_dirty')
argspec = inspect.getargspec(value.__init__)
assert 'allow_dirty' in argspec.args
if hasattr(value, '__slots__'):
assert '_allow_dirty' in value.__slots__
sanity_check()
del sanity_check
[docs]def build_prologue(cluster):
'''Return the string to send as prologue
:param cluster: Name of the cluster to which a connection is made
:type cluster: :class:`str`
:return: Prologue to send to the Arakoon server
:rtype: :class:`str`
'''
return ''.join(itertools.chain(
UINT32.serialize(Message.MASK),
UINT32.serialize(PROTOCOL_VERSION),
STRING.serialize(cluster)))