]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/mutable.py
decentralized directories: integration and testing
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable.py
index cb80557f05d93b25f2b5f3f2eab947945130c94b..fadcea5a4273d7b447fe2ee7a194358e75bd61f9 100644 (file)
@@ -14,6 +14,9 @@ from allmydata.encode import NotEnoughPeersError
 from pycryptopp.publickey import rsa
 
 
+class NotMutableError(Exception):
+    pass
+
 class NeedMoreDataError(Exception):
     def __init__(self, needed_bytes, encprivkey_offset, encprivkey_length):
         Exception.__init__(self)
@@ -613,12 +616,31 @@ class Publish:
     def log_err(self, f):
         log.err(f)
 
-    def publish(self, newdata):
-        """Publish the filenode's current contents. Returns a Deferred that
+    def publish(self, newdata, wait_for_numpeers=None):
+        """Publish the filenode's current contents.  Returns a Deferred that
         fires (with None) when the publish has done as much work as it's ever
         going to do, or errbacks with ConsistencyError if it detects a
-        simultaneous write."""
+        simultaneous write.
+
+        It will wait until at least wait_for_numpeers peers are connected
+        before it starts uploading
+
+        If wait_for_numpeers is None, then wait_for_numpeers is set to the
+        number of shares total (M).
+        """
+
+        self.log("starting publish")
+
+        if wait_for_numpeers is None:
+            # TODO: perhaps the default should be something like:
+            # wait_for_numpeers = self._node.get_total_shares()
+            wait_for_numpeers = 1
 
+        d = self._node._client.introducer_client.when_enough_peers(wait_for_numpeers)
+        d.addCallback(lambda dummy: self._after_enough_peers(newdata))
+        return d
+
+    def _after_enough_peers(self, newdata):
         # 1: generate shares (SDMF: files are small, so we can do it in RAM)
         # 2: perform peer selection, get candidate servers
         #  2a: send queries to n+epsilon servers, to determine current shares
@@ -628,7 +650,7 @@ class Publish:
         # 4a: may need to run recovery algorithm
         # 5: when enough responses are back, we're done
 
-        self.log("starting publish, data is %r" % (newdata,))
+        self.log("got enough peers")
 
         self._storage_index = self._node.get_storage_index()
         self._writekey = self._node.get_writekey()
@@ -725,8 +747,6 @@ class Publish:
 
     def _got_query_results(self, datavs, peerid, permutedid,
                            reachable_peers, current_share_peers):
-        self.log("_got_query_results")
-
         assert isinstance(datavs, dict)
         reachable_peers[peerid] = permutedid
         for shnum, datav in datavs.items():
@@ -829,7 +849,8 @@ class Publish:
         # one-per-peer in the normal permuted order.
         while shares_needing_homes:
             if not reachable_peers:
-                raise NotEnoughPeersError("ran out of peers during upload")
+                prefix = idlib.b2a(self._node.get_storage_index())[:6]
+                raise NotEnoughPeersError("ran out of peers during upload of (%s); shares_needing_homes: %s, reachable_peers: %s" % (prefix, shares_needing_homes, reachable_peers,))
             shnum = shares_needing_homes.pop(0)
             possible_homes = reachable_peers.keys()
             possible_homes.sort(lambda a,b:
@@ -888,6 +909,7 @@ class Publish:
         key = hashutil.ssk_readkey_data_hash(IV, readkey)
         enc = AES.new(key=key, mode=AES.MODE_CTR, counterstart="\x00"*16)
         crypttext = enc.encrypt(newdata)
+        assert len(crypttext) == len(newdata)
 
         # now apply FEC
         self.MAX_SEGMENT_SIZE = 1024*1024
@@ -1100,7 +1122,7 @@ class Publish:
         if surprised:
             self._surprised = True
 
-    def log_dispatch_map(self, dispatch_map):
+    def _log_dispatch_map(self, dispatch_map):
         for shnum, places in dispatch_map.items():
             sent_to = [(idlib.shortnodeid_b2a(peerid),
                         seqnum,
@@ -1110,7 +1132,7 @@ class Publish:
 
     def _maybe_recover(self, (surprised, dispatch_map)):
         self.log("_maybe_recover, surprised=%s, dispatch_map:" % surprised)
-        self.log_dispatch_map(dispatch_map)
+        self._log_dispatch_map(dispatch_map)
         if not surprised:
             self.log(" no recovery needed")
             return
@@ -1139,6 +1161,9 @@ class MutableFileNode:
         self._current_roothash = None # ditto
         self._current_seqnum = None # ditto
 
+    def __repr__(self):
+        return "<%s %x %s %s>" % (self.__class__.__name__, id(self), self.is_readonly() and 'RO' or 'RW', hasattr(self, '_uri') and self._uri.abbrev())
+
     def init_from_uri(self, myuri):
         # we have the URI, but we have not yet retrieved the public
         # verification key, nor things like 'k' or 'N'. If and when someone
@@ -1160,11 +1185,12 @@ class MutableFileNode:
         self._encprivkey = None
         return self
 
-    def create(self, initial_contents):
+    def create(self, initial_contents, wait_for_numpeers=None):
         """Call this when the filenode is first created. This will generate
-        the keys, generate the initial shares, allocate shares, and upload
-        the initial contents. Returns a Deferred that fires (with the
-        MutableFileNode instance you should use) when it completes.
+        the keys, generate the initial shares, wait until at least numpeers
+        are connected, allocate shares, and upload the initial
+        contents. Returns a Deferred that fires (with the MutableFileNode
+        instance you should use) when it completes.
         """
         self._required_shares = 3
         self._total_shares = 10
@@ -1183,7 +1209,7 @@ class MutableFileNode:
             # nobody knows about us yet"
             self._current_seqnum = 0
             self._current_roothash = "\x00"*32
-            return self._publish(initial_contents)
+            return self._publish(initial_contents, wait_for_numpeers=wait_for_numpeers)
         d.addCallback(_generated)
         return d
 
@@ -1193,9 +1219,9 @@ class MutableFileNode:
         verifier = signer.get_verifying_key()
         return verifier, signer
 
-    def _publish(self, initial_contents):
+    def _publish(self, initial_contents, wait_for_numpeers):
         p = self.publish_class(self)
-        d = p.publish(initial_contents)
+        d = p.publish(initial_contents, wait_for_numpeers=wait_for_numpeers)
         d.addCallback(lambda res: self)
         return d
 
@@ -1276,10 +1302,12 @@ class MutableFileNode:
         ro.init_from_uri(self._uri.get_readonly())
         return ro
 
+    def get_readonly_uri(self):
+        return self._uri.get_readonly().to_string()
+
     def is_mutable(self):
         return self._uri.is_mutable()
     def is_readonly(self):
-        # but maybe not you
         return self._uri.is_readonly()
 
     def __hash__(self):
@@ -1313,8 +1341,8 @@ class MutableFileNode:
         r = Retrieve(self)
         return r.retrieve()
 
-    def replace(self, newdata):
+    def replace(self, newdata, wait_for_numpeers=None):
         r = Retrieve(self)
         d = r.retrieve()
-        d.addCallback(lambda res: self._publish(newdata))
+        d.addCallback(lambda res: self._publish(newdata, wait_for_numpeers=wait_for_numpeers))
         return d