1 import copy, os.path, stat
2 from cStringIO import StringIO
3 from zope.interface import implements
4 from twisted.internet import defer
5 from twisted.internet.interfaces import IPushProducer, IConsumer
6 from twisted.protocols import basic
7 from foolscap.api import eventually
8 from allmydata.interfaces import IFileNode, IFileURI, ICheckable, \
9 IDownloadTarget, IUploadResults
10 from allmydata.util import dictutil, log, base32
11 from allmydata.util.assertutil import precondition
12 from allmydata import uri as urimodule
13 from allmydata.immutable.checker import Checker
14 from allmydata.check_results import CheckResults, CheckAndRepairResults
15 from allmydata.immutable.repairer import Repairer
16 from allmydata.immutable import download
18 class _ImmutableFileNodeBase(object):
19 implements(IFileNode, ICheckable)
21 def __init__(self, uri, client):
22 precondition(urimodule.IImmutableFileURI.providedBy(uri), uri)
23 self.u = IFileURI(uri)
26 def get_readonly_uri(self):
32 def is_readonly(self):
36 return self.u.__hash__()
37 def __eq__(self, other):
38 if IFileNode.providedBy(other):
39 return self.u.__eq__(other.u)
42 def __ne__(self, other):
43 if IFileNode.providedBy(other):
44 return self.u.__eq__(other.u)
49 # like a list slice (things[2:14]), but for a file on disk
50 def __init__(self, fn, offset=0, size=None):
51 self.f = open(fn, "rb")
53 self.bytes_left = size
55 def read(self, size=None):
56 # bytes_to_read = min(size, self.bytes_left), but None>anything
58 bytes_to_read = self.bytes_left
59 elif self.bytes_left is None:
62 bytes_to_read = min(size, self.bytes_left)
63 data = self.f.read(bytes_to_read)
64 if self.bytes_left is not None:
65 self.bytes_left -= len(data)
69 implements(IDownloadTarget)
71 def __init__(self, node, cachedirectorymanager):
72 self._downloader = node._client.getServiceNamed("downloader")
73 self._uri = node.get_uri()
74 self._storage_index = node.get_storage_index()
75 self.milestones = set() # of (offset,size,Deferred)
76 self.cachedirectorymanager = cachedirectorymanager
78 self.download_in_progress = False
80 # new FileNode, no downloads ever performed
81 # new FileNode, leftover file (partial)
82 # new FileNode, leftover file (whole)
83 # download in progress, not yet complete
86 def when_range_available(self, offset, size):
87 assert isinstance(offset, (int,long))
88 assert isinstance(size, (int,long))
91 self.milestones.add( (offset,size,d) )
92 self._check_milestones()
93 if self.milestones and not self.download_in_progress:
94 self.download_in_progress = True
95 log.msg(format=("immutable filenode read [%(si)s]: " +
97 si=base32.b2a(self._storage_index),
98 umid="h26Heg", level=log.OPERATIONAL)
99 d2 = self._downloader.download(self._uri, self)
100 d2.addBoth(self._download_done)
101 d2.addErrback(self._download_failed)
102 d2.addErrback(log.err, umid="cQaM9g")
105 def read(self, consumer, offset, size):
106 assert offset+size <= self.get_filesize()
107 if not self.cachefile:
108 self.cachefile = self.cachedirectorymanager.get_file(base32.b2a(self._storage_index))
109 f = PortionOfFile(self.cachefile.get_filename(), offset, size)
110 d = basic.FileSender().beginFileTransfer(f, consumer)
111 d.addCallback(lambda lastSent: consumer)
114 def _download_done(self, res):
115 # clear download_in_progress, so failed downloads can be re-tried
116 self.download_in_progress = False
119 def _download_failed(self, f):
120 # tell anyone who's waiting that we failed
121 for m in self.milestones:
123 eventually(d.errback, f)
124 self.milestones.clear()
126 def _check_milestones(self):
127 current_size = self.get_filesize()
128 for m in list(self.milestones):
130 if offset+size <= current_size:
131 log.msg(format=("immutable filenode read [%(si)s] " +
132 "%(offset)d+%(size)d vs %(filesize)d: " +
134 si=base32.b2a(self._storage_index),
135 offset=offset, size=size, filesize=current_size,
136 umid="nuedUg", level=log.NOISY)
137 self.milestones.discard(m)
138 eventually(d.callback, None)
140 log.msg(format=("immutable filenode read [%(si)s] " +
141 "%(offset)d+%(size)d vs %(filesize)d: " +
143 si=base32.b2a(self._storage_index),
144 offset=offset, size=size, filesize=current_size,
145 umid="8PKOhg", level=log.NOISY)
147 def get_filesize(self):
148 if not self.cachefile:
149 self.cachefile = self.cachedirectorymanager.get_file(base32.b2a(self._storage_index))
151 filesize = os.stat(self.cachefile.get_filename())[stat.ST_SIZE]
157 def open(self, size):
158 if not self.cachefile:
159 self.cachefile = self.cachedirectorymanager.get_file(base32.b2a(self._storage_index))
160 self.f = open(self.cachefile.get_filename(), "wb")
162 def write(self, data):
164 self._check_milestones()
168 self._check_milestones()
172 def register_canceller(self, cb):
176 # The following methods are just because the target might be a repairer.DownUpConnector,
177 # and just because the current CHKUpload object expects to find the storage index and
178 # encoding parameters in its Uploadable.
179 def set_storageindex(self, storageindex):
181 def set_encodingparams(self, encodingparams):
185 class FileNode(_ImmutableFileNodeBase, log.PrefixingLogMixin):
186 def __init__(self, uri, client, cachedirectorymanager):
187 _ImmutableFileNodeBase.__init__(self, uri, client)
188 self.download_cache = DownloadCache(self, cachedirectorymanager)
189 prefix = uri.get_verify_cap().to_string()
190 log.PrefixingLogMixin.__init__(self, "allmydata.immutable.filenode", prefix=prefix)
191 self.log("starting", level=log.OPERATIONAL)
194 return self.u.to_string()
197 return self.u.get_size()
199 def get_verify_cap(self):
200 return self.u.get_verify_cap()
202 def get_repair_cap(self):
203 # CHK files can be repaired with just the verifycap
204 return self.u.get_verify_cap()
206 def get_storage_index(self):
207 return self.u.storage_index
209 def check_and_repair(self, monitor, verify=False, add_lease=False):
210 verifycap = self.get_verify_cap()
211 sb = self._client.get_storage_broker()
212 servers = sb.get_all_servers()
214 c = Checker(client=self._client, verifycap=verifycap, servers=servers,
215 verify=verify, add_lease=add_lease, monitor=monitor)
217 def _maybe_repair(cr):
218 crr = CheckAndRepairResults(self.u.storage_index)
219 crr.pre_repair_results = cr
221 crr.post_repair_results = cr
222 return defer.succeed(crr)
224 crr.repair_attempted = True
225 crr.repair_successful = False # until proven successful
226 def _gather_repair_results(ur):
227 assert IUploadResults.providedBy(ur), ur
228 # clone the cr -- check results to form the basic of the prr -- post-repair results
229 prr = CheckResults(cr.uri, cr.storage_index)
230 prr.data = copy.deepcopy(cr.data)
232 sm = prr.data['sharemap']
233 assert isinstance(sm, dictutil.DictOfSets), sm
234 sm.update(ur.sharemap)
235 servers_responding = set(prr.data['servers-responding'])
236 servers_responding.union(ur.sharemap.iterkeys())
237 prr.data['servers-responding'] = list(servers_responding)
238 prr.data['count-shares-good'] = len(sm)
239 prr.data['count-good-share-hosts'] = len(sm)
240 is_healthy = bool(len(sm) >= self.u.total_shares)
241 is_recoverable = bool(len(sm) >= self.u.needed_shares)
242 prr.set_healthy(is_healthy)
243 prr.set_recoverable(is_recoverable)
244 crr.repair_successful = is_healthy
245 prr.set_needs_rebalancing(len(sm) >= self.u.total_shares)
247 crr.post_repair_results = prr
249 def _repair_error(f):
250 # as with mutable repair, I'm not sure if I want to pass
251 # through a failure or not. TODO
252 crr.repair_successful = False
253 crr.repair_failure = f
255 r = Repairer(client=self._client, verifycap=verifycap, monitor=monitor)
257 d.addCallbacks(_gather_repair_results, _repair_error)
260 d.addCallback(_maybe_repair)
263 def check(self, monitor, verify=False, add_lease=False):
264 verifycap = self.get_verify_cap()
265 sb = self._client.get_storage_broker()
266 servers = sb.get_all_servers()
268 v = Checker(client=self._client, verifycap=verifycap, servers=servers,
269 verify=verify, add_lease=add_lease, monitor=monitor)
272 def read(self, consumer, offset=0, size=None):
274 size = self.get_size() - offset
275 size = min(size, self.get_size() - offset)
277 if offset == 0 and size == self.get_size():
278 # don't use the cache, just do a normal streaming download
279 self.log("doing normal full download", umid="VRSBwg", level=log.OPERATIONAL)
280 return self.download(download.ConsumerAdapter(consumer))
282 d = self.download_cache.when_range_available(offset, size)
283 d.addCallback(lambda res:
284 self.download_cache.read(consumer, offset, size))
287 def download(self, target):
288 downloader = self._client.getServiceNamed("downloader")
289 history = self._client.get_history()
290 return downloader.download(self.get_uri(), target, self._parentmsgid,
293 def download_to_data(self):
294 downloader = self._client.getServiceNamed("downloader")
295 history = self._client.get_history()
296 return downloader.download_to_data(self.get_uri(), history=history)
298 class LiteralProducer:
299 implements(IPushProducer)
300 def resumeProducing(self):
302 def stopProducing(self):
306 class LiteralFileNode(_ImmutableFileNodeBase):
308 def __init__(self, uri, client):
309 precondition(urimodule.IImmutableFileURI.providedBy(uri), uri)
310 _ImmutableFileNodeBase.__init__(self, uri, client)
313 return self.u.to_string()
316 return len(self.u.data)
318 def get_verify_cap(self):
321 def get_repair_cap(self):
324 def get_storage_index(self):
327 def check(self, monitor, verify=False, add_lease=False):
328 return defer.succeed(None)
330 def check_and_repair(self, monitor, verify=False, add_lease=False):
331 return defer.succeed(None)
333 def read(self, consumer, offset=0, size=None):
335 data = self.u.data[offset:]
337 data = self.u.data[offset:offset+size]
339 # We use twisted.protocols.basic.FileSender, which only does
340 # non-streaming, i.e. PullProducer, where the receiver/consumer must
341 # ask explicitly for each chunk of data. There are only two places in
342 # the Twisted codebase that can't handle streaming=False, both of
343 # which are in the upload path for an FTP/SFTP server
344 # (protocols.ftp.FileConsumer and
345 # vfs.adapters.ftp._FileToConsumerAdapter), neither of which is
346 # likely to be used as the target for a Tahoe download.
348 d = basic.FileSender().beginFileTransfer(StringIO(data), consumer)
349 d.addCallback(lambda lastSent: consumer)
352 def download(self, target):
353 # note that this does not update the stats_provider
355 if IConsumer.providedBy(target):
356 target.registerProducer(LiteralProducer(), True)
357 target.open(len(data))
359 if IConsumer.providedBy(target):
360 target.unregisterProducer()
362 return defer.maybeDeferred(target.finish)
364 def download_to_data(self):
366 return defer.succeed(data)