3 from zope.interface import implements
4 from twisted.application import service
5 from twisted.internet import defer
6 from foolscap import Referenceable
7 from allmydata.interfaces import RIControlClient
8 from allmydata.util import fileutil, mathutil
9 from allmydata.immutable import upload, download
10 from twisted.python import log
12 def get_memory_usage():
13 # this is obviously linux-specific
14 stat_names = ("VmPeak",
20 for line in open("/proc/self/status", "r").readlines():
21 name, right = line.split(":",2)
22 if name in stat_names:
23 assert right.endswith(" kB\n")
25 stats[name] = int(right) * 1024
27 # Probably not on (a compatible version of) Linux
32 def log_memory_usage(where=""):
33 stats = get_memory_usage()
34 log.msg("VmSize: %9d VmPeak: %9d %s" % (stats["VmSize"],
39 class ControlServer(Referenceable, service.Service):
40 implements(RIControlClient)
42 def remote_wait_for_client_connections(self, num_clients):
43 return self.parent.debug_wait_for_client_connections(num_clients)
45 def remote_upload_from_file_to_uri(self, filename, convergence):
46 uploader = self.parent.getServiceNamed("uploader")
47 u = upload.FileName(filename, convergence=convergence)
48 d = uploader.upload(u)
49 d.addCallback(lambda results: results.uri)
52 def remote_download_from_uri_to_file(self, uri, filename):
53 downloader = self.parent.getServiceNamed("downloader")
54 d = downloader.download_to_filename(uri, filename)
55 d.addCallback(lambda res: filename)
58 def remote_speed_test(self, count, size, mutable):
60 log.msg("speed_test: count=%d, size=%d, mutable=%s" % (count, size,
62 st = SpeedTest(self.parent, count, size, mutable)
65 def remote_get_memory_usage(self):
66 return get_memory_usage()
68 def remote_measure_peer_response_time(self):
69 # I'd like to average together several pings, but I don't want this
70 # phase to take more than 10 seconds. Expect worst-case latency to be
73 conns = self.parent.introducer_client.get_all_connections_for("storage")
74 everyone = [(peerid,rref) for (peerid, service_name, rref) in conns]
75 num_pings = int(mathutil.div_ceil(10, (len(everyone) * 0.3)))
76 everyone = everyone * num_pings
77 d = self._do_one_ping(None, everyone, results)
79 def _do_one_ping(self, res, everyone_left, results):
82 peerid, connection = everyone_left.pop(0)
84 d = connection.callRemote("get_version")
87 elapsed = stop - start
89 results[peerid].append(elapsed)
91 results[peerid] = [elapsed]
93 d.addCallback(self._do_one_ping, everyone_left, results)
96 for peerid,times in results.iteritems():
97 averaged[peerid] = sum(times) / len(times)
99 d.addCallback(_average)
103 def __init__(self, parent, count, size, mutable):
107 self.mutable_mode = mutable
109 self.basedir = os.path.join(self.parent.basedir, "_speed_test_data")
114 d.addCallback(lambda res: self.do_download())
115 d.addBoth(self.do_cleanup)
116 d.addCallback(lambda res: (self.upload_time, self.download_time))
119 def create_data(self):
120 fileutil.make_dirs(self.basedir)
121 for i in range(self.count):
123 fn = os.path.join(self.basedir, str(i))
124 if os.path.exists(fn):
127 f.write(os.urandom(8))
131 f.write("\x00" * chunk)
136 d = defer.succeed(None)
137 def _create_slot(res):
138 d1 = self.parent.create_mutable_file("")
141 d1.addCallback(_created)
143 if self.mutable_mode == "upload":
144 d.addCallback(_create_slot)
146 self._start = time.time()
147 d.addCallback(_start)
149 def _record_uri(uri, i):
151 def _upload_one_file(ignored, i):
154 fn = os.path.join(self.basedir, str(i))
155 if self.mutable_mode == "create":
156 data = open(fn,"rb").read()
157 d1 = self.parent.create_mutable_file(data)
158 d1.addCallback(lambda n: n.get_uri())
159 elif self.mutable_mode == "upload":
160 data = open(fn,"rb").read()
161 d1 = self._n.overwrite(data)
162 d1.addCallback(lambda res: self._n.get_uri())
164 up = upload.FileName(fn, convergence=None)
165 d1 = self.parent.upload(up)
166 d1.addCallback(lambda results: results.uri)
167 d1.addCallback(_record_uri, i)
168 d1.addCallback(_upload_one_file, i+1)
170 d.addCallback(_upload_one_file, 0)
171 def _upload_done(ignored):
173 self.upload_time = stop - self._start
174 d.addCallback(_upload_done)
177 def do_download(self):
179 d = defer.succeed(None)
180 def _download_one_file(ignored, i):
183 n = self.parent.create_node_from_uri(self.uris[i])
184 d1 = n.download(download.FileHandle(Discard()))
185 d1.addCallback(_download_one_file, i+1)
187 d.addCallback(_download_one_file, 0)
188 def _download_done(ignored):
190 self.download_time = stop - start
191 d.addCallback(_download_done)
194 def do_cleanup(self, res):
195 for i in range(self.count):
196 fn = os.path.join(self.basedir, str(i))
201 def write(self, data):
203 # download_to_filehandle explicitly does not close the filehandle it was
204 # given: that is reserved for the provider of the filehandle. Therefore
205 # the lack of a close() method on this otherwise filehandle-like object
206 # is a part of the test.