1 import copy, os.path, stat
2 from cStringIO import StringIO
3 from zope.interface import implements
4 from twisted.internet import defer
5 from twisted.internet.interfaces import IPushProducer, IConsumer
6 from twisted.protocols import basic
7 from foolscap.api import eventually
8 from allmydata.interfaces import IFileNode, ICheckable, \
9 IDownloadTarget, IUploadResults
10 from allmydata.util import dictutil, log, base32
11 from allmydata.uri import CHKFileURI, LiteralFileURI
12 from allmydata.immutable.checker import Checker
13 from allmydata.check_results import CheckResults, CheckAndRepairResults
14 from allmydata.immutable.repairer import Repairer
15 from allmydata.immutable import download
17 class _ImmutableFileNodeBase(object):
18 implements(IFileNode, ICheckable)
20 def get_readonly_uri(self):
26 def is_readonly(self):
30 return self.u.__hash__()
31 def __eq__(self, other):
32 if IFileNode.providedBy(other):
33 return self.u.__eq__(other.u)
36 def __ne__(self, other):
37 if IFileNode.providedBy(other):
38 return self.u.__eq__(other.u)
43 # like a list slice (things[2:14]), but for a file on disk
44 def __init__(self, fn, offset=0, size=None):
45 self.f = open(fn, "rb")
47 self.bytes_left = size
49 def read(self, size=None):
50 # bytes_to_read = min(size, self.bytes_left), but None>anything
52 bytes_to_read = self.bytes_left
53 elif self.bytes_left is None:
56 bytes_to_read = min(size, self.bytes_left)
57 data = self.f.read(bytes_to_read)
58 if self.bytes_left is not None:
59 self.bytes_left -= len(data)
63 implements(IDownloadTarget)
65 def __init__(self, filecap, storage_index, downloader,
66 cachedirectorymanager):
67 self._downloader = downloader
69 self._storage_index = storage_index
70 self.milestones = set() # of (offset,size,Deferred)
71 self.cachedirectorymanager = cachedirectorymanager
73 self.download_in_progress = False
75 # new FileNode, no downloads ever performed
76 # new FileNode, leftover file (partial)
77 # new FileNode, leftover file (whole)
78 # download in progress, not yet complete
81 def when_range_available(self, offset, size):
82 assert isinstance(offset, (int,long))
83 assert isinstance(size, (int,long))
86 self.milestones.add( (offset,size,d) )
87 self._check_milestones()
88 if self.milestones and not self.download_in_progress:
89 self.download_in_progress = True
90 log.msg(format=("immutable filenode read [%(si)s]: " +
92 si=base32.b2a(self._storage_index),
93 umid="h26Heg", level=log.OPERATIONAL)
94 d2 = self._downloader.download(self._uri, self)
95 d2.addBoth(self._download_done)
96 d2.addErrback(self._download_failed)
97 d2.addErrback(log.err, umid="cQaM9g")
100 def read(self, consumer, offset, size):
101 assert offset+size <= self.get_filesize()
102 if not self.cachefile:
103 self.cachefile = self.cachedirectorymanager.get_file(base32.b2a(self._storage_index))
104 f = PortionOfFile(self.cachefile.get_filename(), offset, size)
105 d = basic.FileSender().beginFileTransfer(f, consumer)
106 d.addCallback(lambda lastSent: consumer)
109 def _download_done(self, res):
110 # clear download_in_progress, so failed downloads can be re-tried
111 self.download_in_progress = False
114 def _download_failed(self, f):
115 # tell anyone who's waiting that we failed
116 for m in self.milestones:
118 eventually(d.errback, f)
119 self.milestones.clear()
121 def _check_milestones(self):
122 current_size = self.get_filesize()
123 for m in list(self.milestones):
125 if offset+size <= current_size:
126 log.msg(format=("immutable filenode read [%(si)s] " +
127 "%(offset)d+%(size)d vs %(filesize)d: " +
129 si=base32.b2a(self._storage_index),
130 offset=offset, size=size, filesize=current_size,
131 umid="nuedUg", level=log.NOISY)
132 self.milestones.discard(m)
133 eventually(d.callback, None)
135 log.msg(format=("immutable filenode read [%(si)s] " +
136 "%(offset)d+%(size)d vs %(filesize)d: " +
138 si=base32.b2a(self._storage_index),
139 offset=offset, size=size, filesize=current_size,
140 umid="8PKOhg", level=log.NOISY)
142 def get_filesize(self):
143 if not self.cachefile:
144 self.cachefile = self.cachedirectorymanager.get_file(base32.b2a(self._storage_index))
146 filesize = os.stat(self.cachefile.get_filename())[stat.ST_SIZE]
152 def open(self, size):
153 if not self.cachefile:
154 self.cachefile = self.cachedirectorymanager.get_file(base32.b2a(self._storage_index))
155 self.f = open(self.cachefile.get_filename(), "wb")
157 def write(self, data):
159 self._check_milestones()
163 self._check_milestones()
167 def register_canceller(self, cb):
171 # The following methods are just because the target might be a
172 # repairer.DownUpConnector, and just because the current CHKUpload object
173 # expects to find the storage index and encoding parameters in its
175 def set_storageindex(self, storageindex):
177 def set_encodingparams(self, encodingparams):
181 class FileNode(_ImmutableFileNodeBase, log.PrefixingLogMixin):
182 def __init__(self, filecap, storage_broker, secret_holder,
183 downloader, history, cachedirectorymanager):
184 assert isinstance(filecap, CHKFileURI)
186 self._storage_broker = storage_broker
187 self._secret_holder = secret_holder
188 self._downloader = downloader
189 self._history = history
190 storage_index = self.get_storage_index()
191 self.download_cache = DownloadCache(filecap, storage_index, downloader,
192 cachedirectorymanager)
193 prefix = self.u.get_verify_cap().to_string()
194 log.PrefixingLogMixin.__init__(self, "allmydata.immutable.filenode", prefix=prefix)
195 self.log("starting", level=log.OPERATIONAL)
198 return self.u.get_size()
199 def get_current_size(self):
200 return defer.succeed(self.get_size())
204 def get_readcap(self):
205 return self.u.get_readonly()
206 def get_verify_cap(self):
207 return self.u.get_verify_cap()
208 def get_repair_cap(self):
209 # CHK files can be repaired with just the verifycap
210 return self.u.get_verify_cap()
213 return self.u.to_string()
215 def get_storage_index(self):
216 return self.u.storage_index
218 def check_and_repair(self, monitor, verify=False, add_lease=False):
219 verifycap = self.get_verify_cap()
220 sb = self._storage_broker
221 servers = sb.get_all_servers()
222 sh = self._secret_holder
224 c = Checker(verifycap=verifycap, servers=servers,
225 verify=verify, add_lease=add_lease, secret_holder=sh,
228 def _maybe_repair(cr):
229 crr = CheckAndRepairResults(self.u.storage_index)
230 crr.pre_repair_results = cr
232 crr.post_repair_results = cr
233 return defer.succeed(crr)
235 crr.repair_attempted = True
236 crr.repair_successful = False # until proven successful
237 def _gather_repair_results(ur):
238 assert IUploadResults.providedBy(ur), ur
239 # clone the cr -- check results to form the basic of the prr -- post-repair results
240 prr = CheckResults(cr.uri, cr.storage_index)
241 prr.data = copy.deepcopy(cr.data)
243 sm = prr.data['sharemap']
244 assert isinstance(sm, dictutil.DictOfSets), sm
245 sm.update(ur.sharemap)
246 servers_responding = set(prr.data['servers-responding'])
247 servers_responding.union(ur.sharemap.iterkeys())
248 prr.data['servers-responding'] = list(servers_responding)
249 prr.data['count-shares-good'] = len(sm)
250 prr.data['count-good-share-hosts'] = len(sm)
251 is_healthy = bool(len(sm) >= self.u.total_shares)
252 is_recoverable = bool(len(sm) >= self.u.needed_shares)
253 prr.set_healthy(is_healthy)
254 prr.set_recoverable(is_recoverable)
255 crr.repair_successful = is_healthy
256 prr.set_needs_rebalancing(len(sm) >= self.u.total_shares)
258 crr.post_repair_results = prr
260 def _repair_error(f):
261 # as with mutable repair, I'm not sure if I want to pass
262 # through a failure or not. TODO
263 crr.repair_successful = False
264 crr.repair_failure = f
266 r = Repairer(storage_broker=sb, secret_holder=sh,
267 verifycap=verifycap, monitor=monitor)
269 d.addCallbacks(_gather_repair_results, _repair_error)
272 d.addCallback(_maybe_repair)
275 def check(self, monitor, verify=False, add_lease=False):
276 verifycap = self.get_verify_cap()
277 sb = self._storage_broker
278 servers = sb.get_all_servers()
279 sh = self._secret_holder
281 v = Checker(verifycap=verifycap, servers=servers,
282 verify=verify, add_lease=add_lease, secret_holder=sh,
286 def read(self, consumer, offset=0, size=None):
288 size = self.get_size() - offset
289 size = min(size, self.get_size() - offset)
291 if offset == 0 and size == self.get_size():
292 # don't use the cache, just do a normal streaming download
293 self.log("doing normal full download", umid="VRSBwg", level=log.OPERATIONAL)
294 return self.download(download.ConsumerAdapter(consumer))
296 d = self.download_cache.when_range_available(offset, size)
297 d.addCallback(lambda res:
298 self.download_cache.read(consumer, offset, size))
301 def download(self, target):
302 return self._downloader.download(self.get_cap(), target,
304 history=self._history)
306 def download_to_data(self):
307 return self._downloader.download_to_data(self.get_cap(),
308 history=self._history)
309 def download_to_filename(self, filename):
310 return self._downloader.download_to_filename(self.get_cap(), filename)
312 class LiteralProducer:
313 implements(IPushProducer)
314 def resumeProducing(self):
316 def stopProducing(self):
320 class LiteralFileNode(_ImmutableFileNodeBase):
322 def __init__(self, filecap):
323 assert isinstance(filecap, LiteralFileURI)
327 return len(self.u.data)
328 def get_current_size(self):
329 return defer.succeed(self.get_size())
333 def get_readcap(self):
335 def get_verify_cap(self):
337 def get_repair_cap(self):
341 return self.u.to_string()
343 def get_storage_index(self):
346 def check(self, monitor, verify=False, add_lease=False):
347 return defer.succeed(None)
349 def check_and_repair(self, monitor, verify=False, add_lease=False):
350 return defer.succeed(None)
352 def read(self, consumer, offset=0, size=None):
354 data = self.u.data[offset:]
356 data = self.u.data[offset:offset+size]
358 # We use twisted.protocols.basic.FileSender, which only does
359 # non-streaming, i.e. PullProducer, where the receiver/consumer must
360 # ask explicitly for each chunk of data. There are only two places in
361 # the Twisted codebase that can't handle streaming=False, both of
362 # which are in the upload path for an FTP/SFTP server
363 # (protocols.ftp.FileConsumer and
364 # vfs.adapters.ftp._FileToConsumerAdapter), neither of which is
365 # likely to be used as the target for a Tahoe download.
367 d = basic.FileSender().beginFileTransfer(StringIO(data), consumer)
368 d.addCallback(lambda lastSent: consumer)
371 def download(self, target):
372 # note that this does not update the stats_provider
374 if IConsumer.providedBy(target):
375 target.registerProducer(LiteralProducer(), True)
376 target.open(len(data))
378 if IConsumer.providedBy(target):
379 target.unregisterProducer()
381 return defer.maybeDeferred(target.finish)
383 def download_to_data(self):
385 return defer.succeed(data)