helper status: include percentage fetched+pushed, add helper-uploads to the upload...
authorBrian Warner <warner@allmydata.com>
Tue, 15 Apr 2008 01:36:27 +0000 (18:36 -0700)
committerBrian Warner <warner@allmydata.com>
Tue, 15 Apr 2008 01:36:27 +0000 (18:36 -0700)
src/allmydata/client.py
src/allmydata/offloaded.py
src/allmydata/test/test_web.py
src/allmydata/web/status.py

index 90abef3ce0d2125bad26c03a463792b58d09cb9a..bcab46027e7404cf42b762d0d89b68ff87a38b71 100644 (file)
@@ -369,3 +369,11 @@ class Client(node.Node, testutil.PollMixin):
     def list_recent_retrieve(self):
         watcher = self.getServiceNamed("mutable-watcher")
         return watcher.list_recent_retrieve()
+
+    def list_active_helper_statuses(self):
+        helper = self.getServiceNamed("helper")
+        return helper.get_active_upload_statuses()
+    def list_recent_helper_statuses(self):
+        helper = self.getServiceNamed("helper")
+        return helper.get_recent_upload_statuses()
+
index 03dcfeea44e10c01f2c684b053bce4e7ec873884..81c61156aaf1deb988626ae3a25b6831582adb60 100644 (file)
@@ -143,6 +143,9 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
         self._results = results
         self._upload_status = upload.UploadStatus()
         self._upload_status.set_helper(False)
+        self._upload_status.set_storage_index(storage_index)
+        self._upload_status.set_status("fetching ciphertext")
+        self._upload_status.set_progress(0, 1.0)
         self._helper.log("CHKUploadHelper starting for SI %s" % upload_id,
                          parent=log_number)
 
@@ -209,6 +212,10 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
         del self._reader
 
     def _failed(self, f):
+        self.log(format="CHKUploadHelper(%(si)s) failed",
+                 si=storage.si_b2a(self._storage_index)[:5],
+                 failure=f,
+                 level=log.UNUSUAL)
         self._finished_observers.fire(f)
         self._helper.upload_finished(self._storage_index, 0)
         del self._reader
@@ -306,6 +313,7 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin):
 
     def _got_size(self, size):
         self.log("total size is %d bytes" % size, level=log.NOISY)
+        self._upload_helper._upload_status.set_size(size)
         self._expected_size = size
 
     def _start_reading(self, res):
@@ -365,16 +373,16 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin):
         needed = self._expected_size - self._have
         fetch_size = min(needed, self.CHUNK_SIZE)
         if fetch_size == 0:
+            self._upload_helper._upload_status.set_progress(1, 1.0)
             return True # all done
-        percent = 0
+        percent = 0.0
         if self._expected_size:
             percent = 1.0 * (self._have+fetch_size) / self._expected_size
-            percent = int(100*percent)
         self.log(format="fetching %(start)d-%(end)d of %(total)d (%(percent)d%%)",
                  start=self._have,
                  end=self._have+fetch_size,
                  total=self._expected_size,
-                 percent=percent,
+                 percent=int(100.0*percent),
                  level=log.NOISY)
         d = self.call("read_encrypted", self._have, fetch_size)
         def _got_data(ciphertext_v):
@@ -383,6 +391,7 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin):
                 self._have += len(data)
                 self._ciphertext_fetched += len(data)
                 self._upload_helper._helper.count("chk_upload_helper.fetched_bytes", len(data))
+                self._upload_helper._upload_status.set_progress(1, percent)
             return False # not done
         d.addCallback(_got_data)
         return d
@@ -432,6 +441,7 @@ class LocalCiphertextReader(AskUntilSuccessMixin):
         self._status = interfaces.IUploadStatus(upload_status)
 
     def start(self):
+        self._upload_helper._upload_status.set_status("pushing")
         self._size = os.stat(self._encoding_file)[stat.ST_SIZE]
         self.f = open(self._encoding_file, "rb")
 
