]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
#96: add flag to enable pushing data to ourselves, defaulting to False
authorBrian Warner <warner@allmydata.com>
Fri, 10 Aug 2007 01:30:24 +0000 (18:30 -0700)
committerBrian Warner <warner@allmydata.com>
Fri, 10 Aug 2007 01:30:24 +0000 (18:30 -0700)
src/allmydata/client.py
src/allmydata/introducer.py
src/allmydata/node.py
src/allmydata/test/check_memory.py
src/allmydata/test/test_upload.py
src/allmydata/upload.py

index e54e195ee065cab4514aa6c389ae8ab2ece18c6d..8f31a9bba2aa48dfa3a7ce4d48ac68decbca358a 100644 (file)
@@ -1,5 +1,6 @@
 
 import os, sha, stat, time, re
+from base64 import b32decode
 from foolscap import Referenceable, SturdyRef
 from zope.interface import implements
 from allmydata.interfaces import RIClient
@@ -29,6 +30,7 @@ class Client(node.Node, Referenceable):
     MY_FURL_FILE = "myself.furl"
     SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
     SIZELIMIT_FILE = "sizelimit"
+    PUSH_TO_OURSELVES_FILE = "push_to_ourselves"
 
     # we're pretty narrow-minded right now
     OLDEST_SUPPORTED_VERSION = allmydata.__version__
@@ -38,6 +40,7 @@ class Client(node.Node, Referenceable):
         self.my_furl = None
         self.introducer_client = None
         self.init_storage()
+        self.init_options()
         self.add_service(Uploader())
         self.add_service(Downloader())
         self.add_service(VirtualDrive())
@@ -87,6 +90,12 @@ class Client(node.Node, Referenceable):
         no_storage = os.path.exists(NOSTORAGE_FILE)
         self.add_service(StorageServer(storedir, sizelimit, no_storage))
 
+    def init_options(self):
+        self.push_to_ourselves = None
+        filename = os.path.join(self.basedir, self.PUSH_TO_OURSELVES_FILE)
+        if os.path.exists(filename):
+            self.push_to_ourselves = True
+
     def _check_hotline(self, hotline_file):
         if os.path.exists(hotline_file):
             mtime = os.stat(hotline_file)[stat.ST_MTIME]
@@ -141,18 +150,25 @@ class Client(node.Node, Referenceable):
             return []
         return self.introducer_client.get_all_peerids()
 
-    def get_permuted_peers(self, key):
+    def get_permuted_peers(self, key, include_myself=True):
         """
         @return: list of (permuted-peerid, peerid, connection,)
         """
         results = []
+        myid = b32decode(self.tub.tubID.upper())
         for peerid, connection in self.introducer_client.get_all_peers():
             assert isinstance(peerid, str)
+            if not include_myself and peerid == myid:
+                self.log("get_permuted_peers: removing myself from the list")
+                continue
             permuted = bytes_to_long(sha.new(key + peerid).digest())
             results.append((permuted, peerid, connection))
         results.sort()
         return results
 
+    def get_push_to_ourselves(self):
+        return self.push_to_ourselves
+
     def get_encoding_parameters(self):
         if not self.introducer_client:
             return None
index 4d2f088f12068287d07992943758b7190509e2d4..2e889eb2a6c4f63bbafb50b2bcd866893bc8639f 100644 (file)
@@ -112,8 +112,8 @@ class IntroducerClient(service.Service, Referenceable):
         self.log(" introducing ourselves: %s, %s" % (self, self.my_furl))
         self._connected = True
         d = introducer.callRemote("hello",
-                             node=self,
-                             furl=self.my_furl)
+                                  node=self,
+                                  furl=self.my_furl)
         introducer.notifyOnDisconnect(self._disconnected)
 
     def _disconnected(self):
index 4c24f18d5489faf2150212d1d191d7c0f5c6aa05..8c99a89fb7a3fa5c6e2a923c6b862d365dd65b16 100644 (file)
@@ -36,6 +36,8 @@ class Node(service.MultiService):
         self.tub = Tub(certFile=certfile)
         self.tub.setOption("logLocalFailures", True)
         self.tub.setOption("logRemoteFailures", True)
