-import os, struct, time, weakref
+import os, sys, struct, time, weakref
from itertools import count
from zope.interface import implements
from twisted.internet import defer
def __repr__(self):
return "<%s -- You, oh user, tried to change a file or directory at the same time as another process was trying to change it. To avoid data loss, don't do this. Please see docs/write_coordination.html for details.>" % (self.__class__.__name__,)
+class UnrecoverableFileError(Exception):
+ pass
+
class CorruptShareError(Exception):
def __init__(self, peerid, shnum, reason):
self.args = (peerid, shnum, reason)
self.last_update_mode = None
self.last_update_time = 0
+ def dump(self, out=sys.stdout):
+ print >>out, "servermap:"
+ for (peerid, shares) in self.servermap.items():
+ for (shnum, versionid, timestamp) in sorted(shares):
+ (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
+ offsets_tuple) = versionid
+ print >>out, ("[%s]: sh#%d seq%d-%s %d-of-%d len%d" %
+ (idlib.shortnodeid_b2a(peerid), shnum,
+ seqnum, base32.b2a(root_hash)[:4], k, N,
+ datalength))
+ return out
+
def make_versionmap(self):
"""Return a dict that maps versionid to sets of (shnum, peerid,
timestamp) tuples."""
versionmap.add(verinfo, (shnum, peerid, timestamp))
return versionmap
+ def shares_on_peer(self, peerid):
+ return set([shnum
+ for (shnum, versionid, timestamp)
+ in self.servermap.get(peerid, [])])
+
+ def version_on_peer(self, peerid, shnum):
+ shares = self.servermap.get(peerid, [])
+ for (sm_shnum, sm_versionid, sm_timestamp) in shares:
+ if sm_shnum == shnum:
+ return sm_versionid
+ return None
+
def shares_available(self):
"""Return a dict that maps versionid to tuples of
(num_distinct_shares, k) tuples."""
all_shares[versionid] = (len(s), k)
return all_shares
+ def highest_seqnum(self):
+ available = self.shares_available()
+ seqnums = [versionid[0]
+ for versionid in available.keys()]
+ seqnums.append(0)
+ return max(seqnums)
+
def recoverable_versions(self):
"""Return a set of versionids, one for each version that is currently
recoverable."""
# * if we only need the checkstring, then [0:75]
# * if we need to validate the checkstring sig, then [543ish:799ish]
# * if we need the verification key, then [107:436ish]
- # * the offset table at [75:107] tells us about the 'ish'
+ # * the offset table at [75:107] tells us about the 'ish'
+ # * if we need the encrypted private key, we want [-1216ish:]
+ # * but we can't read from negative offsets
+ # * the offset table tells us the 'ish', also the positive offset
# A future version of the SMDF slot format should consider using
# fixed-size slots so we can retrieve less data. For now, we'll just
# read 2000 bytes, which also happens to read enough actual data to
if mode == MODE_CHECK:
# we use unpack_prefix_and_signature, so we need 1k
self._read_size = 1000
+ self._need_privkey = False
+ if mode == MODE_WRITE and not self._node._privkey:
+ self._need_privkey = True
prefix = storage.si_b2a(self._storage_index)[:5]
self._log_number = log.msg("SharemapUpdater(%s): starting" % prefix)
# might not wait for all of their answers to come back)
self.num_peers_to_query = k + self.EPSILON
- # TODO: initial_peers_to_query needs to be ordered list of (peerid,
- # ss) tuples
-
if self.mode == MODE_CHECK:
initial_peers_to_query = dict(full_peerlist)
must_query = set(initial_peers_to_query.keys())
initial_peers_to_query, must_query = self._build_initial_querylist()
self.required_num_empty_peers = self.EPSILON
- # TODO: also populate self._filenode._privkey
+ # TODO: arrange to read lots of data from k-ish servers, to avoid
+ # the extra round trip required to read large directories. This
+ # might also avoid the round trip required to read the encrypted
+ # private key.
- # TODO: arrange to read 3KB from one peer who is likely to hold a
- # share, so we can avoid the latency of that extra roundtrip. 3KB
- # would get us the encprivkey from a dirnode with up to 7
- # entries, allowing us to make an update in 2 RTT instead of 3.
else:
initial_peers_to_query, must_query = self._build_initial_querylist()
# contains the overflow (peers that we should tap if we don't get
# enough responses)
- d = defer.succeed(initial_peers_to_query)
- d.addCallback(self._send_initial_requests)
- d.addCallback(lambda res: self._done_deferred)
- return d
+ self._send_initial_requests(initial_peers_to_query)
+ return self._done_deferred
def _build_initial_querylist(self):
initial_peers_to_query = {}
# might produce a result.
return None
- def _do_read(self, ss, peerid, storage_index, shnums, readv):
- d = ss.callRemote("slot_readv", storage_index, shnums, readv)
- return d
-
def _do_query(self, ss, peerid, storage_index, readsize):
self.log(format="sending query to [%(peerid)s], readsize=%(readsize)d",
peerid=idlib.shortnodeid_b2a(peerid),
# _query_failed) get logged, but we still want to check for doneness.
d.addErrback(log.err)
d.addBoth(self._check_for_done)
- d.addErrback(log.err)
+ d.addErrback(self._fatal_error)
return d
- def _deserialize_pubkey(self, pubkey_s):
- verifier = rsa.create_verifying_key_from_string(pubkey_s)
- return verifier
-
- def _try_to_extract_privkey(self, data, peerid, shnum):
- try:
- r = unpack_share(data)
- except NeedMoreDataError, e:
- # this share won't help us. oh well.
- offset = e.encprivkey_offset
- length = e.encprivkey_length
- self.log("shnum %d on peerid %s: share was too short (%dB) "
- "to get the encprivkey; [%d:%d] ought to hold it" %
- (shnum, idlib.shortnodeid_b2a(peerid), len(data),
- offset, offset+length))
- # NOTE: if uncoordinated writes are taking place, someone might
- # change the share (and most probably move the encprivkey) before
- # we get a chance to do one of these reads and fetch it. This
- # will cause us to see a NotEnoughPeersError(unable to fetch
- # privkey) instead of an UncoordinatedWriteError . This is a
- # nuisance, but it will go away when we move to DSA-based mutable
- # files (since the privkey will be small enough to fit in the
- # write cap).
-
- self._encprivkey_shares.append( (peerid, shnum, offset, length))
- return
-
- (seqnum, root_hash, IV, k, N, segsize, datalen,
- pubkey, signature, share_hash_chain, block_hash_tree,
- share_data, enc_privkey) = r
-
- return self._try_to_validate_privkey(enc_privkey, peerid, shnum)
-
- def _try_to_validate_privkey(self, enc_privkey, peerid, shnum):
- alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
- alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
- if alleged_writekey != self._writekey:
- self.log("invalid privkey from %s shnum %d" %
- (idlib.nodeid_b2a(peerid)[:8], shnum), level=log.WEIRD)
- return
-
- # it's good
- self.log("got valid privkey from shnum %d on peerid %s" %
- (shnum, idlib.shortnodeid_b2a(peerid)))
- self._privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
- self._encprivkey = enc_privkey
- self._node._populate_encprivkey(self._encprivkey)
- self._node._populate_privkey(self._privkey)
+ def _do_read(self, ss, peerid, storage_index, shnums, readv):
+ d = ss.callRemote("slot_readv", storage_index, shnums, readv)
+ return d
def _got_results(self, datavs, peerid, readsize, stuff, started):
- self.log(format="got result from [%(peerid)s], %(numshares)d shares",
- peerid=idlib.shortnodeid_b2a(peerid),
- numshares=len(datavs),
- level=log.NOISY)
+ lp = self.log(format="got result from [%(peerid)s], %(numshares)d shares",
+ peerid=idlib.shortnodeid_b2a(peerid),
+ numshares=len(datavs),
+ level=log.NOISY)
self._queries_outstanding.discard(peerid)
self._must_query.discard(peerid)
self._queries_completed += 1
else:
self._empty_peers.add(peerid)
+ last_verinfo = None
+ last_shnum = None
for shnum,datav in datavs.items():
data = datav[0]
try:
- self._got_results_one_share(shnum, data, peerid)
+ verinfo = self._got_results_one_share(shnum, data, peerid)
+ last_verinfo = verinfo
+ last_shnum = shnum
except CorruptShareError, e:
# log it and give the other shares a chance to be processed
f = failure.Failure()
self._last_failure = f
self._servermap.problems.append(f)
pass
+
+ if self._need_privkey and last_verinfo:
+ # send them a request for the privkey. We send one request per
+ # server.
+ (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
+ offsets_tuple) = last_verinfo
+ o = dict(offsets_tuple)
+
+ self._queries_outstanding.add(peerid)
+ readv = [ (o['enc_privkey'], (o['EOF'] - o['enc_privkey'])) ]
+ ss = self._servermap.connections[peerid]
+ d = self._do_read(ss, peerid, self._storage_index,
+ [last_shnum], readv)
+ d.addCallback(self._got_privkey_results, peerid, last_shnum)
+ d.addErrback(self._privkey_query_failed, peerid, last_shnum)
+ d.addErrback(log.err)
+ d.addCallback(self._check_for_done)
+ d.addErrback(self._fatal_error)
+
# all done!
- self.log("DONE")
+ self.log("_got_results done", parent=lp)
def _got_results_one_share(self, shnum, data, peerid):
self.log(format="_got_results: got shnum #%(shnum)d from peerid %(peerid)s",
"pubkey doesn't match fingerprint")
self._node._pubkey = self._deserialize_pubkey(pubkey_s)
+ if self._need_privkey:
+ self._try_to_extract_privkey(data, peerid, shnum)
+
(ig_version, ig_seqnum, ig_root_hash, ig_IV, ig_k, ig_N,
ig_segsize, ig_datalen, offsets) = unpack_header(data)
offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
self._servermap.servermap.add(peerid, (shnum, verinfo, timestamp))
# and the versionmap
self.versionmap.add(verinfo, (shnum, peerid, timestamp))
+ return verinfo
+
+ def _deserialize_pubkey(self, pubkey_s):
+ verifier = rsa.create_verifying_key_from_string(pubkey_s)
+ return verifier
+
+ def _try_to_extract_privkey(self, data, peerid, shnum):
+ try:
+ r = unpack_share(data)
+ except NeedMoreDataError, e:
+ # this share won't help us. oh well.
+ offset = e.encprivkey_offset
+ length = e.encprivkey_length
+ self.log("shnum %d on peerid %s: share was too short (%dB) "
+ "to get the encprivkey; [%d:%d] ought to hold it" %
+ (shnum, idlib.shortnodeid_b2a(peerid), len(data),
+ offset, offset+length))
+ # NOTE: if uncoordinated writes are taking place, someone might
+ # change the share (and most probably move the encprivkey) before
+ # we get a chance to do one of these reads and fetch it. This
+ # will cause us to see a NotEnoughPeersError(unable to fetch
+ # privkey) instead of an UncoordinatedWriteError . This is a
+ # nuisance, but it will go away when we move to DSA-based mutable
+ # files (since the privkey will be small enough to fit in the
+ # write cap).
+
+ self._encprivkey_shares.append( (peerid, shnum, offset, length))
+ return
+
+ (seqnum, root_hash, IV, k, N, segsize, datalen,
+ pubkey, signature, share_hash_chain, block_hash_tree,
+ share_data, enc_privkey) = r
+
+ return self._try_to_validate_privkey(self, enc_privkey, peerid, shnum)
+
+ def _try_to_validate_privkey(self, enc_privkey, peerid, shnum):
+
+ alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
+ alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
+ if alleged_writekey != self._writekey:
+ self.log("invalid privkey from %s shnum %d" %
+ (idlib.nodeid_b2a(peerid)[:8], shnum), level=log.WEIRD)
+ return
+
+ # it's good
+ self.log("got valid privkey from shnum %d on peerid %s" %
+ (shnum, idlib.shortnodeid_b2a(peerid)))
+ privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
+ self._node._populate_encprivkey(enc_privkey)
+ self._node._populate_privkey(privkey)
+ self._need_privkey = False
def _query_failed(self, f, peerid):
self._queries_completed += 1
self._last_failure = f
+ def _got_privkey_results(self, datavs, peerid, shnum):
+ self._queries_outstanding.discard(peerid)
+ if not self._need_privkey:
+ return
+ if shnum not in datavs:
+ self.log("privkey wasn't there when we asked it", level=log.WEIRD)
+ return
+ datav = datavs[shnum]
+ enc_privkey = datav[0]
+ self._try_to_validate_privkey(enc_privkey, peerid, shnum)
+
+ def _privkey_query_failed(self, f, peerid, shnum):
+ self._queries_outstanding.discard(peerid)
+ self.log("error during privkey query: %s %s" % (f, f.value),
+ level=log.WEIRD)
+ if not self._running:
+ return
+ self._queries_outstanding.discard(peerid)
+ self._servermap.problems.append(f)
+ self._last_failure = f
+
def _check_for_done(self, res):
# exit paths:
# return self._send_more_queries(outstanding) : send some more queries
num_not_responded = 0
num_not_found = 0
states = []
+ found_boundary = False
+
for i,(peerid,ss) in enumerate(self.full_peerlist):
if peerid in self._bad_peers:
# query failed
if num_not_found >= self.EPSILON:
self.log("MODE_WRITE: found our boundary, %s" %
"".join(states))
- # we need to know that we've gotten answers from
- # everybody to the left of here
- if last_not_responded == -1:
- # we're done
- self.log("have all our answers")
- return self._done()
- # still waiting for somebody
- return self._send_more_queries(num_not_responded)
+ found_boundary = True
+ break
elif peerid in self._good_peers:
# yes shares
last_not_responded = i
num_not_responded += 1
+ if found_boundary:
+ # we need to know that we've gotten answers from
+ # everybody to the left of here
+ if last_not_responded == -1:
+ # we're done
+ self.log("have all our answers")
+ # .. unless we're still waiting on the privkey
+ if self._need_privkey:
+ self.log("but we're still waiting for the privkey")
+ # if we found the boundary but we haven't yet found
+ # the privkey, we may need to look further. If
+ # somehow all the privkeys were corrupted (but the
+ # shares were readable), then this is likely to do an
+ # exhaustive search.
+ return self._send_more_queries(MAX_IN_FLIGHT)
+ return self._done()
+ # still waiting for somebody
+ return self._send_more_queries(num_not_responded)
+
# if we hit here, we didn't find our boundary, so we're still
# waiting for peers
self.log("MODE_WRITE: no boundary yet, %s" % "".join(states))
return self._send_more_queries(MAX_IN_FLIGHT)
def _send_more_queries(self, num_outstanding):
- assert self.extra_peers # we shouldn't get here with nothing in reserve
more_queries = []
while True:
- self.log(" there are %d queries outstanding" % len(self._queries_outstanding))
+ self.log(format=" there are %(outstanding)d queries outstanding",
+ outstanding=len(self._queries_outstanding),
+ level=log.NOISY)
active_queries = len(self._queries_outstanding) + len(more_queries)
if active_queries >= num_outstanding:
break
if not self._running:
return
self._running = False
- self._servermap.last_update_mode = self._mode
+ self._servermap.last_update_mode = self.mode
self._servermap.last_update_time = self._started
# the servermap will not be touched after this
eventually(self._done_deferred.callback, self._servermap)
+ def _fatal_error(self, f):
+ self.log("fatal error", failure=f, level=log.WEIRD)
+ self._done_deferred.errback(f)
+
class Marker:
pass
self._running = False
# res is either the new contents, or a Failure
if isinstance(res, failure.Failure):
- self.log("DONE, with failure", failure=res)
+ self.log("Retrieve done, with failure", failure=res)
else:
- self.log("DONE, success!: res=%s" % (res,))
+ self.log("Retrieve done, success!: res=%s" % (res,))
eventually(self._done_deferred.callback, res)
To make the initial publish, set servermap to None.
"""
+ # we limit the segment size as usual to constrain our memory footprint.
+ # The max segsize is higher for mutable files, because we want to support
+ # dirnodes with up to 10k children, and each child uses about 330 bytes.
+ # If you actually put that much into a directory you'll be using a
+ # footprint of around 14MB, which is higher than we'd like, but it is
+ # more important right now to support large directories than to make
+ # memory usage small when you use them. Once we implement MDMF (with
+ # multiple segments), we will drop this back down, probably to 128KiB.
+ MAX_SEGMENT_SIZE = 3500000
+
def __init__(self, filenode, servermap):
self._node = filenode
self._servermap = servermap
self._log_prefix = prefix = storage.si_b2a(self._storage_index)[:5]
num = self._node._client.log("Publish(%s): starting" % prefix)
self._log_number = num
+ self._running = True
def log(self, *args, **kwargs):
if 'parent' not in kwargs:
self.log("starting publish, datalen is %s" % len(newdata))
+ self.done_deferred = defer.Deferred()
+
self._writekey = self._node.get_writekey()
assert self._writekey, "need write capability to publish"
current_share_peers)
# TODO: add an errback too, probably to ignore that peer
- # we limit the segment size as usual to constrain our memory
- # footprint. The max segsize is higher for mutable files, because we
- # want to support dirnodes with up to 10k children, and each child
- # uses about 330 bytes. If you actually put that much into a
- # directory you'll be using a footprint of around 14MB, which is
- # higher than we'd like, but it is more important right now to
- # support large directories than to make memory usage small when you
- # use them. Once we implement MDMF (with multiple segments), we will
- # drop this back down, probably to 128KiB.
- self.MAX_SEGMENT_SIZE = 3500000
-
- segment_size = min(self.MAX_SEGMENT_SIZE, len(self.newdata))
- # this must be a multiple of self.required_shares
- segment_size = mathutil.next_multiple(segment_size,
- self.required_shares)
- self.segment_size = segment_size
- if segment_size:
- self.num_segments = mathutil.div_ceil(len(self.newdata),
- segment_size)
- else:
- self.num_segments = 0
- assert self.num_segments in [0, 1,] # SDMF restrictions
+ self.setup_encoding_parameters()
self.surprised = False
# When self.placed == self.goal, we're done.
self.placed = set() # (peerid, shnum) tuples
+ # we also keep a mapping from peerid to RemoteReference. Each time we
+ # pull a connection out of the full peerlist, we add it to this for
+ # use later.
+ self.connections = {}
+
# we use the servermap to populate the initial goal: this way we will
# try to update each existing share in place.
for (peerid, shares) in self._servermap.servermap.items():
for (shnum, versionid, timestamp) in shares:
self.goal.add( (peerid, shnum) )
+ self.connections[peerid] = self._servermap.connections[peerid]
# create the shares. We'll discard these as they are delivered. SMDF:
# we're allowed to hold everything in memory.
d = self._encrypt_and_encode()
d.addCallback(self._generate_shares)
d.addCallback(self.loop) # trigger delivery
+ d.addErrback(self._fatal_error)
return self.done_deferred
- def loop(self):
+ def setup_encoding_parameters(self):
+ segment_size = min(self.MAX_SEGMENT_SIZE, len(self.newdata))
+ # this must be a multiple of self.required_shares
+ segment_size = mathutil.next_multiple(segment_size,
+ self.required_shares)
+ self.segment_size = segment_size
+ if segment_size:
+ self.num_segments = mathutil.div_ceil(len(self.newdata),
+ segment_size)
+ else:
+ self.num_segments = 0
+ assert self.num_segments in [0, 1,] # SDMF restrictions
+
+ def _fatal_error(self, f):
+ self.log("error during loop", failure=f, level=log.SCARY)
+ self._done(f)
+
+ def loop(self, ignored=None):
+ self.log("entering loop", level=log.NOISY)
self.update_goal()
# how far are we from our goal?
needed = self.goal - self.placed - self.outstanding
if needed:
# we need to send out new shares
- d = self.send_shares(needed)
+ self.log(format="need to send %(needed)d new shares",
+ needed=len(needed), level=log.NOISY)
+ d = self._send_shares(needed)
d.addCallback(self.loop)
d.addErrback(self._fatal_error)
return
if self.outstanding:
# queries are still pending, keep waiting
+ self.log(format="%(outstanding)d queries still outstanding",
+ outstanding=len(self.outstanding),
+ level=log.NOISY)
return
# no queries outstanding, no placements needed: we're done
- return self._done()
+ self.log("no queries outstanding, no placements needed: done",
+ level=log.OPERATIONAL)
+ return self._done(None)
def log_goal(self, goal):
logmsg = []
# TODO: 2: move those shares instead of copying them, to reduce future
# update work
- # this is CPU intensive but easy to analyze. We create a sort order
- # for each peerid. If the peerid is marked as bad, we don't even put
- # them in the list. Then we care about the number of shares which
- # have already been assigned to them. After that we care about their
- # permutation order.
+ # this is a bit CPU intensive but easy to analyze. We create a sort
+ # order for each peerid. If the peerid is marked as bad, we don't
+ # even put them in the list. Then we care about the number of shares
+ # which have already been assigned to them. After that we care about
+ # their permutation order.
old_assignments = DictOfSets()
for (peerid, shnum) in self.goal:
old_assignments.add(peerid, shnum)
peerlist = []
for i, (peerid, ss) in enumerate(self.full_peerlist):
- entry = (len(old_assignments[peerid]), i, peerid, ss)
- peerlist.add(entry)
+ entry = (len(old_assignments.get(peerid, [])), i, peerid, ss)
+ peerlist.append(entry)
peerlist.sort()
new_assignments = []
for shnum in homeless_shares:
(ignored1, ignored2, peerid, ss) = peerlist[i]
self.goal.add( (peerid, shnum) )
+ self.connections[peerid] = ss
i += 1
if i >= len(peerlist):
i = 0
self.shares = final_shares
self.root_hash = root_hash
+ # we also need to build up the version identifier for what we're
+ # pushing. Extract the offsets from one of our shares.
+ assert final_shares
+ offsets = unpack_header(final_shares.values()[0])[-1]
+ offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
+ verinfo = (self._new_seqnum, root_hash, self.salt,
+ self.segment_size, len(self.newdata),
+ self.required_shares, self.total_shares,
+ prefix, offsets_tuple)
+ self.versioninfo = verinfo
+
+
def _send_shares(self, needed):
self.log("_send_shares")
peermap = DictOfSets()
for (peerid, shnum) in needed:
- peermap[peerid].add(shnum)
+ peermap.add(peerid, shnum)
# the next thing is to build up a bunch of test vectors. The
# semantics of Publish are that we perform the operation if the world
for (peerid, shnum) in needed:
testvs = []
- for (old_shnum, old_versionid, old_timestamp) in sm[peerid]:
+ for (old_shnum, old_versionid, old_timestamp) in sm.get(peerid,[]):
if old_shnum == shnum:
# an old version of that share already exists on the
# server, according to our servermap. We will create a
old_checkstring = pack_checkstring(old_seqnum,
old_root_hash,
old_salt)
- testv = [ (0, len(old_checkstring), "eq", old_checkstring) ]
+ testv = (0, len(old_checkstring), "eq", old_checkstring)
testvs.append(testv)
break
if not testvs:
# add a testv that requires the share not exist
- testv = [ (0, 1, 'eq', "") ]
+ #testv = (0, 1, 'eq', "")
+
+ # Unfortunately, foolscap-0.2.5 has a bug in the way inbound
+ # constraints are handled. If the same object is referenced
+ # multiple times inside the arguments, foolscap emits a
+ # 'reference' token instead of a distinct copy of the
+ # argument. The bug is that these 'reference' tokens are not
+ # accepted by the inbound constraint code. To work around
+ # this, we need to prevent python from interning the
+ # (constant) tuple, by creating a new copy of this vector
+ # each time. This bug is fixed in later versions of foolscap.
+ testv = tuple([0, 1, 'eq', ""])
testvs.append(testv)
# the write vector is simply the share
d.addCallbacks(self._got_write_answer, self._got_write_error,
callbackArgs=(peerid, shnums, started),
errbackArgs=(peerid, shnums, started))
- d.addErrback(self.error, peerid)
+ d.addErrback(self._fatal_error)
dl.append(d)
d = defer.DeferredList(dl)
def _do_testreadwrite(self, peerid, secrets,
tw_vectors, read_vector):
storage_index = self._storage_index
- ss = self._storage_servers[peerid]
+ ss = self.connections[peerid]
+ #print "SS[%s] is %s" % (idlib.shortnodeid_b2a(peerid), ss), ss.tracker.interfaceName
d = ss.callRemote("slot_testv_and_readv_and_writev",
storage_index,
secrets,
idlib.shortnodeid_b2a(peerid))
for shnum in shnums:
self.outstanding.discard( (peerid, shnum) )
+ sm = self._servermap.servermap
wrote, read_data = answer
if not wrote:
- # TODO: use the checkstring to add information to the log message
- #self.log("somebody modified the share on us:"
- # " shnum=%d: I thought they had #%d:R=%s,"
- # " but testv reported #%d:R=%s" %
- # (shnum,
- # seqnum, base32.b2a(root_hash)[:4],
- # old_seqnum, base32.b2a(old_root_hash)[:4]),
- # parent=lp, level=log.WEIRD)
self.log("our testv failed, so the write did not happen",
parent=lp, level=log.WEIRD)
self.surprised = True
self.bad_peers.add(peerid) # don't ask them again
+ # use the checkstring to add information to the log message
+ for (shnum,readv) in read_data.items():
+ checkstring = readv[0]
+ (other_seqnum,
+ other_roothash,
+ other_salt) = unpack_checkstring(checkstring)
+ expected_version = self._servermap.version_on_peer(peerid,
+ shnum)
+ (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
+ offsets_tuple) = expected_version
+ self.log("somebody modified the share on us:"
+ " shnum=%d: I thought they had #%d:R=%s,"
+ " but testv reported #%d:R=%s" %
+ (shnum,
+ seqnum, base32.b2a(root_hash)[:4],
+ other_seqnum, base32.b2a(other_roothash)[:4]),
+ parent=lp, level=log.NOISY)
# self.loop() will take care of finding new homes
return
- sm = self._servermap.servermap
for shnum in shnums:
self.placed.add( (peerid, shnum) )
# and update the servermap. We strip the old entry out..
newset = set([ t
- for t in sm[peerid]
+ for t in sm.get(peerid, [])
if t[0] != shnum ])
sm[peerid] = newset
# and add a new one
self.bad_peers.add(peerid)
self.log(format="error while writing shares %(shnums)s to peerid %(peerid)s",
shnums=list(shnums), peerid=idlib.shortnodeid_b2a(peerid),
+ failure=f,
level=log.UNUSUAL)
# self.loop() will take care of checking to see if we're done
return
raise UncoordinatedWriteError("I was surprised!")
def _done(self, res):
- now = time.time()
- self._status.timings["total"] = now - self._started
- self._status.set_active(False)
- self._status.set_status("Done")
- self._status.set_progress(1.0)
+ if not self._running:
+ return
+ self._running = False
+ #now = time.time()
+ #self._status.timings["total"] = now - self._started
+ #self._status.set_active(False)
+ #self._status.set_status("Done")
+ #self._status.set_progress(1.0)
+ self.done_deferred.callback(res)
return None
def get_status(self):
return self._status
- def _do_privkey_query(self, rref, peerid, shnum, offset, length):
- started = time.time()
- d = self._do_read(rref, peerid, self._storage_index,
- [shnum], [(offset, length)])
- d.addCallback(self._privkey_query_response, peerid, shnum, started)
- return d
-
- def _privkey_query_response(self, datav, peerid, shnum, started):
- elapsed = time.time() - started
- self._status.add_per_server_time(peerid, "read", elapsed)
-
- data = datav[shnum][0]
- self._try_to_validate_privkey(data, peerid, shnum)
-
- elapsed = time.time() - self._privkey_fetch_started
- self._status.timings["privkey_fetch"] = elapsed
- self._status.privkey_from = peerid
-
- def _obtain_privkey_done(self, target_info):
- elapsed = time.time() - self._obtain_privkey_started
- self._status.timings["privkey"] = elapsed
- return target_info
-
# use client.create_mutable_file() to make one of these
verifier = signer.get_verifying_key()
return verifier, signer
- def _publish(self, initial_contents):
- p = self.publish_class(self)
- self._client.notify_publish(p)
- d = p.publish(initial_contents)
- d.addCallback(lambda res: self)
- return d
-
def _encrypt_privkey(self, writekey, privkey):
enc = AES(writekey)
crypttext = enc.process(privkey)
d = self.obtain_lock()
d.addCallback(lambda res:
ServermapUpdater(self, servermap, mode).update())
- d.addCallback(self.release_lock)
+ d.addBoth(self.release_lock)
return d
def download_version(self, servermap, versionid):
d = self.obtain_lock()
d.addCallback(lambda res:
Retrieve(self, servermap, versionid).download())
- d.addCallback(self.release_lock)
+ d.addBoth(self.release_lock)
return d
def publish(self, servermap, newdata):
d.addCallback(lambda res: Publish(self, servermap).publish(newdata))
# p = self.publish_class(self)
# self._client.notify_publish(p)
- d.addCallback(self.release_lock)
+ d.addBoth(self.release_lock)
return d
def modify(self, modifier, *args, **kwargs):
def download_to_data(self):
d = self.obtain_lock()
d.addCallback(lambda res: self.update_servermap(mode=MODE_ENOUGH))
- d.addCallback(lambda smap:
- self.download_version(smap,
- smap.best_recoverable_version()))
- d.addCallback(self.release_lock)
+ def _updated(smap):
+ goal = smap.best_recoverable_version()
+ if not goal:
+ raise UnrecoverableFileError("no recoverable versions")
+ return self.download_version(smap, goal)
+ d.addCallback(_updated)
+ d.addBoth(self.release_lock)
+ return d
+
+ def _publish(self, initial_contents):
+ p = Publish(self, None)
+ d = p.publish(initial_contents)
+ d.addCallback(lambda res: self)
return d
def update(self, newdata):
- return self._publish(newdata)
+ d = self.obtain_lock()
+ d.addCallback(lambda res: self.update_servermap(mode=MODE_WRITE))
+ d.addCallback(lambda smap:
+ Publish(self, smap).publish(newdata))
+ d.addBoth(self.release_lock)
+ return d
def overwrite(self, newdata):
- return self._publish(newdata)
+ return self.update(newdata)
+
class MutableWatcher(service.MultiService):
MAX_PUBLISH_STATUSES = 20
-import itertools, struct, re
+import struct
from cStringIO import StringIO
from twisted.trial import unittest
from twisted.internet import defer, reactor
from twisted.python import failure
-from allmydata import mutable, uri, dirnode, download
+from allmydata import mutable, uri, download
from allmydata.util import base32
from allmydata.util.idlib import shortnodeid_b2a
from allmydata.util.hashutil import tagged_hash
from allmydata.encode import NotEnoughPeersError
-from allmydata.interfaces import IURI, INewDirectoryURI, \
- IMutableFileURI, IUploadable, IFileURI
-from allmydata.filenode import LiteralFileNode
+from allmydata.interfaces import IURI, IMutableFileURI, IUploadable
from foolscap.eventual import eventually, fireEventually
from foolscap.logging import log
import sha
-#from allmydata.test.common import FakeMutableFileNode
-#FakeFilenode = FakeMutableFileNode
+# this "FastMutableFileNode" exists solely to speed up tests by using smaller
+# public/private keys. Once we switch to fast DSA-based keys, we can get rid
+# of this.
-class FakeFilenode(mutable.MutableFileNode):
- counter = itertools.count(1)
- all_contents = {}
- all_rw_friends = {}
+class FastMutableFileNode(mutable.MutableFileNode):
+ SIGNATURE_KEY_SIZE = 522
- def create(self, initial_contents):
- d = mutable.MutableFileNode.create(self, initial_contents)
- def _then(res):
- self.all_contents[self.get_uri()] = initial_contents
- return res
- d.addCallback(_then)
- return d
- def init_from_uri(self, myuri):
- mutable.MutableFileNode.init_from_uri(self, myuri)
- return self
- def _generate_pubprivkeys(self, key_size):
- count = self.counter.next()
- return FakePubKey(count), FakePrivKey(count)
- def _publish(self, initial_contents):
- self.all_contents[self.get_uri()] = initial_contents
- return defer.succeed(self)
-
- def download_to_data(self):
- if self.is_readonly():
- assert self.all_rw_friends.has_key(self.get_uri()), (self.get_uri(), id(self.all_rw_friends))
- return defer.succeed(self.all_contents[self.all_rw_friends[self.get_uri()]])
- else:
- return defer.succeed(self.all_contents[self.get_uri()])
- def update(self, newdata):
- self.all_contents[self.get_uri()] = newdata
- return defer.succeed(None)
- def overwrite(self, newdata):
- return self.update(newdata)
+# this "FakeStorage" exists to put the share data in RAM and avoid using real
+# network connections, both to speed up the tests and to reduce the amount of
+# non-mutable.py code being exercised.
class FakeStorage:
# this class replaces the collection of storage servers, allowing the
def read(self, peerid, storage_index):
shares = self._peers.get(peerid, {})
if self._sequence is None:
- return shares
+ return defer.succeed(shares)
d = defer.Deferred()
if not self._pending:
reactor.callLater(1.0, self._fire_readers)
shares[shnum] = f.getvalue()
-class FakePublish(mutable.Publish):
-
- def _do_read(self, ss, peerid, storage_index, shnums, readv):
- assert ss[0] == peerid
- assert shnums == []
+class FakeStorageServer:
+ def __init__(self, peerid, storage):
+ self.peerid = peerid
+ self.storage = storage
+ self.queries = 0
+ def callRemote(self, methname, *args, **kwargs):
+ def _call():
+ meth = getattr(self, methname)
+ return meth(*args, **kwargs)
d = fireEventually()
- d.addCallback(lambda res: self._storage.read(peerid, storage_index))
+ d.addCallback(lambda res: _call())
+ return d
+
+ def slot_readv(self, storage_index, shnums, readv):
+ d = self.storage.read(self.peerid, storage_index)
+ def _read(shares):
+ response = {}
+ for shnum in shares:
+ if shnums and shnum not in shnums:
+ continue
+ vector = response[shnum] = []
+ for (offset, length) in readv:
+ assert isinstance(offset, (int, long)), offset
+ assert isinstance(length, (int, long)), length
+ vector.append(shares[shnum][offset:offset+length])
+ return response
+ d.addCallback(_read)
return d
- def _do_testreadwrite(self, peerid, secrets,
- tw_vectors, read_vector):
- storage_index = self._node._uri.storage_index
+ def slot_testv_and_readv_and_writev(self, storage_index, secrets,
+ tw_vectors, read_vector):
# always-pass: parrot the test vectors back to them.
readv = {}
for shnum, (testv, writev, new_length) in tw_vectors.items():
for (offset, length, op, specimen) in testv:
assert op in ("le", "eq", "ge")
+ # TODO: this isn't right, the read is controlled by read_vector,
+ # not by testv
readv[shnum] = [ specimen
for (offset, length, op, specimen)
in testv ]
for (offset, data) in writev:
- self._storage.write(peerid, storage_index, shnum, offset, data)
+ self.storage.write(self.peerid, storage_index, shnum,
+ offset, data)
answer = (True, readv)
- return defer.succeed(answer)
-
+ return fireEventually(answer)
-
-class FakeNewDirectoryNode(dirnode.NewDirectoryNode):
- filenode_class = FakeFilenode
+# our "FakeClient" has just enough functionality of the real Client to let
+# the tests run.
class FakeClient:
+ mutable_file_node_class = FastMutableFileNode
+
def __init__(self, num_peers=10):
+ self._storage = FakeStorage()
self._num_peers = num_peers
self._peerids = [tagged_hash("peerid", "%d" % i)[:20]
for i in range(self._num_peers)]
+ self._connections = dict([(peerid, FakeStorageServer(peerid,
+ self._storage))
+ for peerid in self._peerids])
self.nodeid = "fakenodeid"
def log(self, msg, **kw):
def get_cancel_secret(self):
return "I hereby permit you to cancel my leases"
- def create_empty_dirnode(self):
- n = FakeNewDirectoryNode(self)
- d = n.create()
- d.addCallback(lambda res: n)
- return d
-
- def create_dirnode_from_uri(self, u):
- return FakeNewDirectoryNode(self).init_from_uri(u)
-
def create_mutable_file(self, contents=""):
- n = FakeFilenode(self)
+ n = self.mutable_file_node_class(self)
d = n.create(contents)
d.addCallback(lambda res: n)
return d
def create_node_from_uri(self, u):
u = IURI(u)
- if INewDirectoryURI.providedBy(u):
- return self.create_dirnode_from_uri(u)
- if IFileURI.providedBy(u):
- if isinstance(u, uri.LiteralFileURI):
- return LiteralFileNode(u, self)
- else:
- # CHK
- raise RuntimeError("not simulated")
assert IMutableFileURI.providedBy(u), u
- res = FakeFilenode(self).init_from_uri(u)
+ res = self.mutable_file_node_class(self).init_from_uri(u)
return res
def get_permuted_peers(self, service_name, key):
"""
@return: list of (peerid, connection,)
"""
- peers_and_connections = [(pid, (pid,)) for pid in self._peerids]
results = []
- for peerid, connection in peers_and_connections:
+ for (peerid, connection) in self._connections.items():
assert isinstance(peerid, str)
permuted = sha.new(key + peerid).digest()
results.append((permuted, peerid, connection))
#d.addCallback(self.create_mutable_file)
def _got_data(datav):
data = "".join(datav)
- #newnode = FakeFilenode(self)
+ #newnode = FastMutableFileNode(self)
return uri.LiteralFileURI(data)
d.addCallback(_got_data)
return d
-class FakePubKey:
- def __init__(self, count):
- self.count = count
- def serialize(self):
- return "PUBKEY-%d" % self.count
- def verify(self, msg, signature):
- if signature[:5] != "SIGN(":
- return False
- if signature[5:-1] != msg:
- return False
- if signature[-1] != ")":
- return False
- return True
-
-class FakePrivKey:
- def __init__(self, count):
- self.count = count
- def serialize(self):
- return "PRIVKEY-%d" % self.count
- def sign(self, data):
- return "SIGN(%s)" % data
-
class Filenode(unittest.TestCase):
def setUp(self):
def test_create(self):
d = self.client.create_mutable_file()
def _created(n):
- d = n.overwrite("contents 1")
+ self.failUnless(isinstance(n, FastMutableFileNode))
+ peer0 = self.client._peerids[0]
+ shnums = self.client._storage._peers[peer0].keys()
+ self.failUnlessEqual(len(shnums), 1)
+ d.addCallback(_created)
+ return d
+
+ def test_upload_and_download(self):
+ d = self.client.create_mutable_file()
+ def _created(n):
+ d = defer.succeed(None)
+ d.addCallback(lambda res: n.update_servermap())
+ d.addCallback(lambda smap: smap.dump(StringIO()))
+ d.addCallback(lambda sio:
+ self.failUnless("3-of-10" in sio.getvalue()))
+ d.addCallback(lambda res: n.overwrite("contents 1"))
d.addCallback(lambda res: self.failUnlessIdentical(res, None))
d.addCallback(lambda res: n.download_to_data())
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
d.addCallback(_created)
return d
+ def test_upload_and_download_full_size_keys(self):
+ self.client.mutable_file_node_class = mutable.MutableFileNode
+ d = self.client.create_mutable_file()
+ def _created(n):
+ d = defer.succeed(None)
+ d.addCallback(lambda res: n.update_servermap())
+ d.addCallback(lambda smap: smap.dump(StringIO()))
+ d.addCallback(lambda sio:
+ self.failUnless("3-of-10" in sio.getvalue()))
+ d.addCallback(lambda res: n.overwrite("contents 1"))
+ d.addCallback(lambda res: self.failUnlessIdentical(res, None))
+ d.addCallback(lambda res: n.download_to_data())
+ d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
+ d.addCallback(lambda res: n.overwrite("contents 2"))
+ d.addCallback(lambda res: n.download_to_data())
+ d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
+ d.addCallback(lambda res: n.download(download.Data()))
+ d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
+ d.addCallback(lambda res: n.update("contents 3"))
+ d.addCallback(lambda res: n.download_to_data())
+ d.addCallback(lambda res: self.failUnlessEqual(res, "contents 3"))
+ return d
+ d.addCallback(_created)
+ return d
+
class Publish(unittest.TestCase):
def test_encrypt(self):
c = FakeClient()
- fn = FakeFilenode(c)
- # .create usually returns a Deferred, but we happen to know it's
- # synchronous
+ fn = FastMutableFileNode(c)
CONTENTS = "some initial contents"
- fn.create(CONTENTS)
- p = mutable.Publish(fn)
- target_info = None
- d = defer.maybeDeferred(p._encrypt_and_encode, target_info,
- CONTENTS, "READKEY", "IV"*8, 3, 10)
- def _done( ((shares, share_ids),
- required_shares, total_shares,
- segsize, data_length, target_info2) ):
+ d = fn.create(CONTENTS)
+ def _created(res):
+ p = mutable.Publish(fn, None)
+ p.salt = "SALT" * 4
+ p.readkey = "\x00" * 16
+ p.newdata = CONTENTS
+ p.required_shares = 3
+ p.total_shares = 10
+ p.setup_encoding_parameters()
+ return p._encrypt_and_encode()
+ d.addCallback(_created)
+ def _done(shares_and_shareids):
+ (shares, share_ids) = shares_and_shareids
self.failUnlessEqual(len(shares), 10)
for sh in shares:
self.failUnless(isinstance(sh, str))
self.failUnlessEqual(len(sh), 7)
self.failUnlessEqual(len(share_ids), 10)
- self.failUnlessEqual(required_shares, 3)
- self.failUnlessEqual(total_shares, 10)
- self.failUnlessEqual(segsize, 21)
- self.failUnlessEqual(data_length, len(CONTENTS))
- self.failUnlessIdentical(target_info, target_info2)
d.addCallback(_done)
return d
def test_generate(self):
c = FakeClient()
- fn = FakeFilenode(c)
- # .create usually returns a Deferred, but we happen to know it's
- # synchronous
+ fn = FastMutableFileNode(c)
CONTENTS = "some initial contents"
fn.create(CONTENTS)
p = mutable.Publish(fn)
self.failUnlessEqual(sorted(final_shares.keys()), range(10))
for i,sh in final_shares.items():
self.failUnless(isinstance(sh, str))
- self.failUnlessEqual(len(sh), 381)
# feed the share through the unpacker as a sanity-check
pieces = mutable.unpack_share(sh)
(u_seqnum, u_root_hash, IV, k, N, segsize, datalen,
self.failUnlessEqual(N, 10)
self.failUnlessEqual(segsize, 21)
self.failUnlessEqual(datalen, len(CONTENTS))
- self.failUnlessEqual(pubkey, FakePubKey(0).serialize())
+ self.failUnlessEqual(pubkey, p._pubkey.serialize())
sig_material = struct.pack(">BQ32s16s BBQQ",
- 0, seqnum, root_hash, IV,
+ 0, p._new_seqnum, root_hash, IV,
k, N, segsize, datalen)
- self.failUnlessEqual(signature,
- FakePrivKey(0).sign(sig_material))
+ self.failUnless(p._pubkey.verify(sig_material, signature))
+ #self.failUnlessEqual(signature, p._privkey.sign(sig_material))
self.failUnless(isinstance(share_hash_chain, dict))
self.failUnlessEqual(len(share_hash_chain), 4) # ln2(10)++
for shnum,share_hash in share_hash_chain.items():
self.failUnlessEqual(len(share_hash), 32)
self.failUnless(isinstance(block_hash_tree, list))
self.failUnlessEqual(len(block_hash_tree), 1) # very small tree
- self.failUnlessEqual(IV, "IV"*8)
+ self.failUnlessEqual(IV, "SALT"*4)
self.failUnlessEqual(len(share_data), len("%07d" % 1))
- self.failUnlessEqual(enc_privkey, "encprivkey")
- self.failUnlessIdentical(target_info, target_info2)
- d.addCallback(_done)
- return d
-
- def setup_for_sharemap(self, num_peers):
- c = FakeClient(num_peers)
- fn = FakeFilenode(c)
- s = FakeStorage()
- # .create usually returns a Deferred, but we happen to know it's
- # synchronous
- CONTENTS = "some initial contents"
- fn.create(CONTENTS)
- p = FakePublish(fn)
- p._storage_index = "\x00"*32
- p._new_seqnum = 3
- p._read_size = 1000
- #r = mutable.Retrieve(fn)
- p._storage = s
- return c, p
-
- def shouldFail(self, expected_failure, which, call, *args, **kwargs):
- substring = kwargs.pop("substring", None)
- d = defer.maybeDeferred(call, *args, **kwargs)
- def _done(res):
- if isinstance(res, failure.Failure):
- res.trap(expected_failure)
- if substring:
- self.failUnless(substring in str(res),
- "substring '%s' not in '%s'"
- % (substring, str(res)))
- else:
- self.fail("%s was supposed to raise %s, not get '%s'" %
- (which, expected_failure, res))
- d.addBoth(_done)
- return d
-
- def test_sharemap_20newpeers(self):
- c, p = self.setup_for_sharemap(20)
-
- total_shares = 10
- d = p._query_peers(total_shares)
- def _done(target_info):
- (target_map, shares_per_peer) = target_info
- shares_per_peer = {}
- for shnum in target_map:
- for (peerid, old_seqnum, old_R) in target_map[shnum]:
- #print "shnum[%d]: send to %s [oldseqnum=%s]" % \
- # (shnum, idlib.b2a(peerid), old_seqnum)
- if peerid not in shares_per_peer:
- shares_per_peer[peerid] = 1
- else:
- shares_per_peer[peerid] += 1
- # verify that we're sending only one share per peer
- for peerid, count in shares_per_peer.items():
- self.failUnlessEqual(count, 1)
- d.addCallback(_done)
+ self.failUnlessEqual(enc_privkey, fn.get_encprivkey())
+ d.addCallback(_generated)
return d
- def test_sharemap_3newpeers(self):
- c, p = self.setup_for_sharemap(3)
-
- total_shares = 10
- d = p._query_peers(total_shares)
- def _done(target_info):
- (target_map, shares_per_peer) = target_info
- shares_per_peer = {}
- for shnum in target_map:
- for (peerid, old_seqnum, old_R) in target_map[shnum]:
- if peerid not in shares_per_peer:
- shares_per_peer[peerid] = 1
- else:
- shares_per_peer[peerid] += 1
- # verify that we're sending 3 or 4 shares per peer
- for peerid, count in shares_per_peer.items():
- self.failUnless(count in (3,4), count)
- d.addCallback(_done)
- return d
-
- def test_sharemap_nopeers(self):
- c, p = self.setup_for_sharemap(0)
-
- total_shares = 10
- d = self.shouldFail(NotEnoughPeersError, "test_sharemap_nopeers",
- p._query_peers, total_shares)
- return d
-
- def test_write(self):
- total_shares = 10
- c, p = self.setup_for_sharemap(20)
- p._privkey = FakePrivKey(0)
- p._encprivkey = "encprivkey"
- p._pubkey = FakePubKey(0)
- # make some fake shares
- CONTENTS = "some initial contents"
- shares_and_ids = ( ["%07d" % i for i in range(10)], range(10) )
- d = defer.maybeDeferred(p._query_peers, total_shares)
- IV = "IV"*8
- d.addCallback(lambda target_info:
- p._generate_shares( (shares_and_ids,
- 3, total_shares,
- 21, # segsize
- len(CONTENTS),
- target_info),
- 3, # seqnum
- IV))
- d.addCallback(p._send_shares, IV)
- def _done((surprised, dispatch_map)):
- self.failIf(surprised, "surprised!")
- d.addCallback(_done)
- return d
-
-class FakeRetrieve(mutable.Retrieve):
- def _do_read(self, ss, peerid, storage_index, shnums, readv):
- d = fireEventually()
- d.addCallback(lambda res: self._storage.read(peerid, storage_index))
- def _read(shares):
- response = {}
- for shnum in shares:
- if shnums and shnum not in shnums:
- continue
- vector = response[shnum] = []
- for (offset, length) in readv:
- assert isinstance(offset, (int, long)), offset
- assert isinstance(length, (int, long)), length
- vector.append(shares[shnum][offset:offset+length])
- return response
- d.addCallback(_read)
- return d
-
-class FakeServermapUpdater(mutable.ServermapUpdater):
-
- def _do_read(self, ss, peerid, storage_index, shnums, readv):
- d = fireEventually()
- d.addCallback(lambda res: self._storage.read(peerid, storage_index))
- def _read(shares):
- response = {}
- for shnum in shares:
- if shnums and shnum not in shnums:
- continue
- vector = response[shnum] = []
- for (offset, length) in readv:
- vector.append(shares[shnum][offset:offset+length])
- return response
- d.addCallback(_read)
- return d
-
- def _deserialize_pubkey(self, pubkey_s):
- mo = re.search(r"^PUBKEY-(\d+)$", pubkey_s)
- if not mo:
- raise RuntimeError("mangled pubkey")
- count = mo.group(1)
- return FakePubKey(int(count))
+ # TODO: when we publish to 20 peers, we should get one share per peer on 10
+ # when we publish to 3 peers, we should get either 3 or 4 shares per peer
+ # when we publish to zero peers, we should get a NotEnoughPeersError
-class Sharemap(unittest.TestCase):
+class Servermap(unittest.TestCase):
def setUp(self):
# publish a file and create shares, which can then be manipulated
# later.
num_peers = 20
self._client = FakeClient(num_peers)
- self._fn = FakeFilenode(self._client)
- self._storage = FakeStorage()
- d = self._fn.create("")
- def _created(res):
- p = FakePublish(self._fn)
- p._storage = self._storage
- contents = "New contents go here"
- return p.publish(contents)
+ self._storage = self._client._storage
+ d = self._client.create_mutable_file("New contents go here")
+ def _created(node):
+ self._fn = node
d.addCallback(_created)
return d
- def make_servermap(self, storage, mode=mutable.MODE_CHECK):
- smu = FakeServermapUpdater(self._fn, mutable.ServerMap(), mode)
- smu._storage = storage
+ def make_servermap(self, mode=mutable.MODE_CHECK):
+ smu = mutable.ServermapUpdater(self._fn, mutable.ServerMap(), mode)
d = smu.update()
return d
- def update_servermap(self, storage, oldmap, mode=mutable.MODE_CHECK):
- smu = FakeServermapUpdater(self._fn, oldmap, mode)
- smu._storage = storage
+ def update_servermap(self, oldmap, mode=mutable.MODE_CHECK):
+ smu = mutable.ServermapUpdater(self._fn, oldmap, mode)
d = smu.update()
return d
return sm
def test_basic(self):
- s = self._storage # unmangled
d = defer.succeed(None)
ms = self.make_servermap
us = self.update_servermap
- d.addCallback(lambda res: ms(s, mode=mutable.MODE_CHECK))
+ d.addCallback(lambda res: ms(mode=mutable.MODE_CHECK))
d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
- d.addCallback(lambda res: ms(s, mode=mutable.MODE_WRITE))
+ d.addCallback(lambda res: ms(mode=mutable.MODE_WRITE))
d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
- d.addCallback(lambda res: ms(s, mode=mutable.MODE_ENOUGH))
+ d.addCallback(lambda res: ms(mode=mutable.MODE_ENOUGH))
# this more stops at k+epsilon, and epsilon=k, so 6 shares
d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 6))
- d.addCallback(lambda res: ms(s, mode=mutable.MODE_ANYTHING))
+ d.addCallback(lambda res: ms(mode=mutable.MODE_ANYTHING))
# this mode stops at 'k' shares
d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 3))
# increasing order of number of servers queried, since once a server
# gets into the servermap, we'll always ask it for an update.
d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 3))
- d.addCallback(lambda sm: us(s, sm, mode=mutable.MODE_ENOUGH))
+ d.addCallback(lambda sm: us(sm, mode=mutable.MODE_ENOUGH))
d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 6))
- d.addCallback(lambda sm: us(s, sm, mode=mutable.MODE_WRITE))
- d.addCallback(lambda sm: us(s, sm, mode=mutable.MODE_CHECK))
+ d.addCallback(lambda sm: us(sm, mode=mutable.MODE_WRITE))
+ d.addCallback(lambda sm: us(sm, mode=mutable.MODE_CHECK))
d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
- d.addCallback(lambda sm: us(s, sm, mode=mutable.MODE_ANYTHING))
+ d.addCallback(lambda sm: us(sm, mode=mutable.MODE_ANYTHING))
d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
return d
self.failUnlessEqual(len(sm.shares_available()), 0)
def test_no_shares(self):
- s = self._storage
- s._peers = {} # delete all shares
+ self._client._storage._peers = {} # delete all shares
ms = self.make_servermap
d = defer.succeed(None)
- d.addCallback(lambda res: ms(s, mode=mutable.MODE_CHECK))
+ d.addCallback(lambda res: ms(mode=mutable.MODE_CHECK))
d.addCallback(lambda sm: self.failUnlessNoneRecoverable(sm))
- d.addCallback(lambda res: ms(s, mode=mutable.MODE_ANYTHING))
+ d.addCallback(lambda res: ms(mode=mutable.MODE_ANYTHING))
d.addCallback(lambda sm: self.failUnlessNoneRecoverable(sm))
- d.addCallback(lambda res: ms(s, mode=mutable.MODE_WRITE))
+ d.addCallback(lambda res: ms(mode=mutable.MODE_WRITE))
d.addCallback(lambda sm: self.failUnlessNoneRecoverable(sm))
- d.addCallback(lambda res: ms(s, mode=mutable.MODE_ENOUGH))
+ d.addCallback(lambda res: ms(mode=mutable.MODE_ENOUGH))
d.addCallback(lambda sm: self.failUnlessNoneRecoverable(sm))
return d
self.failUnlessEqual(sm.shares_available().values()[0], (2,3) )
def test_not_quite_enough_shares(self):
- s = self._storage
+ s = self._client._storage
ms = self.make_servermap
num_shares = len(s._peers)
for peerid in s._peers:
d = defer.succeed(None)
- d.addCallback(lambda res: ms(s, mode=mutable.MODE_CHECK))
+ d.addCallback(lambda res: ms(mode=mutable.MODE_CHECK))
d.addCallback(lambda sm: self.failUnlessNotQuiteEnough(sm))
- d.addCallback(lambda res: ms(s, mode=mutable.MODE_ANYTHING))
+ d.addCallback(lambda res: ms(mode=mutable.MODE_ANYTHING))
d.addCallback(lambda sm: self.failUnlessNotQuiteEnough(sm))
- d.addCallback(lambda res: ms(s, mode=mutable.MODE_WRITE))
+ d.addCallback(lambda res: ms(mode=mutable.MODE_WRITE))
d.addCallback(lambda sm: self.failUnlessNotQuiteEnough(sm))
- d.addCallback(lambda res: ms(s, mode=mutable.MODE_ENOUGH))
+ d.addCallback(lambda res: ms(mode=mutable.MODE_ENOUGH))
d.addCallback(lambda sm: self.failUnlessNotQuiteEnough(sm))
return d
class Roundtrip(unittest.TestCase):
-
def setUp(self):
# publish a file and create shares, which can then be manipulated
# later.
self.CONTENTS = "New contents go here"
num_peers = 20
self._client = FakeClient(num_peers)
- self._fn = FakeFilenode(self._client)
- self._storage = FakeStorage()
- d = self._fn.create("")
- def _created(res):
- p = FakePublish(self._fn)
- p._storage = self._storage
- return p.publish(self.CONTENTS)
+ self._storage = self._client._storage
+ d = self._client.create_mutable_file(self.CONTENTS)
+ def _created(node):
+ self._fn = node
d.addCallback(_created)
return d
def make_servermap(self, mode=mutable.MODE_ENOUGH, oldmap=None):
if oldmap is None:
oldmap = mutable.ServerMap()
- smu = FakeServermapUpdater(self._fn, oldmap, mode)
- smu._storage = self._storage
+ smu = mutable.ServermapUpdater(self._fn, oldmap, mode)
d = smu.update()
return d
def do_download(self, servermap, version=None):
if version is None:
version = servermap.best_recoverable_version()
- r = FakeRetrieve(self._fn, servermap, version)
- r._storage = self._storage
+ r = mutable.Retrieve(self._fn, servermap, version)
return r.download()
def test_basic(self):
allproblems = [str(f) for f in servermap.problems]
self.failUnless(substring in "".join(allproblems))
return
- r = FakeRetrieve(self._fn, servermap, ver)
- r._storage = self._storage
+ r = mutable.Retrieve(self._fn, servermap, ver)
if should_succeed:
d1 = r.download()
d1.addCallback(lambda new_contents:
self.failUnless("pubkey doesn't match fingerprint"
in str(servermap.problems[0]))
ver = servermap.best_recoverable_version()
- r = FakeRetrieve(self._fn, servermap, ver)
- r._storage = self._storage
+ r = mutable.Retrieve(self._fn, servermap, ver)
return r.download()
d.addCallback(_do_retrieve)
d.addCallback(lambda new_contents:
self.failUnlessEqual(new_contents, self.CONTENTS))
return d
- def _encode(self, c, s, fn, k, n, data):
+
+class MultipleEncodings(unittest.TestCase):
+ def setUp(self):
+ self.CONTENTS = "New contents go here"
+ num_peers = 20
+ self._client = FakeClient(num_peers)
+ self._storage = self._client._storage
+ d = self._client.create_mutable_file(self.CONTENTS)
+ def _created(node):
+ self._fn = node
+ d.addCallback(_created)
+ return d
+
+ def _encode(self, k, n, data):
# encode 'data' into a peerid->shares dict.
- fn2 = FakeFilenode(c)
+ fn2 = FastMutableFileNode(self._client)
# init_from_uri populates _uri, _writekey, _readkey, _storage_index,
# and _fingerprint
+ fn = self._fn
fn2.init_from_uri(fn.get_uri())
# then we copy over other fields that are normally fetched from the
# existing shares
fn2._required_shares = k
fn2._total_shares = n
- p2 = FakePublish(fn2)
- p2._storage = s
- p2._storage._peers = {} # clear existing storage
+ s = self._client._storage
+ s._peers = {} # clear existing storage
+ p2 = mutable.Publish(fn2, None)
d = p2.publish(data)
def _published(res):
shares = s._peers
d.addCallback(_published)
return d
-class MultipleEncodings(unittest.TestCase):
-
- def publish(self):
- # publish a file and create shares, which can then be manipulated
- # later.
- self.CONTENTS = "New contents go here"
- num_peers = 20
- self._client = FakeClient(num_peers)
- self._fn = FakeFilenode(self._client)
- self._storage = FakeStorage()
- d = self._fn.create("")
- def _created(res):
- p = FakePublish(self._fn)
- p._storage = self._storage
- return p.publish(self.CONTENTS)
- d.addCallback(_created)
- return d
-
def make_servermap(self, mode=mutable.MODE_ENOUGH, oldmap=None):
if oldmap is None:
oldmap = mutable.ServerMap()
- smu = FakeServermapUpdater(self._fn, oldmap, mode)
- smu._storage = self._storage
+ smu = mutable.ServermapUpdater(self._fn, oldmap, mode)
d = smu.update()
return d
# we encode the same file in two different ways (3-of-10 and 4-of-9),
# then mix up the shares, to make sure that download survives seeing
# a variety of encodings. This is actually kind of tricky to set up.
- c, s, fn, p, r = self.setup_for_publish(20)
- # we ignore fn, p, and r
contents1 = "Contents for encoding 1 (3-of-10) go here"
contents2 = "Contents for encoding 2 (4-of-9) go here"
# we make a retrieval object that doesn't know what encoding
# parameters to use
- fn3 = FakeFilenode(c)
- fn3.init_from_uri(fn.get_uri())
+ fn3 = FastMutableFileNode(self._client)
+ fn3.init_from_uri(self._fn.get_uri())
# now we upload a file through fn1, and grab its shares
- d = self._encode(c, s, fn, 3, 10, contents1)
+ d = self._encode(3, 10, contents1)
def _encoded_1(shares):
self._shares1 = shares
d.addCallback(_encoded_1)
- d.addCallback(lambda res: self._encode(c, s, fn, 4, 9, contents2))
+ d.addCallback(lambda res: self._encode(4, 9, contents2))
def _encoded_2(shares):
self._shares2 = shares
d.addCallback(_encoded_2)
- d.addCallback(lambda res: self._encode(c, s, fn, 4, 7, contents3))
+ d.addCallback(lambda res: self._encode(4, 7, contents3))
def _encoded_3(shares):
self._shares3 = shares
d.addCallback(_encoded_3)
sharemap = {}
- for i,peerid in enumerate(c._peerids):
+ for i,peerid in enumerate(self._client._peerids):
peerid_s = shortnodeid_b2a(peerid)
for shnum in self._shares1.get(peerid, {}):
if shnum < len(places):
which = places[shnum]
else:
which = "x"
- s._peers[peerid] = peers = {}
+ self._client._storage._peers[peerid] = peers = {}
in_1 = shnum in self._shares1[peerid]
in_2 = shnum in self._shares2.get(peerid, {})
in_3 = shnum in self._shares3.get(peerid, {})
# now sort the sequence so that share 0 is returned first
new_sequence = [sharemap[shnum]
for shnum in sorted(sharemap.keys())]
- s._sequence = new_sequence
+ self._client._storage._sequence = new_sequence
log.msg("merge done")
d.addCallback(_merge)
- def _retrieve(res):
- r3 = FakeRetrieve(fn3)
- r3._storage = s
- return r3.retrieve()
- d.addCallback(_retrieve)
+ d.addCallback(lambda res: fn3.download_to_data())
def _retrieved(new_contents):
# the current specified behavior is "first version recoverable"
self.failUnlessEqual(new_contents, contents1)
c.add("v1", 1, 10, xdata[10:20], "time1")
#self.failUnlessEqual(c.read("v1", 1, 0, 20), (xdata[:20], "time0"))
-