@@ -471,6 +481,7 @@ class Helper(Referenceable, service.MultiService):
 
     name = "helper"
     chk_upload_helper_class = CHKUploadHelper
+    MAX_UPLOAD_STATUSES = 10
 
     def __init__(self, basedir, stats_provider=None):
         self._basedir = basedir
@@ -479,6 +490,7 @@ class Helper(Referenceable, service.MultiService):
         fileutil.make_dirs(self._chk_incoming)
         fileutil.make_dirs(self._chk_encoding)
         self._active_uploads = {}
+        self._recent_upload_statuses = []
         self.stats_provider = stats_provider
         if stats_provider:
             stats_provider.register_producer(self)
@@ -612,4 +624,16 @@ class Helper(Referenceable, service.MultiService):
 
     def upload_finished(self, storage_index, size):
         self.count("chk_upload_helper.encoded_bytes", size)
+        uh = self._active_uploads[storage_index]
         del self._active_uploads[storage_index]
+        s = uh.get_upload_status()
+        s.set_active(False)
+        self._recent_upload_statuses.append(s)
+        while len(self._recent_upload_statuses) > self.MAX_UPLOAD_STATUSES:
+            self._recent_upload_statuses.pop(0)
+
+    def get_active_upload_statuses(self):
+        return [u.get_upload_status() for u in self._active_uploads.values()]
+
+    def get_recent_upload_statuses(self):
+        return self._recent_upload_statuses
index c53b49aecf6707f54b78e216c6cb442bb4983fae..73a149a97fa0908ea61dee541bd89c042137041c 100644 (file)
@@ -94,7 +94,10 @@ class FakeClient(service.MultiService):
         return []
     def list_recent_retrieve(self):
         return []
-
+    def list_active_helper_statuses(self):
+        return []
+    def list_recent_helper_statuses(self):
+        return []
 
 class WebMixin(object):
     def setUp(self):
index 1b07045db0f4d8e7b7aeae279f92226a9d8a7322..917c265f5cb32fc789449c39fed7d0afaab85216 100644 (file)
@@ -659,17 +659,21 @@ class Status(rend.Page):
     addSlash = True
 
     def data_active_operations(self, ctx, data):
-        active =  (IClient(ctx).list_active_uploads() +
-                   IClient(ctx).list_active_downloads() +
-                   IClient(ctx).list_active_publish() +
-                   IClient(ctx).list_active_retrieve())
+        client = IClient(ctx)
+        active =  (client.list_active_uploads() +
+                   client.list_active_downloads() +
+                   client.list_active_publish() +
+                   client.list_active_retrieve() +
+                   client.list_active_helper_statuses())
         return active
 
     def data_recent_operations(self, ctx, data):
-        recent = [o for o in (IClient(ctx).list_recent_uploads() +
-                              IClient(ctx).list_recent_downloads() +
-                              IClient(ctx).list_recent_publish() +
-                              IClient(ctx).list_recent_retrieve())
+        client = IClient(ctx)
+        recent = [o for o in (client.list_recent_uploads() +
+                              client.list_recent_downloads() +
+                              client.list_recent_publish() +
+                              client.list_recent_retrieve() +
+                              client.list_recent_helper_statuses())
                   if not o.get_active()]
         recent.sort(lambda a,b: cmp(a.get_started(), b.get_started()))
         recent.reverse()
@@ -735,6 +739,12 @@ class Status(rend.Page):
                 s = u.get_upload_status()
                 if s.get_counter() == count:
                     return UploadStatusPage(s)
+            for s in (client.list_active_helper_statuses() +
+                      client.list_recent_helper_statuses()):
+                if s.get_counter() == count:
+                    # immutable-upload helpers use the same status object as
+                    # a regular immutable-upload
+                    return UploadStatusPage(s)
         if stype == "down":
             for s in client.list_recent_downloads():
                 if s.get_counter() == count: