]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
immutable: refactor uploader to do just encoding-and-uploading, not encryption
authorZooko O'Whielacronx <zooko@zooko.com>
Wed, 7 Jan 2009 04:48:22 +0000 (21:48 -0700)
committerZooko O'Whielacronx <zooko@zooko.com>
Wed, 7 Jan 2009 04:48:22 +0000 (21:48 -0700)
This makes Uploader take an EncryptedUploadable object instead of an Uploadable object.  I also changed it to return a verify cap instead of a tuple of the bits of data that one finds in a verify cap.
This will facilitate hooking together an Uploader and a Downloader to make a Repairer.
Also move offloaded.py into src/allmydata/immutable/.

src/allmydata/client.py
src/allmydata/immutable/encode.py
src/allmydata/immutable/offloaded.py [new file with mode: 0644]
src/allmydata/immutable/upload.py
src/allmydata/interfaces.py
src/allmydata/offloaded.py [deleted file]
src/allmydata/test/test_encode.py
src/allmydata/test/test_helper.py
src/allmydata/test/test_system.py

index a9b9a52f905bfa053447a1c98b3eb4bb80443670..d1465d6d5582789483e373d92bc4f7b22cc014a7 100644 (file)
@@ -15,7 +15,7 @@ from allmydata.storage import StorageServer
 from allmydata.immutable.upload import Uploader
 from allmydata.immutable.download import Downloader
 from allmydata.immutable.filenode import FileNode, LiteralFileNode
-from allmydata.offloaded import Helper
+from allmydata.immutable.offloaded import Helper
 from allmydata.control import ControlServer
 from allmydata.introducer.client import IntroducerClient
 from allmydata.util import hashutil, base32, pollmixin, cachedir
index 7bc4ce0f621d80acca48c11792312fad8c5c21df..3ef9f3ae1fc85a01ba03ac4ffa088af505274066 100644 (file)
@@ -197,6 +197,8 @@ class Encoder(object):
         self.landlords = landlords.copy()
 
     def start(self):
+        """ Returns a Deferred that will fire with the verify cap (an instance of
+        uri.CHKFileVerifierURI)."""
         self.log("%s starting" % (self,))
         #paddedsize = self._size + mathutil.pad_size(self._size, self.needed_shares)
         assert self._codec
@@ -637,8 +639,8 @@ class Encoder(object):
 
         # update our sharemap
         self._shares_placed = set(self.landlords.keys())
-        return (self.uri_extension_hash, self.required_shares,
-                self.num_shares, self.file_size)
+        return uri.CHKFileVerifierURI(self._storage_index, self.uri_extension_hash,
+                                      self.required_shares, self.num_shares, self.file_size)
 
     def err(self, f):
         self.log("upload failed", failure=f, level=log.UNUSUAL)
