]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/immutable/upload.py
Flesh out "tahoe magic-folder status" command
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / upload.py
index d152c44d6af0f5159154bba83d8050b861cacb3f..99168f4e07952efe53a18a9154a554dd0115cf70 100644 (file)
@@ -16,12 +16,12 @@ from allmydata.util import base32, dictutil, idlib, log, mathutil
 from allmydata.util.happinessutil import servers_of_happiness, \
                                          shares_by_server, merge_servers, \
                                          failure_message
-from allmydata.util.assertutil import precondition
+from allmydata.util.assertutil import precondition, _assert
 from allmydata.util.rrefutil import add_version_to_remote_reference
 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
      IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
      NoServersError, InsufficientVersionError, UploadUnhappinessError, \
-     DEFAULT_MAX_SEGMENT_SIZE
+     DEFAULT_MAX_SEGMENT_SIZE, IProgress
 from allmydata.immutable import layout
 from pycryptopp.cipher.aes import AES
 
@@ -32,8 +32,10 @@ from cStringIO import StringIO
 class TooFullError(Exception):
     pass
 
-class UploadResults(Copyable, RemoteCopy):
-    implements(IUploadResults)
+# HelperUploadResults are what we get from the Helper, and to retain
+# backwards compatibility with old Helpers we can't change the format. We
+# convert them into a local UploadResults upon receipt.
+class HelperUploadResults(Copyable, RemoteCopy):
     # note: don't change this string, it needs to match the value used on the
     # helper, and it does *not* need to match the fully-qualified
     # package/module/class name
@@ -55,6 +57,53 @@ class UploadResults(Copyable, RemoteCopy):
         self.preexisting_shares = None # count of shares already present
         self.pushed_shares = None # count of shares we pushed
 
+class UploadResults:
+    implements(IUploadResults)
+
+    def __init__(self, file_size,
+                 ciphertext_fetched, # how much the helper fetched
+                 preexisting_shares, # count of shares already present
+                 pushed_shares, # count of shares we pushed
+                 sharemap, # {shnum: set(server)}
+                 servermap, # {server: set(shnum)}
+                 timings, # dict of name to number of seconds
+                 uri_extension_data,
+                 uri_extension_hash,
+                 verifycapstr):
+        self._file_size = file_size
+        self._ciphertext_fetched = ciphertext_fetched
+        self._preexisting_shares = preexisting_shares
+        self._pushed_shares = pushed_shares
+        self._sharemap = sharemap
+        self._servermap = servermap
+        self._timings = timings
+        self._uri_extension_data = uri_extension_data
+        self._uri_extension_hash = uri_extension_hash
+        self._verifycapstr = verifycapstr
+
+    def set_uri(self, uri):
+        self._uri = uri
+
+    def get_file_size(self):
+        return self._file_size
+    def get_uri(self):
+        return self._uri
+    def get_ciphertext_fetched(self):
+        return self._ciphertext_fetched
+    def get_preexisting_shares(self):
+        return self._preexisting_shares
+    def get_pushed_shares(self):
+        return self._pushed_shares
+    def get_sharemap(self):
+        return self._sharemap
+    def get_servermap(self):
+        return self._servermap
+    def get_timings(self):
+        return self._timings
+    def get_uri_extension_data(self):
+        return self._uri_extension_data
+    def get_verifycapstr(self):
+        return self._verifycapstr
 
 # our current uri_extension is 846 bytes for small files, a few bytes
 # more for larger ones (since the filesize is encoded in decimal in a
@@ -95,6 +144,8 @@ class ServerTracker:
         return ("<ServerTracker for server %s and SI %s>"
                 % (self._server.get_name(), si_b2a(self.storage_index)[:5]))
 
+    def get_server(self):
+        return self._server
     def get_serverid(self):
         return self._server.get_serverid()
     def get_name(self):
@@ -220,9 +271,9 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
             v0 = server.get_rref().version
             v1 = v0["http://allmydata.org/tahoe/protocols/storage/v1"]
             return v1["maximum-immutable-share-size"]
