]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/control.py
fix check-memory test, with new new (safe) control-port methods
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / control.py
index dc27d9f7e48973e743584f9e22393ac56e259d58..a78daf62fe3f391f7c31d4ba2e00b73ebd8376a0 100644 (file)
@@ -1,11 +1,14 @@
 
-import os, sys, time
+import os, time, tempfile
 from zope.interface import implements
 from twisted.application import service
 from twisted.internet import defer
-from foolscap import Referenceable
-from allmydata.interfaces import RIControlClient
-from allmydata.util import testutil, fileutil, mathutil
+from twisted.internet.interfaces import IConsumer
+from foolscap.api import Referenceable
+from allmydata.interfaces import RIControlClient, IFileNode
+from allmydata.util import fileutil, mathutil
+from allmydata.immutable import upload
+from allmydata.mutable.publish import MutableData
 from twisted.python import log
 
 def get_memory_usage():
@@ -34,28 +37,68 @@ def log_memory_usage(where=""):
                                               stats["VmPeak"],
                                               where))
 
+class FileWritingConsumer:
+    implements(IConsumer)
+    def __init__(self, filename):
+        self.done = False
+        self.f = open(filename, "wb")
+    def registerProducer(self, p, streaming):
+        if streaming:
+            p.resumeProducing()
+        else:
+            while not self.done:
+                p.resumeProducing()
+    def write(self, data):
+        self.f.write(data)
+    def unregisterProducer(self):
+        self.done = True
+        self.f.close()
 
-class ControlServer(Referenceable, service.Service, testutil.PollMixin):
+class ControlServer(Referenceable, service.Service):
     implements(RIControlClient)
 
     def remote_wait_for_client_connections(self, num_clients):
         return self.parent.debug_wait_for_client_connections(num_clients)
 
-    def remote_upload_from_file_to_uri(self, filename):
+    def remote_upload_random_data_from_file(self, size, convergence):
+        filename = tempfile.NamedTemporaryFile(delete=False).name
+        f = open(filename, "wb")
+        block = "a" * 8192
+        while size > 0:
+            l = min(size, 8192)
+            f.write(block[:l])
+            size -= l
+        f.close()
         uploader = self.parent.getServiceNamed("uploader")
-        d = uploader.upload_filename(filename)
+        u = upload.FileName(filename, convergence=convergence)
+        d = uploader.upload(u)
+        d.addCallback(lambda results: results.get_uri())
+        def _done(uri):
+            os.remove(filename)
+            return uri
+        d.addCallback(_done)
         return d
 
-    def remote_download_from_uri_to_file(self, uri, filename):
-        downloader = self.parent.getServiceNamed("downloader")
-        d = downloader.download_to_filename(uri, filename)
-        d.addCallback(lambda res: filename)
+    def remote_download_to_tempfile_and_delete(self, uri):
+        tempdir = tempfile.mkdtemp()
+        filename = os.path.join(tempdir, "data")
+        filenode = self.parent.create_node_from_uri(uri, name=filename)
+        if not IFileNode.providedBy(filenode):
+            raise AssertionError("The URI does not reference a file.")
+        c = FileWritingConsumer(filename)
+        d = filenode.read(c)
+        def _done(res):
+            os.remove(filename)
+            os.rmdir(tempdir)
+            return None
+        d.addCallback(_done)
         return d
 
-    def remote_speed_test(self, count, size):
+    def remote_speed_test(self, count, size, mutable):
         assert size > 8
-        log.msg("speed_test: count=%d, size=%d" % (count, size))
-        st = SpeedTest(self.parent, count, size)
+        log.msg("speed_test: count=%d, size=%d, mutable=%s" % (count, size,
+                                                               mutable))
+        st = SpeedTest(self.parent, count, size, mutable)
         return st.run()
 
     def remote_get_memory_usage(self):
@@ -66,39 +109,43 @@ class ControlServer(Referenceable, service.Service, testutil.PollMixin):
         # phase to take more than 10 seconds. Expect worst-case latency to be
         # 300ms.
         results = {}
-        everyone = list(self.parent.introducer_client.get_all_peers())
+        sb = self.parent.get_storage_broker()
+        everyone = sb.get_connected_servers()
         num_pings = int(mathutil.div_ceil(10, (len(everyone) * 0.3)))
-        everyone = everyone * num_pings
+        everyone = list(everyone) * num_pings
         d = self._do_one_ping(None, everyone, results)
         return d
     def _do_one_ping(self, res, everyone_left, results):
         if not everyone_left:
             return results
