]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/magic_folder.py
add docstrings
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / magic_folder.py
1
2 import sys, os
3 import os.path
4 from collections import deque
5 import time
6
7 from twisted.internet import defer, reactor, task
8 from twisted.python.failure import Failure
9 from twisted.python import runtime
10 from twisted.application import service
11
12 from zope.interface import Interface, Attribute, implementer
13
14 from allmydata.util import fileutil
15 from allmydata.interfaces import IDirectoryNode
16 from allmydata.util import log
17 from allmydata.util.fileutil import precondition_abspath, get_pathinfo, ConflictError
18 from allmydata.util.assertutil import precondition, _assert
19 from allmydata.util.deferredutil import HookMixin
20 from allmydata.util.progress import PercentProgress
21 from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
22      extend_filepath, unicode_from_filepath, unicode_segments_from, \
23      quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
24 from allmydata.immutable.upload import FileName, Data
25 from allmydata import magicfolderdb, magicpath
26
27 defer.setDebugging(True)
28 IN_EXCL_UNLINK = 0x04000000L
29
30 def get_inotify_module():
31     try:
32         if sys.platform == "win32":
33             from allmydata.windows import inotify
34         elif runtime.platform.supportsINotify():
35             from twisted.internet import inotify
36         else:
37             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
38                                       "This currently requires Linux or Windows.")
39         return inotify
40     except (ImportError, AttributeError) as e:
41         log.msg(e)
42         if sys.platform == "win32":
43             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
44                                       "Windows support requires at least Vista, and has only been tested on Windows 7.")
45         raise
46
47
48 def is_new_file(pathinfo, db_entry):
49     if db_entry is None:
50         return True
51
52     if not pathinfo.exists and db_entry.size is None:
53         return False
54
55     return ((pathinfo.size, pathinfo.ctime, pathinfo.mtime) !=
56             (db_entry.size, db_entry.ctime, db_entry.mtime))
57
58
59 class MagicFolder(service.MultiService):
60     name = 'magic-folder'
61
62     def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile, umask,
63                  pending_delay=1.0, clock=None):
64         precondition_abspath(local_path_u)
65
66         service.MultiService.__init__(self)
67
68         immediate = clock is not None
69         clock = clock or reactor
70         db = magicfolderdb.get_magicfolderdb(dbfile, create_version=(magicfolderdb.SCHEMA_v1, 1))
71         if db is None:
72             return Failure(Exception('ERROR: Unable to load magic folder db.'))
73
74         # for tests
75         self._client = client
76         self._db = db
77
78         upload_dirnode = self._client.create_node_from_uri(upload_dircap)
79         collective_dirnode = self._client.create_node_from_uri(collective_dircap)
80
81         self.uploader = Uploader(client, local_path_u, db, upload_dirnode, pending_delay, clock, immediate)
82         self.downloader = Downloader(client, local_path_u, db, collective_dirnode,
83                                      upload_dirnode.get_readonly_uri(), clock, self.uploader.is_pending, umask)
84
85     def startService(self):
86         # TODO: why is this being called more than once?
87         if self.running:
88             return defer.succeed(None)
89         print "%r.startService" % (self,)
90         service.MultiService.startService(self)
91         return self.uploader.start_monitoring()
92
93     def ready(self):
94         """ready is used to signal us to start
95         processing the upload and download items...
96         """
97         self.uploader.start_uploading()  # synchronous
98         return self.downloader.start_downloading()
99
100     def finish(self):
101         print "finish"
102         d = self.uploader.stop()
103         d2 = self.downloader.stop()
104         d.addCallback(lambda ign: d2)
105         return d
106
107     def remove_service(self):
108         return service.MultiService.disownServiceParent(self)
109
110
111 class QueueMixin(HookMixin):
112     def __init__(self, client, local_path_u, db, name, clock):
113         self._client = client
114         self._local_path_u = local_path_u
115         self._local_filepath = to_filepath(local_path_u)
116         self._db = db
117         self._name = name
118         self._clock = clock
119         self._hooks = {'processed': None, 'started': None}
120         self.started_d = self.set_hook('started')
121
122         if not self._local_filepath.exists():
123             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
124                                  "but there is no directory at that location."
125                                  % quote_local_unicode_path(self._local_path_u))
126         if not self._local_filepath.isdir():
127             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
128                                  "but the thing at that location is not a directory."
129                                  % quote_local_unicode_path(self._local_path_u))
130
131         self._deque = deque()
132         # do we also want to bound on "maximum age"?
133         self._process_history = deque(maxlen=20)
134         self._lazy_tail = defer.succeed(None)
135         self._stopped = False
136         self._turn_delay = 0
137
138     def get_status(self):
139         """
140         Returns an iterable of instances that implement IQueuedItem
141         """
142         for item in self._deque:
143             yield item
144         for item in self._process_history:
145             yield item
146
147     def _get_filepath(self, relpath_u):
148         self._log("_get_filepath(%r)" % (relpath_u,))
149         return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
150
151     def _get_relpath(self, filepath):
152         self._log("_get_relpath(%r)" % (filepath,))
153         segments = unicode_segments_from(filepath, self._local_filepath)
154         self._log("segments = %r" % (segments,))
155         return u"/".join(segments)
156
157     def _count(self, counter_name, delta=1):
158         ctr = 'magic_folder.%s.%s' % (self._name, counter_name)
159         self._log("%s += %r" % (counter_name, delta))
160         self._client.stats_provider.count(ctr, delta)
161
162     def _logcb(self, res, msg):
163         self._log("%s: %r" % (msg, res))
164         return res
165
166     def _log(self, msg):
167         s = "Magic Folder %s %s: %s" % (quote_output(self._client.nickname), self._name, msg)
168         self._client.log(s)
169         print s
170         #open("events", "ab+").write(msg)
171
172     def _turn_deque(self):
173         try:
174             self._log("_turn_deque")
175             if self._stopped:
176                 self._log("stopped")
177                 return
178             try:
179                 item = IQueuedItem(self._deque.pop())
180                 self._process_history.append(item)
181
182                 self._log("popped %r, now have %d" % (item, len(self._deque)))
183                 self._count('objects_queued', -1)
184             except IndexError:
185                 self._log("deque is now empty")
186                 self._lazy_tail.addBoth(self._logcb, "whawhat empty")
187                 self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
188                 self._lazy_tail.addBoth(self._logcb, "got past _when_queue_is_empty")
189             else:
190                 self._log("_turn_deque else clause")
191                 self._lazy_tail.addBoth(self._logcb, "whawhat else %r" % (item,))
192                 self._lazy_tail.addCallback(lambda ign: self._process(item))
193                 self._lazy_tail.addBoth(self._logcb, "got past _process")
194                 self._lazy_tail.addBoth(self._call_hook, 'processed', async=True)
195                 self._lazy_tail.addBoth(self._logcb, "got past _call_hook (turn_delay = %r)" % (self._turn_delay,))
196                 self._lazy_tail.addErrback(log.err)
197                 self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
198                 self._lazy_tail.addBoth(self._logcb, "got past deferLater")
199         except Exception as e:
200             self._log("---- turn deque exception %s" % (e,))
201             raise
202
203
204 # this isn't in interfaces.py because it's very specific to QueueMixin
205 class IQueuedItem(Interface):
206     relpath_u = Attribute("The path this item represents")
207     progress = Attribute("A PercentProgress instance")
208
209     def set_status(self, status, current_time=None):
210         """
211         """
212
213     def status_time(self, state):
214         """
215         Get the time of particular state change, or None
216         """
217
218     def status_history(self):
219         """
220         All status changes, sorted latest -> oldest
221         """
222
223
224 @implementer(IQueuedItem)
225 class QueuedItem(object):
226     def __init__(self, relpath_u, progress):
227         self.relpath_u = relpath_u
228         self.progress = progress
229         self._status_history = dict()
230
231     def set_status(self, status, current_time=None):
232         if current_time is None:
233             current_time = time.time()
234         self._status_history[status] = current_time
235
236     def status_time(self, state):
237         """
238         Returns None if there's no status-update for 'state', else returns
239         the timestamp when that state was reached.
240         """
241         return self._status_history.get(state, None)
242
243     def status_history(self):
244         """
245         Returns a list of 2-tuples of (state, timestamp) sorted by timestamp
246         """
247         hist = self._status_history.items()
248         hist.sort(lambda a, b: cmp(a[1], b[1]))
249         return hist
250
251
252 class UploadItem(QueuedItem):
253     """
254     Represents a single item the _deque of the Uploader
255     """
256     pass
257
258
259 class Uploader(QueueMixin):
260     def __init__(self, client, local_path_u, db, upload_dirnode, pending_delay, clock,
261                  immediate=False):
262         QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
263
264         self.is_ready = False
265         self._immediate = immediate
266
267         if not IDirectoryNode.providedBy(upload_dirnode):
268             raise AssertionError("The URI in '%s' does not refer to a directory."
269                                  % os.path.join('private', 'magic_folder_dircap'))
270         if upload_dirnode.is_unknown() or upload_dirnode.is_readonly():
271             raise AssertionError("The URI in '%s' is not a writecap to a directory."
272                                  % os.path.join('private', 'magic_folder_dircap'))
273
274         self._upload_dirnode = upload_dirnode
275         self._inotify = get_inotify_module()
276         self._notifier = self._inotify.INotify()
277         self._pending = set()  # of unicode relpaths
278
279         self._periodic_full_scan_duration = 10 * 60 # perform a full scan every 10 minutes
280
281         if hasattr(self._notifier, 'set_pending_delay'):
282             self._notifier.set_pending_delay(pending_delay)
283
284         # TODO: what about IN_MOVE_SELF and IN_UNMOUNT?
285         #
286         self.mask = ( self._inotify.IN_CREATE
287                     | self._inotify.IN_CLOSE_WRITE
288                     | self._inotify.IN_MOVED_TO
289                     | self._inotify.IN_MOVED_FROM
290                     | self._inotify.IN_DELETE
291                     | self._inotify.IN_ONLYDIR
292                     | IN_EXCL_UNLINK
293                     )
294         self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
295                              recursive=True)
296
297     def start_monitoring(self):
298         self._log("start_monitoring")
299         d = defer.succeed(None)
300         d.addCallback(lambda ign: self._notifier.startReading())
301         d.addCallback(lambda ign: self._count('dirs_monitored'))
302         d.addBoth(self._call_hook, 'started')
303         return d
304
305     def stop(self):
306         self._log("stop")
307         self._notifier.stopReading()
308         self._count('dirs_monitored', -1)
309         self.periodic_callid.cancel()
310         if hasattr(self._notifier, 'wait_until_stopped'):
311             d = self._notifier.wait_until_stopped()
312         else:
313             d = defer.succeed(None)
314         d.addCallback(lambda ign: self._lazy_tail)
315         return d
316
317     def start_uploading(self):
318         self._log("start_uploading")
319         self.is_ready = True
320
321         all_relpaths = self._db.get_all_relpaths()
322         self._log("all relpaths: %r" % (all_relpaths,))
323
324         for relpath_u in all_relpaths:
325             self._add_pending(relpath_u)
326
327         self._full_scan()
328
329     def _extend_queue_and_keep_going(self, relpaths_u):
330         self._log("_extend_queue_and_keep_going %r" % (relpaths_u,))
331         for relpath_u in relpaths_u:
332             progress = PercentProgress()
333             item = UploadItem(relpath_u, progress)
334             item.set_status('queued', self._clock.seconds())
335             self._deque.append(item)
336
337         self._count('objects_queued', len(relpaths_u))
338
339         if self.is_ready:
340             if self._immediate:  # for tests
341                 self._turn_deque()
342             else:
343                 self._clock.callLater(0, self._turn_deque)
344
345     def _full_scan(self):
346         self.periodic_callid = self._clock.callLater(self._periodic_full_scan_duration, self._full_scan)
347         print "FULL SCAN"
348         self._log("_pending %r" % (self._pending))
349         self._scan(u"")
350         self._extend_queue_and_keep_going(self._pending)
351
352     def _add_pending(self, relpath_u):
353         self._log("add pending %r" % (relpath_u,))
354         if not magicpath.should_ignore_file(relpath_u):
355             self._pending.add(relpath_u)
356
357     def _scan(self, reldir_u):
358         # Scan a directory by (synchronously) adding the paths of all its children to self._pending.
359         # Note that this doesn't add them to the deque -- that will
360
361         self._log("scan %r" % (reldir_u,))
362         fp = self._get_filepath(reldir_u)
363         try:
364             children = listdir_filepath(fp)
365         except EnvironmentError:
366             raise Exception("WARNING: magic folder: permission denied on directory %s"
367                             % quote_filepath(fp))
368         except FilenameEncodingError:
369             raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
370                             % quote_filepath(fp))
371
372         for child in children:
373             _assert(isinstance(child, unicode), child=child)
374             self._add_pending("%s/%s" % (reldir_u, child) if reldir_u != u"" else child)
375
376     def is_pending(self, relpath_u):
377         return relpath_u in self._pending
378
379     def _notify(self, opaque, path, events_mask):
380         self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
381         relpath_u = self._get_relpath(path)
382
383         # We filter out IN_CREATE events not associated with a directory.
384         # Acting on IN_CREATE for files could cause us to read and upload
385         # a possibly-incomplete file before the application has closed it.
386         # There should always be an IN_CLOSE_WRITE after an IN_CREATE, I think.
387         # It isn't possible to avoid watching for IN_CREATE at all, because
388         # it is the only event notified for a directory creation.
389
390         if ((events_mask & self._inotify.IN_CREATE) != 0 and
391             (events_mask & self._inotify.IN_ISDIR) == 0):
392             self._log("ignoring event for %r (creation of non-directory)\n" % (relpath_u,))
393             return
394         if relpath_u in self._pending:
395             self._log("not queueing %r because it is already pending" % (relpath_u,))
396             return
397         if magicpath.should_ignore_file(relpath_u):
398             self._log("ignoring event for %r (ignorable path)" % (relpath_u,))
399             return
400
401         self._pending.add(relpath_u)
402         self._extend_queue_and_keep_going([relpath_u])
403
404     def _when_queue_is_empty(self):
405         return defer.succeed(None)
406
407     def _process(self, item):
408         # Uploader
409         relpath_u = item.relpath_u
410         self._log("_process(%r)" % (relpath_u,))
411         item.set_status('started', self._clock.seconds())
412
413         if relpath_u is None:
414             item.set_status('invalid_path', self._clock.seconds())
415             return
416         precondition(isinstance(relpath_u, unicode), relpath_u)
417         precondition(not relpath_u.endswith(u'/'), relpath_u)
418
419         d = defer.succeed(None)
420
421         def _maybe_upload(ign, now=None):
422             self._log("_maybe_upload: relpath_u=%r, now=%r" % (relpath_u, now))
423             if now is None:
424                 now = time.time()
425             fp = self._get_filepath(relpath_u)
426             pathinfo = get_pathinfo(unicode_from_filepath(fp))
427
428             self._log("about to remove %r from pending set %r" %
429                       (relpath_u, self._pending))
430             self._pending.remove(relpath_u)
431             encoded_path_u = magicpath.path2magic(relpath_u)
432
433             if not pathinfo.exists:
434                 # FIXME merge this with the 'isfile' case.
435                 self._log("notified object %s disappeared (this is normal)" % quote_filepath(fp))
436                 self._count('objects_disappeared')
437
438                 db_entry = self._db.get_db_entry(relpath_u)
439                 if db_entry is None:
440                     return None
441
442                 last_downloaded_timestamp = now  # is this correct?
443
444                 if is_new_file(pathinfo, db_entry):
445                     new_version = db_entry.version + 1
446                 else:
447                     self._log("Not uploading %r" % (relpath_u,))
448                     self._count('objects_not_uploaded')
449                     return
450
451                 metadata = { 'version': new_version,
452                              'deleted': True,
453                              'last_downloaded_timestamp': last_downloaded_timestamp }
454                 if db_entry.last_downloaded_uri is not None:
455                     metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
456
457                 empty_uploadable = Data("", self._client.convergence)
458                 d2 = self._upload_dirnode.add_file(
459                     encoded_path_u, empty_uploadable,
460                     metadata=metadata,
461                     overwrite=True,
462                     progress=item.progress,
463                 )
464
465                 def _add_db_entry(filenode):
466                     filecap = filenode.get_uri()
467                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
468                     self._db.did_upload_version(relpath_u, new_version, filecap,
469                                                 last_downloaded_uri, last_downloaded_timestamp,
470                                                 pathinfo)
471                     self._count('files_uploaded')
472                 d2.addCallback(_add_db_entry)
473                 return d2
474             elif pathinfo.islink:
475                 self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
476                 return None
477             elif pathinfo.isdir:
478                 print "ISDIR "
479                 if not getattr(self._notifier, 'recursive_includes_new_subdirectories', False):
480                     self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
481
482                 uploadable = Data("", self._client.convergence)
483                 encoded_path_u += magicpath.path2magic(u"/")
484                 self._log("encoded_path_u =  %r" % (encoded_path_u,))
485                 upload_d = self._upload_dirnode.add_file(
486                     encoded_path_u, uploadable,
487                     metadata={"version": 0},
488                     overwrite=True,
489                     progress=item.progress,
490                 )
491                 def _dir_succeeded(ign):
492                     self._log("created subdirectory %r" % (relpath_u,))
493                     self._count('directories_created')
494                 def _dir_failed(f):
495                     self._log("failed to create subdirectory %r" % (relpath_u,))
496                     return f
497                 upload_d.addCallbacks(_dir_succeeded, _dir_failed)
498                 upload_d.addCallback(lambda ign: self._scan(relpath_u))
499                 upload_d.addCallback(lambda ign: self._extend_queue_and_keep_going(self._pending))
500                 return upload_d
501             elif pathinfo.isfile:
502                 db_entry = self._db.get_db_entry(relpath_u)
503
504                 last_downloaded_timestamp = now
505
506                 if db_entry is None:
507                     new_version = 0
508                 elif is_new_file(pathinfo, db_entry):
509                     new_version = db_entry.version + 1
510                 else:
511                     self._log("Not uploading %r" % (relpath_u,))
512                     self._count('objects_not_uploaded')
513                     return None
514
515                 metadata = { 'version': new_version,
516                              'last_downloaded_timestamp': last_downloaded_timestamp }
517                 if db_entry is not None and db_entry.last_downloaded_uri is not None:
518                     metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
519
520                 uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
521                 d2 = self._upload_dirnode.add_file(
522                     encoded_path_u, uploadable,
523                     metadata=metadata,
524                     overwrite=True,
525                     progress=item.progress,
526                 )
527
528                 def _add_db_entry(filenode):
529                     filecap = filenode.get_uri()
530                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
531                     self._db.did_upload_version(relpath_u, new_version, filecap,
532                                                 last_downloaded_uri, last_downloaded_timestamp,
533                                                 pathinfo)
534                     self._count('files_uploaded')
535                 d2.addCallback(_add_db_entry)
536                 return d2
537             else:
538                 self.warn("WARNING: cannot process special file %s" % quote_filepath(fp))
539                 return None
540
541         d.addCallback(_maybe_upload)
542
543         def _succeeded(res):
544             self._count('objects_succeeded')
545             item.set_status('success', self._clock.seconds())
546             return res
547         def _failed(f):
548             self._count('objects_failed')
549             self._log("%s while processing %r" % (f, relpath_u))
550             item.set_status('failure', self._clock.seconds())
551             return f
552         d.addCallbacks(_succeeded, _failed)
553         return d
554
555     def _get_metadata(self, encoded_path_u):
556         try:
557             d = self._upload_dirnode.get_metadata_for(encoded_path_u)
558         except KeyError:
559             return Failure()
560         return d
561
562     def _get_filenode(self, encoded_path_u):
563         try:
564             d = self._upload_dirnode.get(encoded_path_u)
565         except KeyError:
566             return Failure()
567         return d
568
569
570 class WriteFileMixin(object):
571     FUDGE_SECONDS = 10.0
572
573     def _get_conflicted_filename(self, abspath_u):
574         return abspath_u + u".conflict"
575
576     def _write_downloaded_file(self, abspath_u, file_contents, is_conflict=False, now=None):
577         self._log("_write_downloaded_file(%r, <%d bytes>, is_conflict=%r, now=%r)"
578                   % (abspath_u, len(file_contents), is_conflict, now))
579
580         # 1. Write a temporary file, say .foo.tmp.
581         # 2. is_conflict determines whether this is an overwrite or a conflict.
582         # 3. Set the mtime of the replacement file to be T seconds before the
583         #    current local time.
584         # 4. Perform a file replacement with backup filename foo.backup,
585         #    replaced file foo, and replacement file .foo.tmp. If any step of
586         #    this operation fails, reclassify as a conflict and stop.
587         #
588         # Returns the path of the destination file.
589
590         precondition_abspath(abspath_u)
591         replacement_path_u = abspath_u + u".tmp"  # FIXME more unique
592         backup_path_u = abspath_u + u".backup"
593         if now is None:
594             now = time.time()
595
596         # ensure parent directory exists
597         head, tail = os.path.split(abspath_u)
598
599         old_mask = os.umask(self._umask)
600         try:
601             fileutil.make_dirs(head, (~ self._umask) & 0777)
602             fileutil.write(replacement_path_u, file_contents)
603         finally:
604             os.umask(old_mask)
605
606         os.utime(replacement_path_u, (now, now - self.FUDGE_SECONDS))
607         if is_conflict:
608             print "0x00 ------------ <><> is conflict; calling _rename_conflicted_file... %r %r" % (abspath_u, replacement_path_u)
609             return self._rename_conflicted_file(abspath_u, replacement_path_u)
610         else:
611             try:
612                 fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u)
613                 return abspath_u
614             except fileutil.ConflictError:
615                 return self._rename_conflicted_file(abspath_u, replacement_path_u)
616
617     def _rename_conflicted_file(self, abspath_u, replacement_path_u):
618         self._log("_rename_conflicted_file(%r, %r)" % (abspath_u, replacement_path_u))
619
620         conflict_path_u = self._get_conflicted_filename(abspath_u)
621         print "XXX rename %r %r" % (replacement_path_u, conflict_path_u)
622         if os.path.isfile(replacement_path_u):
623             print "%r exists" % (replacement_path_u,)
624         if os.path.isfile(conflict_path_u):
625             print "%r exists" % (conflict_path_u,)
626
627         fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
628         return conflict_path_u
629
630     def _rename_deleted_file(self, abspath_u):
631         self._log('renaming deleted file to backup: %s' % (abspath_u,))
632         try:
633             fileutil.rename_no_overwrite(abspath_u, abspath_u + u'.backup')
634         except OSError:
635             self._log("Already gone: '%s'" % (abspath_u,))
636         return abspath_u
637
638
639 class DownloadItem(QueuedItem):
640     """
641     Represents a single item in the _deque of the Downloader
642     """
643     def __init__(self, relpath_u, progress, filenode, metadata):
644         super(DownloadItem, self).__init__(relpath_u, progress)
645         self.file_node = filenode
646         self.metadata = metadata
647
648
649 class Downloader(QueueMixin, WriteFileMixin):
650     REMOTE_SCAN_INTERVAL = 3  # facilitates tests
651
652     def __init__(self, client, local_path_u, db, collective_dirnode,
653                  upload_readonly_dircap, clock, is_upload_pending, umask):
654         QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
655
656         if not IDirectoryNode.providedBy(collective_dirnode):
657             raise AssertionError("The URI in '%s' does not refer to a directory."
658                                  % os.path.join('private', 'collective_dircap'))
659         if collective_dirnode.is_unknown() or not collective_dirnode.is_readonly():
660             raise AssertionError("The URI in '%s' is not a readonly cap to a directory."
661                                  % os.path.join('private', 'collective_dircap'))
662
663         self._collective_dirnode = collective_dirnode
664         self._upload_readonly_dircap = upload_readonly_dircap
665         self._is_upload_pending = is_upload_pending
666         self._umask = umask
667
668     def start_downloading(self):
669         self._log("start_downloading")
670         self._turn_delay = self.REMOTE_SCAN_INTERVAL
671         files = self._db.get_all_relpaths()
672         self._log("all files %s" % files)
673
674         d = self._scan_remote_collective(scan_self=True)
675         d.addBoth(self._logcb, "after _scan_remote_collective 0")
676         self._turn_deque()
677         return d
678
679     def stop(self):
680         self._log("stop")
681         self._stopped = True
682         d = defer.succeed(None)
683         d.addCallback(lambda ign: self._lazy_tail)
684         return d
685
686     def _should_download(self, relpath_u, remote_version):
687         """
688         _should_download returns a bool indicating whether or not a remote object should be downloaded.
689         We check the remote metadata version against our magic-folder db version number;
690         latest version wins.
691         """
692         self._log("_should_download(%r, %r)" % (relpath_u, remote_version))
693         if magicpath.should_ignore_file(relpath_u):
694             self._log("nope")
695             return False
696         self._log("yep")
697         db_entry = self._db.get_db_entry(relpath_u)
698         if db_entry is None:
699             return True
700         self._log("version %r" % (db_entry.version,))
701         return (db_entry.version < remote_version)
702
703     def _get_local_latest(self, relpath_u):
704         """
705         _get_local_latest takes a unicode path string checks to see if this file object
706         exists in our magic-folder db; if not then return None
707         else check for an entry in our magic-folder db and return the version number.
708         """
709         if not self._get_filepath(relpath_u).exists():
710             return None
711         db_entry = self._db.get_db_entry(relpath_u)
712         return None if db_entry is None else db_entry.version
713
714     def _get_collective_latest_file(self, filename):
715         """
716         _get_collective_latest_file takes a file path pointing to a file managed by
717         magic-folder and returns a deferred that fires with the two tuple containing a
718         file node and metadata for the latest version of the file located in the
719         magic-folder collective directory.
720         """
721         collective_dirmap_d = self._collective_dirnode.list()
722         def scan_collective(result):
723             list_of_deferreds = []
724             for dir_name in result.keys():
725                 # XXX make sure it's a directory
726                 d = defer.succeed(None)
727                 d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
728                 list_of_deferreds.append(d)
729             deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
730             return deferList
731         collective_dirmap_d.addCallback(scan_collective)
732         def highest_version(deferredList):
733             max_version = 0
734             metadata = None
735             node = None
736             for success, result in deferredList:
737                 if success:
738                     if result[1]['version'] > max_version:
739                         node, metadata = result
740                         max_version = result[1]['version']
741             return node, metadata
742         collective_dirmap_d.addCallback(highest_version)
743         return collective_dirmap_d
744
745     def _scan_remote_dmd(self, nickname, dirnode, scan_batch):
746         self._log("_scan_remote_dmd nickname %r" % (nickname,))
747         d = dirnode.list()
748         def scan_listing(listing_map):
749             for encoded_relpath_u in listing_map.keys():
750                 relpath_u = magicpath.magic2path(encoded_relpath_u)
751                 self._log("found %r" % (relpath_u,))
752
753                 file_node, metadata = listing_map[encoded_relpath_u]
754                 local_version = self._get_local_latest(relpath_u)
755                 remote_version = metadata.get('version', None)
756                 self._log("%r has local version %r, remote version %r" % (relpath_u, local_version, remote_version))
757
758                 if local_version is None or remote_version is None or local_version < remote_version:
759                     self._log("%r added to download queue" % (relpath_u,))
760                     if scan_batch.has_key(relpath_u):
761                         scan_batch[relpath_u] += [(file_node, metadata)]
762                     else:
763                         scan_batch[relpath_u] = [(file_node, metadata)]
764
765         d.addCallback(scan_listing)
766         d.addBoth(self._logcb, "end of _scan_remote_dmd")
767         return d
768
769     def _scan_remote_collective(self, scan_self=False):
770         self._log("_scan_remote_collective")
771         scan_batch = {}  # path -> [(filenode, metadata)]
772
773         d = self._collective_dirnode.list()
774         def scan_collective(dirmap):
775             d2 = defer.succeed(None)
776             for dir_name in dirmap:
777                 (dirnode, metadata) = dirmap[dir_name]
778                 if scan_self or dirnode.get_readonly_uri() != self._upload_readonly_dircap:
779                     d2.addCallback(lambda ign, dir_name=dir_name, dirnode=dirnode:
780                                    self._scan_remote_dmd(dir_name, dirnode, scan_batch))
781                     def _err(f, dir_name=dir_name):
782                         self._log("failed to scan DMD for client %r: %s" % (dir_name, f))
783                         # XXX what should we do to make this failure more visible to users?
784                     d2.addErrback(_err)
785
786             return d2
787         d.addCallback(scan_collective)
788
789         def _filter_batch_to_deque(ign):
790             self._log("deque = %r, scan_batch = %r" % (self._deque, scan_batch))
791             for relpath_u in scan_batch.keys():
792                 file_node, metadata = max(scan_batch[relpath_u], key=lambda x: x[1]['version'])
793
794                 if self._should_download(relpath_u, metadata['version']):
795                     to_dl = DownloadItem(
796                         relpath_u,
797                         PercentProgress(file_node.get_size()),
798                         file_node,
799                         metadata,
800                     )
801                     to_dl.set_status('queued', self._clock.seconds())
802                     self._deque.append(to_dl)
803                 else:
804                     self._log("Excluding %r" % (relpath_u,))
805                     self._call_hook(None, 'processed', async=True)
806
807             self._log("deque after = %r" % (self._deque,))
808         d.addCallback(_filter_batch_to_deque)
809         return d
810
811     def _when_queue_is_empty(self):
812         d = task.deferLater(self._clock, self.REMOTE_SCAN_INTERVAL, self._scan_remote_collective)
813         d.addBoth(self._logcb, "after _scan_remote_collective 1")
814         d.addCallback(lambda ign: self._turn_deque())
815         return d
816
817     def _process(self, item, now=None):
818         # Downloader
819         self._log("_process(%r)" % (item,))
820         if now is None:  # XXX why can we pass in now?
821             now = time.time()  # self._clock.seconds()
822
823         self._log("started! %s" % (now,))
824         item.set_status('started', now)
825         fp = self._get_filepath(item.relpath_u)
826         abspath_u = unicode_from_filepath(fp)
827         conflict_path_u = self._get_conflicted_filename(abspath_u)
828
829         d = defer.succeed(None)
830
831         def do_update_db(written_abspath_u):
832             filecap = item.file_node.get_uri()
833             last_uploaded_uri = item.metadata.get('last_uploaded_uri', None)
834             last_downloaded_uri = filecap
835             last_downloaded_timestamp = now
836             written_pathinfo = get_pathinfo(written_abspath_u)
837
838             if not written_pathinfo.exists and not item.metadata.get('deleted', False):
839                 raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u))
840
841             self._db.did_upload_version(
842                 item.relpath_u, item.metadata['version'], last_uploaded_uri,
843                 last_downloaded_uri, last_downloaded_timestamp, written_pathinfo,
844             )
845             self._count('objects_downloaded')
846             item.set_status('success', self._clock.seconds())
847
848         def failed(f):
849             item.set_status('failure', self._clock.seconds())
850             self._log("download failed: %s" % (str(f),))
851             self._count('objects_failed')
852             return f
853
854         if os.path.isfile(conflict_path_u):
855             def fail(res):
856                 raise ConflictError("download failed: already conflicted: %r" % (item.relpath_u,))
857             d.addCallback(fail)
858         else:
859             is_conflict = False
860             db_entry = self._db.get_db_entry(item.relpath_u)
861             dmd_last_downloaded_uri = item.metadata.get('last_downloaded_uri', None)
862             dmd_last_uploaded_uri = item.metadata.get('last_uploaded_uri', None)
863             if db_entry:
864                 if dmd_last_downloaded_uri is not None and db_entry.last_downloaded_uri is not None:
865                     if dmd_last_downloaded_uri != db_entry.last_downloaded_uri:
866                         is_conflict = True
867                         self._count('objects_conflicted')
868                 elif dmd_last_uploaded_uri is not None and dmd_last_uploaded_uri != db_entry.last_uploaded_uri:
869                     is_conflict = True
870                     self._count('objects_conflicted')
871                 elif self._is_upload_pending(item.relpath_u):
872                     is_conflict = True
873                     self._count('objects_conflicted')
874
875             if item.relpath_u.endswith(u"/"):
876                 if item.metadata.get('deleted', False):
877                     self._log("rmdir(%r) ignored" % (abspath_u,))
878                 else:
879                     self._log("mkdir(%r)" % (abspath_u,))
880                     d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
881                     d.addCallback(lambda ign: abspath_u)
882             else:
883                 if item.metadata.get('deleted', False):
884                     d.addCallback(lambda ign: self._rename_deleted_file(abspath_u))
885                 else:
886                     d.addCallback(lambda ign: item.file_node.download_best_version(progress=item.progress))
887                     d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents,
888                                                                                is_conflict=is_conflict))
889
890         d.addCallbacks(do_update_db, failed)
891
892         def trap_conflicts(f):
893             f.trap(ConflictError)
894             return None
895         d.addErrback(trap_conflicts)
896         return d