diff --git a/src/allmydata/immutable/offloaded.py b/src/allmydata/immutable/offloaded.py
new file mode 100644 (file)
index 0000000..7ba3943
--- /dev/null
@@ -0,0 +1,665 @@
+
+import os, stat, time, weakref
+from zope.interface import implements
+from twisted.application import service
+from twisted.internet import defer
+from foolscap import Referenceable, DeadReferenceError
+from foolscap.eventual import eventually
+import allmydata
+from allmydata import interfaces, storage, uri
+from allmydata.immutable import upload
+from allmydata.immutable.layout import ReadBucketProxy
+from allmydata.util.assertutil import precondition
+from allmydata.util import idlib, log, observer, fileutil, hashutil
+
+
+class NotEnoughWritersError(Exception):
+    pass
+
+
+class CHKCheckerAndUEBFetcher:
+    """I check to see if a file is already present in the grid. I also fetch
+    the URI Extension Block, which is useful for an uploading client who
+    wants to avoid the work of encryption and encoding.
+
+    I return False if the file is not completely healthy: i.e. if there are
+    less than 'N' shares present.
+
+    If the file is completely healthy, I return a tuple of (sharemap,
+    UEB_data, UEB_hash).
+    """
+
+    def __init__(self, peer_getter, storage_index, logparent=None):
+        self._peer_getter = peer_getter
+        self._found_shares = set()
+        self._storage_index = storage_index
+        self._sharemap = {}
+        self._readers = set()
+        self._ueb_hash = None
+        self._ueb_data = None
+        self._logparent = logparent
+
+    def log(self, *args, **kwargs):
+        if 'facility' not in kwargs:
+            kwargs['facility'] = "tahoe.helper.chk.checkandUEBfetch"
+        if 'parent' not in kwargs:
+            kwargs['parent'] = self._logparent
+        return log.msg(*args, **kwargs)
+
+    def check(self):
+        d = self._get_all_shareholders(self._storage_index)
+        d.addCallback(self._get_uri_extension)
+        d.addCallback(self._done)
+        return d
+
+    def _get_all_shareholders(self, storage_index):
+        dl = []
+        for (peerid, ss) in self._peer_getter("storage", storage_index):
+            d = ss.callRemote("get_buckets", storage_index)
+            d.addCallbacks(self._got_response, self._got_error,
+                           callbackArgs=(peerid,))
+            dl.append(d)
+        return defer.DeferredList(dl)
+
+    def _got_response(self, buckets, peerid):
+        # buckets is a dict: maps shum to an rref of the server who holds it
+        shnums_s = ",".join([str(shnum) for shnum in buckets])
+        self.log("got_response: [%s] has %d shares (%s)" %
+                 (idlib.shortnodeid_b2a(peerid), len(buckets), shnums_s),
+                 level=log.NOISY)
+        self._found_shares.update(buckets.keys())
+        for k in buckets:
+            if k not in self._sharemap:
+                self._sharemap[k] = []
+            self._sharemap[k].append(peerid)
+        self._readers.update( [ (bucket, peerid)
+                                for bucket in buckets.values() ] )
+
+    def _got_error(self, f):
+        if f.check(DeadReferenceError):
+            return
+        log.err(f, parent=self._logparent)
+        pass
+
+    def _get_uri_extension(self, res):
+        # assume that we can pull the UEB from any share. If we get an error,
+        # declare the whole file unavailable.
+        if not self._readers:
+            self.log("no readers, so no UEB", level=log.NOISY)
+            return
+        b,peerid = self._readers.pop()
+        rbp = ReadBucketProxy(b, peerid, storage.si_b2a(self._storage_index))
+        d = rbp.get_uri_extension()
+        d.addCallback(self._got_uri_extension)
+        d.addErrback(self._ueb_error)
+        return d
+
+    def _got_uri_extension(self, ueb):
+        self.log("_got_uri_extension", level=log.NOISY)
+        self._ueb_hash = hashutil.uri_extension_hash(ueb)
+        self._ueb_data = uri.unpack_extension(ueb)
+
+    def _ueb_error(self, f):
+        # an error means the file is unavailable, but the overall check
+        # shouldn't fail.
+        self.log("UEB fetch failed", failure=f, level=log.WEIRD, umid="sJLKVg")
+        return None
+
+    def _done(self, res):
+        if self._ueb_data:
+            found = len(self._found_shares)
+            total = self._ueb_data['total_shares']
+            self.log(format="got %(found)d shares of %(total)d",
+                     found=found, total=total, level=log.NOISY)
+            if found < total:
+                # not all shares are present in the grid
+                self.log("not enough to qualify, file not found in grid",
+                         level=log.NOISY)
+                return False
+            # all shares are present
+            self.log("all shares present, file is found in grid",
+                     level=log.NOISY)
+            return (self._sharemap, self._ueb_data, self._ueb_hash)
+        # no shares are present
+        self.log("unable to find UEB data, file not found in grid",
+                 level=log.NOISY)
+        return False
+
+
+class CHKUploadHelper(Referenceable, upload.CHKUploader):
+    """I am the helper-server -side counterpart to AssistedUploader. I handle
+    peer selection, encoding, and share pushing. I read ciphertext from the
+    remote AssistedUploader.
+    """
+    implements(interfaces.RICHKUploadHelper)
+    VERSION = { "http://allmydata.org/tahoe/protocols/helper/chk-upload/v1" :
+                 { },
+                "application-version": str(allmydata.__version__),
+                }
+
+    def __init__(self, storage_index, helper,
+                 incoming_file, encoding_file,
+                 results, log_number):
+        self._storage_index = storage_index
+        self._helper = helper
+        self._incoming_file = incoming_file
+        self._encoding_file = encoding_file
+        self._upload_id = storage.si_b2a(storage_index)[:5]
+        self._log_number = log_number
+        self._results = results
+        self._upload_status = upload.UploadStatus()
+        self._upload_status.set_helper(False)
+        self._upload_status.set_storage_index(storage_index)
+        self._upload_status.set_status("fetching ciphertext")
+        self._upload_status.set_progress(0, 1.0)
+        self._helper.log("CHKUploadHelper starting for SI %s" % self._upload_id,
+                         parent=log_number)
+
+        self._client = helper.parent
+        self._fetcher = CHKCiphertextFetcher(self, incoming_file, encoding_file,
+                                             self._log_number)
+        self._reader = LocalCiphertextReader(self, storage_index, encoding_file)
+        self._finished_observers = observer.OneShotObserverList()
+
+        d = self._fetcher.when_done()
+        d.addCallback(lambda res: self._reader.start())
+        d.addCallback(lambda res: self.start_encrypted(self._reader))
+        d.addCallback(self._finished)
+        d.addErrback(self._failed)
+
+    def log(self, *args, **kwargs):
+        if 'facility' not in kwargs:
+            kwargs['facility'] = "tahoe.helper.chk"
+        return upload.CHKUploader.log(self, *args, **kwargs)
+
+    def start(self):
+        self._started = time.time()
+        # determine if we need to upload the file. If so, return ({},self) .
+        # If not, return (UploadResults,None) .
+        self.log("deciding whether to upload the file or not", level=log.NOISY)
+        if os.path.exists(self._encoding_file):
+            # we have the whole file, and we might be encoding it (or the
+            # encode/upload might have failed, and we need to restart it).
+            self.log("ciphertext already in place", level=log.UNUSUAL)
+            return (self._results, self)
+        if os.path.exists(self._incoming_file):
+            # we have some of the file, but not all of it (otherwise we'd be
+            # encoding). The caller might be useful.
+            self.log("partial ciphertext already present", level=log.UNUSUAL)
+            return (self._results, self)
+        # we don't remember uploading this file
+        self.log("no ciphertext yet", level=log.NOISY)
+        return (self._results, self)
+
+    def remote_get_version(self):
+        return self.VERSION
+
+    def remote_upload(self, reader):
+        # reader is an RIEncryptedUploadable. I am specified to return an
+        # UploadResults dictionary.
+
+        # let our fetcher pull ciphertext from the reader.
+        self._fetcher.add_reader(reader)
+        # and also hashes
+        self._reader.add_reader(reader)
+
+        # and inform the client when the upload has finished
+        return self._finished_observers.when_fired()
+
+    def _finished(self, uploadresults):
+        precondition(isinstance(uploadresults.verifycapstr, str), uploadresults.verifycapstr)
+        assert interfaces.IUploadResults.providedBy(uploadresults), uploadresults
+        r = uploadresults
+        v = uri.from_string(r.verifycapstr)
+        r.uri_extension_hash = v.uri_extension_hash
+        f_times = self._fetcher.get_times()
+        r.timings["cumulative_fetch"] = f_times["cumulative_fetch"]
+        r.ciphertext_fetched = self._fetcher.get_ciphertext_fetched()
+        r.timings["total_fetch"] = f_times["total"]
+        self._reader.close()
+        os.unlink(self._encoding_file)
+        self._finished_observers.fire(r)
+        self._helper.upload_finished(self._storage_index, v.size)
+        del self._reader
+
+    def _failed(self, f):
+        self.log(format="CHKUploadHelper(%(si)s) failed",
+                 si=storage.si_b2a(self._storage_index)[:5],
+                 failure=f,
+                 level=log.UNUSUAL)
+        self._finished_observers.fire(f)
+        self._helper.upload_finished(self._storage_index, 0)
+        del self._reader
+
+class AskUntilSuccessMixin:
+    # create me with a _reader array
+    _last_failure = None
+
+    def add_reader(self, reader):
+        self._readers.append(reader)
+
+    def call(self, *args, **kwargs):
+        if not self._readers:
+            raise NotEnoughWritersError("ran out of assisted uploaders, last failure was %s" % self._last_failure)
+        rr = self._readers[0]
+        d = rr.callRemote(*args, **kwargs)
+        def _err(f):
+            self._last_failure = f
+            if rr in self._readers:
+                self._readers.remove(rr)
+            self._upload_helper.log("call to assisted uploader %s failed" % rr,
+                                    failure=f, level=log.UNUSUAL)
+            # we can try again with someone else who's left
+            return self.call(*args, **kwargs)
+        d.addErrback(_err)
+        return d
+
+class CHKCiphertextFetcher(AskUntilSuccessMixin):
+    """I use one or more remote RIEncryptedUploadable instances to gather
+    ciphertext on disk. When I'm done, the file I create can be used by a
+    LocalCiphertextReader to satisfy the ciphertext needs of a CHK upload
+    process.
+
+    I begin pulling ciphertext as soon as a reader is added. I remove readers
+    when they have any sort of error. If the last reader is removed, I fire
+    my when_done() Deferred with a failure.
+
+    I fire my when_done() Deferred (with None) immediately after I have moved
+    the ciphertext to 'encoded_file'.
+    """
+
+    def __init__(self, helper, incoming_file, encoded_file, logparent):
+        self._upload_helper = helper
+        self._incoming_file = incoming_file
+        self._encoding_file = encoded_file
+        self._upload_id = helper._upload_id
+        self._log_parent = logparent
+        self._done_observers = observer.OneShotObserverList()
+        self._readers = []
+        self._started = False
+        self._f = None
+        self._times = {
+            "cumulative_fetch": 0.0,
+            "total": 0.0,
+            }
+        self._ciphertext_fetched = 0
+
+    def log(self, *args, **kwargs):
+        if "facility" not in kwargs:
+            kwargs["facility"] = "tahoe.helper.chkupload.fetch"
+        if "parent" not in kwargs:
+            kwargs["parent"] = self._log_parent
+        return log.msg(*args, **kwargs)
+
+    def add_reader(self, reader):
+        AskUntilSuccessMixin.add_reader(self, reader)
+        eventually(self._start)
+
+    def _start(self):
+        if self._started:
+            return
+        self._started = True
+        started = time.time()
+
+        if os.path.exists(self._encoding_file):
+            self.log("ciphertext already present, bypassing fetch",
+                     level=log.UNUSUAL)
+            # we'll still need the plaintext hashes (when
+            # LocalCiphertextReader.get_plaintext_hashtree_leaves() is
+            # called), and currently the easiest way to get them is to ask
+            # the sender for the last byte of ciphertext. That will provoke
+            # them into reading and hashing (but not sending) everything
+            # else.
+            have = os.stat(self._encoding_file)[stat.ST_SIZE]
+            d = self.call("read_encrypted", have-1, 1)
+            d.addCallback(self._done2, started)
+            return
+
+        # first, find out how large the file is going to be
+        d = self.call("get_size")
+        d.addCallback(self._got_size)
+        d.addCallback(self._start_reading)
+        d.addCallback(self._done)
+        d.addCallback(self._done2, started)
+        d.addErrback(self._failed)
+
+    def _got_size(self, size):
+        self.log("total size is %d bytes" % size, level=log.NOISY)
+        self._upload_helper._upload_status.set_size(size)
+        self._expected_size = size
+
+    def _start_reading(self, res):
+        # then find out how much crypttext we have on disk
+        if os.path.exists(self._incoming_file):
+            self._have = os.stat(self._incoming_file)[stat.ST_SIZE]
+            self._upload_helper._helper.count("chk_upload_helper.resumes")
+            self.log("we already have %d bytes" % self._have, level=log.NOISY)
+        else:
+            self._have = 0
+            self.log("we do not have any ciphertext yet", level=log.NOISY)
+        self.log("starting ciphertext fetch", level=log.NOISY)
+        self._f = open(self._incoming_file, "ab")
+
+        # now loop to pull the data from the readers
+        d = defer.Deferred()
+        self._loop(d)
+        # this Deferred will be fired once the last byte has been written to
+        # self._f
+        return d
+
+    # read data in 50kB chunks. We should choose a more considered number
+    # here, possibly letting the client specify it. The goal should be to
+    # keep the RTT*bandwidth to be less than 10% of the chunk size, to reduce
+    # the upload bandwidth lost because this protocol is non-windowing. Too
+    # large, however, means more memory consumption for both ends. Something
+    # that can be transferred in, say, 10 seconds sounds about right. On my
+    # home DSL line (50kBps upstream), that suggests 500kB. Most lines are
+    # slower, maybe 10kBps, which suggests 100kB, and that's a bit more
+    # memory than I want to hang on to, so I'm going to go with 50kB and see
+    # how that works.
+    CHUNK_SIZE = 50*1024
+
+    def _loop(self, fire_when_done):
+        # this slightly weird structure is needed because Deferreds don't do
+        # tail-recursion, so it is important to let each one retire promptly.
+        # Simply chaining them will cause a stack overflow at the end of a
+        # transfer that involves more than a few hundred chunks.
+        # 'fire_when_done' lives a long time, but the Deferreds returned by
+        # the inner _fetch() call do not.
+        start = time.time()
+        d = defer.maybeDeferred(self._fetch)
+        def _done(finished):
+            elapsed = time.time() - start
+            self._times["cumulative_fetch"] += elapsed
+            if finished:
+                self.log("finished reading ciphertext", level=log.NOISY)
+                fire_when_done.callback(None)
+            else:
+                self._loop(fire_when_done)
+        def _err(f):
+            self.log(format="[%(si)s] ciphertext read failed",
+                     si=self._upload_id, failure=f, level=log.UNUSUAL)
+            fire_when_done.errback(f)
+        d.addCallbacks(_done, _err)
+        return None
+
+    def _fetch(self):
+        needed = self._expected_size - self._have
+        fetch_size = min(needed, self.CHUNK_SIZE)
+        if fetch_size == 0:
+            self._upload_helper._upload_status.set_progress(1, 1.0)
+            return True # all done
+        percent = 0.0
+        if self._expected_size:
+            percent = 1.0 * (self._have+fetch_size) / self._expected_size
+        self.log(format="fetching [%(si)s] %(start)d-%(end)d of %(total)d (%(percent)d%%)",
+                 si=self._upload_id,
+                 start=self._have,
+                 end=self._have+fetch_size,
+                 total=self._expected_size,
+                 percent=int(100.0*percent),
+                 level=log.NOISY)
+        d = self.call("read_encrypted", self._have, fetch_size)
+        def _got_data(ciphertext_v):
+            for data in ciphertext_v:
+                self._f.write(data)
+                self._have += len(data)
+                self._ciphertext_fetched += len(data)
+                self._upload_helper._helper.count("chk_upload_helper.fetched_bytes", len(data))
+                self._upload_helper._upload_status.set_progress(1, percent)
+            return False # not done
+        d.addCallback(_got_data)
+        return d
+
+    def _done(self, res):
+        self._f.close()
+        self._f = None
+        self.log(format="done fetching ciphertext, size=%(size)d",
+                 size=os.stat(self._incoming_file)[stat.ST_SIZE],
+                 level=log.NOISY)
+        os.rename(self._incoming_file, self._encoding_file)
+
+    def _done2(self, _ignored, started):
+        self.log("done2", level=log.NOISY)
+        elapsed = time.time() - started
+        self._times["total"] = elapsed
+        self._readers = []
+        self._done_observers.fire(None)
+
+    def _failed(self, f):
+        if self._f:
+            self._f.close()
+        self._readers = []
+        self._done_observers.fire(f)
+
+    def when_done(self):
+        return self._done_observers.when_fired()
+
+    def get_times(self):
+        return self._times
+
+    def get_ciphertext_fetched(self):
+        return self._ciphertext_fetched
+
+
+class LocalCiphertextReader(AskUntilSuccessMixin):
+    implements(interfaces.IEncryptedUploadable)
+
+    def __init__(self, upload_helper, storage_index, encoding_file):
+        self._readers = []
+        self._upload_helper = upload_helper
+        self._storage_index = storage_index
+        self._encoding_file = encoding_file
+        self._status = None
+
+    def start(self):
+        self._upload_helper._upload_status.set_status("pushing")
+        self._size = os.stat(self._encoding_file)[stat.ST_SIZE]
+        self.f = open(self._encoding_file, "rb")
+
+    def get_size(self):
+        return defer.succeed(self._size)
+
+    def get_all_encoding_parameters(self):
+        return self.call("get_all_encoding_parameters")
+
+    def get_storage_index(self):
+        return defer.succeed(self._storage_index)
+
+    def read_encrypted(self, length, hash_only):
+        assert hash_only is False
+        d = defer.maybeDeferred(self.f.read, length)
+        d.addCallback(lambda data: [data])
+        return d
+    def get_plaintext_hashtree_leaves(self, first, last, num_segments):
+        return self.call("get_plaintext_hashtree_leaves", first, last,
+                         num_segments)
+    def get_plaintext_hash(self):
+        return self.call("get_plaintext_hash")
+    def close(self):
+        self.f.close()
+        # ??. I'm not sure if it makes sense to forward the close message.
+        return self.call("close")
+
+
+
+class Helper(Referenceable, service.MultiService):
+    implements(interfaces.RIHelper, interfaces.IStatsProducer)
+    # this is the non-distributed version. When we need to have multiple
+    # helpers, this object will become the HelperCoordinator, and will query
+    # the farm of Helpers to see if anyone has the storage_index of interest,
+    # and send the request off to them. If nobody has it, we'll choose a
+    # helper at random.
+
+    name = "helper"
+    VERSION = { "http://allmydata.org/tahoe/protocols/helper/v1" :
+                 { },
+                "application-version": str(allmydata.__version__),
+                }
+    chk_upload_helper_class = CHKUploadHelper
+    MAX_UPLOAD_STATUSES = 10
+
+    def __init__(self, basedir, stats_provider=None):
+        self._basedir = basedir
+        self._chk_incoming = os.path.join(basedir, "CHK_incoming")
+        self._chk_encoding = os.path.join(basedir, "CHK_encoding")
+        fileutil.make_dirs(self._chk_incoming)
+        fileutil.make_dirs(self._chk_encoding)
+        self._active_uploads = {}
+        self._all_uploads = weakref.WeakKeyDictionary() # for debugging
+        self._all_upload_statuses = weakref.WeakKeyDictionary()
+        self._recent_upload_statuses = []
+        self.stats_provider = stats_provider
+        if stats_provider:
+            stats_provider.register_producer(self)
+        self._counters = {"chk_upload_helper.upload_requests": 0,
+                          "chk_upload_helper.upload_already_present": 0,
+                          "chk_upload_helper.upload_need_upload": 0,
+                          "chk_upload_helper.resumes": 0,
+                          "chk_upload_helper.fetched_bytes": 0,
+                          "chk_upload_helper.encoded_bytes": 0,
+                          }
+        service.MultiService.__init__(self)
+
+    def setServiceParent(self, parent):
+        service.MultiService.setServiceParent(self, parent)
+
+    def log(self, *args, **kwargs):
+        if 'facility' not in kwargs:
+            kwargs['facility'] = "tahoe.helper"
+        return self.parent.log(*args, **kwargs)
+
+    def count(self, key, value=1):
+        if self.stats_provider:
+            self.stats_provider.count(key, value)
+        self._counters[key] += value
+
+    def get_stats(self):
+        OLD = 86400*2 # 48hours
+        now = time.time()
+        inc_count = inc_size = inc_size_old = 0
+        enc_count = enc_size = enc_size_old = 0
+        inc = os.listdir(self._chk_incoming)
+        enc = os.listdir(self._chk_encoding)
+        for f in inc:
+            s = os.stat(os.path.join(self._chk_incoming, f))
+            size = s[stat.ST_SIZE]
+            mtime = s[stat.ST_MTIME]
+            inc_count += 1
+            inc_size += size
+            if now - mtime > OLD:
+                inc_size_old += size
+        for f in enc:
+            s = os.stat(os.path.join(self._chk_encoding, f))
+            size = s[stat.ST_SIZE]
+            mtime = s[stat.ST_MTIME]
+            enc_count += 1
+            enc_size += size
+            if now - mtime > OLD:
+                enc_size_old += size
+        stats = { 'chk_upload_helper.active_uploads': len(self._active_uploads),
+                  'chk_upload_helper.incoming_count': inc_count,
+                  'chk_upload_helper.incoming_size': inc_size,
+                  'chk_upload_helper.incoming_size_old': inc_size_old,
+                  'chk_upload_helper.encoding_count': enc_count,
+                  'chk_upload_helper.encoding_size': enc_size,
+                  'chk_upload_helper.encoding_size_old': enc_size_old,
+                  }
+        stats.update(self._counters)
+        return stats
+
+    def remote_get_version(self):
+        return self.VERSION
+
+    def remote_upload_chk(self, storage_index):
+        self.count("chk_upload_helper.upload_requests")
+        r = upload.UploadResults()
+        started = time.time()
+        si_s = storage.si_b2a(storage_index)
+        lp = self.log(format="helper: upload_chk query for SI %(si)s", si=si_s)
+        incoming_file = os.path.join(self._chk_incoming, si_s)
+        encoding_file = os.path.join(self._chk_encoding, si_s)
+        if storage_index in self._active_uploads:
+            self.log("upload is currently active", parent=lp)
+            uh = self._active_uploads[storage_index]
+            return uh.start()
+
+        d = self._check_for_chk_already_in_grid(storage_index, r, lp)
+        def _checked(already_present):
+            elapsed = time.time() - started
+            r.timings['existence_check'] = elapsed
+            if already_present:
+                # the necessary results are placed in the UploadResults
+                self.count("chk_upload_helper.upload_already_present")
+                self.log("file already found in grid", parent=lp)
+                return (r, None)
+
+            self.count("chk_upload_helper.upload_need_upload")
+            # the file is not present in the grid, by which we mean there are
+            # less than 'N' shares available.
+            self.log("unable to find file in the grid", parent=lp,
+                     level=log.NOISY)
+            # We need an upload helper. Check our active uploads again in
+            # case there was a race.
+            if storage_index in self._active_uploads:
+                self.log("upload is currently active", parent=lp)
+                uh = self._active_uploads[storage_index]
+            else:
+                self.log("creating new upload helper", parent=lp)
+                uh = self.chk_upload_helper_class(storage_index, self,
+                                                  incoming_file, encoding_file,
+                                                  r, lp)
+                self._active_uploads[storage_index] = uh
+                self._add_upload(uh)
+            return uh.start()
+        d.addCallback(_checked)
+        def _err(f):
+            self.log("error while checking for chk-already-in-grid",
+                     failure=f, level=log.WEIRD, parent=lp, umid="jDtxZg")
+            return f
+        d.addErrback(_err)
+        return d
+
+    def _check_for_chk_already_in_grid(self, storage_index, results, lp):
+        # see if this file is already in the grid
+        lp2 = self.log("doing a quick check+UEBfetch",
+                       parent=lp, level=log.NOISY)
+        c = CHKCheckerAndUEBFetcher(self.parent.get_permuted_peers,
+                                    storage_index, lp2)
+        d = c.check()
+        def _checked(res):
+            if res:
+                (sharemap, ueb_data, ueb_hash) = res
+                self.log("found file in grid", level=log.NOISY, parent=lp)
+                results.uri_extension_hash = ueb_hash
+                results.sharemap = {}
+                for shnum, peerids in sharemap.items():
+                    peers_s = ",".join(["[%s]" % idlib.shortnodeid_b2a(peerid)
+                                        for peerid in peerids])
+                    results.sharemap[shnum] = "Found on " + peers_s
+                results.uri_extension_data = ueb_data
+                results.preexisting_shares = len(sharemap)
+                results.pushed_shares = 0
+                return True
+            return False
+        d.addCallback(_checked)
+        return d
+
+    def _add_upload(self, uh):
+        self._all_uploads[uh] = None
+        s = uh.get_upload_status()
+        self._all_upload_statuses[s] = None
+        self._recent_upload_statuses.append(s)
+        while len(self._recent_upload_statuses) > self.MAX_UPLOAD_STATUSES:
+            self._recent_upload_statuses.pop(0)
+
+    def upload_finished(self, storage_index, size):
+        # this is called with size=0 if the upload failed
+        self.count("chk_upload_helper.encoded_bytes", size)
+        uh = self._active_uploads[storage_index]
+        del self._active_uploads[storage_index]
+        s = uh.get_upload_status()
+        s.set_active(False)
+
+    def get_all_upload_statuses(self):
+        return self._all_upload_statuses
index fd4a601d1d941a106e9d61a3fa7e57b1ce2a38c5..1790be93b6779a2afff3d7a0e967c183c3142edf 100644 (file)
@@ -13,7 +13,7 @@ from allmydata.util.hashutil import file_renewal_secret_hash, \
      storage_index_hash, plaintext_segment_hasher, convergence_hasher
 from allmydata import storage, hashtree, uri
 from allmydata.immutable import encode
