3 from zope.interface import implements
4 from twisted.application import service
5 from twisted.internet import defer
6 from twisted.internet.interfaces import IConsumer
7 from foolscap.api import Referenceable
8 from allmydata.interfaces import RIControlClient
9 from allmydata.util import fileutil, mathutil
10 from allmydata.immutable import upload
11 from twisted.python import log
13 def get_memory_usage():
14 # this is obviously linux-specific
15 stat_names = ("VmPeak",
21 for line in open("/proc/self/status", "r").readlines():
22 name, right = line.split(":",2)
23 if name in stat_names:
24 assert right.endswith(" kB\n")
26 stats[name] = int(right) * 1024
28 # Probably not on (a compatible version of) Linux
33 def log_memory_usage(where=""):
34 stats = get_memory_usage()
35 log.msg("VmSize: %9d VmPeak: %9d %s" % (stats["VmSize"],
39 class FileWritingConsumer:
41 def __init__(self, filename):
43 self.f = open(filename, "wb")
44 def registerProducer(self, p, streaming):
50 def write(self, data):
52 def unregisterProducer(self):
56 class ControlServer(Referenceable, service.Service):
57 implements(RIControlClient)
59 def remote_wait_for_client_connections(self, num_clients):
60 return self.parent.debug_wait_for_client_connections(num_clients)
62 def remote_upload_from_file_to_uri(self, filename, convergence):
63 uploader = self.parent.getServiceNamed("uploader")
64 u = upload.FileName(filename, convergence=convergence)
65 d = uploader.upload(u)
66 d.addCallback(lambda results: results.uri)
69 def remote_download_from_uri_to_file(self, uri, filename):
70 filenode = self.parent.create_node_from_uri(uri)
71 c = FileWritingConsumer(filename)
73 d.addCallback(lambda res: filename)
76 def remote_speed_test(self, count, size, mutable):
78 log.msg("speed_test: count=%d, size=%d, mutable=%s" % (count, size,
80 st = SpeedTest(self.parent, count, size, mutable)
83 def remote_get_memory_usage(self):
84 return get_memory_usage()
86 def remote_measure_peer_response_time(self):
87 # I'd like to average together several pings, but I don't want this
88 # phase to take more than 10 seconds. Expect worst-case latency to be
91 sb = self.parent.get_storage_broker()
92 everyone = sb.get_all_servers()
93 num_pings = int(mathutil.div_ceil(10, (len(everyone) * 0.3)))
94 everyone = list(everyone) * num_pings
95 d = self._do_one_ping(None, everyone, results)
97 def _do_one_ping(self, res, everyone_left, results):
100 peerid, connection = everyone_left.pop(0)
102 d = connection.callRemote("get_buckets", "\x00"*16)
105 elapsed = stop - start
106 if peerid in results:
107 results[peerid].append(elapsed)
109 results[peerid] = [elapsed]
111 d.addCallback(self._do_one_ping, everyone_left, results)
114 for peerid,times in results.iteritems():
115 averaged[peerid] = sum(times) / len(times)
117 d.addCallback(_average)
121 def __init__(self, parent, count, size, mutable):
125 self.mutable_mode = mutable
127 self.basedir = os.path.join(self.parent.basedir, "_speed_test_data")
132 d.addCallback(lambda res: self.do_download())
133 d.addBoth(self.do_cleanup)
134 d.addCallback(lambda res: (self.upload_time, self.download_time))
137 def create_data(self):
138 fileutil.make_dirs(self.basedir)
139 for i in range(self.count):
141 fn = os.path.join(self.basedir, str(i))
142 if os.path.exists(fn):
145 f.write(os.urandom(8))
149 f.write("\x00" * chunk)
154 d = defer.succeed(None)
155 def _create_slot(res):
156 d1 = self.parent.create_mutable_file("")
159 d1.addCallback(_created)
161 if self.mutable_mode == "upload":
162 d.addCallback(_create_slot)
164 self._start = time.time()
165 d.addCallback(_start)
167 def _record_uri(uri, i):
169 def _upload_one_file(ignored, i):
172 fn = os.path.join(self.basedir, str(i))
173 if self.mutable_mode == "create":
174 data = open(fn,"rb").read()
175 d1 = self.parent.create_mutable_file(data)
176 d1.addCallback(lambda n: n.get_uri())
177 elif self.mutable_mode == "upload":
178 data = open(fn,"rb").read()
179 d1 = self._n.overwrite(data)
180 d1.addCallback(lambda res: self._n.get_uri())
182 up = upload.FileName(fn, convergence=None)
183 d1 = self.parent.upload(up)
184 d1.addCallback(lambda results: results.uri)
185 d1.addCallback(_record_uri, i)
186 d1.addCallback(_upload_one_file, i+1)
188 d.addCallback(_upload_one_file, 0)
189 def _upload_done(ignored):
191 self.upload_time = stop - self._start
192 d.addCallback(_upload_done)
195 def do_download(self):
197 d = defer.succeed(None)
198 def _download_one_file(ignored, i):
201 n = self.parent.create_node_from_uri(self.uris[i])
202 d1 = n.read(DiscardingConsumer())
203 d1.addCallback(_download_one_file, i+1)
205 d.addCallback(_download_one_file, 0)
206 def _download_done(ignored):
208 self.download_time = stop - start
209 d.addCallback(_download_done)
212 def do_cleanup(self, res):
213 for i in range(self.count):
214 fn = os.path.join(self.basedir, str(i))
218 class DiscardingConsumer:
219 implements(IConsumer)
222 def registerProducer(self, p, streaming):
228 def write(self, data):
230 def unregisterProducer(self):