from allmydata.mutable.common import NeedMoreDataError, UnknownVersionError
from allmydata.interfaces import HASH_SIZE, SALT_SIZE, SDMF_VERSION, \
MDMF_VERSION, IMutableSlotWriter
-from allmydata.util import mathutil, observer
+from allmydata.util import mathutil
from twisted.python import failure
from twisted.internet import defer
from zope.interface import implements
if self._data == None:
self._data = ""
- self._queue_observers = observer.ObserverList()
- self._queue_errbacks = observer.ObserverList()
- self._readvs = []
-
def _maybe_fetch_offsets_and_header(self, force_remote=False):
"""
self._offsets['share_data'] = sharedata
- def get_block_and_salt(self, segnum, queue=False):
+ def get_block_and_salt(self, segnum):
"""
I return (block, salt), where block is the block data and
salt is the salt used to encrypt that segment.
readvs = [(share_offset, data)]
return readvs
d.addCallback(_then)
- d.addCallback(lambda readvs:
- self._read(readvs, queue=queue))
+ d.addCallback(lambda readvs: self._read(readvs))
def _process_results(results):
assert self.shnum in results
if self._version_number == 0:
return d
- def get_blockhashes(self, needed=None, queue=False, force_remote=False):
+ def get_blockhashes(self, needed=None, force_remote=False):
"""
I return the block hash tree
return readvs
d.addCallback(_then)
d.addCallback(lambda readvs:
- self._read(readvs, queue=queue, force_remote=force_remote))
+ self._read(readvs, force_remote=force_remote))
def _build_block_hash_tree(results):
assert self.shnum in results
return d
- def get_sharehashes(self, needed=None, queue=False, force_remote=False):
+ def get_sharehashes(self, needed=None, force_remote=False):
"""
I return the part of the share hash chain placed to validate
this share.
return readvs
d.addCallback(_make_readvs)
d.addCallback(lambda readvs:
- self._read(readvs, queue=queue, force_remote=force_remote))
+ self._read(readvs, force_remote=force_remote))
def _build_share_hash_chain(results):
assert self.shnum in results
return d
- def get_encprivkey(self, queue=False):
+ def get_encprivkey(self):
"""
I return the encrypted private key.
"""
readvs = [(privkey_offset, privkey_length)]
return readvs
d.addCallback(_make_readvs)
- d.addCallback(lambda readvs:
- self._read(readvs, queue=queue))
+ d.addCallback(lambda readvs: self._read(readvs))
def _process_results(results):
assert self.shnum in results
privkey = results[self.shnum][0]
return d
- def get_signature(self, queue=False):
+ def get_signature(self):
"""
I return the signature of my share.
"""
readvs = [(signature_offset, signature_length)]
return readvs
d.addCallback(_make_readvs)
- d.addCallback(lambda readvs:
- self._read(readvs, queue=queue))
+ d.addCallback(lambda readvs: self._read(readvs))
def _process_results(results):
assert self.shnum in results
signature = results[self.shnum][0]
return d
- def get_verification_key(self, queue=False):
+ def get_verification_key(self):
"""
I return the verification key.
"""
readvs = [(vk_offset, vk_length)]
return readvs
d.addCallback(_make_readvs)
- d.addCallback(lambda readvs:
- self._read(readvs, queue=queue))
+ d.addCallback(lambda readvs: self._read(readvs))
def _process_results(results):
assert self.shnum in results
verification_key = results[self.shnum][0]
return d
- def flush(self):
- """
- I flush my queue of read vectors.
- """
- d = self._read(self._readvs)
- def _then(results):
- self._readvs = []
- if isinstance(results, failure.Failure):
- self._queue_errbacks.notify(results)
- else:
- self._queue_observers.notify(results)
- self._queue_observers = observer.ObserverList()
- self._queue_errbacks = observer.ObserverList()
- d.addBoth(_then)
-
-
- def _read(self, readvs, force_remote=False, queue=False):
+ def _read(self, readvs, force_remote=False):
unsatisfiable = filter(lambda x: x[0] + x[1] > len(self._data), readvs)
# TODO: It's entirely possible to tweak this so that it just
# fulfills the requests that it can, and not demand that all
results = {self.shnum: results}
return defer.succeed(results)
else:
- if queue:
- start = len(self._readvs)
- self._readvs += readvs
- end = len(self._readvs)
- def _get_results(results, start, end):
- if not self.shnum in results:
- return {self._shnum: [""]}
- return {self.shnum: results[self.shnum][start:end]}
- d = defer.Deferred()
- d.addCallback(_get_results, start, end)
- self._queue_observers.subscribe(d.callback)
- self._queue_errbacks.subscribe(d.errback)
- return d
return self._rref.callRemote("slot_readv",
self._storage_index,
[self.shnum],