-from allmydata.util import base32, idlib, mathutil
+from allmydata.util import base32, idlib, log, mathutil
 from allmydata.util.assertutil import precondition
 from allmydata.util.rrefutil import get_versioned_remote_reference
 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
@@ -662,27 +662,21 @@ class CHKUploader:
             kwargs["facility"] = "tahoe.upload"
         return self._client.log(*args, **kwargs)
 
-    def start(self, uploadable):
+    def start(self, encrypted_uploadable):
         """Start uploading the file.
 
-        This method returns a Deferred that will fire with the URI (a
-        string)."""
+        Returns a Deferred that will fire with the UploadResults instance.
+        """
 
         self._started = time.time()
-        uploadable = IUploadable(uploadable)
-        self.log("starting upload of %s" % uploadable)
+        eu = IEncryptedUploadable(encrypted_uploadable)
+        self.log("starting upload of %s" % eu)
 
-        eu = EncryptAnUploadable(uploadable, self._log_number)
         eu.set_upload_status(self._upload_status)
         d = self.start_encrypted(eu)
-        def _uploaded(res):
-            d1 = uploadable.get_encryption_key()
-            d1.addCallback(lambda key: self._compute_uri(res, key))
-            return d1
-        d.addCallback(_uploaded)
-        def _done(res):
+        def _done(uploadresults):
             self._upload_status.set_active(False)
-            return res
+            return uploadresults
         d.addBoth(_done)
         return d
 
@@ -696,6 +690,7 @@ class CHKUploader:
         return self._encoder.abort()
 
     def start_encrypted(self, encrypted):
+        """ Returns a Deferred that will fire with the UploadResults instance. """
         eu = IEncryptedUploadable(encrypted)
 
         started = time.time()
@@ -706,7 +701,6 @@ class CHKUploader:
         d.addCallback(self.set_shareholders, e)
         d.addCallback(lambda res: e.start())
         d.addCallback(self._encrypted_done)
-        # this fires with the uri_extension_hash and other data
         return d
 
     def locate_all_shareholders(self, encoder, started):
@@ -761,7 +755,8 @@ class CHKUploader:
         assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
         encoder.set_shareholders(buckets)
 
-    def _encrypted_done(self, res):
+    def _encrypted_done(self, verifycap):
+        """ Returns a Deferred that will fire with the UploadResults instance. """
         r = self._results
         for shnum in self._encoder.get_shares_placed():
             peer_tracker = self._sharemap[shnum]
@@ -779,19 +774,7 @@ class CHKUploader:
         r.timings["peer_selection"] = self._peer_selection_elapsed
         r.timings.update(self._encoder.get_times())
         r.uri_extension_data = self._encoder.get_uri_extension_data()
-        return res
-
-    def _compute_uri(self, (uri_extension_hash,
-                            needed_shares, total_shares, size),
-                     key):
-        u = uri.CHKFileURI(key=key,
-                           uri_extension_hash=uri_extension_hash,
-                           needed_shares=needed_shares,
-                           total_shares=total_shares,
-                           size=size,
-                           )
-        r = self._results
-        r.uri = u.to_string()
+        r.verifycapstr = verifycap.to_string()
         return r
 
     def get_upload_status(self):
@@ -948,26 +931,23 @@ class AssistedUploader:
             kwargs["parent"] = self._log_number
         return log.msg(*args, **kwargs)
 
-    def start(self, uploadable):
+    def start(self, encrypted_uploadable, storage_index):
+        """Start uploading the file.
+
+        Returns a Deferred that will fire with the UploadResults instance.
+        """
+        precondition(isinstance(storage_index, str), storage_index)
         self._started = time.time()
-        u = IUploadable(uploadable)
-        eu = EncryptAnUploadable(u, self._log_number)
+        eu = IEncryptedUploadable(encrypted_uploadable)
         eu.set_upload_status(self._upload_status)
         self._encuploadable = eu
+        self._storage_index = storage_index
         d = eu.get_size()
         d.addCallback(self._got_size)
         d.addCallback(lambda res: eu.get_all_encoding_parameters())
         d.addCallback(self._got_all_encoding_parameters)
-        # when we get the encryption key, that will also compute the storage
-        # index, so this only takes one pass.
-        # TODO: I'm not sure it's cool to switch back and forth between
-        # the Uploadable and the IEncryptedUploadable that wraps it.
-        d.addCallback(lambda res: u.get_encryption_key())
-        d.addCallback(self._got_encryption_key)
-        d.addCallback(lambda res: eu.get_storage_index())
-        d.addCallback(self._got_storage_index)
         d.addCallback(self._contact_helper)
-        d.addCallback(self._build_readcap)
+        d.addCallback(self._build_verifycap)
         def _done(res):
             self._upload_status.set_active(False)
             return res
@@ -985,13 +965,6 @@ class AssistedUploader:
         self._total_shares = n
         self._segment_size = segment_size
 
-    def _got_encryption_key(self, key):
-        self._key = key
-
-    def _got_storage_index(self, storage_index):
-        self._storage_index = storage_index
-
-
     def _contact_helper(self, res):
         now = self._time_contacting_helper_start = time.time()
         self._storage_index_elapsed = now - self._started
@@ -1023,7 +996,7 @@ class AssistedUploader:
         self._upload_status.set_results(upload_results)
         return upload_results
 
-    def _build_readcap(self, upload_results):
+    def _build_verifycap(self, upload_results):
         self.log("upload finished, building readcap")
         self._upload_status.set_status("Building Readcap")
         r = upload_results
@@ -1031,13 +1004,11 @@ class AssistedUploader:
         assert r.uri_extension_data["total_shares"] == self._total_shares
         assert r.uri_extension_data["segment_size"] == self._segment_size
         assert r.uri_extension_data["size"] == self._size
-        u = uri.CHKFileURI(key=self._key,
-                           uri_extension_hash=r.uri_extension_hash,
-                           needed_shares=self._needed_shares,
-                           total_shares=self._total_shares,
-                           size=self._size,
-                           )
-        r.uri = u.to_string()
+        r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
+                                             uri_extension_hash=r.uri_extension_hash,
+                                             needed_shares=self._needed_shares,
+                                             total_shares=self._total_shares, size=self._size
+                                             ).to_string()
         now = time.time()
         r.file_size = self._size
         r.timings["storage_index"] = self._storage_index_elapsed
@@ -1207,13 +1178,12 @@ class Data(FileHandle):
         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
         FileHandle.__init__(self, StringIO(data), convergence=convergence)
 
-class Uploader(service.MultiService):
+class Uploader(service.MultiService, log.PrefixingLogMixin):
     """I am a service that allows file uploading. I am a service-child of the
     Client.
     """
     implements(IUploader)
     name = "uploader"
-    uploader_class = CHKUploader
     URI_LIT_SIZE_THRESHOLD = 55
     MAX_UPLOAD_STATUSES = 10
 
@@ -1224,6 +1194,7 @@ class Uploader(service.MultiService):
         self._all_uploads = weakref.WeakKeyDictionary() # for debugging
         self._all_upload_statuses = weakref.WeakKeyDictionary()
         self._recent_upload_statuses = []
+        log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
         service.MultiService.__init__(self)
 
     def startService(self):
@@ -1233,7 +1204,7 @@ class Uploader(service.MultiService):
                                       self._got_helper)
 
     def _got_helper(self, helper):
-        log.msg("got helper connection, getting versions")
+        self.log("got helper connection, getting versions")
         default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
                     { },
                     "application-version": "unknown: no get_version()",
@@ -1257,7 +1228,9 @@ class Uploader(service.MultiService):
 
 
     def upload(self, uploadable):
-        # this returns the URI
+        """
+        Returns a Deferred that will fire with the UploadResults instance.
+        """
         assert self.parent
         assert self.running
 
@@ -1275,12 +1248,31 @@ class Uploader(service.MultiService):
 
             if size <= self.URI_LIT_SIZE_THRESHOLD:
                 uploader = LiteralUploader(self.parent)
-            elif self._helper:
-                uploader = AssistedUploader(self._helper)
+                return uploader.start(uploadable)
             else:
-                uploader = self.uploader_class(self.parent)
-            self._add_upload(uploader)
-            return uploader.start(uploadable)
+                eu = EncryptAnUploadable(uploadable, self._parentmsgid)
+                d2 = defer.succeed(None)
+                if self._helper:
+                    uploader = AssistedUploader(self._helper)
+                    d2.addCallback(lambda x: eu.get_storage_index())
+                    d2.addCallback(lambda si: uploader.start(eu, si))
+                else:
+                    uploader = CHKUploader(self.parent)
+                    d2.addCallback(lambda x: uploader.start(eu))
+
+                self._add_upload(uploader)
+                def turn_verifycap_into_read_cap(uploadresults):
+                    # Generate the uri from the verifycap plus the key.
+                    d3 = uploadable.get_encryption_key()
+                    def put_readcap_into_results(key):
+                        v = uri.from_string(uploadresults.verifycapstr)
+                        r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
+                        uploadresults.uri = r.to_string()
+                        return uploadresults
+                    d3.addCallback(put_readcap_into_results)
+                    return d3
+                d2.addCallback(turn_verifycap_into_read_cap)
+                return d2
         d.addCallback(_got_size)
         def _done(res):
             uploadable.close()
index 9828db9a8daa4e61d470cb3e2c22398abeee9ee0..178a9e94a5909a59f366776c1eb8ded53130cdf7 100644 (file)
@@ -1212,10 +1212,9 @@ class IEncoder(Interface):
         set_encrypted_uploadable() and set_shareholders() must be called
         before this can be invoked.
 
-        This returns a Deferred that fires with a tuple of
-        (uri_extension_hash, needed_shares, total_shares, size) when the
-        upload process is complete. This information, plus the encryption
-        key, is sufficient to construct the URI.
+        This returns a Deferred that fires with a verify cap when the upload process is
+        complete. The verifycap, plus the encryption key, is sufficient to construct the read
+        cap.
         """
 
 class IDecoder(Interface):
