-import os, time
+import os, time, tempfile
from zope.interface import implements
from twisted.application import service
-from foolscap import Referenceable
-from allmydata.interfaces import RIControlClient
-from allmydata.util import testutil, idlib
+from twisted.internet import defer
+from twisted.internet.interfaces import IConsumer
+from foolscap.api import Referenceable
+from allmydata.interfaces import RIControlClient, IFileNode
+from allmydata.util import fileutil, mathutil
+from allmydata.immutable import upload
+from allmydata.mutable.publish import MutableData
from twisted.python import log
def get_memory_usage():
#"VmHWM",
"VmData")
stats = {}
- for line in open("/proc/self/status", "r").readlines():
- name, right = line.split(":",2)
- if name in stat_names:
- assert right.endswith(" kB\n")
- right = right[:-4]
- stats[name] = int(right) * 1024
+ try:
+ for line in open("/proc/self/status", "r").readlines():
+ name, right = line.split(":",2)
+ if name in stat_names:
+ assert right.endswith(" kB\n")
+ right = right[:-4]
+ stats[name] = int(right) * 1024
+ except:
+ # Probably not on (a compatible version of) Linux
+ stats['VmSize'] = 0
+ stats['VmPeak'] = 0
return stats
def log_memory_usage(where=""):
stats["VmPeak"],
where))
+class FileWritingConsumer:
+ implements(IConsumer)
+ def __init__(self, filename):
+ self.done = False
+ self.f = open(filename, "wb")
+ def registerProducer(self, p, streaming):
+ if streaming:
+ p.resumeProducing()
+ else:
+ while not self.done:
+ p.resumeProducing()
+ def write(self, data):
+ self.f.write(data)
+ def unregisterProducer(self):
+ self.done = True
+ self.f.close()
-class ControlServer(Referenceable, service.Service, testutil.PollMixin):
+class ControlServer(Referenceable, service.Service):
implements(RIControlClient)
def remote_wait_for_client_connections(self, num_clients):
return self.parent.debug_wait_for_client_connections(num_clients)
- def remote_upload_from_file_to_uri(self, filename):
+ def remote_upload_random_data_from_file(self, size, convergence):
+ filename = tempfile.NamedTemporaryFile(delete=False).name
+ f = open(filename, "wb")
+ block = "a" * 8192
+ while size > 0:
+ l = min(size, 8192)
+ f.write(block[:l])
+ size -= l
+ f.close()
uploader = self.parent.getServiceNamed("uploader")
- d = uploader.upload_filename(filename)
+ u = upload.FileName(filename, convergence=convergence)
+ d = uploader.upload(u)
+ d.addCallback(lambda results: results.get_uri())
+ def _done(uri):
+ os.remove(filename)
+ return uri
+ d.addCallback(_done)
return d
- def remote_download_from_uri_to_file(self, uri, filename):
- downloader = self.parent.getServiceNamed("downloader")
- d = downloader.download_to_filename(uri, filename)
- d.addCallback(lambda res: filename)
+ def remote_download_to_tempfile_and_delete(self, uri):
+ tempdir = tempfile.mkdtemp()
+ filename = os.path.join(tempdir, "data")
+ filenode = self.parent.create_node_from_uri(uri, name=filename)
+ if not IFileNode.providedBy(filenode):
+ raise AssertionError("The URI does not reference a file.")
+ c = FileWritingConsumer(filename)
+ d = filenode.read(c)
+ def _done(res):
+ os.remove(filename)
+ os.rmdir(tempdir)
+ return None
+ d.addCallback(_done)
return d
- def remote_upload_speed_test(self, size):
+ def remote_speed_test(self, count, size, mutable):
assert size > 8
- fn = os.path.join(self.parent.basedir, idlib.b2a(os.urandom(8)))
- f = open(fn, "w")
- f.write(os.urandom(8))
- size -= 8
- while size > 0:
- chunk = min(size, 4096)
- f.write("\x00" * chunk)
- size -= chunk
- f.close()
- uploader = self.parent.getServiceNamed("uploader")
+ log.msg("speed_test: count=%d, size=%d, mutable=%s" % (count, size,
+ mutable))
+ st = SpeedTest(self.parent, count, size, mutable)
+ return st.run()
+
+ def remote_get_memory_usage(self):
+ return get_memory_usage()
+
+ def remote_measure_peer_response_time(self):
+ # I'd like to average together several pings, but I don't want this
+ # phase to take more than 10 seconds. Expect worst-case latency to be
+ # 300ms.
+ results = {}
+ sb = self.parent.get_storage_broker()
+ everyone = sb.get_connected_servers()
+ num_pings = int(mathutil.div_ceil(10, (len(everyone) * 0.3)))
+ everyone = list(everyone) * num_pings
+ d = self._do_one_ping(None, everyone, results)
+ return d
+ def _do_one_ping(self, res, everyone_left, results):
+ if not everyone_left:
+ return results
+ server = everyone_left.pop(0)
+ server_name = server.get_longname()
+ connection = server.get_rref()
start = time.time()
- d = uploader.upload_filename(fn)
- def _done(uri):
+ d = connection.callRemote("get_buckets", "\x00"*16)
+ def _done(ignored):
stop = time.time()
- return stop - start
+ elapsed = stop - start
+ if server_name in results:
+ results[server_name].append(elapsed)
+ else:
+ results[server_name] = [elapsed]
d.addCallback(_done)
- def _cleanup(res):
- os.unlink(fn)
- return res
- d.addBoth(_cleanup)
+ d.addCallback(self._do_one_ping, everyone_left, results)
+ def _average(res):
+ averaged = {}
+ for server_name,times in results.iteritems():
+ averaged[server_name] = sum(times) / len(times)
+ return averaged
+ d.addCallback(_average)
return d
- def remote_get_memory_usage(self):
- return get_memory_usage()
+class SpeedTest:
+ def __init__(self, parent, count, size, mutable):
+ self.parent = parent
+ self.count = count
+ self.size = size
+ self.mutable_mode = mutable
+ self.uris = {}
+ self.basedir = os.path.join(self.parent.basedir, "_speed_test_data")
+
+ def run(self):
+ self.create_data()
+ d = self.do_upload()
+ d.addCallback(lambda res: self.do_download())
+ d.addBoth(self.do_cleanup)
+ d.addCallback(lambda res: (self.upload_time, self.download_time))
+ return d
+
+ def create_data(self):
+ fileutil.make_dirs(self.basedir)
+ for i in range(self.count):
+ s = self.size
+ fn = os.path.join(self.basedir, str(i))
+ if os.path.exists(fn):
+ os.unlink(fn)
+ f = open(fn, "w")
+ f.write(os.urandom(8))
+ s -= 8
+ while s > 0:
+ chunk = min(s, 4096)
+ f.write("\x00" * chunk)
+ s -= chunk
+ f.close()
+
+ def do_upload(self):
+ d = defer.succeed(None)
+ def _create_slot(res):
+ d1 = self.parent.create_mutable_file("")
+ def _created(n):
+ self._n = n
+ d1.addCallback(_created)
+ return d1
+ if self.mutable_mode == "upload":
+ d.addCallback(_create_slot)
+ def _start(res):
+ self._start = time.time()
+ d.addCallback(_start)
+
+ def _record_uri(uri, i):
+ self.uris[i] = uri
+ def _upload_one_file(ignored, i):
+ if i >= self.count:
+ return
+ fn = os.path.join(self.basedir, str(i))
+ if self.mutable_mode == "create":
+ data = open(fn,"rb").read()
+ d1 = self.parent.create_mutable_file(data)
+ d1.addCallback(lambda n: n.get_uri())
+ elif self.mutable_mode == "upload":
+ data = open(fn,"rb").read()
+ d1 = self._n.overwrite(MutableData(data))
+ d1.addCallback(lambda res: self._n.get_uri())
+ else:
+ up = upload.FileName(fn, convergence=None)
+ d1 = self.parent.upload(up)
+ d1.addCallback(lambda results: results.get_uri())
+ d1.addCallback(_record_uri, i)
+ d1.addCallback(_upload_one_file, i+1)
+ return d1
+ d.addCallback(_upload_one_file, 0)
+ def _upload_done(ignored):
+ stop = time.time()
+ self.upload_time = stop - self._start
+ d.addCallback(_upload_done)
+ return d
+
+ def do_download(self):
+ start = time.time()
+ d = defer.succeed(None)
+ def _download_one_file(ignored, i):
+ if i >= self.count:
+ return
+ n = self.parent.create_node_from_uri(self.uris[i])
+ if not IFileNode.providedBy(n):
+ raise AssertionError("The URI does not reference a file.")
+ if n.is_mutable():
+ d1 = n.download_best_version()
+ else:
+ d1 = n.read(DiscardingConsumer())
+ d1.addCallback(_download_one_file, i+1)
+ return d1
+ d.addCallback(_download_one_file, 0)
+ def _download_done(ignored):
+ stop = time.time()
+ self.download_time = stop - start
+ d.addCallback(_download_done)
+ return d
+
+ def do_cleanup(self, res):
+ for i in range(self.count):
+ fn = os.path.join(self.basedir, str(i))
+ os.unlink(fn)
+ return res
+
+class DiscardingConsumer:
+ implements(IConsumer)
+ def __init__(self):
+ self.done = False
+ def registerProducer(self, p, streaming):
+ if streaming:
+ p.resumeProducing()
+ else:
+ while not self.done:
+ p.resumeProducing()
+ def write(self, data):
+ pass
+ def unregisterProducer(self):
+ self.done = True