3 from cStringIO import StringIO
4 from zope.interface import implements
5 from twisted.internet import defer
6 from twisted.internet.interfaces import IPushProducer, IConsumer
7 from twisted.protocols import basic
8 from foolscap.eventual import eventually
9 from allmydata.interfaces import IFileNode, IFileURI, ICheckable, \
11 from allmydata.util import log, base32
12 from allmydata import uri as urimodule
13 from allmydata.immutable.checker import Checker
14 from allmydata.check_results import CheckAndRepairResults
15 from allmydata.immutable.repairer import Repairer
16 from allmydata.immutable import download
18 class _ImmutableFileNodeBase(object):
19 implements(IFileNode, ICheckable)
21 def __init__(self, uri, client):
22 self.u = IFileURI(uri)
25 def get_readonly_uri(self):
31 def is_readonly(self):
35 return self.u.__hash__()
36 def __eq__(self, other):
37 if IFileNode.providedBy(other):
38 return self.u.__eq__(other.u)
41 def __ne__(self, other):
42 if IFileNode.providedBy(other):
43 return self.u.__eq__(other.u)
48 # like a list slice (things[2:14]), but for a file on disk
49 def __init__(self, fn, offset=0, size=None):
50 self.f = open(fn, "rb")
52 self.bytes_left = size
54 def read(self, size=None):
55 # bytes_to_read = min(size, self.bytes_left), but None>anything
57 bytes_to_read = self.bytes_left
58 elif self.bytes_left is None:
61 bytes_to_read = min(size, self.bytes_left)
62 data = self.f.read(bytes_to_read)
63 if self.bytes_left is not None:
64 self.bytes_left -= len(data)
68 implements(IDownloadTarget)
70 def __init__(self, node, cachefile):
71 self._downloader = node._client.getServiceNamed("downloader")
72 self._uri = node.get_uri()
73 self._storage_index = node.get_storage_index()
74 self.milestones = set() # of (offset,size,Deferred)
75 self.cachefile = cachefile
76 self.download_in_progress = False
78 # new FileNode, no downloads ever performed
79 # new FileNode, leftover file (partial)
80 # new FileNode, leftover file (whole)
81 # download in progress, not yet complete
84 def when_range_available(self, offset, size):
85 assert isinstance(offset, (int,long))
86 assert isinstance(size, (int,long))
89 self.milestones.add( (offset,size,d) )
90 self._check_milestones()
91 if self.milestones and not self.download_in_progress:
92 self.download_in_progress = True
93 log.msg(format=("immutable filenode read [%(si)s]: " +
95 si=base32.b2a(self._storage_index),
96 umid="h26Heg", level=log.OPERATIONAL)
97 d2 = self._downloader.download(self._uri, self)
98 d2.addBoth(self._download_done)
99 d2.addErrback(self._download_failed)
100 d2.addErrback(log.err, umid="cQaM9g")
103 def read(self, consumer, offset, size):
104 assert offset+size <= self.get_filesize()
105 f = PortionOfFile(self.cachefile.get_filename(), offset, size)
106 d = basic.FileSender().beginFileTransfer(f, consumer)
107 d.addCallback(lambda lastSent: consumer)
110 def _download_done(self, res):
111 # clear download_in_progress, so failed downloads can be re-tried
112 self.download_in_progress = False
115 def _download_failed(self, f):
116 # tell anyone who's waiting that we failed
117 for m in self.milestones:
119 eventually(d.errback, f)
120 self.milestones.clear()
122 def _check_milestones(self):
123 current_size = self.get_filesize()
124 for m in list(self.milestones):
126 if offset+size <= current_size:
127 log.msg(format=("immutable filenode read [%(si)s] " +
128 "%(offset)d+%(size)d vs %(filesize)d: " +
130 si=base32.b2a(self._storage_index),
131 offset=offset, size=size, filesize=current_size,
132 umid="nuedUg", level=log.NOISY)
133 self.milestones.discard(m)
134 eventually(d.callback, None)
136 log.msg(format=("immutable filenode read [%(si)s] " +
137 "%(offset)d+%(size)d vs %(filesize)d: " +
139 si=base32.b2a(self._storage_index),
140 offset=offset, size=size, filesize=current_size,
141 umid="8PKOhg", level=log.NOISY)
143 def get_filesize(self):
145 filesize = os.stat(self.cachefile.get_filename())[stat.ST_SIZE]
151 def open(self, size):
152 self.f = open(self.cachefile.get_filename(), "wb")
154 def write(self, data):
156 self._check_milestones()
160 self._check_milestones()
164 def register_canceller(self, cb):
171 class FileNode(_ImmutableFileNodeBase, log.PrefixingLogMixin):
172 def __init__(self, uri, client, cachefile):
173 _ImmutableFileNodeBase.__init__(self, uri, client)
174 self.download_cache = DownloadCache(self, cachefile)
175 prefix = urimodule.from_string(uri).get_verify_cap().to_string()
176 log.PrefixingLogMixin.__init__(self, "allmydata.immutable.filenode", prefix=prefix)
177 self.log("starting", level=log.OPERATIONAL)
180 return self.u.to_string()
183 return self.u.get_size()
185 def get_verify_cap(self):
186 return self.u.get_verify_cap()
188 def get_storage_index(self):
189 return self.u.storage_index
191 def check_and_repair(self, monitor, verify=False):
192 verifycap = self.get_verify_cap()
193 servers = self._client.get_servers("storage")
195 c = Checker(client=self._client, verifycap=verifycap, servers=servers, verify=verify, monitor=monitor)
197 def _maybe_repair(cr):
198 crr = CheckAndRepairResults(self.u.storage_index)
199 crr.pre_repair_results = cr
201 crr.post_repair_results = cr
202 return defer.succeed(crr)
204 def _gather_repair_results(rr):
205 crr.post_repair_results = rr
207 r = Repairer(client=self._client, verifycap=verifycap, servers=servers, monitor=monitor)
209 d.addCallback(_gather_repair_results)
212 d.addCallback(_maybe_repair)
215 def check(self, monitor, verify=False):
216 v = Checker(client=self._client, verifycap=self.get_verify_cap(), servers=self._client.get_servers("storage"), verify=verify, monitor=monitor)
219 def read(self, consumer, offset=0, size=None):
221 size = self.get_size() - offset
222 size = min(size, self.get_size() - offset)
224 if offset == 0 and size == self.get_size():
225 # don't use the cache, just do a normal streaming download
226 self.log("doing normal full download", umid="VRSBwg", level=log.OPERATIONAL)
227 return self.download(download.ConsumerAdapter(consumer))
229 d = self.download_cache.when_range_available(offset, size)
230 d.addCallback(lambda res:
231 self.download_cache.read(consumer, offset, size))
234 def download(self, target):
235 downloader = self._client.getServiceNamed("downloader")
236 return downloader.download(self.get_uri(), target, self._parentmsgid)
238 def download_to_data(self):
239 downloader = self._client.getServiceNamed("downloader")
240 return downloader.download_to_data(self.get_uri())
242 class LiteralProducer:
243 implements(IPushProducer)
244 def resumeProducing(self):
246 def stopProducing(self):
250 class LiteralFileNode(_ImmutableFileNodeBase):
252 def __init__(self, uri, client):
253 _ImmutableFileNodeBase.__init__(self, uri, client)
256 return self.u.to_string()
259 return len(self.u.data)
261 def get_verify_cap(self):
264 def get_storage_index(self):
267 def check(self, monitor, verify=False):
268 return defer.succeed(None)
270 def check_and_repair(self, monitor, verify=False):
271 return defer.succeed(None)
273 def read(self, consumer, offset=0, size=None):
275 data = self.u.data[offset:]
277 data = self.u.data[offset:offset+size]
279 # We use twisted.protocols.basic.FileSender, which only does
280 # non-streaming, i.e. PullProducer, where the receiver/consumer must
281 # ask explicitly for each chunk of data. There are only two places in
282 # the Twisted codebase that can't handle streaming=False, both of
283 # which are in the upload path for an FTP/SFTP server
284 # (protocols.ftp.FileConsumer and
285 # vfs.adapters.ftp._FileToConsumerAdapter), neither of which is
286 # likely to be used as the target for a Tahoe download.
288 d = basic.FileSender().beginFileTransfer(StringIO(data), consumer)
289 d.addCallback(lambda lastSent: consumer)
292 def download(self, target):
293 # note that this does not update the stats_provider
295 if IConsumer.providedBy(target):
296 target.registerProducer(LiteralProducer(), True)
297 target.open(len(data))
299 if IConsumer.providedBy(target):
300 target.unregisterProducer()
302 return defer.maybeDeferred(target.finish)
304 def download_to_data(self):
306 return defer.succeed(data)