1 import copy, os.path, stat
2 from cStringIO import StringIO
3 from zope.interface import implements
4 from twisted.internet import defer
5 from twisted.internet.interfaces import IPushProducer
6 from twisted.protocols import basic
7 from foolscap.api import eventually
8 from allmydata.interfaces import IImmutableFileNode, ICheckable, \
9 IDownloadTarget, IUploadResults
10 from allmydata.util import dictutil, log, base32
11 from allmydata.uri import CHKFileURI, LiteralFileURI
12 from allmydata.immutable.checker import Checker
13 from allmydata.check_results import CheckResults, CheckAndRepairResults
14 from allmydata.immutable.repairer import Repairer
15 from allmydata.immutable import download
17 class _ImmutableFileNodeBase(object):
18 implements(IImmutableFileNode, ICheckable)
20 def get_write_uri(self):
23 def get_readonly_uri(self):
29 def is_readonly(self):
35 def is_allowed_in_immutable_directory(self):
38 def raise_error(self):
42 return self.u.__hash__()
43 def __eq__(self, other):
44 if isinstance(other, _ImmutableFileNodeBase):
45 return self.u.__eq__(other.u)
48 def __ne__(self, other):
49 if isinstance(other, _ImmutableFileNodeBase):
50 return self.u.__eq__(other.u)
55 # like a list slice (things[2:14]), but for a file on disk
56 def __init__(self, fn, offset=0, size=None):
57 self.f = open(fn, "rb")
59 self.bytes_left = size
61 def read(self, size=None):
62 # bytes_to_read = min(size, self.bytes_left), but None>anything
64 bytes_to_read = self.bytes_left
65 elif self.bytes_left is None:
68 bytes_to_read = min(size, self.bytes_left)
69 data = self.f.read(bytes_to_read)
70 if self.bytes_left is not None:
71 self.bytes_left -= len(data)
75 implements(IDownloadTarget)
77 def __init__(self, filecap, storage_index, downloader,
78 cachedirectorymanager):
79 self._downloader = downloader
81 self._storage_index = storage_index
82 self.milestones = set() # of (offset,size,Deferred)
83 self.cachedirectorymanager = cachedirectorymanager
85 self.download_in_progress = False
87 # new ImmutableFileNode, no downloads ever performed
88 # new ImmutableFileNode, leftover file (partial)
89 # new ImmutableFileNode, leftover file (whole)
90 # download in progress, not yet complete
93 def when_range_available(self, offset, size):
94 assert isinstance(offset, (int,long))
95 assert isinstance(size, (int,long))
98 self.milestones.add( (offset,size,d) )
99 self._check_milestones()
100 if self.milestones and not self.download_in_progress:
101 self.download_in_progress = True
102 log.msg(format=("immutable filenode read [%(si)s]: " +
103 "starting download"),
104 si=base32.b2a(self._storage_index),
105 umid="h26Heg", level=log.OPERATIONAL)
106 d2 = self._downloader.download(self._uri, self)
107 d2.addBoth(self._download_done)
108 d2.addErrback(self._download_failed)
109 d2.addErrback(log.err, umid="cQaM9g")
112 def read(self, consumer, offset, size):
113 assert offset+size <= self.get_filesize()
114 if not self.cachefile:
115 self.cachefile = self.cachedirectorymanager.get_file(base32.b2a(self._storage_index))
116 f = PortionOfFile(self.cachefile.get_filename(), offset, size)
117 d = basic.FileSender().beginFileTransfer(f, consumer)
118 d.addCallback(lambda lastSent: consumer)
121 def _download_done(self, res):
122 # clear download_in_progress, so failed downloads can be re-tried
123 self.download_in_progress = False
126 def _download_failed(self, f):
127 # tell anyone who's waiting that we failed
128 for m in self.milestones:
130 eventually(d.errback, f)
131 self.milestones.clear()
133 def _check_milestones(self):
134 current_size = self.get_filesize()
135 for m in list(self.milestones):
137 if offset+size <= current_size:
138 log.msg(format=("immutable filenode read [%(si)s] " +
139 "%(offset)d+%(size)d vs %(filesize)d: " +
141 si=base32.b2a(self._storage_index),
142 offset=offset, size=size, filesize=current_size,
143 umid="nuedUg", level=log.NOISY)
144 self.milestones.discard(m)
145 eventually(d.callback, None)
147 log.msg(format=("immutable filenode read [%(si)s] " +
148 "%(offset)d+%(size)d vs %(filesize)d: " +
150 si=base32.b2a(self._storage_index),
151 offset=offset, size=size, filesize=current_size,
152 umid="8PKOhg", level=log.NOISY)
154 def get_filesize(self):
155 if not self.cachefile:
156 self.cachefile = self.cachedirectorymanager.get_file(base32.b2a(self._storage_index))
158 filesize = os.stat(self.cachefile.get_filename())[stat.ST_SIZE]
164 def open(self, size):
165 if not self.cachefile:
166 self.cachefile = self.cachedirectorymanager.get_file(base32.b2a(self._storage_index))
167 self.f = open(self.cachefile.get_filename(), "wb")
169 def write(self, data):
171 self._check_milestones()
175 self._check_milestones()
179 def register_canceller(self, cb):
183 # The following methods are just because the target might be a
184 # repairer.DownUpConnector, and just because the current CHKUpload object
185 # expects to find the storage index and encoding parameters in its
187 def set_storageindex(self, storageindex):
189 def set_encodingparams(self, encodingparams):
193 class ImmutableFileNode(_ImmutableFileNodeBase, log.PrefixingLogMixin):
194 def __init__(self, filecap, storage_broker, secret_holder,
195 downloader, history, cachedirectorymanager):
196 assert isinstance(filecap, CHKFileURI)
198 self._storage_broker = storage_broker
199 self._secret_holder = secret_holder
200 self._downloader = downloader
201 self._history = history
202 storage_index = self.get_storage_index()
203 self.download_cache = DownloadCache(filecap, storage_index, downloader,
204 cachedirectorymanager)
205 prefix = self.u.get_verify_cap().to_string()
206 log.PrefixingLogMixin.__init__(self, "allmydata.immutable.filenode", prefix=prefix)
207 self.log("starting", level=log.OPERATIONAL)
210 return self.u.get_size()
211 def get_current_size(self):
212 return defer.succeed(self.get_size())
216 def get_readcap(self):
217 return self.u.get_readonly()
218 def get_verify_cap(self):
219 return self.u.get_verify_cap()
220 def get_repair_cap(self):
221 # CHK files can be repaired with just the verifycap
222 return self.u.get_verify_cap()
225 return self.u.to_string()
227 def get_storage_index(self):
228 return self.u.storage_index
230 def check_and_repair(self, monitor, verify=False, add_lease=False):
231 verifycap = self.get_verify_cap()
232 sb = self._storage_broker
233 servers = sb.get_all_servers()
234 sh = self._secret_holder
236 c = Checker(verifycap=verifycap, servers=servers,
237 verify=verify, add_lease=add_lease, secret_holder=sh,
240 def _maybe_repair(cr):
241 crr = CheckAndRepairResults(self.u.storage_index)
242 crr.pre_repair_results = cr
244 crr.post_repair_results = cr
245 return defer.succeed(crr)
247 crr.repair_attempted = True
248 crr.repair_successful = False # until proven successful
249 def _gather_repair_results(ur):
250 assert IUploadResults.providedBy(ur), ur
251 # clone the cr -- check results to form the basic of the prr -- post-repair results
252 prr = CheckResults(cr.uri, cr.storage_index)
253 prr.data = copy.deepcopy(cr.data)
255 sm = prr.data['sharemap']
256 assert isinstance(sm, dictutil.DictOfSets), sm
257 sm.update(ur.sharemap)
258 servers_responding = set(prr.data['servers-responding'])
259 servers_responding.union(ur.sharemap.iterkeys())
260 prr.data['servers-responding'] = list(servers_responding)
261 prr.data['count-shares-good'] = len(sm)
262 prr.data['count-good-share-hosts'] = len(sm)
263 is_healthy = bool(len(sm) >= self.u.total_shares)
264 is_recoverable = bool(len(sm) >= self.u.needed_shares)
265 prr.set_healthy(is_healthy)
266 prr.set_recoverable(is_recoverable)
267 crr.repair_successful = is_healthy
268 prr.set_needs_rebalancing(len(sm) >= self.u.total_shares)
270 crr.post_repair_results = prr
272 def _repair_error(f):
273 # as with mutable repair, I'm not sure if I want to pass
274 # through a failure or not. TODO
275 crr.repair_successful = False
276 crr.repair_failure = f
278 r = Repairer(storage_broker=sb, secret_holder=sh,
279 verifycap=verifycap, monitor=monitor)
281 d.addCallbacks(_gather_repair_results, _repair_error)
284 d.addCallback(_maybe_repair)
287 def check(self, monitor, verify=False, add_lease=False):
288 verifycap = self.get_verify_cap()
289 sb = self._storage_broker
290 servers = sb.get_all_servers()
291 sh = self._secret_holder
293 v = Checker(verifycap=verifycap, servers=servers,
294 verify=verify, add_lease=add_lease, secret_holder=sh,
298 def read(self, consumer, offset=0, size=None):
299 self.log("read", offset=offset, size=size,
300 umid="UPP8FA", level=log.OPERATIONAL)
302 size = self.get_size() - offset
303 size = min(size, self.get_size() - offset)
305 if offset == 0 and size == self.get_size():
306 # don't use the cache, just do a normal streaming download
307 self.log("doing normal full download", umid="VRSBwg", level=log.OPERATIONAL)
308 target = download.ConsumerAdapter(consumer)
309 return self._downloader.download(self.get_cap(), target,
311 history=self._history)
313 d = self.download_cache.when_range_available(offset, size)
314 d.addCallback(lambda res:
315 self.download_cache.read(consumer, offset, size))
318 class LiteralProducer:
319 implements(IPushProducer)
320 def resumeProducing(self):
322 def stopProducing(self):
326 class LiteralFileNode(_ImmutableFileNodeBase):
328 def __init__(self, filecap):
329 assert isinstance(filecap, LiteralFileURI)
333 return len(self.u.data)
334 def get_current_size(self):
335 return defer.succeed(self.get_size())
339 def get_readcap(self):
341 def get_verify_cap(self):
343 def get_repair_cap(self):
347 return self.u.to_string()
349 def get_storage_index(self):
352 def check(self, monitor, verify=False, add_lease=False):
353 return defer.succeed(None)
355 def check_and_repair(self, monitor, verify=False, add_lease=False):
356 return defer.succeed(None)
358 def read(self, consumer, offset=0, size=None):
360 data = self.u.data[offset:]
362 data = self.u.data[offset:offset+size]
364 # We use twisted.protocols.basic.FileSender, which only does
365 # non-streaming, i.e. PullProducer, where the receiver/consumer must
366 # ask explicitly for each chunk of data. There are only two places in
367 # the Twisted codebase that can't handle streaming=False, both of
368 # which are in the upload path for an FTP/SFTP server
369 # (protocols.ftp.FileConsumer and
370 # vfs.adapters.ftp._FileToConsumerAdapter), neither of which is
371 # likely to be used as the target for a Tahoe download.
373 d = basic.FileSender().beginFileTransfer(StringIO(data), consumer)
374 d.addCallback(lambda lastSent: consumer)