diff --git a/src/allmydata/offloaded.py b/src/allmydata/offloaded.py
deleted file mode 100644 (file)
index 10097e3..0000000
+++ /dev/null
@@ -1,662 +0,0 @@
-
-import os, stat, time, weakref
-from zope.interface import implements
-from twisted.application import service
-from twisted.internet import defer
-from foolscap import Referenceable, DeadReferenceError
-from foolscap.eventual import eventually
-import allmydata
-from allmydata import interfaces, storage, uri
-from allmydata.immutable import upload
-from allmydata.immutable.layout import ReadBucketProxy
-from allmydata.util import idlib, log, observer, fileutil, hashutil
-
-
-class NotEnoughWritersError(Exception):
-    pass
-
-
-class CHKCheckerAndUEBFetcher:
-    """I check to see if a file is already present in the grid. I also fetch
-    the URI Extension Block, which is useful for an uploading client who
-    wants to avoid the work of encryption and encoding.
-
-    I return False if the file is not completely healthy: i.e. if there are
-    less than 'N' shares present.
-
-    If the file is completely healthy, I return a tuple of (sharemap,
-    UEB_data, UEB_hash).
-    """
-
-    def __init__(self, peer_getter, storage_index, logparent=None):
-        self._peer_getter = peer_getter
-        self._found_shares = set()
-        self._storage_index = storage_index
-        self._sharemap = {}
-        self._readers = set()
-        self._ueb_hash = None
-        self._ueb_data = None
-        self._logparent = logparent
-
-    def log(self, *args, **kwargs):
-        if 'facility' not in kwargs:
-            kwargs['facility'] = "tahoe.helper.chk.checkandUEBfetch"
-        if 'parent' not in kwargs:
-            kwargs['parent'] = self._logparent
-        return log.msg(*args, **kwargs)
-
-    def check(self):
-        d = self._get_all_shareholders(self._storage_index)
-        d.addCallback(self._get_uri_extension)
-        d.addCallback(self._done)
-        return d
-
-    def _get_all_shareholders(self, storage_index):
-        dl = []
-        for (peerid, ss) in self._peer_getter("storage", storage_index):
-            d = ss.callRemote("get_buckets", storage_index)
-            d.addCallbacks(self._got_response, self._got_error,
-                           callbackArgs=(peerid,))
-            dl.append(d)
-        return defer.DeferredList(dl)
-
-    def _got_response(self, buckets, peerid):
-        # buckets is a dict: maps shum to an rref of the server who holds it
-        shnums_s = ",".join([str(shnum) for shnum in buckets])
-        self.log("got_response: [%s] has %d shares (%s)" %
-                 (idlib.shortnodeid_b2a(peerid), len(buckets), shnums_s),
-                 level=log.NOISY)
-        self._found_shares.update(buckets.keys())
-        for k in buckets:
-            if k not in self._sharemap:
-                self._sharemap[k] = []
-            self._sharemap[k].append(peerid)
-        self._readers.update( [ (bucket, peerid)
-                                for bucket in buckets.values() ] )
-
-    def _got_error(self, f):
-        if f.check(DeadReferenceError):
-            return
-        log.err(f, parent=self._logparent)
-        pass
-
-    def _get_uri_extension(self, res):
-        # assume that we can pull the UEB from any share. If we get an error,
-        # declare the whole file unavailable.
-        if not self._readers:
-            self.log("no readers, so no UEB", level=log.NOISY)
-            return
-        b,peerid = self._readers.pop()
-        rbp = ReadBucketProxy(b, peerid, storage.si_b2a(self._storage_index))
-        d = rbp.get_uri_extension()
-        d.addCallback(self._got_uri_extension)
-        d.addErrback(self._ueb_error)
-        return d
-
-    def _got_uri_extension(self, ueb):
-        self.log("_got_uri_extension", level=log.NOISY)
-        self._ueb_hash = hashutil.uri_extension_hash(ueb)
-        self._ueb_data = uri.unpack_extension(ueb)
-
-    def _ueb_error(self, f):
-        # an error means the file is unavailable, but the overall check
-        # shouldn't fail.
-        self.log("UEB fetch failed", failure=f, level=log.WEIRD, umid="sJLKVg")
-        return None
-
-    def _done(self, res):
-        if self._ueb_data:
-            found = len(self._found_shares)
-            total = self._ueb_data['total_shares']
-            self.log(format="got %(found)d shares of %(total)d",
-                     found=found, total=total, level=log.NOISY)
-            if found < total:
-                # not all shares are present in the grid
-                self.log("not enough to qualify, file not found in grid",
-                         level=log.NOISY)
-                return False
-            # all shares are present
-            self.log("all shares present, file is found in grid",
-                     level=log.NOISY)
-            return (self._sharemap, self._ueb_data, self._ueb_hash)
-        # no shares are present
-        self.log("unable to find UEB data, file not found in grid",
-                 level=log.NOISY)
-        return False
-
-
-class CHKUploadHelper(Referenceable, upload.CHKUploader):
-    """I am the helper-server -side counterpart to AssistedUploader. I handle
-    peer selection, encoding, and share pushing. I read ciphertext from the
-    remote AssistedUploader.
-    """
-    implements(interfaces.RICHKUploadHelper)
-    VERSION = { "http://allmydata.org/tahoe/protocols/helper/chk-upload/v1" :
-                 { },
-                "application-version": str(allmydata.__version__),
-                }
-
-    def __init__(self, storage_index, helper,
-                 incoming_file, encoding_file,
-                 results, log_number):
-        self._storage_index = storage_index
-        self._helper = helper
-        self._incoming_file = incoming_file
-        self._encoding_file = encoding_file
-        self._upload_id = storage.si_b2a(storage_index)[:5]
-        self._log_number = log_number
-        self._results = results
-        self._upload_status = upload.UploadStatus()
-        self._upload_status.set_helper(False)
-        self._upload_status.set_storage_index(storage_index)
-        self._upload_status.set_status("fetching ciphertext")
-        self._upload_status.set_progress(0, 1.0)
-        self._helper.log("CHKUploadHelper starting for SI %s" % self._upload_id,
-                         parent=log_number)
-
-        self._client = helper.parent
-        self._fetcher = CHKCiphertextFetcher(self, incoming_file, encoding_file,
-                                             self._log_number)
-        self._reader = LocalCiphertextReader(self, storage_index, encoding_file)
-        self._finished_observers = observer.OneShotObserverList()
-
-        d = self._fetcher.when_done()
-        d.addCallback(lambda res: self._reader.start())
-        d.addCallback(lambda res: self.start_encrypted(self._reader))
-        d.addCallback(self._finished)
-        d.addErrback(self._failed)
-
-    def log(self, *args, **kwargs):
-        if 'facility' not in kwargs:
-            kwargs['facility'] = "tahoe.helper.chk"
-        return upload.CHKUploader.log(self, *args, **kwargs)
-
-    def start(self):
-        self._started = time.time()
-        # determine if we need to upload the file. If so, return ({},self) .
-        # If not, return (UploadResults,None) .
-        self.log("deciding whether to upload the file or not", level=log.NOISY)
-        if os.path.exists(self._encoding_file):
-            # we have the whole file, and we might be encoding it (or the
-            # encode/upload might have failed, and we need to restart it).
-            self.log("ciphertext already in place", level=log.UNUSUAL)
-            return (self._results, self)
-        if os.path.exists(self._incoming_file):
-            # we have some of the file, but not all of it (otherwise we'd be
-            # encoding). The caller might be useful.
-            self.log("partial ciphertext already present", level=log.UNUSUAL)
-            return (self._results, self)
-        # we don't remember uploading this file
-        self.log("no ciphertext yet", level=log.NOISY)
-        return (self._results, self)
-
-    def remote_get_version(self):
-        return self.VERSION
-
-    def remote_upload(self, reader):
-        # reader is an RIEncryptedUploadable. I am specified to return an
-        # UploadResults dictionary.
-
-        # let our fetcher pull ciphertext from the reader.
-        self._fetcher.add_reader(reader)
-        # and also hashes
-        self._reader.add_reader(reader)
-
-        # and inform the client when the upload has finished
-        return self._finished_observers.when_fired()
-
-    def _finished(self, res):
-        (uri_extension_hash, needed_shares, total_shares, size) = res
-        r = self._results
-        r.uri_extension_hash = uri_extension_hash
-        f_times = self._fetcher.get_times()
-        r.timings["cumulative_fetch"] = f_times["cumulative_fetch"]
-        r.ciphertext_fetched = self._fetcher.get_ciphertext_fetched()
-        r.timings["total_fetch"] = f_times["total"]
-        self._reader.close()
-        os.unlink(self._encoding_file)
-        self._finished_observers.fire(r)
-        self._helper.upload_finished(self._storage_index, size)
-        del self._reader
-
-    def _failed(self, f):
-        self.log(format="CHKUploadHelper(%(si)s) failed",
-                 si=storage.si_b2a(self._storage_index)[:5],
-                 failure=f,
-                 level=log.UNUSUAL)
-        self._finished_observers.fire(f)
-        self._helper.upload_finished(self._storage_index, 0)
-        del self._reader
-
-class AskUntilSuccessMixin:
-    # create me with a _reader array
-    _last_failure = None
-
-    def add_reader(self, reader):
-        self._readers.append(reader)
-
-    def call(self, *args, **kwargs):
-        if not self._readers:
-            raise NotEnoughWritersError("ran out of assisted uploaders, last failure was %s" % self._last_failure)
-        rr = self._readers[0]
-        d = rr.callRemote(*args, **kwargs)
-        def _err(f):
-            self._last_failure = f
-            if rr in self._readers:
-                self._readers.remove(rr)
-            self._upload_helper.log("call to assisted uploader %s failed" % rr,
-                                    failure=f, level=log.UNUSUAL)
-            # we can try again with someone else who's left
-            return self.call(*args, **kwargs)
-        d.addErrback(_err)
-        return d
-
-class CHKCiphertextFetcher(AskUntilSuccessMixin):
-    """I use one or more remote RIEncryptedUploadable instances to gather
-    ciphertext on disk. When I'm done, the file I create can be used by a
-    LocalCiphertextReader to satisfy the ciphertext needs of a CHK upload
-    process.
-
-    I begin pulling ciphertext as soon as a reader is added. I remove readers
-    when they have any sort of error. If the last reader is removed, I fire
-    my when_done() Deferred with a failure.
-
-    I fire my when_done() Deferred (with None) immediately after I have moved
-    the ciphertext to 'encoded_file'.
-    """
-
-    def __init__(self, helper, incoming_file, encoded_file, logparent):
-        self._upload_helper = helper
-        self._incoming_file = incoming_file
-        self._encoding_file = encoded_file
-        self._upload_id = helper._upload_id
-        self._log_parent = logparent
-        self._done_observers = observer.OneShotObserverList()
-        self._readers = []
-        self._started = False
-        self._f = None
-        self._times = {
-            "cumulative_fetch": 0.0,
-            "total": 0.0,
-            }
-        self._ciphertext_fetched = 0
-
-    def log(self, *args, **kwargs):
-        if "facility" not in kwargs:
-            kwargs["facility"] = "tahoe.helper.chkupload.fetch"
-        if "parent" not in kwargs:
-            kwargs["parent"] = self._log_parent
-        return log.msg(*args, **kwargs)
-
-    def add_reader(self, reader):
-        AskUntilSuccessMixin.add_reader(self, reader)
-        eventually(self._start)
-
-    def _start(self):
-        if self._started:
-            return
-        self._started = True
-        started = time.time()
-
-        if os.path.exists(self._encoding_file):
-            self.log("ciphertext already present, bypassing fetch",
-                     level=log.UNUSUAL)
-            # we'll still need the plaintext hashes (when
-            # LocalCiphertextReader.get_plaintext_hashtree_leaves() is
-            # called), and currently the easiest way to get them is to ask
-            # the sender for the last byte of ciphertext. That will provoke
-            # them into reading and hashing (but not sending) everything
-            # else.
-            have = os.stat(self._encoding_file)[stat.ST_SIZE]
-            d = self.call("read_encrypted", have-1, 1)
-            d.addCallback(self._done2, started)
-            return
-
-        # first, find out how large the file is going to be
-        d = self.call("get_size")
-        d.addCallback(self._got_size)
-        d.addCallback(self._start_reading)
-        d.addCallback(self._done)
-        d.addCallback(self._done2, started)
-        d.addErrback(self._failed)
-
-    def _got_size(self, size):
-        self.log("total size is %d bytes" % size, level=log.NOISY)
-        self._upload_helper._upload_status.set_size(size)
-        self._expected_size = size
-
-    def _start_reading(self, res):
-        # then find out how much crypttext we have on disk
-        if os.path.exists(self._incoming_file):
-            self._have = os.stat(self._incoming_file)[stat.ST_SIZE]
-            self._upload_helper._helper.count("chk_upload_helper.resumes")
-            self.log("we already have %d bytes" % self._have, level=log.NOISY)
-        else:
-            self._have = 0
-            self.log("we do not have any ciphertext yet", level=log.NOISY)
-        self.log("starting ciphertext fetch", level=log.NOISY)
-        self._f = open(self._incoming_file, "ab")
-
-        # now loop to pull the data from the readers
-        d = defer.Deferred()
-        self._loop(d)
-        # this Deferred will be fired once the last byte has been written to
-        # self._f
-        return d
-
-    # read data in 50kB chunks. We should choose a more considered number
-    # here, possibly letting the client specify it. The goal should be to
-    # keep the RTT*bandwidth to be less than 10% of the chunk size, to reduce
-    # the upload bandwidth lost because this protocol is non-windowing. Too
-    # large, however, means more memory consumption for both ends. Something
-    # that can be transferred in, say, 10 seconds sounds about right. On my
-    # home DSL line (50kBps upstream), that suggests 500kB. Most lines are
-    # slower, maybe 10kBps, which suggests 100kB, and that's a bit more
-    # memory than I want to hang on to, so I'm going to go with 50kB and see
-    # how that works.
-    CHUNK_SIZE = 50*1024
-
-    def _loop(self, fire_when_done):
-        # this slightly weird structure is needed because Deferreds don't do
-        # tail-recursion, so it is important to let each one retire promptly.
-        # Simply chaining them will cause a stack overflow at the end of a
-        # transfer that involves more than a few hundred chunks.
-        # 'fire_when_done' lives a long time, but the Deferreds returned by
-        # the inner _fetch() call do not.
-        start = time.time()
-        d = defer.maybeDeferred(self._fetch)
-        def _done(finished):
-            elapsed = time.time() - start
-            self._times["cumulative_fetch"] += elapsed
-            if finished:
-                self.log("finished reading ciphertext", level=log.NOISY)
-                fire_when_done.callback(None)
-            else:
-                self._loop(fire_when_done)
-        def _err(f):
-            self.log(format="[%(si)s] ciphertext read failed",
-                     si=self._upload_id, failure=f, level=log.UNUSUAL)
-            fire_when_done.errback(f)
-        d.addCallbacks(_done, _err)
-        return None
-
-    def _fetch(self):
-        needed = self._expected_size - self._have
-        fetch_size = min(needed, self.CHUNK_SIZE)
-        if fetch_size == 0:
-            self._upload_helper._upload_status.set_progress(1, 1.0)
-            return True # all done
-        percent = 0.0
-        if self._expected_size:
-            percent = 1.0 * (self._have+fetch_size) / self._expected_size
-        self.log(format="fetching [%(si)s] %(start)d-%(end)d of %(total)d (%(percent)d%%)",
-                 si=self._upload_id,
-                 start=self._have,
-                 end=self._have+fetch_size,
-                 total=self._expected_size,
-                 percent=int(100.0*percent),
-                 level=log.NOISY)
-        d = self.call("read_encrypted", self._have, fetch_size)
-        def _got_data(ciphertext_v):
-            for data in ciphertext_v:
-                self._f.write(data)
-                self._have += len(data)
-                self._ciphertext_fetched += len(data)
-                self._upload_helper._helper.count("chk_upload_helper.fetched_bytes", len(data))
-                self._upload_helper._upload_status.set_progress(1, percent)
-            return False # not done
-        d.addCallback(_got_data)
-        return d
-
-    def _done(self, res):
-        self._f.close()
-        self._f = None
-        self.log(format="done fetching ciphertext, size=%(size)d",
-                 size=os.stat(self._incoming_file)[stat.ST_SIZE],
-                 level=log.NOISY)
-        os.rename(self._incoming_file, self._encoding_file)
-
-    def _done2(self, _ignored, started):
-        self.log("done2", level=log.NOISY)
-        elapsed = time.time() - started
-        self._times["total"] = elapsed
-        self._readers = []
-        self._done_observers.fire(None)
-
-    def _failed(self, f):
-        if self._f:
-            self._f.close()
-        self._readers = []
-        self._done_observers.fire(f)
-
-    def when_done(self):
-        return self._done_observers.when_fired()
-
-    def get_times(self):
-        return self._times
-
-    def get_ciphertext_fetched(self):
-        return self._ciphertext_fetched
-
-
-class LocalCiphertextReader(AskUntilSuccessMixin):
-    implements(interfaces.IEncryptedUploadable)
-
-    def __init__(self, upload_helper, storage_index, encoding_file):
-        self._readers = []
-        self._upload_helper = upload_helper
-        self._storage_index = storage_index
-        self._encoding_file = encoding_file
-        self._status = None
-
-    def start(self):
-        self._upload_helper._upload_status.set_status("pushing")
-        self._size = os.stat(self._encoding_file)[stat.ST_SIZE]
-        self.f = open(self._encoding_file, "rb")
-
-    def get_size(self):
-        return defer.succeed(self._size)
-
-    def get_all_encoding_parameters(self):
-        return self.call("get_all_encoding_parameters")
-
-    def get_storage_index(self):
-        return defer.succeed(self._storage_index)
-
-    def read_encrypted(self, length, hash_only):
-        assert hash_only is False
-        d = defer.maybeDeferred(self.f.read, length)
-        d.addCallback(lambda data: [data])
-        return d
-    def get_plaintext_hashtree_leaves(self, first, last, num_segments):
-        return self.call("get_plaintext_hashtree_leaves", first, last,
-                         num_segments)
-    def get_plaintext_hash(self):
-        return self.call("get_plaintext_hash")
-    def close(self):
-        self.f.close()
-        # ??. I'm not sure if it makes sense to forward the close message.
-        return self.call("close")
-
-
-
-class Helper(Referenceable, service.MultiService):
-    implements(interfaces.RIHelper, interfaces.IStatsProducer)
-    # this is the non-distributed version. When we need to have multiple
-    # helpers, this object will become the HelperCoordinator, and will query
-    # the farm of Helpers to see if anyone has the storage_index of interest,
-    # and send the request off to them. If nobody has it, we'll choose a
-    # helper at random.
-
-    name = "helper"
-    VERSION = { "http://allmydata.org/tahoe/protocols/helper/v1" :
-                 { },
-                "application-version": str(allmydata.__version__),
-                }
-    chk_upload_helper_class = CHKUploadHelper
-    MAX_UPLOAD_STATUSES = 10
-
-    def __init__(self, basedir, stats_provider=None):
-        self._basedir = basedir
-        self._chk_incoming = os.path.join(basedir, "CHK_incoming")
-        self._chk_encoding = os.path.join(basedir, "CHK_encoding")
-        fileutil.make_dirs(self._chk_incoming)
-        fileutil.make_dirs(self._chk_encoding)
-        self._active_uploads = {}
-        self._all_uploads = weakref.WeakKeyDictionary() # for debugging
-        self._all_upload_statuses = weakref.WeakKeyDictionary()
-        self._recent_upload_statuses = []
-        self.stats_provider = stats_provider
-        if stats_provider:
-            stats_provider.register_producer(self)
-        self._counters = {"chk_upload_helper.upload_requests": 0,
-                          "chk_upload_helper.upload_already_present": 0,
-                          "chk_upload_helper.upload_need_upload": 0,
-                          "chk_upload_helper.resumes": 0,
-                          "chk_upload_helper.fetched_bytes": 0,
-                          "chk_upload_helper.encoded_bytes": 0,
-                          }
-        service.MultiService.__init__(self)
-
-    def setServiceParent(self, parent):
-        service.MultiService.setServiceParent(self, parent)
-
-    def log(self, *args, **kwargs):
-        if 'facility' not in kwargs:
-            kwargs['facility'] = "tahoe.helper"
-        return self.parent.log(*args, **kwargs)
-
-    def count(self, key, value=1):
-        if self.stats_provider:
-            self.stats_provider.count(key, value)
-        self._counters[key] += value
-
-    def get_stats(self):
-        OLD = 86400*2 # 48hours
-        now = time.time()
-        inc_count = inc_size = inc_size_old = 0
-        enc_count = enc_size = enc_size_old = 0
-        inc = os.listdir(self._chk_incoming)
-        enc = os.listdir(self._chk_encoding)
-        for f in inc:
-            s = os.stat(os.path.join(self._chk_incoming, f))
-            size = s[stat.ST_SIZE]
-            mtime = s[stat.ST_MTIME]
-            inc_count += 1
-            inc_size += size
-            if now - mtime > OLD:
-                inc_size_old += size
-        for f in enc:
-            s = os.stat(os.path.join(self._chk_encoding, f))
-            size = s[stat.ST_SIZE]
-            mtime = s[stat.ST_MTIME]
-            enc_count += 1
-            enc_size += size
-            if now - mtime > OLD:
-                enc_size_old += size
-        stats = { 'chk_upload_helper.active_uploads': len(self._active_uploads),
-                  'chk_upload_helper.incoming_count': inc_count,
-                  'chk_upload_helper.incoming_size': inc_size,
-                  'chk_upload_helper.incoming_size_old': inc_size_old,
-                  'chk_upload_helper.encoding_count': enc_count,
-                  'chk_upload_helper.encoding_size': enc_size,
-                  'chk_upload_helper.encoding_size_old': enc_size_old,
-                  }
-        stats.update(self._counters)
-        return stats
-
-    def remote_get_version(self):
-        return self.VERSION
-
-    def remote_upload_chk(self, storage_index):
-        self.count("chk_upload_helper.upload_requests")
-        r = upload.UploadResults()
-        started = time.time()
-        si_s = storage.si_b2a(storage_index)
-        lp = self.log(format="helper: upload_chk query for SI %(si)s", si=si_s)
-        incoming_file = os.path.join(self._chk_incoming, si_s)
-        encoding_file = os.path.join(self._chk_encoding, si_s)
-        if storage_index in self._active_uploads:
-            self.log("upload is currently active", parent=lp)
-            uh = self._active_uploads[storage_index]
-            return uh.start()
-
-        d = self._check_for_chk_already_in_grid(storage_index, r, lp)
-        def _checked(already_present):
-            elapsed = time.time() - started
-            r.timings['existence_check'] = elapsed
-            if already_present:
-                # the necessary results are placed in the UploadResults
-                self.count("chk_upload_helper.upload_already_present")
-                self.log("file already found in grid", parent=lp)
-                return (r, None)
-
-            self.count("chk_upload_helper.upload_need_upload")
-            # the file is not present in the grid, by which we mean there are
-            # less than 'N' shares available.
-            self.log("unable to find file in the grid", parent=lp,
-                     level=log.NOISY)
-            # We need an upload helper. Check our active uploads again in
-            # case there was a race.
-            if storage_index in self._active_uploads:
-                self.log("upload is currently active", parent=lp)
-                uh = self._active_uploads[storage_index]
-            else:
-                self.log("creating new upload helper", parent=lp)
-                uh = self.chk_upload_helper_class(storage_index, self,
-                                                  incoming_file, encoding_file,
-                                                  r, lp)
-                self._active_uploads[storage_index] = uh
-                self._add_upload(uh)
-            return uh.start()
-        d.addCallback(_checked)
-        def _err(f):
-            self.log("error while checking for chk-already-in-grid",
-                     failure=f, level=log.WEIRD, parent=lp, umid="jDtxZg")
-            return f
-        d.addErrback(_err)
-        return d
-
-    def _check_for_chk_already_in_grid(self, storage_index, results, lp):
-        # see if this file is already in the grid
-        lp2 = self.log("doing a quick check+UEBfetch",
-                       parent=lp, level=log.NOISY)
-        c = CHKCheckerAndUEBFetcher(self.parent.get_permuted_peers,
-                                    storage_index, lp2)
-        d = c.check()
-        def _checked(res):
-            if res:
-                (sharemap, ueb_data, ueb_hash) = res
-                self.log("found file in grid", level=log.NOISY, parent=lp)
-                results.uri_extension_hash = ueb_hash
-                results.sharemap = {}
-                for shnum, peerids in sharemap.items():
-                    peers_s = ",".join(["[%s]" % idlib.shortnodeid_b2a(peerid)
-                                        for peerid in peerids])
-                    results.sharemap[shnum] = "Found on " + peers_s
-                results.uri_extension_data = ueb_data
-                results.preexisting_shares = len(sharemap)
-                results.pushed_shares = 0
-                return True
-            return False
-        d.addCallback(_checked)
-        return d
-
-    def _add_upload(self, uh):
-        self._all_uploads[uh] = None
-        s = uh.get_upload_status()
-        self._all_upload_statuses[s] = None
-        self._recent_upload_statuses.append(s)
-        while len(self._recent_upload_statuses) > self.MAX_UPLOAD_STATUSES:
-            self._recent_upload_statuses.pop(0)
-
-    def upload_finished(self, storage_index, size):
-        # this is called with size=0 if the upload failed
-        self.count("chk_upload_helper.encoded_bytes", size)
-        uh = self._active_uploads[storage_index]
-        del self._active_uploads[storage_index]
-        s = uh.get_upload_status()
-        s.set_active(False)
-
-    def get_all_upload_statuses(self):
-        return self._all_upload_statuses
index 39e0adae2223e00b034466306e6ee31580107174..1bcec9aa01872820f326436b1f1ffa55ebe36524 100644 (file)
@@ -304,9 +304,9 @@ class Encode(unittest.TestCase):
         d.addCallback(_ready)
 
         def _check(res):
