From: Brian Warner Date: Thu, 31 Jan 2008 01:49:02 +0000 (-0700) Subject: upload-helper: avoid duplicate uploads: check the grid to see if the file already... X-Git-Url: https://git.rkrishnan.org/%5B/%5D%20/uri/vdrive/?a=commitdiff_plain;h=81eeafc57473e7c419d8e03106161bae79d1e9c7;p=tahoe-lafs%2Ftahoe-lafs.git upload-helper: avoid duplicate uploads: check the grid to see if the file already exists --- diff --git a/src/allmydata/offloaded.py b/src/allmydata/offloaded.py index 6cb9f98a..2a5f6bfb 100644 --- a/src/allmydata/offloaded.py +++ b/src/allmydata/offloaded.py @@ -5,14 +5,125 @@ from twisted.application import service from twisted.internet import defer from foolscap import Referenceable from foolscap.eventual import eventually -from allmydata import upload, interfaces -from allmydata.util import idlib, log, observer, fileutil +from allmydata import upload, interfaces, storage, uri +from allmydata.util import idlib, log, observer, fileutil, hashutil class NotEnoughWritersError(Exception): pass +class CHKCheckerAndUEBFetcher: + """I check to see if a file is already present in the grid. I also fetch + the URI Extension Block, which is useful for an uploading client who + wants to avoid the work of encryption and encoding. + + I return False if the file is not completely healthy: i.e. if there are + less than 'N' shares present. + + If the file is completely healthy, I return a tuple of (sharemap, + UEB_data, UEB_hash). + """ + + def __init__(self, peer_getter, storage_index, logparent=None): + self._peer_getter = peer_getter + self._found_shares = set() + self._storage_index = storage_index + self._sharemap = {} + self._readers = set() + self._ueb_hash = None + self._ueb_data = None + self._logparent = logparent + + def log(self, *args, **kwargs): + if 'facility' not in kwargs: + kwargs['facility'] = "tahoe.helper.chk.checkandUEBfetch" + if 'parent' not in kwargs: + kwargs['parent'] = self._logparent + return log.msg(*args, **kwargs) + + def check(self): + d = self._get_all_shareholders(self._storage_index) + d.addCallback(self._get_uri_extension) + d.addCallback(self._done) + return d + + def _get_all_shareholders(self, storage_index): + dl = [] + for (pmpeerid, peerid, connection) in self._peer_getter(storage_index): + d = connection.callRemote("get_service", "storageserver") + d.addCallback(lambda ss: ss.callRemote("get_buckets", + storage_index)) + d.addCallbacks(self._got_response, self._got_error, + callbackArgs=(peerid,)) + dl.append(d) + return defer.DeferredList(dl) + + def _got_response(self, buckets, peerid): + # buckets is a dict: maps shum to an rref of the server who holds it + shnums_s = ",".join([str(shnum) for shnum in buckets]) + self.log("got_response: [%s] has %d shares (%s)" % + (idlib.shortnodeid_b2a(peerid), len(buckets), shnums_s), + level=log.NOISY) + self._found_shares.update(buckets.keys()) + for k in buckets: + if k not in self._sharemap: + self._sharemap[k] = [] + self._sharemap[k].append(peerid) + self._readers.update(buckets.values()) + + def _got_error(self, f): + if f.check(KeyError): + pass + log.err(f, parent=self._logparent) + pass + + def _get_uri_extension(self, res): + # assume that we can pull the UEB from any share. If we get an error, + # declare the whole file unavailable. + if not self._readers: + self.log("no readers, so no UEB", level=log.NOISY) + return + b = self._readers.pop() + rbp = storage.ReadBucketProxy(b) + d = rbp.startIfNecessary() + d.addCallback(lambda res: rbp.get_uri_extension()) + d.addCallback(self._got_uri_extension) + d.addErrback(self._ueb_error) + return d + + def _got_uri_extension(self, ueb): + self.log("_got_uri_extension", level=log.NOISY) + self._ueb_hash = hashutil.uri_extension_hash(ueb) + self._ueb_data = uri.unpack_extension(ueb) + + def _ueb_error(self, f): + # an error means the file is unavailable, but the overall check + # shouldn't fail. + self.log("UEB fetch failed", failure=f, level=log.WEIRD) + return None + + def _done(self, res): + if self._ueb_data: + found = len(self._found_shares) + total = self._ueb_data['total_shares'] + self.log(format="got %(found)d shares of %(total)d", + found=found, total=total, level=log.NOISY) + if found < total: + # not all shares are present in the grid + self.log("not enough to qualify, file not found in grid", + level=log.NOISY) + return False + # all shares are present + self.log("all shares present, file is found in grid", + level=log.NOISY) + return (self._sharemap, self._ueb_data, self._ueb_hash) + # no shares are present + self.log("unable to find UEB data, file not found in grid", + level=log.NOISY) + return False + + class CHKUploadHelper(Referenceable, upload.CHKUploader): """I am the helper-server -side counterpart to AssistedUploader. I handle peer selection, encoding, and share pushing. I read ciphertext from the @@ -63,13 +174,8 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader): # encoding). The caller might be useful. self.log("partial ciphertext already present", level=log.UNUSUAL) return ({}, self) - # we don't remember uploading this file, but it might already be in - # the grid. For now we do an unconditional upload. TODO: Do a quick - # checker run (send one query to each storage server) to see who has - # the file. Then accomodate a lazy uploader by retrieving the UEB - # from one of the shares and hash it. - #return ({'uri_extension_hash': hashutil.uri_extension_hash("")},self) - self.log("no record of having uploaded the file", level=log.NOISY) + # we don't remember uploading this file + self.log("no ciphertext yet", level=log.NOISY) return ({}, self) def remote_upload(self, reader): @@ -352,13 +458,47 @@ class Helper(Referenceable, service.MultiService): if storage_index in self._active_uploads: self.log("upload is currently active", parent=lp) uh = self._active_uploads[storage_index] - else: - self.log("creating new upload helper", parent=lp) - uh = self.chk_upload_helper_class(storage_index, self, - incoming_file, encoding_file, - lp) - self._active_uploads[storage_index] = uh - return uh.start() + return uh.start() + + d = self._check_for_chk_already_in_grid(storage_index, lp) + def _checked(upload_results): + if upload_results: + return (upload_results, None) + + # the file is not present in the grid, by which we mean there are + # less than 'N' shares available. + self.log("unable to find file in the grid", level=log.NOISY) + # We need an upload helper. Check our active uploads again in + # case there was a race. + if storage_index in self._active_uploads: + self.log("upload is currently active", parent=lp) + uh = self._active_uploads[storage_index] + else: + self.log("creating new upload helper", parent=lp) + uh = self.chk_upload_helper_class(storage_index, self, + incoming_file, encoding_file, + lp) + self._active_uploads[storage_index] = uh + return uh.start() + d.addCallback(_checked) + return d + + def _check_for_chk_already_in_grid(self, storage_index, lp): + # see if this file is already in the grid + lp2 = self.log("doing a quick check+UEBfetch", + parent=lp, level=log.NOISY) + c = CHKCheckerAndUEBFetcher(self.parent.get_permuted_peers, + storage_index, lp2) + d = c.check() + def _checked(res): + if res: + (sharemap, ueb_data, ueb_hash) = res + self.log("found file in grid", level=log.NOISY, parent=lp) + upload_results = {'uri_extension_hash': ueb_hash} + return upload_results + return False + d.addCallback(_checked) + return d def upload_finished(self, storage_index): del self._active_uploads[storage_index] diff --git a/src/allmydata/test/test_helper.py b/src/allmydata/test/test_helper.py index fd8cbf80..cf766ac7 100644 --- a/src/allmydata/test/test_helper.py +++ b/src/allmydata/test/test_helper.py @@ -43,6 +43,8 @@ class FakeClient(service.MultiService): return True def get_encoding_parameters(self): return self.DEFAULT_ENCODING_PARAMETERS + def get_permuted_peers(self, storage_index): + return [] def flush_but_dont_ignore(res): d = eventual.flushEventualQueue() diff --git a/src/allmydata/test/test_system.py b/src/allmydata/test/test_system.py index 842adb22..75de3da6 100644 --- a/src/allmydata/test/test_system.py +++ b/src/allmydata/test/test_system.py @@ -289,19 +289,35 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase): extra_node.getServiceNamed("storageserver").sizelimit = 0 d.addCallback(_added) + HELPER_DATA = "Data that needs help to upload" * 1000 def _upload_with_helper(res): - DATA = "Data that needs help to upload" * 1000 - u = upload.Data(DATA, contenthashkey=contenthashkey) + u = upload.Data(HELPER_DATA, contenthashkey=contenthashkey) d = self.extra_node.upload(u) def _uploaded(uri): return self.downloader.download_to_data(uri) d.addCallback(_uploaded) def _check(newdata): - self.failUnlessEqual(newdata, DATA) + self.failUnlessEqual(newdata, HELPER_DATA) d.addCallback(_check) return d d.addCallback(_upload_with_helper) + def _upload_duplicate_with_helper(res): + u = upload.Data(HELPER_DATA, contenthashkey=contenthashkey) + u.debug_stash_RemoteEncryptedUploadable = True + d = self.extra_node.upload(u) + def _uploaded(uri): + return self.downloader.download_to_data(uri) + d.addCallback(_uploaded) + def _check(newdata): + self.failUnlessEqual(newdata, HELPER_DATA) + self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"), + "uploadable started uploading, should have been avoided") + d.addCallback(_check) + return d + if contenthashkey: + d.addCallback(_upload_duplicate_with_helper) + def _upload_resumable(res): DATA = "Data that needs help to upload and gets interrupted" * 1000 u1 = upload.Data(DATA, contenthashkey=contenthashkey)