-        writable_servers = [server for server in all_servers
+        writeable_servers = [server for server in all_servers
                             if _get_maxsize(server) >= allocated_size]
-        readonly_servers = set(all_servers[:2*total_shares]) - set(writable_servers)
+        readonly_servers = set(all_servers[:2*total_shares]) - set(writeable_servers)
 
         # decide upon the renewal/cancel secrets, to include them in the
         # allocate_buckets query.
@@ -257,7 +308,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
         # second-pass list and repeat the "second" pass (really the third,
         # fourth, etc pass), until all shares are assigned, or we've run out
         # of potential servers.
-        self.first_pass_trackers = _make_trackers(writable_servers)
+        self.first_pass_trackers = _make_trackers(writeable_servers)
         self.second_pass_trackers = [] # servers worth asking again
         self.next_pass_trackers = [] # servers that we have asked again
         self._started_second_pass = False
@@ -572,7 +623,9 @@ class EncryptAnUploadable:
     implements(IEncryptedUploadable)
     CHUNKSIZE = 50*1024
 
-    def __init__(self, original, log_parent=None):
+    def __init__(self, original, log_parent=None, progress=None):
+        precondition(original.default_params_set,
+                     "set_default_encoding_parameters not called on %r before wrapping with EncryptAnUploadable" % (original,))
         self.original = IUploadable(original)
         self._log_number = log_parent
         self._encryptor = None
@@ -583,6 +636,7 @@ class EncryptAnUploadable:
         self._file_size = None
         self._ciphertext_bytes_read = 0
         self._status = None
+        self._progress = progress
 
     def set_upload_status(self, upload_status):
         self._status = IUploadStatus(upload_status)
@@ -603,6 +657,8 @@ class EncryptAnUploadable:
             self._file_size = size
             if self._status:
                 self._status.set_size(size)
+            if self._progress:
+                self._progress.set_progress_total(size)
             return size
         d.addCallback(_got_size)
         return d
@@ -841,18 +897,17 @@ class UploadStatus:
 class CHKUploader:
     server_selector_class = Tahoe2ServerSelector
 
-    def __init__(self, storage_broker, secret_holder):
+    def __init__(self, storage_broker, secret_holder, progress=None):
         # server_selector needs storage_broker and secret_holder
         self._storage_broker = storage_broker
         self._secret_holder = secret_holder
         self._log_number = self.log("CHKUploader starting", parent=None)
         self._encoder = None
-        self._results = UploadResults()
         self._storage_index = None
         self._upload_status = UploadStatus()
         self._upload_status.set_helper(False)
         self._upload_status.set_active(True)
-        self._upload_status.set_results(self._results)
+        self._progress = progress
 
         # locate_all_shareholders() will create the following attribute:
         # self._server_trackers = {} # k: shnum, v: instance of ServerTracker
@@ -896,8 +951,11 @@ class CHKUploader:
         eu = IEncryptedUploadable(encrypted)
 
         started = time.time()
-        self._encoder = e = encode.Encoder(self._log_number,
-                                           self._upload_status)
+        self._encoder = e = encode.Encoder(
+            self._log_number,
+            self._upload_status,
+            progress=self._progress,
+        )
         d = e.set_encrypted_uploadable(eu)
         d.addCallback(self.locate_all_shareholders, started)
         d.addCallback(self.set_shareholders, e)
@@ -950,7 +1008,7 @@ class CHKUploader:
                    for st in upload_trackers], already_serverids)
         self.log(msgtempl % values, level=log.OPERATIONAL)
         # record already-present shares in self._results
-        self._results.preexisting_shares = len(already_serverids)
+        self._count_preexisting_shares = len(already_serverids)
 
         self._server_trackers = {} # k: shnum, v: instance of ServerTracker
         for tracker in upload_trackers:
@@ -973,23 +1031,32 @@ class CHKUploader:
         encoder.set_shareholders(buckets, servermap)
 
     def _encrypted_done(self, verifycap):