+        # I think self.nodeid is kind of whacked. Shouldn't it equal the
+        # fingerprint portion of our furl?
         self.nodeid = b32encode(self.tub.tubID).lower()
         f = open(os.path.join(self.basedir, self.NODEIDFILE), "w")
         f.write(b32encode(self.nodeid).lower() + "\n")
index 71e0246a5eb155325ba9590bfc06d972064e9411..604647c3b4cf46dee18e74b10eb504154e198e5b 100644 (file)
@@ -161,6 +161,10 @@ this file are ignored.
             f = open(os.path.join(clientdir, "debug_no_storage"), "w")
             f.write("no_storage\n")
             f.close()
+        if self.mode == "upload-self":
+            f = open(os.path.join(clientdir, "push_to_ourselves"), "w")
+            f.write("push_to_ourselves\n")
+            f.close()
         self.keepalive_file = os.path.join(clientdir,
                                            "suicide_prevention_hotline")
         # now start updating the mtime.
@@ -273,7 +277,7 @@ this file are ignored.
             name = '%d' % size
             print
             print "uploading %s" % name
-            if self.mode == "upload":
+            if self.mode in ("upload", "upload-self"):
                 files[name] = self.create_data(name, size)
                 d = control.callRemote("upload_from_file_to_uri", files[name])
                 def _done(uri):
index 15148085dc21ea405dbeae8d7b8f3290b9fa9bd9..48ece34518d74c70de1aac9e48b42b216425e17e 100644 (file)
@@ -129,9 +129,11 @@ class FakeBucketWriter:
 class FakeClient:
     def __init__(self, mode="good"):
         self.mode = mode
-    def get_permuted_peers(self, storage_index):
+    def get_permuted_peers(self, storage_index, include_myself):
         return [ ("%20d"%fakeid, "%20d"%fakeid, FakePeer(self.mode),)
                  for fakeid in range(50) ]
+    def get_push_to_ourselves(self):
+        return None
     def get_encoding_parameters(self):
         return None
 
index 65bcd2dc16222974a2b5ee3291c13100c780eb54..055e82cc8d038ccb32fdca5d99959e9aee974f65 100644 (file)
@@ -88,7 +88,8 @@ class Tahoe3PeerSelector:
 
     def get_shareholders(self, client,
                          storage_index, share_size, block_size,
-                         num_segments, total_shares, shares_of_happiness):
+                         num_segments, total_shares, shares_of_happiness,
+                         push_to_ourselves):
         """
         @return: a set of PeerTracker instances that have agreed to hold some
             shares for us
@@ -99,8 +100,9 @@ class Tahoe3PeerSelector:
 
         # we are responsible for locating the shareholders. self._encoder is
         # responsible for handling the data and sending out the shares.
-        peers = client.get_permuted_peers(storage_index)
-        assert peers
+        peers = client.get_permuted_peers(storage_index, push_to_ourselves)
+
+        assert peers, "peer selection left us with zero peers for our data"
 
         # this needed_hashes computation should mirror
         # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
@@ -398,10 +400,11 @@ class CHKUploader:
         block_size = encoder.get_param("block_size")
         num_segments = encoder.get_param("num_segments")
         k,desired,n = encoder.get_param("share_counts")
+        push_to_ourselves = self._options.get("push_to_ourselves", False)
 
         gs = peer_selector.get_shareholders
         d = gs(self._client, storage_index, share_size, block_size,
-               num_segments, n, desired)
+               num_segments, n, desired, push_to_ourselves)
         return d
 
     def set_shareholders(self, used_peers, encoder):
@@ -554,6 +557,10 @@ class Uploader(service.MultiService):
         # this returns the URI
         assert self.parent
         assert self.running
+        push_to_ourselves = self.parent.get_push_to_ourselves()
+        if push_to_ourselves is not None:
+            options["push_to_ourselves"] = push_to_ourselves
+
         uploadable = IUploadable(uploadable)
         d = uploadable.get_size()
         def _got_size(size):