-            (uri_extension_hash, required_shares, num_shares, file_size) = res
-            self.failUnless(isinstance(uri_extension_hash, str))
-            self.failUnlessEqual(len(uri_extension_hash), 32)
+            verifycap = res
+            self.failUnless(isinstance(verifycap.uri_extension_hash, str))
+            self.failUnlessEqual(len(verifycap.uri_extension_hash), 32)
             for i,peer in enumerate(all_shareholders):
                 self.failUnless(peer.closed)
                 self.failUnlessEqual(len(peer.blocks), NUM_SEGMENTS)
@@ -475,7 +475,7 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin):
 
     def recover(self, (res, key, shareholders), AVAILABLE_SHARES,
                 recover_mode, target=None):
-        (uri_extension_hash, required_shares, num_shares, file_size) = res
+        verifycap = res
 
         if "corrupt_key" in recover_mode:
             # we corrupt the key, so that the decrypted data is corrupted and
@@ -485,10 +485,10 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin):
             key = flip_bit(key)
 
         u = uri.CHKFileURI(key=key,
-                           uri_extension_hash=uri_extension_hash,
-                           needed_shares=required_shares,
-                           total_shares=num_shares,
-                           size=file_size)
+                           uri_extension_hash=verifycap.uri_extension_hash,
+                           needed_shares=verifycap.needed_shares,
+                           total_shares=verifycap.total_shares,
+                           size=verifycap.size)
 
         client = FakeClient()
         if not target:
