from pycryptopp.publickey import rsa
+class NotMutableError(Exception):
+ pass
+
class NeedMoreDataError(Exception):
def __init__(self, needed_bytes, encprivkey_offset, encprivkey_length):
Exception.__init__(self)
def log_err(self, f):
log.err(f)
- def publish(self, newdata):
- """Publish the filenode's current contents. Returns a Deferred that
+ def publish(self, newdata, wait_for_numpeers=None):
+ """Publish the filenode's current contents. Returns a Deferred that
fires (with None) when the publish has done as much work as it's ever
going to do, or errbacks with ConsistencyError if it detects a
- simultaneous write."""
+ simultaneous write.
+
+ It will wait until at least wait_for_numpeers peers are connected
+ before it starts uploading
+
+ If wait_for_numpeers is None, then wait_for_numpeers is set to the
+ number of shares total (M).
+ """
+
+ self.log("starting publish")
+
+ if wait_for_numpeers is None:
+ # TODO: perhaps the default should be something like:
+ # wait_for_numpeers = self._node.get_total_shares()
+ wait_for_numpeers = 1
+ d = self._node._client.introducer_client.when_enough_peers(wait_for_numpeers)
+ d.addCallback(lambda dummy: self._after_enough_peers(newdata))
+ return d
+
+ def _after_enough_peers(self, newdata):
# 1: generate shares (SDMF: files are small, so we can do it in RAM)
# 2: perform peer selection, get candidate servers
# 2a: send queries to n+epsilon servers, to determine current shares
# 4a: may need to run recovery algorithm
# 5: when enough responses are back, we're done
- self.log("starting publish, data is %r" % (newdata,))
+ self.log("got enough peers")
self._storage_index = self._node.get_storage_index()
self._writekey = self._node.get_writekey()
def _got_query_results(self, datavs, peerid, permutedid,
reachable_peers, current_share_peers):
- self.log("_got_query_results")
-
assert isinstance(datavs, dict)
reachable_peers[peerid] = permutedid
for shnum, datav in datavs.items():
# one-per-peer in the normal permuted order.
while shares_needing_homes:
if not reachable_peers:
- raise NotEnoughPeersError("ran out of peers during upload")
+ prefix = idlib.b2a(self._node.get_storage_index())[:6]
+ raise NotEnoughPeersError("ran out of peers during upload of (%s); shares_needing_homes: %s, reachable_peers: %s" % (prefix, shares_needing_homes, reachable_peers,))
shnum = shares_needing_homes.pop(0)
possible_homes = reachable_peers.keys()
possible_homes.sort(lambda a,b:
key = hashutil.ssk_readkey_data_hash(IV, readkey)
enc = AES.new(key=key, mode=AES.MODE_CTR, counterstart="\x00"*16)
crypttext = enc.encrypt(newdata)
+ assert len(crypttext) == len(newdata)
# now apply FEC
self.MAX_SEGMENT_SIZE = 1024*1024
if surprised:
self._surprised = True
- def log_dispatch_map(self, dispatch_map):
+ def _log_dispatch_map(self, dispatch_map):
for shnum, places in dispatch_map.items():
sent_to = [(idlib.shortnodeid_b2a(peerid),
seqnum,
def _maybe_recover(self, (surprised, dispatch_map)):
self.log("_maybe_recover, surprised=%s, dispatch_map:" % surprised)
- self.log_dispatch_map(dispatch_map)
+ self._log_dispatch_map(dispatch_map)
if not surprised:
self.log(" no recovery needed")
return
self._current_roothash = None # ditto
self._current_seqnum = None # ditto
+ def __repr__(self):
+ return "<%s %x %s %s>" % (self.__class__.__name__, id(self), self.is_readonly() and 'RO' or 'RW', hasattr(self, '_uri') and self._uri.abbrev())
+
def init_from_uri(self, myuri):
# we have the URI, but we have not yet retrieved the public
# verification key, nor things like 'k' or 'N'. If and when someone
self._encprivkey = None
return self
- def create(self, initial_contents):
+ def create(self, initial_contents, wait_for_numpeers=None):
"""Call this when the filenode is first created. This will generate
- the keys, generate the initial shares, allocate shares, and upload
- the initial contents. Returns a Deferred that fires (with the
- MutableFileNode instance you should use) when it completes.
+ the keys, generate the initial shares, wait until at least numpeers
+ are connected, allocate shares, and upload the initial
+ contents. Returns a Deferred that fires (with the MutableFileNode
+ instance you should use) when it completes.
"""
self._required_shares = 3
self._total_shares = 10
# nobody knows about us yet"
self._current_seqnum = 0
self._current_roothash = "\x00"*32
- return self._publish(initial_contents)
+ return self._publish(initial_contents, wait_for_numpeers=wait_for_numpeers)
d.addCallback(_generated)
return d
verifier = signer.get_verifying_key()
return verifier, signer
- def _publish(self, initial_contents):
+ def _publish(self, initial_contents, wait_for_numpeers):
p = self.publish_class(self)
- d = p.publish(initial_contents)
+ d = p.publish(initial_contents, wait_for_numpeers=wait_for_numpeers)
d.addCallback(lambda res: self)
return d
ro.init_from_uri(self._uri.get_readonly())
return ro
+ def get_readonly_uri(self):
+ return self._uri.get_readonly().to_string()
+
def is_mutable(self):
return self._uri.is_mutable()
def is_readonly(self):
- # but maybe not you
return self._uri.is_readonly()
def __hash__(self):
r = Retrieve(self)
return r.retrieve()
- def replace(self, newdata):
+ def replace(self, newdata, wait_for_numpeers=None):
r = Retrieve(self)
d = r.retrieve()
- d.addCallback(lambda res: self._publish(newdata))
+ d.addCallback(lambda res: self._publish(newdata, wait_for_numpeers=wait_for_numpeers))
return d