]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/control.py
control.py: minor improvements
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / control.py
1
2 import os, time, tempfile
3 from zope.interface import implements
4 from twisted.application import service
5 from twisted.internet import defer
6 from twisted.internet.interfaces import IConsumer
7 from foolscap.api import Referenceable
8 from allmydata.interfaces import RIControlClient, IFileNode
9 from allmydata.util import fileutil, mathutil
10 from allmydata.immutable import upload
11 from allmydata.mutable.publish import MutableData
12 from twisted.python import log
13
14 def get_memory_usage():
15     # this is obviously linux-specific
16     stat_names = ("VmPeak",
17                   "VmSize",
18                   #"VmHWM",
19                   "VmData")
20     stats = {}
21     try:
22         for line in open("/proc/self/status", "r").readlines():
23             name, right = line.split(":",2)
24             if name in stat_names:
25                 assert right.endswith(" kB\n")
26                 right = right[:-4]
27                 stats[name] = int(right) * 1024
28     except:
29         # Probably not on (a compatible version of) Linux
30         stats['VmSize'] = 0
31         stats['VmPeak'] = 0
32     return stats
33
34 def log_memory_usage(where=""):
35     stats = get_memory_usage()
36     log.msg("VmSize: %9d  VmPeak: %9d  %s" % (stats["VmSize"],
37                                               stats["VmPeak"],
38                                               where))
39
40 class FileWritingConsumer:
41     implements(IConsumer)
42     def __init__(self, filename):
43         self.done = False
44         self.f = open(filename, "wb")
45     def registerProducer(self, p, streaming):
46         if streaming:
47             p.resumeProducing()
48         else:
49             while not self.done:
50                 p.resumeProducing()
51     def write(self, data):
52         self.f.write(data)
53     def unregisterProducer(self):
54         self.done = True
55         self.f.close()
56
57 class ControlServer(Referenceable, service.Service):
58     implements(RIControlClient)
59
60     def remote_wait_for_client_connections(self, num_clients):
61         return self.parent.debug_wait_for_client_connections(num_clients)
62
63     def remote_upload_random_data_from_file(self, size, convergence):
64         tempdir = tempfile.mkdtemp()
65         filename = os.path.join(tempdir, "data")
66         f = open(filename, "wb")
67         block = "a" * 8192
68         while size > 0:
69             l = min(size, 8192)
70             f.write(block[:l])
71             size -= l
72         f.close()
73         uploader = self.parent.getServiceNamed("uploader")
74         u = upload.FileName(filename, convergence=convergence)
75         d = uploader.upload(u)
76         d.addCallback(lambda results: results.get_uri())
77         def _done(uri):
78             os.remove(filename)
79             os.rmdir(tempdir)
80             return uri
81         d.addCallback(_done)
82         return d
83
84     def remote_download_to_tempfile_and_delete(self, uri):
85         tempdir = tempfile.mkdtemp()
86         filename = os.path.join(tempdir, "data")
87         filenode = self.parent.create_node_from_uri(uri, name=filename)
88         if not IFileNode.providedBy(filenode):
89             raise AssertionError("The URI does not reference a file.")
90         c = FileWritingConsumer(filename)
91         d = filenode.read(c)
92         def _done(res):
93             os.remove(filename)
94             os.rmdir(tempdir)
95             return None
96         d.addCallback(_done)
97         return d
98
99     def remote_speed_test(self, count, size, mutable):
100         assert size > 8
101         log.msg("speed_test: count=%d, size=%d, mutable=%s" % (count, size,
102                                                                mutable))
103         st = SpeedTest(self.parent, count, size, mutable)
104         return st.run()
105
106     def remote_get_memory_usage(self):
107         return get_memory_usage()
108
109     def remote_measure_peer_response_time(self):
110         # I'd like to average together several pings, but I don't want this
111         # phase to take more than 10 seconds. Expect worst-case latency to be
112         # 300ms.
113         results = {}
114         sb = self.parent.get_storage_broker()
115         everyone = sb.get_connected_servers()
116         num_pings = int(mathutil.div_ceil(10, (len(everyone) * 0.3)))
117         everyone = list(everyone) * num_pings
118         d = self._do_one_ping(None, everyone, results)
119         return d
120     def _do_one_ping(self, res, everyone_left, results):
121         if not everyone_left:
122             return results
123         server = everyone_left.pop(0)
124         server_name = server.get_longname()
125         connection = server.get_rref()
126         start = time.time()
127         d = connection.callRemote("get_buckets", "\x00"*16)
128         def _done(ignored):
129             stop = time.time()
130             elapsed = stop - start
131             if server_name in results:
132                 results[server_name].append(elapsed)
133             else:
134                 results[server_name] = [elapsed]
135         d.addCallback(_done)
136         d.addCallback(self._do_one_ping, everyone_left, results)
137         def _average(res):
138             averaged = {}
139             for server_name,times in results.iteritems():
140                 averaged[server_name] = sum(times) / len(times)
141             return averaged
142         d.addCallback(_average)
143         return d
144
145 class SpeedTest:
146     def __init__(self, parent, count, size, mutable):
147         self.parent = parent
148         self.count = count
149         self.size = size
150         self.mutable_mode = mutable
151         self.uris = {}
152         self.basedir = os.path.join(self.parent.basedir, "_speed_test_data")
153
154     def run(self):
155         self.create_data()
156         d = self.do_upload()
157         d.addCallback(lambda res: self.do_download())
158         d.addBoth(self.do_cleanup)
159         d.addCallback(lambda res: (self.upload_time, self.download_time))
160         return d
161
162     def create_data(self):
163         fileutil.make_dirs(self.basedir)
164         for i in range(self.count):
165             s = self.size
166             fn = os.path.join(self.basedir, str(i))
167             if os.path.exists(fn):
168                 os.unlink(fn)
169             f = open(fn, "w")
170             f.write(os.urandom(8))
171             s -= 8
172             while s > 0:
173                 chunk = min(s, 4096)
174                 f.write("\x00" * chunk)
175                 s -= chunk
176             f.close()
177
178     def do_upload(self):
179         d = defer.succeed(None)
180         def _create_slot(res):
181             d1 = self.parent.create_mutable_file("")
182             def _created(n):
183                 self._n = n
184             d1.addCallback(_created)
185             return d1
186         if self.mutable_mode == "upload":
187             d.addCallback(_create_slot)
188         def _start(res):
189             self._start = time.time()
190         d.addCallback(_start)
191
192         def _record_uri(uri, i):
193             self.uris[i] = uri
194         def _upload_one_file(ignored, i):
195             if i >= self.count:
196                 return
197             fn = os.path.join(self.basedir, str(i))
198             if self.mutable_mode == "create":
199                 data = open(fn,"rb").read()
200                 d1 = self.parent.create_mutable_file(data)
201                 d1.addCallback(lambda n: n.get_uri())
202             elif self.mutable_mode == "upload":
203                 data = open(fn,"rb").read()
204                 d1 = self._n.overwrite(MutableData(data))
205                 d1.addCallback(lambda res: self._n.get_uri())
206             else:
207                 up = upload.FileName(fn, convergence=None)
208                 d1 = self.parent.upload(up)
209                 d1.addCallback(lambda results: results.get_uri())
210             d1.addCallback(_record_uri, i)
211             d1.addCallback(_upload_one_file, i+1)
212             return d1
213         d.addCallback(_upload_one_file, 0)
214         def _upload_done(ignored):
215             stop = time.time()
216             self.upload_time = stop - self._start
217         d.addCallback(_upload_done)
218         return d
219
220     def do_download(self):
221         start = time.time()
222         d = defer.succeed(None)
223         def _download_one_file(ignored, i):
224             if i >= self.count:
225                 return
226             n = self.parent.create_node_from_uri(self.uris[i])
227             if not IFileNode.providedBy(n):
228                 raise AssertionError("The URI does not reference a file.")
229             if n.is_mutable():
230                 d1 = n.download_best_version()
231             else:
232                 d1 = n.read(DiscardingConsumer())
233             d1.addCallback(_download_one_file, i+1)
234             return d1
235         d.addCallback(_download_one_file, 0)
236         def _download_done(ignored):
237             stop = time.time()
238             self.download_time = stop - start
239         d.addCallback(_download_done)
240         return d
241
242     def do_cleanup(self, res):
243         for i in range(self.count):
244             fn = os.path.join(self.basedir, str(i))
245             os.unlink(fn)
246         return res
247
248 class DiscardingConsumer:
249     implements(IConsumer)
250     def __init__(self):
251         self.done = False
252     def registerProducer(self, p, streaming):
253         if streaming:
254             p.resumeProducing()
255         else:
256             while not self.done:
257                 p.resumeProducing()
258     def write(self, data):
259         pass
260     def unregisterProducer(self):
261         self.done = True