]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/control.py
8c12b034a0e853a45b3035d01d195cbc67f83cd7
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / control.py
1
2 import os, time
3 from zope.interface import implements
4 from twisted.application import service
5 from twisted.internet import defer
6 from twisted.internet.interfaces import IConsumer
7 from foolscap.api import Referenceable
8 from allmydata.interfaces import RIControlClient
9 from allmydata.util import fileutil, mathutil
10 from allmydata.immutable import upload
11 from twisted.python import log
12
13 def get_memory_usage():
14     # this is obviously linux-specific
15     stat_names = ("VmPeak",
16                   "VmSize",
17                   #"VmHWM",
18                   "VmData")
19     stats = {}
20     try:
21         for line in open("/proc/self/status", "r").readlines():
22             name, right = line.split(":",2)
23             if name in stat_names:
24                 assert right.endswith(" kB\n")
25                 right = right[:-4]
26                 stats[name] = int(right) * 1024
27     except:
28         # Probably not on (a compatible version of) Linux
29         stats['VmSize'] = 0
30         stats['VmPeak'] = 0
31     return stats
32
33 def log_memory_usage(where=""):
34     stats = get_memory_usage()
35     log.msg("VmSize: %9d  VmPeak: %9d  %s" % (stats["VmSize"],
36                                               stats["VmPeak"],
37                                               where))
38
39 class FileWritingConsumer:
40     implements(IConsumer)
41     def __init__(self, filename):
42         self.done = False
43         self.f = open(filename, "wb")
44     def registerProducer(self, p, streaming):
45         if streaming:
46             p.resumeProducing()
47         else:
48             while not self.done:
49                 p.resumeProducing()
50     def write(self, data):
51         self.f.write(data)
52     def unregisterProducer(self):
53         self.done = True
54         self.f.close()
55
56 class ControlServer(Referenceable, service.Service):
57     implements(RIControlClient)
58
59     def remote_wait_for_client_connections(self, num_clients):
60         return self.parent.debug_wait_for_client_connections(num_clients)
61
62     def remote_upload_from_file_to_uri(self, filename, convergence):
63         uploader = self.parent.getServiceNamed("uploader")
64         u = upload.FileName(filename, convergence=convergence)
65         d = uploader.upload(u)
66         d.addCallback(lambda results: results.uri)
67         return d
68
69     def remote_download_from_uri_to_file(self, uri, filename):
70         filenode = self.parent.create_node_from_uri(uri)
71         c = FileWritingConsumer(filename)
72         d = filenode.read(c)
73         d.addCallback(lambda res: filename)
74         return d
75
76     def remote_speed_test(self, count, size, mutable):
77         assert size > 8
78         log.msg("speed_test: count=%d, size=%d, mutable=%s" % (count, size,
79                                                                mutable))
80         st = SpeedTest(self.parent, count, size, mutable)
81         return st.run()
82
83     def remote_get_memory_usage(self):
84         return get_memory_usage()
85
86     def remote_measure_peer_response_time(self):
87         # I'd like to average together several pings, but I don't want this
88         # phase to take more than 10 seconds. Expect worst-case latency to be
89         # 300ms.
90         results = {}
91         sb = self.parent.get_storage_broker()
92         everyone = sb.get_all_servers()
93         num_pings = int(mathutil.div_ceil(10, (len(everyone) * 0.3)))
94         everyone = list(everyone) * num_pings
95         d = self._do_one_ping(None, everyone, results)
96         return d
97     def _do_one_ping(self, res, everyone_left, results):
98         if not everyone_left:
99             return results
100         peerid, connection = everyone_left.pop(0)
101         start = time.time()
102         d = connection.callRemote("get_buckets", "\x00"*16)
103         def _done(ignored):
104             stop = time.time()
105             elapsed = stop - start
106             if peerid in results:
107                 results[peerid].append(elapsed)
108             else:
109                 results[peerid] = [elapsed]
110         d.addCallback(_done)
111         d.addCallback(self._do_one_ping, everyone_left, results)
112         def _average(res):
113             averaged = {}
114             for peerid,times in results.iteritems():
115                 averaged[peerid] = sum(times) / len(times)
116             return averaged
117         d.addCallback(_average)
118         return d
119
120 class SpeedTest:
121     def __init__(self, parent, count, size, mutable):
122         self.parent = parent
123         self.count = count
124         self.size = size
125         self.mutable_mode = mutable
126         self.uris = {}
127         self.basedir = os.path.join(self.parent.basedir, "_speed_test_data")
128
129     def run(self):
130         self.create_data()
131         d = self.do_upload()
132         d.addCallback(lambda res: self.do_download())
133         d.addBoth(self.do_cleanup)
134         d.addCallback(lambda res: (self.upload_time, self.download_time))
135         return d
136
137     def create_data(self):
138         fileutil.make_dirs(self.basedir)
139         for i in range(self.count):
140             s = self.size
141             fn = os.path.join(self.basedir, str(i))
142             if os.path.exists(fn):
143                 os.unlink(fn)
144             f = open(fn, "w")
145             f.write(os.urandom(8))
146             s -= 8
147             while s > 0:
148                 chunk = min(s, 4096)
149                 f.write("\x00" * chunk)
150                 s -= chunk
151             f.close()
152
153     def do_upload(self):
154         d = defer.succeed(None)
155         def _create_slot(res):
156             d1 = self.parent.create_mutable_file("")
157             def _created(n):
158                 self._n = n
159             d1.addCallback(_created)
160             return d1
161         if self.mutable_mode == "upload":
162             d.addCallback(_create_slot)
163         def _start(res):
164             self._start = time.time()
165         d.addCallback(_start)
166
167         def _record_uri(uri, i):
168             self.uris[i] = uri
169         def _upload_one_file(ignored, i):
170             if i >= self.count:
171                 return
172             fn = os.path.join(self.basedir, str(i))
173             if self.mutable_mode == "create":
174                 data = open(fn,"rb").read()
175                 d1 = self.parent.create_mutable_file(data)
176                 d1.addCallback(lambda n: n.get_uri())
177             elif self.mutable_mode == "upload":
178                 data = open(fn,"rb").read()
179                 d1 = self._n.overwrite(data)
180                 d1.addCallback(lambda res: self._n.get_uri())
181             else:
182                 up = upload.FileName(fn, convergence=None)
183                 d1 = self.parent.upload(up)
184                 d1.addCallback(lambda results: results.uri)
185             d1.addCallback(_record_uri, i)
186             d1.addCallback(_upload_one_file, i+1)
187             return d1
188         d.addCallback(_upload_one_file, 0)
189         def _upload_done(ignored):
190             stop = time.time()
191             self.upload_time = stop - self._start
192         d.addCallback(_upload_done)
193         return d
194
195     def do_download(self):
196         start = time.time()
197         d = defer.succeed(None)
198         def _download_one_file(ignored, i):
199             if i >= self.count:
200                 return
201             n = self.parent.create_node_from_uri(self.uris[i])
202             if n.is_mutable():
203                 d1 = n.download_best_version()
204             else:
205                 d1 = n.read(DiscardingConsumer())
206             d1.addCallback(_download_one_file, i+1)
207             return d1
208         d.addCallback(_download_one_file, 0)
209         def _download_done(ignored):
210             stop = time.time()
211             self.download_time = stop - start
212         d.addCallback(_download_done)
213         return d
214
215     def do_cleanup(self, res):
216         for i in range(self.count):
217             fn = os.path.join(self.basedir, str(i))
218             os.unlink(fn)
219         return res
220
221 class DiscardingConsumer:
222     implements(IConsumer)
223     def __init__(self):
224         self.done = False
225     def registerProducer(self, p, streaming):
226         if streaming:
227             p.resumeProducing()
228         else:
229             while not self.done:
230                 p.resumeProducing()
231     def write(self, data):
232         pass
233     def unregisterProducer(self):
234         self.done = True