-        """ Returns a Deferred that will fire with the UploadResults instance. """
-        r = self._results
-        for shnum in self._encoder.get_shares_placed():
-            server_tracker = self._server_trackers[shnum]
-            serverid = server_tracker.get_serverid()
-            r.sharemap.add(shnum, serverid)
-            r.servermap.add(serverid, shnum)
-        r.pushed_shares = len(self._encoder.get_shares_placed())
+        """Returns a Deferred that will fire with the UploadResults instance."""
+        e = self._encoder
+        sharemap = dictutil.DictOfSets()
+        servermap = dictutil.DictOfSets()
+        for shnum in e.get_shares_placed():
+            server = self._server_trackers[shnum].get_server()
+            sharemap.add(shnum, server)
+            servermap.add(server, shnum)
         now = time.time()
-        r.file_size = self._encoder.file_size
-        r.timings["total"] = now - self._started
-        r.timings["storage_index"] = self._storage_index_elapsed
-        r.timings["peer_selection"] = self._server_selection_elapsed
-        r.timings.update(self._encoder.get_times())
-        r.uri_extension_data = self._encoder.get_uri_extension_data()
-        r.verifycapstr = verifycap.to_string()
-        return r
+        timings = {}
+        timings["total"] = now - self._started
+        timings["storage_index"] = self._storage_index_elapsed
+        timings["peer_selection"] = self._server_selection_elapsed
+        timings.update(e.get_times())
+        ur = UploadResults(file_size=e.file_size,
+                           ciphertext_fetched=0,
+                           preexisting_shares=self._count_preexisting_shares,
+                           pushed_shares=len(e.get_shares_placed()),
+                           sharemap=sharemap,
+                           servermap=servermap,
+                           timings=timings,
+                           uri_extension_data=e.get_uri_extension_data(),
+                           uri_extension_hash=e.get_uri_extension_hash(),
+                           verifycapstr=verifycap.to_string())
+        self._upload_status.set_results(ur)
+        return ur
 
     def get_upload_status(self):
         return self._upload_status
@@ -1013,14 +1080,13 @@ def read_this_many_bytes(uploadable, size, prepend_data=[]):
 
 class LiteralUploader:
 
-    def __init__(self):
-        self._results = UploadResults()
+    def __init__(self, progress=None):
         self._status = s = UploadStatus()
         s.set_storage_index(None)
         s.set_helper(False)
         s.set_progress(0, 1.0)
         s.set_active(False)
-        s.set_results(self._results)
+        self._progress = progress
 
     def start(self, uploadable):
         uploadable = IUploadable(uploadable)
@@ -1028,7 +1094,8 @@ class LiteralUploader:
         def _got_size(size):
             self._size = size
             self._status.set_size(size)
-            self._results.file_size = size
+            if self._progress:
+                self._progress.set_progress_total(size)
             return read_this_many_bytes(uploadable, size)
         d.addCallback(_got_size)
         d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
@@ -1037,11 +1104,24 @@ class LiteralUploader:
         return d
 
     def _build_results(self, uri):
-        self._results.uri = uri
+        ur = UploadResults(file_size=self._size,
+                           ciphertext_fetched=0,
+                           preexisting_shares=0,
+                           pushed_shares=0,
+                           sharemap={},
+                           servermap={},
+                           timings={},
+                           uri_extension_data=None,
+                           uri_extension_hash=None,
+                           verifycapstr=None)
+        ur.set_uri(uri)
         self._status.set_status("Finished")
         self._status.set_progress(1, 1.0)
         self._status.set_progress(2, 1.0)
-        return self._results
+        self._status.set_results(ur)
+        if self._progress:
+            self._progress.set_progress(self._size)
+        return ur
 
     def close(self):
         pass
@@ -1122,8 +1202,9 @@ class RemoteEncryptedUploadable(Referenceable):
 
 class AssistedUploader:
 
-    def __init__(self, helper):
+    def __init__(self, helper, storage_broker):
         self._helper = helper
+        self._storage_broker = storage_broker
         self._log_number = log.msg("AssistedUploader starting")
         self._storage_index = None
         self._upload_status = s = UploadStatus()
@@ -1179,7 +1260,7 @@ class AssistedUploader:
         d.addCallback(self._contacted_helper)
         return d
 
-    def _contacted_helper(self, (upload_results, upload_helper)):
+    def _contacted_helper(self, (helper_upload_results, upload_helper)):
         now = time.time()
         elapsed = now - self._time_contacting_helper_start
         self._elapsed_time_contacting_helper = elapsed
