2 import os, time, tempfile
3 from zope.interface import implements
4 from twisted.application import service
5 from twisted.internet import defer
6 from twisted.internet.interfaces import IConsumer
7 from foolscap.api import Referenceable
8 from allmydata.interfaces import RIControlClient, IFileNode
9 from allmydata.util import fileutil, mathutil
10 from allmydata.immutable import upload
11 from allmydata.mutable.publish import MutableData
12 from twisted.python import log
14 def get_memory_usage():
15 # this is obviously linux-specific
16 stat_names = ("VmPeak",
22 for line in open("/proc/self/status", "r").readlines():
23 name, right = line.split(":",2)
24 if name in stat_names:
25 assert right.endswith(" kB\n")
27 stats[name] = int(right) * 1024
29 # Probably not on (a compatible version of) Linux
34 def log_memory_usage(where=""):
35 stats = get_memory_usage()
36 log.msg("VmSize: %9d VmPeak: %9d %s" % (stats["VmSize"],
40 class FileWritingConsumer:
42 def __init__(self, filename):
44 self.f = open(filename, "wb")
45 def registerProducer(self, p, streaming):
51 def write(self, data):
53 def unregisterProducer(self):
57 class ControlServer(Referenceable, service.Service):
58 implements(RIControlClient)
60 def remote_wait_for_client_connections(self, num_clients):
61 return self.parent.debug_wait_for_client_connections(num_clients)
63 def remote_upload_random_data_from_file(self, size, convergence):
64 tempdir = tempfile.mkdtemp()
65 filename = os.path.join(tempdir, "data")
66 f = open(filename, "wb")
73 uploader = self.parent.getServiceNamed("uploader")
74 u = upload.FileName(filename, convergence=convergence)
75 d = uploader.upload(u)
76 d.addCallback(lambda results: results.get_uri())
84 def remote_download_to_tempfile_and_delete(self, uri):
85 tempdir = tempfile.mkdtemp()
86 filename = os.path.join(tempdir, "data")
87 filenode = self.parent.create_node_from_uri(uri, name=filename)
88 if not IFileNode.providedBy(filenode):
89 raise AssertionError("The URI does not reference a file.")
90 c = FileWritingConsumer(filename)
99 def remote_speed_test(self, count, size, mutable):
101 log.msg("speed_test: count=%d, size=%d, mutable=%s" % (count, size,
103 st = SpeedTest(self.parent, count, size, mutable)
106 def remote_get_memory_usage(self):
107 return get_memory_usage()
109 def remote_measure_peer_response_time(self):
110 # I'd like to average together several pings, but I don't want this
111 # phase to take more than 10 seconds. Expect worst-case latency to be
114 sb = self.parent.get_storage_broker()
115 everyone = sb.get_connected_servers()
116 num_pings = int(mathutil.div_ceil(10, (len(everyone) * 0.3)))
117 everyone = list(everyone) * num_pings
118 d = self._do_one_ping(None, everyone, results)
120 def _do_one_ping(self, res, everyone_left, results):
121 if not everyone_left:
123 server = everyone_left.pop(0)
124 server_name = server.get_longname()
125 connection = server.get_rref()
127 d = connection.callRemote("get_buckets", "\x00"*16)
130 elapsed = stop - start
131 if server_name in results:
132 results[server_name].append(elapsed)
134 results[server_name] = [elapsed]
136 d.addCallback(self._do_one_ping, everyone_left, results)
139 for server_name,times in results.iteritems():
140 averaged[server_name] = sum(times) / len(times)
142 d.addCallback(_average)
146 def __init__(self, parent, count, size, mutable):
150 self.mutable_mode = mutable
152 self.basedir = os.path.join(self.parent.basedir, "_speed_test_data")
157 d.addCallback(lambda res: self.do_download())
158 d.addBoth(self.do_cleanup)
159 d.addCallback(lambda res: (self.upload_time, self.download_time))
162 def create_data(self):
163 fileutil.make_dirs(self.basedir)
164 for i in range(self.count):
166 fn = os.path.join(self.basedir, str(i))
167 if os.path.exists(fn):
170 f.write(os.urandom(8))
174 f.write("\x00" * chunk)
179 d = defer.succeed(None)
180 def _create_slot(res):
181 d1 = self.parent.create_mutable_file("")
184 d1.addCallback(_created)
186 if self.mutable_mode == "upload":
187 d.addCallback(_create_slot)
189 self._start = time.time()
190 d.addCallback(_start)
192 def _record_uri(uri, i):
194 def _upload_one_file(ignored, i):
197 fn = os.path.join(self.basedir, str(i))
198 if self.mutable_mode == "create":
199 data = open(fn,"rb").read()
200 d1 = self.parent.create_mutable_file(data)
201 d1.addCallback(lambda n: n.get_uri())
202 elif self.mutable_mode == "upload":
203 data = open(fn,"rb").read()
204 d1 = self._n.overwrite(MutableData(data))
205 d1.addCallback(lambda res: self._n.get_uri())
207 up = upload.FileName(fn, convergence=None)
208 d1 = self.parent.upload(up)
209 d1.addCallback(lambda results: results.get_uri())
210 d1.addCallback(_record_uri, i)
211 d1.addCallback(_upload_one_file, i+1)
213 d.addCallback(_upload_one_file, 0)
214 def _upload_done(ignored):
216 self.upload_time = stop - self._start
217 d.addCallback(_upload_done)
220 def do_download(self):
222 d = defer.succeed(None)
223 def _download_one_file(ignored, i):
226 n = self.parent.create_node_from_uri(self.uris[i])
227 if not IFileNode.providedBy(n):
228 raise AssertionError("The URI does not reference a file.")
230 d1 = n.download_best_version()
232 d1 = n.read(DiscardingConsumer())
233 d1.addCallback(_download_one_file, i+1)
235 d.addCallback(_download_one_file, 0)
236 def _download_done(ignored):
238 self.download_time = stop - start
239 d.addCallback(_download_done)
242 def do_cleanup(self, res):
243 for i in range(self.count):
244 fn = os.path.join(self.basedir, str(i))
248 class DiscardingConsumer:
249 implements(IConsumer)
252 def registerProducer(self, p, streaming):
258 def write(self, data):
260 def unregisterProducer(self):