]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/control.py
control.py: minor improvements
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / control.py
index 73756605e449dcc471d9e7b373689e97c86775da..7bd8eedab13f68dbe27312ae251fefe76106ab5d 100644 (file)
@@ -1,12 +1,14 @@
 
-import os, time
+import os, time, tempfile
 from zope.interface import implements
 from twisted.application import service
 from twisted.internet import defer
-from foolscap import Referenceable
-from allmydata.interfaces import RIControlClient
+from twisted.internet.interfaces import IConsumer
+from foolscap.api import Referenceable
+from allmydata.interfaces import RIControlClient, IFileNode
 from allmydata.util import fileutil, mathutil
-from allmydata.immutable import upload, download
+from allmydata.immutable import upload
+from allmydata.mutable.publish import MutableData
 from twisted.python import log
 
 def get_memory_usage():
@@ -35,6 +37,22 @@ def log_memory_usage(where=""):
                                               stats["VmPeak"],
                                               where))
 
+class FileWritingConsumer:
+    implements(IConsumer)
+    def __init__(self, filename):
+        self.done = False
+        self.f = open(filename, "wb")
+    def registerProducer(self, p, streaming):
+        if streaming:
+            p.resumeProducing()
+        else:
+            while not self.done:
+                p.resumeProducing()
+    def write(self, data):
+        self.f.write(data)
+    def unregisterProducer(self):
+        self.done = True
+        self.f.close()
 
 class ControlServer(Referenceable, service.Service):
     implements(RIControlClient)
@@ -42,17 +60,40 @@ class ControlServer(Referenceable, service.Service):
     def remote_wait_for_client_connections(self, num_clients):
         return self.parent.debug_wait_for_client_connections(num_clients)
 
-    def remote_upload_from_file_to_uri(self, filename, convergence):
+    def remote_upload_random_data_from_file(self, size, convergence):
+        tempdir = tempfile.mkdtemp()
+        filename = os.path.join(tempdir, "data")
+        f = open(filename, "wb")
+        block = "a" * 8192
+        while size > 0:
+            l = min(size, 8192)
+            f.write(block[:l])
+            size -= l
+        f.close()
         uploader = self.parent.getServiceNamed("uploader")
         u = upload.FileName(filename, convergence=convergence)
         d = uploader.upload(u)
-        d.addCallback(lambda results: results.uri)
+        d.addCallback(lambda results: results.get_uri())
+        def _done(uri):
+            os.remove(filename)
+            os.rmdir(tempdir)
+            return uri
+        d.addCallback(_done)
         return d
 
-    def remote_download_from_uri_to_file(self, uri, filename):
-        downloader = self.parent.getServiceNamed("downloader")
-        d = downloader.download_to_filename(uri, filename)
-        d.addCallback(lambda res: filename)
+    def remote_download_to_tempfile_and_delete(self, uri):
+        tempdir = tempfile.mkdtemp()
+        filename = os.path.join(tempdir, "data")
+        filenode = self.parent.create_node_from_uri(uri, name=filename)
+        if not IFileNode.providedBy(filenode):
+            raise AssertionError("The URI does not reference a file.")
+        c = FileWritingConsumer(filename)
+        d = filenode.read(c)
+        def _done(res):
+            os.remove(filename)
+            os.rmdir(tempdir)
+            return None
+        d.addCallback(_done)
         return d
 
     def remote_speed_test(self, count, size, mutable):
@@ -70,31 +111,33 @@ class ControlServer(Referenceable, service.Service):
         # phase to take more than 10 seconds. Expect worst-case latency to be
         # 300ms.
         results = {}
-        conns = self.parent.introducer_client.get_all_connections_for("storage")
-        everyone = [(peerid,rref) for (peerid, service_name, rref) in conns]
+        sb = self.parent.get_storage_broker()
+        everyone = sb.get_connected_servers()
         num_pings = int(mathutil.div_ceil(10, (len(everyone) * 0.3)))
-        everyone = everyone * num_pings
+        everyone = list(everyone) * num_pings
         d = self._do_one_ping(None, everyone, results)
         return d
     def _do_one_ping(self, res, everyone_left, results):
         if not everyone_left:
             return results
-        peerid, connection = everyone_left.pop(0)
+        server = everyone_left.pop(0)
+        server_name = server.get_longname()
+        connection = server.get_rref()
         start = time.time()
         d = connection.callRemote("get_buckets", "\x00"*16)
         def _done(ignored):
             stop = time.time()
             elapsed = stop - start
-            if peerid in results:
-                results[peerid].append(elapsed)
+            if server_name in results:
+                results[server_name].append(elapsed)
             else:
-                results[peerid] = [elapsed]
+                results[server_name] = [elapsed]
         d.addCallback(_done)
         d.addCallback(self._do_one_ping, everyone_left, results)
         def _average(res):
             averaged = {}
-            for peerid,times in results.iteritems():
-                averaged[peerid] = sum(times) / len(times)
+            for server_name,times in results.iteritems():
+                averaged[server_name] = sum(times) / len(times)
             return averaged
         d.addCallback(_average)
         return d
@@ -158,12 +201,12 @@ class SpeedTest:
                 d1.addCallback(lambda n: n.get_uri())
             elif self.mutable_mode == "upload":
                 data = open(fn,"rb").read()
-                d1 = self._n.overwrite(data)
+                d1 = self._n.overwrite(MutableData(data))
                 d1.addCallback(lambda res: self._n.get_uri())
             else:
                 up = upload.FileName(fn, convergence=None)
                 d1 = self.parent.upload(up)
-                d1.addCallback(lambda results: results.uri)
+                d1.addCallback(lambda results: results.get_uri())
             d1.addCallback(_record_uri, i)
             d1.addCallback(_upload_one_file, i+1)
             return d1
@@ -181,7 +224,12 @@ class SpeedTest:
             if i >= self.count:
                 return
             n = self.parent.create_node_from_uri(self.uris[i])
-            d1 = n.download(download.FileHandle(Discard()))
+            if not IFileNode.providedBy(n):
+                raise AssertionError("The URI does not reference a file.")
+            if n.is_mutable():
+                d1 = n.download_best_version()
+            else:
+                d1 = n.read(DiscardingConsumer())
             d1.addCallback(_download_one_file, i+1)
             return d1
         d.addCallback(_download_one_file, 0)
@@ -197,10 +245,17 @@ class SpeedTest:
             os.unlink(fn)
         return res
 
-class Discard:
+class DiscardingConsumer:
+    implements(IConsumer)
+    def __init__(self):
+        self.done = False
+    def registerProducer(self, p, streaming):
+        if streaming:
+            p.resumeProducing()
+        else:
+            while not self.done:
+                p.resumeProducing()
     def write(self, data):
         pass
-    # download_to_filehandle explicitly does not close the filehandle it was
-    # given: that is reserved for the provider of the filehandle. Therefore
-    # the lack of a close() method on this otherwise filehandle-like object
-    # is a part of the test.
+    def unregisterProducer(self):
+        self.done = True