@@ -1197,8 +1278,7 @@ class AssistedUploader:
             return d
         self.log("helper says file is already uploaded", level=log.OPERATIONAL)
         self._upload_status.set_progress(1, 1.0)
-        self._upload_status.set_results(upload_results)
-        return upload_results
+        return helper_upload_results
 
     def _convert_old_upload_results(self, upload_results):
         # pre-1.3.0 helpers return upload results which contain a mapping
@@ -1217,30 +1297,56 @@ class AssistedUploader:
         if str in [type(v) for v in sharemap.values()]:
             upload_results.sharemap = None
 
-    def _build_verifycap(self, upload_results):
+    def _build_verifycap(self, helper_upload_results):
         self.log("upload finished, building readcap", level=log.OPERATIONAL)
-        self._convert_old_upload_results(upload_results)
+        self._convert_old_upload_results(helper_upload_results)
         self._upload_status.set_status("Building Readcap")
-        r = upload_results
-        assert r.uri_extension_data["needed_shares"] == self._needed_shares
-        assert r.uri_extension_data["total_shares"] == self._total_shares
-        assert r.uri_extension_data["segment_size"] == self._segment_size
-        assert r.uri_extension_data["size"] == self._size
-        r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
-                                             uri_extension_hash=r.uri_extension_hash,
-                                             needed_shares=self._needed_shares,
-                                             total_shares=self._total_shares, size=self._size
-                                             ).to_string()
+        hur = helper_upload_results
+        assert hur.uri_extension_data["needed_shares"] == self._needed_shares
+        assert hur.uri_extension_data["total_shares"] == self._total_shares
+        assert hur.uri_extension_data["segment_size"] == self._segment_size
+        assert hur.uri_extension_data["size"] == self._size
+
+        # hur.verifycap doesn't exist if already found
+        v = uri.CHKFileVerifierURI(self._storage_index,
+                                   uri_extension_hash=hur.uri_extension_hash,
+                                   needed_shares=self._needed_shares,
+                                   total_shares=self._total_shares,
+                                   size=self._size)
+        timings = {}
+        timings["storage_index"] = self._storage_index_elapsed
+        timings["contacting_helper"] = self._elapsed_time_contacting_helper
+        for key,val in hur.timings.items():
+            if key == "total":
+                key = "helper_total"
+            timings[key] = val
         now = time.time()
-        r.file_size = self._size
-        r.timings["storage_index"] = self._storage_index_elapsed
-        r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
-        if "total" in r.timings:
-            r.timings["helper_total"] = r.timings["total"]
-        r.timings["total"] = now - self._started
+        timings["total"] = now - self._started
+
+        gss = self._storage_broker.get_stub_server
+        sharemap = {}
+        servermap = {}
+        for shnum, serverids in hur.sharemap.items():
+            sharemap[shnum] = set([gss(serverid) for serverid in serverids])
+        # if the file was already in the grid, hur.servermap is an empty dict
+        for serverid, shnums in hur.servermap.items():
+            servermap[gss(serverid)] = set(shnums)
+
+        ur = UploadResults(file_size=self._size,
+                           # not if already found
+                           ciphertext_fetched=hur.ciphertext_fetched,
+                           preexisting_shares=hur.preexisting_shares,
+                           pushed_shares=hur.pushed_shares,
+                           sharemap=sharemap,
+                           servermap=servermap,
+                           timings=timings,
+                           uri_extension_data=hur.uri_extension_data,
+                           uri_extension_hash=hur.uri_extension_hash,
+                           verifycapstr=v.to_string())
+
         self._upload_status.set_status("Finished")
-        self._upload_status.set_results(r)
-        return r
+        self._upload_status.set_results(ur)
+        return ur
 
     def get_upload_status(self):
         return self._upload_status
@@ -1248,9 +1354,7 @@ class AssistedUploader:
 class BaseUploadable:
     # this is overridden by max_segment_size
     default_max_segment_size = DEFAULT_MAX_SEGMENT_SIZE
-    default_encoding_param_k = 3 # overridden by encoding_parameters
-    default_encoding_param_happy = 7
-    default_encoding_param_n = 10
+    default_params_set = False
 
     max_segment_size = None
     encoding_param_k = None
