3 from cStringIO import StringIO
4 from zope.interface import implements
5 from twisted.internet import defer
6 from twisted.internet.interfaces import IPushProducer, IConsumer
7 from twisted.protocols import basic
8 from foolscap.eventual import eventually
9 from allmydata.interfaces import IFileNode, IFileURI, ICheckable, \
11 from allmydata.util import log, base32
12 from allmydata.uri import from_string as uri_from_string
13 from allmydata.immutable.checker import SimpleCHKFileChecker, \
15 from allmydata.immutable import download
17 class _ImmutableFileNodeBase(object):
18 implements(IFileNode, ICheckable)
20 def __init__(self, uri, client):
21 self.u = IFileURI(uri)
24 def get_readonly_uri(self):
30 def is_readonly(self):
34 return self.u.__hash__()
35 def __eq__(self, other):
36 if IFileNode.providedBy(other):
37 return self.u.__eq__(other.u)
40 def __ne__(self, other):
41 if IFileNode.providedBy(other):
42 return self.u.__eq__(other.u)
47 # like a list slice (things[2:14]), but for a file on disk
48 def __init__(self, fn, offset=0, size=None):
49 self.f = open(fn, "rb")
51 self.bytes_left = size
53 def read(self, size=None):
54 # bytes_to_read = min(size, self.bytes_left), but None>anything
56 bytes_to_read = self.bytes_left
57 elif self.bytes_left is None:
60 bytes_to_read = min(size, self.bytes_left)
61 data = self.f.read(bytes_to_read)
62 if self.bytes_left is not None:
63 self.bytes_left -= len(data)
67 implements(IDownloadTarget)
69 def __init__(self, node, cachefile):
70 self._downloader = node._client.getServiceNamed("downloader")
71 self._uri = node.get_uri()
72 self._storage_index = node.get_storage_index()
73 self.milestones = set() # of (offset,size,Deferred)
74 self.cachefile = cachefile
75 self.download_in_progress = False
77 # new FileNode, no downloads ever performed
78 # new FileNode, leftover file (partial)
79 # new FileNode, leftover file (whole)
80 # download in progress, not yet complete
83 def when_range_available(self, offset, size):
84 assert isinstance(offset, (int,long))
85 assert isinstance(size, (int,long))
88 self.milestones.add( (offset,size,d) )
89 self._check_milestones()
90 if self.milestones and not self.download_in_progress:
91 self.download_in_progress = True
92 log.msg(format=("immutable filenode read [%(si)s]: " +
94 si=base32.b2a(self._storage_index),
95 umid="h26Heg", level=log.OPERATIONAL)
96 d2 = self._downloader.download(self._uri, self)
97 d2.addBoth(self._download_done)
98 d2.addErrback(self._download_failed)
99 d2.addErrback(log.err, umid="cQaM9g")
102 def read(self, consumer, offset, size):
103 assert offset+size <= self.get_filesize()
104 f = PortionOfFile(self.cachefile.get_filename(), offset, size)
105 d = basic.FileSender().beginFileTransfer(f, consumer)
106 d.addCallback(lambda lastSent: consumer)
109 def _download_done(self, res):
110 # clear download_in_progress, so failed downloads can be re-tried
111 self.download_in_progress = False
114 def _download_failed(self, f):
115 # tell anyone who's waiting that we failed
116 for m in self.milestones:
118 eventually(d.errback, f)
119 self.milestones.clear()
121 def _check_milestones(self):
122 current_size = self.get_filesize()
123 for m in list(self.milestones):
125 if offset+size <= current_size:
126 log.msg(format=("immutable filenode read [%(si)s] " +
127 "%(offset)d+%(size)d vs %(filesize)d: " +
129 si=base32.b2a(self._storage_index),
130 offset=offset, size=size, filesize=current_size,
131 umid="nuedUg", level=log.NOISY)
132 self.milestones.discard(m)
133 eventually(d.callback, None)
135 log.msg(format=("immutable filenode read [%(si)s] " +
136 "%(offset)d+%(size)d vs %(filesize)d: " +
138 si=base32.b2a(self._storage_index),
139 offset=offset, size=size, filesize=current_size,
140 umid="8PKOhg", level=log.NOISY)
142 def get_filesize(self):
144 filesize = os.stat(self.cachefile.get_filename())[stat.ST_SIZE]
150 def open(self, size):
151 self.f = open(self.cachefile.get_filename(), "wb")
153 def write(self, data):
155 self._check_milestones()
159 self._check_milestones()
163 def register_canceller(self, cb):
170 class FileNode(_ImmutableFileNodeBase):
171 checker_class = SimpleCHKFileChecker
172 verifier_class = SimpleCHKFileVerifier
174 def __init__(self, uri, client, cachefile):
175 _ImmutableFileNodeBase.__init__(self, uri, client)
176 self.download_cache = DownloadCache(self, cachefile)
179 return self.u.to_string()
182 return self.u.get_size()
184 def get_verifier(self):
185 return self.u.get_verifier()
187 def get_storage_index(self):
188 return self.u.storage_index
190 def check(self, monitor, verify=False):
191 # TODO: pass the Monitor to SimpleCHKFileChecker or
192 # SimpleCHKFileVerifier, have it call monitor.raise_if_cancelled()
193 # before sending each request.
194 storage_index = self.u.storage_index
195 assert IFileURI.providedBy(self.u), self.u
196 k = self.u.needed_shares
197 N = self.u.total_shares
199 ueb_hash = self.u.uri_extension_hash
201 v = self.verifier_class(self._client,
202 uri_from_string(self.get_uri()), storage_index,
203 k, N, size, ueb_hash)
205 v = self.checker_class(self._client,
206 uri_from_string(self.get_uri()), storage_index,
210 def check_and_repair(self, monitor, verify=False):
211 # this is a stub, to allow the deep-check tests to pass.
212 #raise NotImplementedError("not implemented yet")
213 from allmydata.checker_results import CheckAndRepairResults
214 cr = CheckAndRepairResults(self.u.storage_index)
215 d = self.check(verify)
217 cr.pre_repair_results = cr.post_repair_results = r
218 cr.repair_attempted = False
223 def read(self, consumer, offset=0, size=None):
225 size = self.get_size() - offset
226 size = min(size, self.get_size() - offset)
228 if offset == 0 and size == self.get_size():
229 # don't use the cache, just do a normal streaming download
230 log.msg(format=("immutable filenode read [%(si)s]: " +
231 "doing normal full download"),
232 si=base32.b2a(self.u.storage_index),
233 umid="VRSBwg", level=log.OPERATIONAL)
234 return self.download(download.ConsumerAdapter(consumer))
236 d = self.download_cache.when_range_available(offset, size)
237 d.addCallback(lambda res:
238 self.download_cache.read(consumer, offset, size))
241 def download(self, target):
242 downloader = self._client.getServiceNamed("downloader")
243 return downloader.download(self.get_uri(), target)
245 def download_to_data(self):
246 downloader = self._client.getServiceNamed("downloader")
247 return downloader.download_to_data(self.get_uri())
249 class LiteralProducer:
250 implements(IPushProducer)
251 def resumeProducing(self):
253 def stopProducing(self):
257 class LiteralFileNode(_ImmutableFileNodeBase):
259 def __init__(self, uri, client):
260 _ImmutableFileNodeBase.__init__(self, uri, client)
263 return self.u.to_string()
266 return len(self.u.data)
268 def get_verifier(self):
271 def get_storage_index(self):
274 def check(self, monitor, verify=False):
275 return defer.succeed(None)
277 def check_and_repair(self, monitor, verify=False):
278 return defer.succeed(None)
280 def read(self, consumer, offset=0, size=None):
282 data = self.u.data[offset:]
284 data = self.u.data[offset:offset+size]
286 # We use twisted.protocols.basic.FileSender, which only does
287 # non-streaming, i.e. PullProducer, where the receiver/consumer must
288 # ask explicitly for each chunk of data. There are only two places in
289 # the Twisted codebase that can't handle streaming=False, both of
290 # which are in the upload path for an FTP/SFTP server
291 # (protocols.ftp.FileConsumer and
292 # vfs.adapters.ftp._FileToConsumerAdapter), neither of which is
293 # likely to be used as the target for a Tahoe download.
295 d = basic.FileSender().beginFileTransfer(StringIO(data), consumer)
296 d.addCallback(lambda lastSent: consumer)
299 def download(self, target):
300 # note that this does not update the stats_provider
302 if IConsumer.providedBy(target):
303 target.registerProducer(LiteralProducer(), True)
304 target.open(len(data))
306 if IConsumer.providedBy(target):
307 target.unregisterProducer()
309 return defer.maybeDeferred(target.finish)
311 def download_to_data(self):
313 return defer.succeed(data)