]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/control.py
remove control-port upload/download file interfaces
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / control.py
index 09d10d617df5fad4e563acfb99b1d6c8a793380c..c29d5572fd6d3dd39f7aae9f7855f8fc32616978 100644 (file)
@@ -3,10 +3,12 @@ import os, time
 from zope.interface import implements
 from twisted.application import service
 from twisted.internet import defer
+from twisted.internet.interfaces import IConsumer
 from foolscap.api import Referenceable
-from allmydata.interfaces import RIControlClient
+from allmydata.interfaces import RIControlClient, IFileNode
 from allmydata.util import fileutil, mathutil
-from allmydata.immutable import upload, download
+from allmydata.immutable import upload
+from allmydata.mutable.publish import MutableData
 from twisted.python import log
 
 def get_memory_usage():
@@ -35,6 +37,22 @@ def log_memory_usage(where=""):
                                               stats["VmPeak"],
                                               where))
 
+class FileWritingConsumer:
+    implements(IConsumer)
+    def __init__(self, filename):
+        self.done = False
+        self.f = open(filename, "wb")
+    def registerProducer(self, p, streaming):
+        if streaming:
+            p.resumeProducing()
+        else:
+            while not self.done:
+                p.resumeProducing()
+    def write(self, data):
+        self.f.write(data)
+    def unregisterProducer(self):
+        self.done = True
+        self.f.close()
 
 class ControlServer(Referenceable, service.Service):
     implements(RIControlClient)
@@ -42,19 +60,6 @@ class ControlServer(Referenceable, service.Service):
     def remote_wait_for_client_connections(self, num_clients):
         return self.parent.debug_wait_for_client_connections(num_clients)
 
-    def remote_upload_from_file_to_uri(self, filename, convergence):
-        uploader = self.parent.getServiceNamed("uploader")
-        u = upload.FileName(filename, convergence=convergence)
-        d = uploader.upload(u)
-        d.addCallback(lambda results: results.uri)
-        return d
-
-    def remote_download_from_uri_to_file(self, uri, filename):
-        filenode = self.parent.create_node_from_uri(uri)
-        d = filenode.download_to_filename(filename)
-        d.addCallback(lambda res: filename)
-        return d
-
     def remote_speed_test(self, count, size, mutable):
         assert size > 8
         log.msg("speed_test: count=%d, size=%d, mutable=%s" % (count, size,
@@ -71,7 +76,7 @@ class ControlServer(Referenceable, service.Service):
         # 300ms.
         results = {}
         sb = self.parent.get_storage_broker()
-        everyone = sb.get_all_servers()
+        everyone = sb.get_connected_servers()
         num_pings = int(mathutil.div_ceil(10, (len(everyone) * 0.3)))
         everyone = list(everyone) * num_pings
         d = self._do_one_ping(None, everyone, results)
@@ -79,22 +84,24 @@ class ControlServer(Referenceable, service.Service):
     def _do_one_ping(self, res, everyone_left, results):
         if not everyone_left:
             return results
-        peerid, connection = everyone_left.pop(0)
+        server = everyone_left.pop(0)
+        server_name = server.get_longname()
+        connection = server.get_rref()
         start = time.time()
         d = connection.callRemote("get_buckets", "\x00"*16)
         def _done(ignored):
             stop = time.time()
             elapsed = stop - start
-            if peerid in results:
-                results[peerid].append(elapsed)
+            if server_name in results:
+                results[server_name].append(elapsed)
             else:
-                results[peerid] = [elapsed]
+                results[server_name] = [elapsed]
         d.addCallback(_done)
         d.addCallback(self._do_one_ping, everyone_left, results)
         def _average(res):
             averaged = {}
-            for peerid,times in results.iteritems():
-                averaged[peerid] = sum(times) / len(times)
+            for server_name,times in results.iteritems():
+                averaged[server_name] = sum(times) / len(times)
             return averaged
         d.addCallback(_average)
         return d
@@ -158,12 +165,12 @@ class SpeedTest:
                 d1.addCallback(lambda n: n.get_uri())
             elif self.mutable_mode == "upload":
                 data = open(fn,"rb").read()
-                d1 = self._n.overwrite(data)
+                d1 = self._n.overwrite(MutableData(data))
                 d1.addCallback(lambda res: self._n.get_uri())
             else:
                 up = upload.FileName(fn, convergence=None)
                 d1 = self.parent.upload(up)
-                d1.addCallback(lambda results: results.uri)
+                d1.addCallback(lambda results: results.get_uri())
             d1.addCallback(_record_uri, i)
             d1.addCallback(_upload_one_file, i+1)
             return d1
@@ -181,7 +188,12 @@ class SpeedTest:
             if i >= self.count:
                 return
             n = self.parent.create_node_from_uri(self.uris[i])
-            d1 = n.download(download.FileHandle(Discard()))
+            if not IFileNode.providedBy(n):
+                raise AssertionError("The URI does not reference a file.")
+            if n.is_mutable():
+                d1 = n.download_best_version()
+            else:
+                d1 = n.read(DiscardingConsumer())
             d1.addCallback(_download_one_file, i+1)
             return d1
         d.addCallback(_download_one_file, 0)
@@ -197,10 +209,17 @@ class SpeedTest:
             os.unlink(fn)
         return res
 
-class Discard:
+class DiscardingConsumer:
+    implements(IConsumer)
+    def __init__(self):
+        self.done = False
+    def registerProducer(self, p, streaming):
+        if streaming:
+            p.resumeProducing()
+        else:
+            while not self.done:
+                p.resumeProducing()
     def write(self, data):
         pass
-    # download_to_filehandle explicitly does not close the filehandle it was
-    # given: that is reserved for the provider of the filehandle. Therefore
-    # the lack of a close() method on this otherwise filehandle-like object
-    # is a part of the test.
+    def unregisterProducer(self):
+        self.done = True