from allmydata.mutable.common import NeedMoreDataError, UnknownVersionError
from allmydata.interfaces import HASH_SIZE, SALT_SIZE, SDMF_VERSION, \
MDMF_VERSION, IMutableSlotWriter
-from allmydata.util import mathutil, observer
+from allmydata.util import mathutil
from twisted.python import failure
from twisted.internet import defer
from zope.interface import implements
if self._data == None:
self._data = ""
- self._queue_observers = observer.ObserverList()
- self._queue_errbacks = observer.ObserverList()
- self._readvs = []
-
def _maybe_fetch_offsets_and_header(self, force_remote=False):
"""
self._offsets['share_data'] = sharedata
- def get_block_and_salt(self, segnum, queue=False):
+ def get_block_and_salt(self, segnum):
"""
I return (block, salt), where block is the block data and
salt is the salt used to encrypt that segment.
readvs = [(share_offset, data)]
return readvs
d.addCallback(_then)
- d.addCallback(lambda readvs:
- self._read(readvs, queue=queue))
+ d.addCallback(lambda readvs: self._read(readvs))
def _process_results(results):
assert self.shnum in results
if self._version_number == 0:
return d
- def get_blockhashes(self, needed=None, queue=False, force_remote=False):
+ def get_blockhashes(self, needed=None, force_remote=False):
"""
I return the block hash tree
return readvs
d.addCallback(_then)
d.addCallback(lambda readvs:
- self._read(readvs, queue=queue, force_remote=force_remote))
+ self._read(readvs, force_remote=force_remote))
def _build_block_hash_tree(results):
assert self.shnum in results
return d
- def get_sharehashes(self, needed=None, queue=False, force_remote=False):
+ def get_sharehashes(self, needed=None, force_remote=False):
"""
I return the part of the share hash chain placed to validate
this share.
return readvs
d.addCallback(_make_readvs)
d.addCallback(lambda readvs:
- self._read(readvs, queue=queue, force_remote=force_remote))
+ self._read(readvs, force_remote=force_remote))
def _build_share_hash_chain(results):
assert self.shnum in results
return d
- def get_encprivkey(self, queue=False):
+ def get_encprivkey(self):
"""
I return the encrypted private key.
"""
readvs = [(privkey_offset, privkey_length)]
return readvs
d.addCallback(_make_readvs)
- d.addCallback(lambda readvs:
- self._read(readvs, queue=queue))
+ d.addCallback(lambda readvs: self._read(readvs))
def _process_results(results):
assert self.shnum in results
privkey = results[self.shnum][0]
return d
- def get_signature(self, queue=False):
+ def get_signature(self):
"""
I return the signature of my share.
"""
readvs = [(signature_offset, signature_length)]
return readvs
d.addCallback(_make_readvs)
- d.addCallback(lambda readvs:
- self._read(readvs, queue=queue))
+ d.addCallback(lambda readvs: self._read(readvs))
def _process_results(results):
assert self.shnum in results
signature = results[self.shnum][0]
return d
- def get_verification_key(self, queue=False):
+ def get_verification_key(self):
"""
I return the verification key.
"""
readvs = [(vk_offset, vk_length)]
return readvs
d.addCallback(_make_readvs)
- d.addCallback(lambda readvs:
- self._read(readvs, queue=queue))
+ d.addCallback(lambda readvs: self._read(readvs))
def _process_results(results):
assert self.shnum in results
verification_key = results[self.shnum][0]
return d
- def flush(self):
- """
- I flush my queue of read vectors.
- """
- d = self._read(self._readvs)
- def _then(results):
- self._readvs = []
- if isinstance(results, failure.Failure):
- self._queue_errbacks.notify(results)
- else:
- self._queue_observers.notify(results)
- self._queue_observers = observer.ObserverList()
- self._queue_errbacks = observer.ObserverList()
- d.addBoth(_then)
-
-
- def _read(self, readvs, force_remote=False, queue=False):
+ def _read(self, readvs, force_remote=False):
unsatisfiable = filter(lambda x: x[0] + x[1] > len(self._data), readvs)
# TODO: It's entirely possible to tweak this so that it just
# fulfills the requests that it can, and not demand that all
results = {self.shnum: results}
return defer.succeed(results)
else:
- if queue:
- start = len(self._readvs)
- self._readvs += readvs
- end = len(self._readvs)
- def _get_results(results, start, end):
- if not self.shnum in results:
- return {self._shnum: [""]}
- return {self.shnum: results[self.shnum][start:end]}
- d = defer.Deferred()
- d.addCallback(_get_results, start, end)
- self._queue_observers.subscribe(d.callback)
- self._queue_errbacks.subscribe(d.errback)
- return d
return self._rref.callRemote("slot_readv",
self._storage_index,
[self.shnum],
ds = []
for reader in self._active_readers:
started = time.time()
- d = reader.get_block_and_salt(segnum, queue=True)
+ d = reader.get_block_and_salt(segnum)
d2 = self._get_needed_hashes(reader, segnum)
dl = defer.DeferredList([d, d2], consumeErrors=True)
dl.addCallback(self._validate_block, segnum, reader, started)
dl.addErrback(self._validation_or_decoding_failed, [reader])
ds.append(dl)
- reader.flush()
dl = defer.DeferredList(ds)
if self._verify:
dl.addCallback(lambda ignored: "")
#needed.discard(0)
self.log("getting blockhashes for segment %d, share %d: %s" % \
(segnum, reader.shnum, str(needed)))
- d1 = reader.get_blockhashes(needed, queue=True, force_remote=True)
+ d1 = reader.get_blockhashes(needed, force_remote=True)
if self.share_hash_tree.needed_hashes(reader.shnum):
need = self.share_hash_tree.needed_hashes(reader.shnum)
self.log("also need sharehashes for share %d: %s" % (reader.shnum,
str(need)))
- d2 = reader.get_sharehashes(need, queue=True, force_remote=True)
+ d2 = reader.get_sharehashes(need, force_remote=True)
else:
d2 = defer.succeed({}) # the logic in the next method
# expects a dict
# public key. We use this to validate the signature.
if not self._node.get_pubkey():
# fetch and set the public key.
- d = reader.get_verification_key(queue=True)
+ d = reader.get_verification_key()
d.addCallback(lambda results, shnum=shnum, peerid=peerid:
self._try_to_set_pubkey(results, peerid, shnum, lp))
# XXX: Make self._pubkey_query_failed?
# to get the version information. In MDMF, this lives at
# the end of the share, so unless the file is quite small,
# we'll need to do a remote fetch to get it.
- d3 = reader.get_signature(queue=True)
+ d3 = reader.get_signature()
d3.addErrback(lambda error, shnum=shnum, peerid=peerid:
self._got_corrupt_share(error, shnum, peerid, data, lp))
# Once we have all three of these responses, we can move on
# Does the node already have a privkey? If not, we'll try to
# fetch it here.
if self._need_privkey:
- d4 = reader.get_encprivkey(queue=True)
+ d4 = reader.get_encprivkey()
d4.addCallback(lambda results, shnum=shnum, peerid=peerid:
self._try_to_validate_privkey(results, peerid, shnum, lp))
d4.addErrback(lambda error, shnum=shnum, peerid=peerid:
# make the two routines share the value without
# introducing more roundtrips?
ds.append(reader.get_verinfo())
- ds.append(reader.get_blockhashes(queue=True))
- ds.append(reader.get_block_and_salt(self.start_segment,
- queue=True))
- ds.append(reader.get_block_and_salt(self.end_segment,
- queue=True))
+ ds.append(reader.get_blockhashes())
+ ds.append(reader.get_block_and_salt(self.start_segment))
+ ds.append(reader.get_block_and_salt(self.end_segment))
d5 = deferredutil.gatherResults(ds)
d5.addCallback(self._got_update_results_one_share, shnum)
else:
dl = defer.DeferredList([d, d2, d3, d4, d5])
dl.addBoth(self._turn_barrier)
- reader.flush()
dl.addCallback(lambda results, shnum=shnum, peerid=peerid:
self._got_signature_one_share(results, shnum, peerid, lp))
dl.addErrback(lambda error, shnum=shnum, data=data:
d = defer.Deferred()
if not self._pending:
self._pending_timer = reactor.callLater(1.0, self._fire_readers)
- self._pending[peerid] = (d, shares)
+ if peerid not in self._pending:
+ self._pending[peerid] = []
+ self._pending[peerid].append( (d, shares) )
return d
def _fire_readers(self):
self._pending = {}
for peerid in self._sequence:
if peerid in pending:
- d, shares = pending.pop(peerid)
+ for (d, shares) in pending.pop(peerid):
+ eventually(d.callback, shares)
+ for peerid in pending:
+ for (d, shares) in pending[peerid]:
eventually(d.callback, shares)
- for (d, shares) in pending.values():
- eventually(d.callback, shares)
def write(self, peerid, storage_index, shnum, offset, data):
if peerid not in self._peers:
return d
- def test_reader_queue(self):
- self.write_test_share_to_server('si1')
- mr = MDMFSlotReadProxy(self.rref, "si1", 0)
- d1 = mr.get_block_and_salt(0, queue=True)
- d2 = mr.get_blockhashes(queue=True)
- d3 = mr.get_sharehashes(queue=True)
- d4 = mr.get_signature(queue=True)
- d5 = mr.get_verification_key(queue=True)
- dl = defer.DeferredList([d1, d2, d3, d4, d5])
- mr.flush()
- def _print(results):
- self.failUnlessEqual(len(results), 5)
- # We have one read for version information and offsets, and
- # one for everything else.
- self.failUnlessEqual(self.rref.read_count, 2)
- block, salt = results[0][1] # results[0] is a boolean that says
- # whether or not the operation
- # worked.
- self.failUnlessEqual(self.block, block)
- self.failUnlessEqual(self.salt, salt)
-
- blockhashes = results[1][1]
- self.failUnlessEqual(self.block_hash_tree, blockhashes)
-
- sharehashes = results[2][1]
- self.failUnlessEqual(self.share_hash_chain, sharehashes)
-
- signature = results[3][1]
- self.failUnlessEqual(self.signature, signature)
-
- verification_key = results[4][1]
- self.failUnlessEqual(self.verification_key, verification_key)
- dl.addCallback(_print)
- return dl
-
-
def test_sdmf_writer(self):
# Go through the motions of writing an SDMF share to the storage
# server. Then read the storage server to see that the share got