index fe815101d0fae1f50dd2cf26bbafd39817bd1e28..163b51ec5dae4bace310cf753d3521a75adb8e98 100644 (file)
@@ -5,8 +5,9 @@ from twisted.application import service
 from foolscap import Tub, eventual
 from foolscap.logging import log
 
-from allmydata import offloaded, storage
-from allmydata.immutable import upload
+from allmydata import storage
+from allmydata.immutable import offloaded, upload
+from allmydata import uri
 from allmydata.util import hashutil, fileutil, mathutil
 from pycryptopp.cipher.aes import AES
 
@@ -27,8 +28,10 @@ class CHKUploadHelper_fake(offloaded.CHKUploadHelper):
                             "size": size,
                             }
                 self._results.uri_extension_data = ueb_data
-                return (hashutil.uri_extension_hash(""),
-                        needed_shares, total_shares, size)
+                self._results.verifycapstr = uri.CHKFileVerifierURI(self._storage_index, "x"*32,
+                                                                 needed_shares, total_shares,
+                                                                 size).to_string()
+                return self._results
             d2.addCallback(_got_parms)
             return d2
         d.addCallback(_got_size)
index 32968192257e93168c7dd7e98bf6f76b00cf2957..cdd5708d44d5c735db9289bf86b970f29be66bc7 100644 (file)
@@ -8,8 +8,8 @@ from twisted.internet import threads # CLI tests use deferToThread
 from twisted.internet.error import ConnectionDone, ConnectionLost
 from twisted.internet.interfaces import IConsumer, IPushProducer
 import allmydata
-from allmydata import uri, storage, offloaded
-from allmydata.immutable import download, upload, filenode
+from allmydata import uri, storage
+from allmydata.immutable import download, filenode, offloaded, upload
 from allmydata.util import idlib, mathutil
 from allmydata.util import log, base32
 from allmydata.scripts import runner