@@ -1276,8 +1380,10 @@ class BaseUploadable:
             self.default_encoding_param_n = default_params["n"]
         if "max_segment_size" in default_params:
             self.default_max_segment_size = default_params["max_segment_size"]
+        self.default_params_set = True
 
     def get_all_encoding_parameters(self):
+        _assert(self.default_params_set, "set_default_encoding_parameters not called on %r" % (self,))
         if self._all_encoding_parameters:
             return defer.succeed(self._all_encoding_parameters)
 
@@ -1363,7 +1469,7 @@ class FileHandle(BaseUploadable):
     def get_size(self):
         if self._size is not None:
             return defer.succeed(self._size)
-        self._filehandle.seek(0,2)
+        self._filehandle.seek(0, os.SEEK_END)
         size = self._filehandle.tell()
         self._size = size
         self._filehandle.seek(0)
@@ -1409,11 +1515,13 @@ class Uploader(service.MultiService, log.PrefixingLogMixin):
     name = "uploader"
     URI_LIT_SIZE_THRESHOLD = 55
 
-    def __init__(self, helper_furl=None, stats_provider=None):
+    def __init__(self, helper_furl=None, stats_provider=None, history=None, progress=None):
         self._helper_furl = helper_furl
         self.stats_provider = stats_provider
+        self._history = history
         self._helper = None
         self._all_uploads = weakref.WeakKeyDictionary() # for debugging
+        self._progress = progress
         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
         service.MultiService.__init__(self)
 
@@ -1447,12 +1555,13 @@ class Uploader(service.MultiService, log.PrefixingLogMixin):
         return (self._helper_furl, bool(self._helper))
 
 
-    def upload(self, uploadable, history=None):
+    def upload(self, uploadable, progress=None):
         """
         Returns a Deferred that will fire with the UploadResults instance.
         """
         assert self.parent
         assert self.running
+        assert progress is None or IProgress.providedBy(progress)
 
         uploadable = IUploadable(uploadable)
         d = uploadable.get_size()
@@ -1461,37 +1570,40 @@ class Uploader(service.MultiService, log.PrefixingLogMixin):
             precondition(isinstance(default_params, dict), default_params)
             precondition("max_segment_size" in default_params, default_params)
             uploadable.set_default_encoding_parameters(default_params)
+            if progress:
+                progress.set_progress_total(size)
 
             if self.stats_provider:
                 self.stats_provider.count('uploader.files_uploaded', 1)
                 self.stats_provider.count('uploader.bytes_uploaded', size)
 
             if size <= self.URI_LIT_SIZE_THRESHOLD:
-                uploader = LiteralUploader()
+                uploader = LiteralUploader(progress=progress)
                 return uploader.start(uploadable)
             else:
                 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
                 d2 = defer.succeed(None)
+                storage_broker = self.parent.get_storage_broker()
                 if self._helper:
-                    uploader = AssistedUploader(self._helper)
+                    uploader = AssistedUploader(self._helper, storage_broker)
                     d2.addCallback(lambda x: eu.get_storage_index())
                     d2.addCallback(lambda si: uploader.start(eu, si))
                 else:
                     storage_broker = self.parent.get_storage_broker()
                     secret_holder = self.parent._secret_holder
-                    uploader = CHKUploader(storage_broker, secret_holder)
+                    uploader = CHKUploader(storage_broker, secret_holder, progress=progress)
                     d2.addCallback(lambda x: uploader.start(eu))
 
                 self._all_uploads[uploader] = None
-                if history:
-                    history.add_upload(uploader.get_upload_status())
+                if self._history:
+                    self._history.add_upload(uploader.get_upload_status())
                 def turn_verifycap_into_read_cap(uploadresults):
                     # Generate the uri from the verifycap plus the key.
                     d3 = uploadable.get_encryption_key()
                     def put_readcap_into_results(key):
-                        v = uri.from_string(uploadresults.verifycapstr)
+                        v = uri.from_string(uploadresults.get_verifycapstr())
                         r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
-                        uploadresults.uri = r.to_string()
+                        uploadresults.set_uri(r.to_string())
                         return uploadresults
                     d3.addCallback(put_readcap_into_results)
                     return d3