from twisted.internet import defer
from foolscap import Referenceable
from foolscap.eventual import eventually
-from allmydata import upload, interfaces
-from allmydata.util import idlib, log, observer, fileutil
+from allmydata import upload, interfaces, storage, uri
+from allmydata.util import idlib, log, observer, fileutil, hashutil
class NotEnoughWritersError(Exception):
pass
+class CHKCheckerAndUEBFetcher:
+ """I check to see if a file is already present in the grid. I also fetch
+ the URI Extension Block, which is useful for an uploading client who
+ wants to avoid the work of encryption and encoding.
+
+ I return False if the file is not completely healthy: i.e. if there are
+ less than 'N' shares present.
+
+ If the file is completely healthy, I return a tuple of (sharemap,
+ UEB_data, UEB_hash).
+ """
+
+ def __init__(self, peer_getter, storage_index, logparent=None):
+ self._peer_getter = peer_getter
+ self._found_shares = set()
+ self._storage_index = storage_index
+ self._sharemap = {}
+ self._readers = set()
+ self._ueb_hash = None
+ self._ueb_data = None
+ self._logparent = logparent
+
+ def log(self, *args, **kwargs):
+ if 'facility' not in kwargs:
+ kwargs['facility'] = "tahoe.helper.chk.checkandUEBfetch"
+ if 'parent' not in kwargs:
+ kwargs['parent'] = self._logparent
+ return log.msg(*args, **kwargs)
+
+ def check(self):
+ d = self._get_all_shareholders(self._storage_index)
+ d.addCallback(self._get_uri_extension)
+ d.addCallback(self._done)
+ return d
+
+ def _get_all_shareholders(self, storage_index):
+ dl = []
+ for (pmpeerid, peerid, connection) in self._peer_getter(storage_index):
+ d = connection.callRemote("get_service", "storageserver")
+ d.addCallback(lambda ss: ss.callRemote("get_buckets",
+ storage_index))
+ d.addCallbacks(self._got_response, self._got_error,
+ callbackArgs=(peerid,))
+ dl.append(d)
+ return defer.DeferredList(dl)
+
+ def _got_response(self, buckets, peerid):
+ # buckets is a dict: maps shum to an rref of the server who holds it
+ shnums_s = ",".join([str(shnum) for shnum in buckets])
+ self.log("got_response: [%s] has %d shares (%s)" %
+ (idlib.shortnodeid_b2a(peerid), len(buckets), shnums_s),
+ level=log.NOISY)
+ self._found_shares.update(buckets.keys())
+ for k in buckets:
+ if k not in self._sharemap:
+ self._sharemap[k] = []
+ self._sharemap[k].append(peerid)
+ self._readers.update(buckets.values())
+
+ def _got_error(self, f):
+ if f.check(KeyError):
+ pass
+ log.err(f, parent=self._logparent)
+ pass
+
+ def _get_uri_extension(self, res):
+ # assume that we can pull the UEB from any share. If we get an error,
+ # declare the whole file unavailable.
+ if not self._readers:
+ self.log("no readers, so no UEB", level=log.NOISY)
+ return
+ b = self._readers.pop()
+ rbp = storage.ReadBucketProxy(b)
+ d = rbp.startIfNecessary()
+ d.addCallback(lambda res: rbp.get_uri_extension())
+ d.addCallback(self._got_uri_extension)
+ d.addErrback(self._ueb_error)
+ return d
+
+ def _got_uri_extension(self, ueb):
+ self.log("_got_uri_extension", level=log.NOISY)
+ self._ueb_hash = hashutil.uri_extension_hash(ueb)
+ self._ueb_data = uri.unpack_extension(ueb)
+
+ def _ueb_error(self, f):
+ # an error means the file is unavailable, but the overall check
+ # shouldn't fail.
+ self.log("UEB fetch failed", failure=f, level=log.WEIRD)
+ return None
+
+ def _done(self, res):
+ if self._ueb_data:
+ found = len(self._found_shares)
+ total = self._ueb_data['total_shares']
+ self.log(format="got %(found)d shares of %(total)d",
+ found=found, total=total, level=log.NOISY)
+ if found < total:
+ # not all shares are present in the grid
+ self.log("not enough to qualify, file not found in grid",
+ level=log.NOISY)
+ return False
+ # all shares are present
+ self.log("all shares present, file is found in grid",
+ level=log.NOISY)
+ return (self._sharemap, self._ueb_data, self._ueb_hash)
+ # no shares are present
+ self.log("unable to find UEB data, file not found in grid",
+ level=log.NOISY)
+ return False
+
+
class CHKUploadHelper(Referenceable, upload.CHKUploader):
"""I am the helper-server -side counterpart to AssistedUploader. I handle
peer selection, encoding, and share pushing. I read ciphertext from the
# encoding). The caller might be useful.
self.log("partial ciphertext already present", level=log.UNUSUAL)
return ({}, self)
- # we don't remember uploading this file, but it might already be in
- # the grid. For now we do an unconditional upload. TODO: Do a quick
- # checker run (send one query to each storage server) to see who has
- # the file. Then accomodate a lazy uploader by retrieving the UEB
- # from one of the shares and hash it.
- #return ({'uri_extension_hash': hashutil.uri_extension_hash("")},self)
- self.log("no record of having uploaded the file", level=log.NOISY)
+ # we don't remember uploading this file
+ self.log("no ciphertext yet", level=log.NOISY)
return ({}, self)
def remote_upload(self, reader):
if storage_index in self._active_uploads:
self.log("upload is currently active", parent=lp)
uh = self._active_uploads[storage_index]
- else:
- self.log("creating new upload helper", parent=lp)
- uh = self.chk_upload_helper_class(storage_index, self,
- incoming_file, encoding_file,
- lp)
- self._active_uploads[storage_index] = uh
- return uh.start()
+ return uh.start()
+
+ d = self._check_for_chk_already_in_grid(storage_index, lp)
+ def _checked(upload_results):
+ if upload_results:
+ return (upload_results, None)
+
+ # the file is not present in the grid, by which we mean there are
+ # less than 'N' shares available.
+ self.log("unable to find file in the grid", level=log.NOISY)
+ # We need an upload helper. Check our active uploads again in
+ # case there was a race.
+ if storage_index in self._active_uploads:
+ self.log("upload is currently active", parent=lp)
+ uh = self._active_uploads[storage_index]
+ else:
+ self.log("creating new upload helper", parent=lp)
+ uh = self.chk_upload_helper_class(storage_index, self,
+ incoming_file, encoding_file,
+ lp)
+ self._active_uploads[storage_index] = uh
+ return uh.start()
+ d.addCallback(_checked)
+ return d
+
+ def _check_for_chk_already_in_grid(self, storage_index, lp):
+ # see if this file is already in the grid
+ lp2 = self.log("doing a quick check+UEBfetch",
+ parent=lp, level=log.NOISY)
+ c = CHKCheckerAndUEBFetcher(self.parent.get_permuted_peers,
+ storage_index, lp2)
+ d = c.check()
+ def _checked(res):
+ if res:
+ (sharemap, ueb_data, ueb_hash) = res
+ self.log("found file in grid", level=log.NOISY, parent=lp)
+ upload_results = {'uri_extension_hash': ueb_hash}
+ return upload_results
+ return False
+ d.addCallback(_checked)
+ return d
def upload_finished(self, storage_index):
del self._active_uploads[storage_index]