From: Brian Warner Date: Fri, 10 Aug 2007 01:30:24 +0000 (-0700) Subject: #96: add flag to enable pushing data to ourselves, defaulting to False X-Git-Tag: allmydata-tahoe-0.5.0~83 X-Git-Url: https://git.rkrishnan.org/%5B/frontends/%22file://%22%3C?a=commitdiff_plain;h=998802fd6d0977640469063a3cdaa26f3389a50f;p=tahoe-lafs%2Ftahoe-lafs.git #96: add flag to enable pushing data to ourselves, defaulting to False --- diff --git a/src/allmydata/client.py b/src/allmydata/client.py index e54e195e..8f31a9bb 100644 --- a/src/allmydata/client.py +++ b/src/allmydata/client.py @@ -1,5 +1,6 @@ import os, sha, stat, time, re +from base64 import b32decode from foolscap import Referenceable, SturdyRef from zope.interface import implements from allmydata.interfaces import RIClient @@ -29,6 +30,7 @@ class Client(node.Node, Referenceable): MY_FURL_FILE = "myself.furl" SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline" SIZELIMIT_FILE = "sizelimit" + PUSH_TO_OURSELVES_FILE = "push_to_ourselves" # we're pretty narrow-minded right now OLDEST_SUPPORTED_VERSION = allmydata.__version__ @@ -38,6 +40,7 @@ class Client(node.Node, Referenceable): self.my_furl = None self.introducer_client = None self.init_storage() + self.init_options() self.add_service(Uploader()) self.add_service(Downloader()) self.add_service(VirtualDrive()) @@ -87,6 +90,12 @@ class Client(node.Node, Referenceable): no_storage = os.path.exists(NOSTORAGE_FILE) self.add_service(StorageServer(storedir, sizelimit, no_storage)) + def init_options(self): + self.push_to_ourselves = None + filename = os.path.join(self.basedir, self.PUSH_TO_OURSELVES_FILE) + if os.path.exists(filename): + self.push_to_ourselves = True + def _check_hotline(self, hotline_file): if os.path.exists(hotline_file): mtime = os.stat(hotline_file)[stat.ST_MTIME] @@ -141,18 +150,25 @@ class Client(node.Node, Referenceable): return [] return self.introducer_client.get_all_peerids() - def get_permuted_peers(self, key): + def get_permuted_peers(self, key, include_myself=True): """ @return: list of (permuted-peerid, peerid, connection,) """ results = [] + myid = b32decode(self.tub.tubID.upper()) for peerid, connection in self.introducer_client.get_all_peers(): assert isinstance(peerid, str) + if not include_myself and peerid == myid: + self.log("get_permuted_peers: removing myself from the list") + continue permuted = bytes_to_long(sha.new(key + peerid).digest()) results.append((permuted, peerid, connection)) results.sort() return results + def get_push_to_ourselves(self): + return self.push_to_ourselves + def get_encoding_parameters(self): if not self.introducer_client: return None diff --git a/src/allmydata/introducer.py b/src/allmydata/introducer.py index 4d2f088f..2e889eb2 100644 --- a/src/allmydata/introducer.py +++ b/src/allmydata/introducer.py @@ -112,8 +112,8 @@ class IntroducerClient(service.Service, Referenceable): self.log(" introducing ourselves: %s, %s" % (self, self.my_furl)) self._connected = True d = introducer.callRemote("hello", - node=self, - furl=self.my_furl) + node=self, + furl=self.my_furl) introducer.notifyOnDisconnect(self._disconnected) def _disconnected(self): diff --git a/src/allmydata/node.py b/src/allmydata/node.py index 4c24f18d..8c99a89f 100644 --- a/src/allmydata/node.py +++ b/src/allmydata/node.py @@ -36,6 +36,8 @@ class Node(service.MultiService): self.tub = Tub(certFile=certfile) self.tub.setOption("logLocalFailures", True) self.tub.setOption("logRemoteFailures", True) + # I think self.nodeid is kind of whacked. Shouldn't it equal the + # fingerprint portion of our furl? self.nodeid = b32encode(self.tub.tubID).lower() f = open(os.path.join(self.basedir, self.NODEIDFILE), "w") f.write(b32encode(self.nodeid).lower() + "\n") diff --git a/src/allmydata/test/check_memory.py b/src/allmydata/test/check_memory.py index 71e0246a..604647c3 100644 --- a/src/allmydata/test/check_memory.py +++ b/src/allmydata/test/check_memory.py @@ -161,6 +161,10 @@ this file are ignored. f = open(os.path.join(clientdir, "debug_no_storage"), "w") f.write("no_storage\n") f.close() + if self.mode == "upload-self": + f = open(os.path.join(clientdir, "push_to_ourselves"), "w") + f.write("push_to_ourselves\n") + f.close() self.keepalive_file = os.path.join(clientdir, "suicide_prevention_hotline") # now start updating the mtime. @@ -273,7 +277,7 @@ this file are ignored. name = '%d' % size print print "uploading %s" % name - if self.mode == "upload": + if self.mode in ("upload", "upload-self"): files[name] = self.create_data(name, size) d = control.callRemote("upload_from_file_to_uri", files[name]) def _done(uri): diff --git a/src/allmydata/test/test_upload.py b/src/allmydata/test/test_upload.py index 15148085..48ece345 100644 --- a/src/allmydata/test/test_upload.py +++ b/src/allmydata/test/test_upload.py @@ -129,9 +129,11 @@ class FakeBucketWriter: class FakeClient: def __init__(self, mode="good"): self.mode = mode - def get_permuted_peers(self, storage_index): + def get_permuted_peers(self, storage_index, include_myself): return [ ("%20d"%fakeid, "%20d"%fakeid, FakePeer(self.mode),) for fakeid in range(50) ] + def get_push_to_ourselves(self): + return None def get_encoding_parameters(self): return None diff --git a/src/allmydata/upload.py b/src/allmydata/upload.py index 65bcd2dc..055e82cc 100644 --- a/src/allmydata/upload.py +++ b/src/allmydata/upload.py @@ -88,7 +88,8 @@ class Tahoe3PeerSelector: def get_shareholders(self, client, storage_index, share_size, block_size, - num_segments, total_shares, shares_of_happiness): + num_segments, total_shares, shares_of_happiness, + push_to_ourselves): """ @return: a set of PeerTracker instances that have agreed to hold some shares for us @@ -99,8 +100,9 @@ class Tahoe3PeerSelector: # we are responsible for locating the shareholders. self._encoder is # responsible for handling the data and sending out the shares. - peers = client.get_permuted_peers(storage_index) - assert peers + peers = client.get_permuted_peers(storage_index, push_to_ourselves) + + assert peers, "peer selection left us with zero peers for our data" # this needed_hashes computation should mirror # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree @@ -398,10 +400,11 @@ class CHKUploader: block_size = encoder.get_param("block_size") num_segments = encoder.get_param("num_segments") k,desired,n = encoder.get_param("share_counts") + push_to_ourselves = self._options.get("push_to_ourselves", False) gs = peer_selector.get_shareholders d = gs(self._client, storage_index, share_size, block_size, - num_segments, n, desired) + num_segments, n, desired, push_to_ourselves) return d def set_shareholders(self, used_peers, encoder): @@ -554,6 +557,10 @@ class Uploader(service.MultiService): # this returns the URI assert self.parent assert self.running + push_to_ourselves = self.parent.get_push_to_ourselves() + if push_to_ourselves is not None: + options["push_to_ourselves"] = push_to_ourselves + uploadable = IUploadable(uploadable) d = uploadable.get_size() def _got_size(size):