-        peerid, connection = everyone_left.pop(0)
+        server = everyone_left.pop(0)
+        server_name = server.get_longname()
+        connection = server.get_rref()
         start = time.time()
-        d = connection.callRemote("get_nodeid")
+        d = connection.callRemote("get_buckets", "\x00"*16)
         def _done(ignored):
             stop = time.time()
             elapsed = stop - start
-            if peerid in results:
-                results[peerid].append(elapsed)
+            if server_name in results:
+                results[server_name].append(elapsed)
             else:
-                results[peerid] = [elapsed]
+                results[server_name] = [elapsed]
         d.addCallback(_done)
         d.addCallback(self._do_one_ping, everyone_left, results)
         def _average(res):
             averaged = {}
-            for peerid,times in results.iteritems():
-                averaged[peerid] = sum(times) / len(times)
+            for server_name,times in results.iteritems():
+                averaged[server_name] = sum(times) / len(times)
             return averaged
         d.addCallback(_average)
         return d
 
 class SpeedTest:
-    def __init__(self, parent, count, size):
+    def __init__(self, parent, count, size, mutable):
         self.parent = parent
         self.count = count
         self.size = size
+        self.mutable_mode = mutable
         self.uris = {}
         self.basedir = os.path.join(self.parent.basedir, "_speed_test_data")
 
@@ -127,34 +174,60 @@ class SpeedTest:
             f.close()
 
     def do_upload(self):
-        uploader = self.parent.getServiceNamed("uploader")
-        start = time.time()
         d = defer.succeed(None)
+        def _create_slot(res):
+            d1 = self.parent.create_mutable_file("")
+            def _created(n):
+                self._n = n
+            d1.addCallback(_created)
+            return d1
+        if self.mutable_mode == "upload":
+            d.addCallback(_create_slot)
+        def _start(res):
+            self._start = time.time()
+        d.addCallback(_start)
+
         def _record_uri(uri, i):
             self.uris[i] = uri
         def _upload_one_file(ignored, i):
             if i >= self.count:
                 return
             fn = os.path.join(self.basedir, str(i))
-            d1 = uploader.upload_filename(fn)
+            if self.mutable_mode == "create":
+                data = open(fn,"rb").read()
+                d1 = self.parent.create_mutable_file(data)
+                d1.addCallback(lambda n: n.get_uri())
+            elif self.mutable_mode == "upload":
+                data = open(fn,"rb").read()
+                d1 = self._n.overwrite(MutableData(data))
+                d1.addCallback(lambda res: self._n.get_uri())
+            else:
+                up = upload.FileName(fn, convergence=None)
+                d1 = self.parent.upload(up)
+                d1.addCallback(lambda results: results.get_uri())
             d1.addCallback(_record_uri, i)
             d1.addCallback(_upload_one_file, i+1)
             return d1
         d.addCallback(_upload_one_file, 0)
         def _upload_done(ignored):
             stop = time.time()
-            self.upload_time = stop - start
+            self.upload_time = stop - self._start
         d.addCallback(_upload_done)
         return d
 
     def do_download(self):
-        downloader = self.parent.getServiceNamed("downloader")
         start = time.time()
         d = defer.succeed(None)
         def _download_one_file(ignored, i):
             if i >= self.count:
                 return
-            d1 = downloader.download_to_filehandle(self.uris[i], Discard())
+            n = self.parent.create_node_from_uri(self.uris[i])
+            if not IFileNode.providedBy(n):
+                raise AssertionError("The URI does not reference a file.")
+            if n.is_mutable():
+                d1 = n.download_best_version()
+            else:
+                d1 = n.read(DiscardingConsumer())
             d1.addCallback(_download_one_file, i+1)
             return d1
         d.addCallback(_download_one_file, 0)
@@ -170,10 +243,17 @@ class SpeedTest:
             os.unlink(fn)
         return res
 
-class Discard:
+class DiscardingConsumer:
+    implements(IConsumer)
+    def __init__(self):
+        self.done = False
+    def registerProducer(self, p, streaming):
+        if streaming:
+            p.resumeProducing()
+        else:
+            while not self.done:
+                p.resumeProducing()
     def write(self, data):
         pass
-    # download_to_filehandle explicitly does not close the filehandle it was
-    # given: that is reserved for the provider of the filehandle. Therefore
-    # the lack of a close() method on this otherwise filehandle-like object
-    # is a part of the test.
+    def unregisterProducer(self):
+        self.done = True