]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/magic_folder.py
Flesh out "tahoe magic-folder status" command
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / magic_folder.py
1
2 import sys, os
3 import os.path
4 from collections import deque
5 import time
6
7 from twisted.internet import defer, reactor, task
8 from twisted.python.failure import Failure
9 from twisted.python import runtime
10 from twisted.application import service
11
12 from allmydata.util import fileutil
13 from allmydata.interfaces import IDirectoryNode
14 from allmydata.util import log
15 from allmydata.util.fileutil import precondition_abspath, get_pathinfo, ConflictError
16 from allmydata.util.assertutil import precondition, _assert
17 from allmydata.util.deferredutil import HookMixin
18 from allmydata.util.progress import PercentProgress
19 from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
20      extend_filepath, unicode_from_filepath, unicode_segments_from, \
21      quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
22 from allmydata.immutable.upload import FileName, Data
23 from allmydata import magicfolderdb, magicpath
24
25 defer.setDebugging(True)
26 IN_EXCL_UNLINK = 0x04000000L
27
28 def get_inotify_module():
29     try:
30         if sys.platform == "win32":
31             from allmydata.windows import inotify
32         elif runtime.platform.supportsINotify():
33             from twisted.internet import inotify
34         else:
35             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
36                                       "This currently requires Linux or Windows.")
37         return inotify
38     except (ImportError, AttributeError) as e:
39         log.msg(e)
40         if sys.platform == "win32":
41             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
42                                       "Windows support requires at least Vista, and has only been tested on Windows 7.")
43         raise
44
45
46 def is_new_file(pathinfo, db_entry):
47     if db_entry is None:
48         return True
49
50     if not pathinfo.exists and db_entry.size is None:
51         return False
52
53     return ((pathinfo.size, pathinfo.ctime, pathinfo.mtime) !=
54             (db_entry.size, db_entry.ctime, db_entry.mtime))
55
56
57 class MagicFolder(service.MultiService):
58     name = 'magic-folder'
59
60     def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile, umask,
61                  pending_delay=1.0, clock=None):
62         precondition_abspath(local_path_u)
63
64         service.MultiService.__init__(self)
65
66         immediate = clock is not None
67         clock = clock or reactor
68         db = magicfolderdb.get_magicfolderdb(dbfile, create_version=(magicfolderdb.SCHEMA_v1, 1))
69         if db is None:
70             return Failure(Exception('ERROR: Unable to load magic folder db.'))
71
72         # for tests
73         self._client = client
74         self._db = db
75
76         upload_dirnode = self._client.create_node_from_uri(upload_dircap)
77         collective_dirnode = self._client.create_node_from_uri(collective_dircap)
78
79         self.uploader = Uploader(client, local_path_u, db, upload_dirnode, pending_delay, clock, immediate)
80         self.downloader = Downloader(client, local_path_u, db, collective_dirnode,
81                                      upload_dirnode.get_readonly_uri(), clock, self.uploader.is_pending, umask)
82
83     def startService(self):
84         # TODO: why is this being called more than once?
85         if self.running:
86             return defer.succeed(None)
87         print "%r.startService" % (self,)
88         service.MultiService.startService(self)
89         return self.uploader.start_monitoring()
90
91     def ready(self):
92         """ready is used to signal us to start
93         processing the upload and download items...
94         """
95         self.uploader.start_uploading()  # synchronous
96         return self.downloader.start_downloading()
97
98     def finish(self):
99         print "finish"
100         d = self.uploader.stop()
101         d2 = self.downloader.stop()
102         d.addCallback(lambda ign: d2)
103         return d
104
105     def remove_service(self):
106         return service.MultiService.disownServiceParent(self)
107
108
109 class QueueMixin(HookMixin):
110     def __init__(self, client, local_path_u, db, name, clock):
111         self._client = client
112         self._local_path_u = local_path_u
113         self._local_filepath = to_filepath(local_path_u)
114         self._db = db
115         self._name = name
116         self._clock = clock
117         self._hooks = {'processed': None, 'started': None}
118         self.started_d = self.set_hook('started')
119
120         if not self._local_filepath.exists():
121             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
122                                  "but there is no directory at that location."
123                                  % quote_local_unicode_path(self._local_path_u))
124         if not self._local_filepath.isdir():
125             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
126                                  "but the thing at that location is not a directory."
127                                  % quote_local_unicode_path(self._local_path_u))
128
129         self._deque = deque()
130         # do we also want to bound on "maximum age"?
131         self._process_history = deque(maxlen=10)
132         self._lazy_tail = defer.succeed(None)
133         self._stopped = False
134         self._turn_delay = 0
135
136     def get_status(self):
137         """
138         Returns an iterable of instances that implement IQueuedItem
139         """
140         for item in self._deque:
141             yield item
142         for item in self._process_history:
143             yield item
144
145     def _get_filepath(self, relpath_u):
146         self._log("_get_filepath(%r)" % (relpath_u,))
147         return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
148
149     def _get_relpath(self, filepath):
150         self._log("_get_relpath(%r)" % (filepath,))
151         segments = unicode_segments_from(filepath, self._local_filepath)
152         self._log("segments = %r" % (segments,))
153         return u"/".join(segments)
154
155     def _count(self, counter_name, delta=1):
156         ctr = 'magic_folder.%s.%s' % (self._name, counter_name)
157         self._log("%s += %r" % (counter_name, delta))
158         self._client.stats_provider.count(ctr, delta)
159
160     def _logcb(self, res, msg):
161         self._log("%s: %r" % (msg, res))
162         return res
163
164     def _log(self, msg):
165         s = "Magic Folder %s %s: %s" % (quote_output(self._client.nickname), self._name, msg)
166         self._client.log(s)
167         print s
168         #open("events", "ab+").write(msg)
169
170     def _turn_deque(self):
171         try:
172             self._log("_turn_deque")
173             if self._stopped:
174                 self._log("stopped")
175                 return
176             try:
177                 item = IQueuedItem(self._deque.pop())
178                 self._process_history.append(item)
179
180                 self._log("popped %r, now have %d" % (item, len(self._deque)))
181                 self._count('objects_queued', -1)
182             except IndexError:
183                 self._log("deque is now empty")
184                 self._lazy_tail.addBoth(self._logcb, "whawhat empty")
185                 self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
186                 self._lazy_tail.addBoth(self._logcb, "got past _when_queue_is_empty")
187             else:
188                 self._log("_turn_deque else clause")
189                 self._lazy_tail.addBoth(self._logcb, "whawhat else %r" % (item,))
190                 self._lazy_tail.addCallback(lambda ign: self._process(item))
191                 self._lazy_tail.addBoth(self._logcb, "got past _process")
192                 self._lazy_tail.addBoth(self._call_hook, 'processed', async=True)
193                 self._lazy_tail.addBoth(self._logcb, "got past _call_hook (turn_delay = %r)" % (self._turn_delay,))
194                 self._lazy_tail.addErrback(log.err)
195                 self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
196                 self._lazy_tail.addBoth(self._logcb, "got past deferLater")
197         except Exception as e:
198             self._log("---- turn deque exception %s" % (e,))
199             raise
200
201
202 from zope.interface import Interface, implementer
203
204 class IQueuedItem(Interface):
205     pass
206
207
208 @implementer(IQueuedItem)
209 class QueuedItem(object):
210     def __init__(self, relpath_u, progress):
211         self.relpath_u = relpath_u
212         self.progress = progress
213         self._status_history = dict()
214
215     def set_status(self, status, current_time=None):
216         if current_time is None:
217             current_time = time.time()
218         self._status_history[status] = current_time
219
220     def status_time(self, state):
221         """
222         Returns None if there's no status-update for 'state', else returns
223         the timestamp when that state was reached.
224         """
225         return self._status_history.get(state, None)
226
227     def status_history(self):
228         """
229         Returns a list of 2-tuples of (state, timestamp) sorted by timestamp
230         """
231         hist = self._status_history.items()
232         hist.sort(lambda a, b: cmp(a[1], b[1]))
233         return hist
234
235
236 class UploadItem(QueuedItem):
237     pass
238
239
240 class Uploader(QueueMixin):
241     def __init__(self, client, local_path_u, db, upload_dirnode, pending_delay, clock,
242                  immediate=False):
243         QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
244
245         self.is_ready = False
246         self._immediate = immediate
247
248         if not IDirectoryNode.providedBy(upload_dirnode):
249             raise AssertionError("The URI in '%s' does not refer to a directory."
250                                  % os.path.join('private', 'magic_folder_dircap'))
251         if upload_dirnode.is_unknown() or upload_dirnode.is_readonly():
252             raise AssertionError("The URI in '%s' is not a writecap to a directory."
253                                  % os.path.join('private', 'magic_folder_dircap'))
254
255         self._upload_dirnode = upload_dirnode
256         self._inotify = get_inotify_module()
257         self._notifier = self._inotify.INotify()
258         self._pending = set()  # of unicode relpaths
259
260         self._periodic_full_scan_duration = 10 * 60 # perform a full scan every 10 minutes
261
262         if hasattr(self._notifier, 'set_pending_delay'):
263             self._notifier.set_pending_delay(pending_delay)
264
265         # TODO: what about IN_MOVE_SELF and IN_UNMOUNT?
266         #
267         self.mask = ( self._inotify.IN_CREATE
268                     | self._inotify.IN_CLOSE_WRITE
269                     | self._inotify.IN_MOVED_TO
270                     | self._inotify.IN_MOVED_FROM
271                     | self._inotify.IN_DELETE
272                     | self._inotify.IN_ONLYDIR
273                     | IN_EXCL_UNLINK
274                     )
275         self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
276                              recursive=True)
277
278     def start_monitoring(self):
279         self._log("start_monitoring")
280         d = defer.succeed(None)
281         d.addCallback(lambda ign: self._notifier.startReading())
282         d.addCallback(lambda ign: self._count('dirs_monitored'))
283         d.addBoth(self._call_hook, 'started')
284         return d
285
286     def stop(self):
287         self._log("stop")
288         self._notifier.stopReading()
289         self._count('dirs_monitored', -1)
290         self.periodic_callid.cancel()
291         if hasattr(self._notifier, 'wait_until_stopped'):
292             d = self._notifier.wait_until_stopped()
293         else:
294             d = defer.succeed(None)
295         d.addCallback(lambda ign: self._lazy_tail)
296         return d
297
298     def start_uploading(self):
299         self._log("start_uploading")
300         self.is_ready = True
301
302         all_relpaths = self._db.get_all_relpaths()
303         self._log("all relpaths: %r" % (all_relpaths,))
304
305         for relpath_u in all_relpaths:
306             self._add_pending(relpath_u)
307
308         self._full_scan()
309
310     def _extend_queue_and_keep_going(self, relpaths_u):
311         self._log("_extend_queue_and_keep_going %r" % (relpaths_u,))
312         for relpath_u in relpaths_u:
313             progress = PercentProgress()
314             item = UploadItem(relpath_u, progress)
315             item.set_status('queued', self._clock.seconds())
316             self._deque.append(item)
317
318         self._count('objects_queued', len(relpaths_u))
319
320         if self.is_ready:
321             if self._immediate:  # for tests
322                 self._turn_deque()
323             else:
324                 self._clock.callLater(0, self._turn_deque)
325
326     def _full_scan(self):
327         self.periodic_callid = self._clock.callLater(self._periodic_full_scan_duration, self._full_scan)
328         print "FULL SCAN"
329         self._log("_pending %r" % (self._pending))
330         self._scan(u"")
331         self._extend_queue_and_keep_going(self._pending)
332
333     def _add_pending(self, relpath_u):
334         self._log("add pending %r" % (relpath_u,))
335         if not magicpath.should_ignore_file(relpath_u):
336             self._pending.add(relpath_u)
337
338     def _scan(self, reldir_u):
339         # Scan a directory by (synchronously) adding the paths of all its children to self._pending.
340         # Note that this doesn't add them to the deque -- that will
341
342         self._log("scan %r" % (reldir_u,))
343         fp = self._get_filepath(reldir_u)
344         try:
345             children = listdir_filepath(fp)
346         except EnvironmentError:
347             raise Exception("WARNING: magic folder: permission denied on directory %s"
348                             % quote_filepath(fp))
349         except FilenameEncodingError:
350             raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
351                             % quote_filepath(fp))
352
353         for child in children:
354             _assert(isinstance(child, unicode), child=child)
355             self._add_pending("%s/%s" % (reldir_u, child) if reldir_u != u"" else child)
356
357     def is_pending(self, relpath_u):
358         return relpath_u in self._pending
359
360     def _notify(self, opaque, path, events_mask):
361         self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
362         relpath_u = self._get_relpath(path)
363
364         # We filter out IN_CREATE events not associated with a directory.
365         # Acting on IN_CREATE for files could cause us to read and upload
366         # a possibly-incomplete file before the application has closed it.
367         # There should always be an IN_CLOSE_WRITE after an IN_CREATE, I think.
368         # It isn't possible to avoid watching for IN_CREATE at all, because
369         # it is the only event notified for a directory creation.
370
371         if ((events_mask & self._inotify.IN_CREATE) != 0 and
372             (events_mask & self._inotify.IN_ISDIR) == 0):
373             self._log("ignoring event for %r (creation of non-directory)\n" % (relpath_u,))
374             return
375         if relpath_u in self._pending:
376             self._log("not queueing %r because it is already pending" % (relpath_u,))
377             return
378         if magicpath.should_ignore_file(relpath_u):
379             self._log("ignoring event for %r (ignorable path)" % (relpath_u,))
380             return
381
382         self._pending.add(relpath_u)
383         self._extend_queue_and_keep_going([relpath_u])
384
385     def _when_queue_is_empty(self):
386         return defer.succeed(None)
387
388     def _process(self, item):
389         # Uploader
390         relpath_u = item.relpath_u
391         self._log("_process(%r)" % (relpath_u,))
392         item.set_status('started', self._clock.seconds())
393
394         if relpath_u is None:
395             item.set_status('invalid_path', self._clock.seconds())
396             return
397         precondition(isinstance(relpath_u, unicode), relpath_u)
398         precondition(not relpath_u.endswith(u'/'), relpath_u)
399
400         d = defer.succeed(None)
401
402         def _maybe_upload(ign, now=None):
403             self._log("_maybe_upload: relpath_u=%r, now=%r" % (relpath_u, now))
404             if now is None:
405                 now = time.time()
406             fp = self._get_filepath(relpath_u)
407             pathinfo = get_pathinfo(unicode_from_filepath(fp))
408
409             self._log("about to remove %r from pending set %r" %
410                       (relpath_u, self._pending))
411             self._pending.remove(relpath_u)
412             encoded_path_u = magicpath.path2magic(relpath_u)
413
414             if not pathinfo.exists:
415                 # FIXME merge this with the 'isfile' case.
416                 self._log("notified object %s disappeared (this is normal)" % quote_filepath(fp))
417                 self._count('objects_disappeared')
418
419                 db_entry = self._db.get_db_entry(relpath_u)
420                 if db_entry is None:
421                     return None
422
423                 last_downloaded_timestamp = now  # is this correct?
424
425                 if is_new_file(pathinfo, db_entry):
426                     new_version = db_entry.version + 1
427                 else:
428                     self._log("Not uploading %r" % (relpath_u,))
429                     self._count('objects_not_uploaded')
430                     return
431
432                 metadata = { 'version': new_version,
433                              'deleted': True,
434                              'last_downloaded_timestamp': last_downloaded_timestamp }
435                 if db_entry.last_downloaded_uri is not None:
436                     metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
437
438                 empty_uploadable = Data("", self._client.convergence)
439                 d2 = self._upload_dirnode.add_file(
440                     encoded_path_u, empty_uploadable,
441                     metadata=metadata,
442                     overwrite=True,
443                     progress=item.progress,
444                 )
445
446                 def _add_db_entry(filenode):
447                     filecap = filenode.get_uri()
448                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
449                     self._db.did_upload_version(relpath_u, new_version, filecap,
450                                                 last_downloaded_uri, last_downloaded_timestamp,
451                                                 pathinfo)
452                     self._count('files_uploaded')
453                 d2.addCallback(_add_db_entry)
454                 return d2
455             elif pathinfo.islink:
456                 self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
457                 return None
458             elif pathinfo.isdir:
459                 print "ISDIR "
460                 if not getattr(self._notifier, 'recursive_includes_new_subdirectories', False):
461                     self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
462
463                 uploadable = Data("", self._client.convergence)
464                 encoded_path_u += magicpath.path2magic(u"/")
465                 self._log("encoded_path_u =  %r" % (encoded_path_u,))
466                 upload_d = self._upload_dirnode.add_file(
467                     encoded_path_u, uploadable,
468                     metadata={"version": 0},
469                     overwrite=True,
470                     progress=item.progress,
471                 )
472                 def _dir_succeeded(ign):
473                     self._log("created subdirectory %r" % (relpath_u,))
474                     self._count('directories_created')
475                 def _dir_failed(f):
476                     self._log("failed to create subdirectory %r" % (relpath_u,))
477                     return f
478                 upload_d.addCallbacks(_dir_succeeded, _dir_failed)
479                 upload_d.addCallback(lambda ign: self._scan(relpath_u))
480                 upload_d.addCallback(lambda ign: self._extend_queue_and_keep_going(self._pending))
481                 return upload_d
482             elif pathinfo.isfile:
483                 db_entry = self._db.get_db_entry(relpath_u)
484
485                 last_downloaded_timestamp = now
486
487                 if db_entry is None:
488                     new_version = 0
489                 elif is_new_file(pathinfo, db_entry):
490                     new_version = db_entry.version + 1
491                 else:
492                     self._log("Not uploading %r" % (relpath_u,))
493                     self._count('objects_not_uploaded')
494                     return None
495
496                 metadata = { 'version': new_version,
497                              'last_downloaded_timestamp': last_downloaded_timestamp }
498                 if db_entry is not None and db_entry.last_downloaded_uri is not None:
499                     metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
500
501                 uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
502                 d2 = self._upload_dirnode.add_file(
503                     encoded_path_u, uploadable,
504                     metadata=metadata,
505                     overwrite=True,
506                     progress=item.progress,
507                 )
508
509                 def _add_db_entry(filenode):
510                     filecap = filenode.get_uri()
511                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
512                     self._db.did_upload_version(relpath_u, new_version, filecap,
513                                                 last_downloaded_uri, last_downloaded_timestamp,
514                                                 pathinfo)
515                     self._count('files_uploaded')
516                 d2.addCallback(_add_db_entry)
517                 return d2
518             else:
519                 self.warn("WARNING: cannot process special file %s" % quote_filepath(fp))
520                 return None
521
522         d.addCallback(_maybe_upload)
523
524         def _succeeded(res):
525             self._count('objects_succeeded')
526             item.set_status('success', self._clock.seconds())
527             return res
528         def _failed(f):
529             self._count('objects_failed')
530             self._log("%s while processing %r" % (f, relpath_u))
531             item.set_status('failure', self._clock.seconds())
532             return f
533         d.addCallbacks(_succeeded, _failed)
534         return d
535
536     def _get_metadata(self, encoded_path_u):
537         try:
538             d = self._upload_dirnode.get_metadata_for(encoded_path_u)
539         except KeyError:
540             return Failure()
541         return d
542
543     def _get_filenode(self, encoded_path_u):
544         try:
545             d = self._upload_dirnode.get(encoded_path_u)
546         except KeyError:
547             return Failure()
548         return d
549
550
551 class WriteFileMixin(object):
552     FUDGE_SECONDS = 10.0
553
554     def _get_conflicted_filename(self, abspath_u):
555         return abspath_u + u".conflict"
556
557     def _write_downloaded_file(self, abspath_u, file_contents, is_conflict=False, now=None):
558         self._log("_write_downloaded_file(%r, <%d bytes>, is_conflict=%r, now=%r)"
559                   % (abspath_u, len(file_contents), is_conflict, now))
560
561         # 1. Write a temporary file, say .foo.tmp.
562         # 2. is_conflict determines whether this is an overwrite or a conflict.
563         # 3. Set the mtime of the replacement file to be T seconds before the
564         #    current local time.
565         # 4. Perform a file replacement with backup filename foo.backup,
566         #    replaced file foo, and replacement file .foo.tmp. If any step of
567         #    this operation fails, reclassify as a conflict and stop.
568         #
569         # Returns the path of the destination file.
570
571         precondition_abspath(abspath_u)
572         replacement_path_u = abspath_u + u".tmp"  # FIXME more unique
573         backup_path_u = abspath_u + u".backup"
574         if now is None:
575             now = time.time()
576
577         # ensure parent directory exists
578         head, tail = os.path.split(abspath_u)
579
580         old_mask = os.umask(self._umask)
581         try:
582             fileutil.make_dirs(head, (~ self._umask) & 0777)
583             fileutil.write(replacement_path_u, file_contents)
584         finally:
585             os.umask(old_mask)
586
587         os.utime(replacement_path_u, (now, now - self.FUDGE_SECONDS))
588         if is_conflict:
589             print "0x00 ------------ <><> is conflict; calling _rename_conflicted_file... %r %r" % (abspath_u, replacement_path_u)
590             return self._rename_conflicted_file(abspath_u, replacement_path_u)
591         else:
592             try:
593                 fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u)
594                 return abspath_u
595             except fileutil.ConflictError:
596                 return self._rename_conflicted_file(abspath_u, replacement_path_u)
597
598     def _rename_conflicted_file(self, abspath_u, replacement_path_u):
599         self._log("_rename_conflicted_file(%r, %r)" % (abspath_u, replacement_path_u))
600
601         conflict_path_u = self._get_conflicted_filename(abspath_u)
602         print "XXX rename %r %r" % (replacement_path_u, conflict_path_u)
603         if os.path.isfile(replacement_path_u):
604             print "%r exists" % (replacement_path_u,)
605         if os.path.isfile(conflict_path_u):
606             print "%r exists" % (conflict_path_u,)
607
608         fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
609         return conflict_path_u
610
611     def _rename_deleted_file(self, abspath_u):
612         self._log('renaming deleted file to backup: %s' % (abspath_u,))
613         try:
614             fileutil.rename_no_overwrite(abspath_u, abspath_u + u'.backup')
615         except OSError:
616             self._log("Already gone: '%s'" % (abspath_u,))
617         return abspath_u
618
619
620 class DownloadItem(QueuedItem):
621     def __init__(self, relpath_u, progress, filenode, metadata):
622         super(DownloadItem, self).__init__(relpath_u, progress)
623         self.file_node = filenode
624         self.metadata = metadata
625
626
627 class Downloader(QueueMixin, WriteFileMixin):
628     REMOTE_SCAN_INTERVAL = 3  # facilitates tests
629
630     def __init__(self, client, local_path_u, db, collective_dirnode,
631                  upload_readonly_dircap, clock, is_upload_pending, umask):
632         QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
633
634         if not IDirectoryNode.providedBy(collective_dirnode):
635             raise AssertionError("The URI in '%s' does not refer to a directory."
636                                  % os.path.join('private', 'collective_dircap'))
637         if collective_dirnode.is_unknown() or not collective_dirnode.is_readonly():
638             raise AssertionError("The URI in '%s' is not a readonly cap to a directory."
639                                  % os.path.join('private', 'collective_dircap'))
640
641         self._collective_dirnode = collective_dirnode
642         self._upload_readonly_dircap = upload_readonly_dircap
643         self._is_upload_pending = is_upload_pending
644         self._umask = umask
645
646     def start_downloading(self):
647         self._log("start_downloading")
648         self._turn_delay = self.REMOTE_SCAN_INTERVAL
649         files = self._db.get_all_relpaths()
650         self._log("all files %s" % files)
651
652         d = self._scan_remote_collective(scan_self=True)
653         d.addBoth(self._logcb, "after _scan_remote_collective 0")
654         self._turn_deque()
655         return d
656
657     def stop(self):
658         self._log("stop")
659         self._stopped = True
660         d = defer.succeed(None)
661         d.addCallback(lambda ign: self._lazy_tail)
662         return d
663
664     def _should_download(self, relpath_u, remote_version):
665         """
666         _should_download returns a bool indicating whether or not a remote object should be downloaded.
667         We check the remote metadata version against our magic-folder db version number;
668         latest version wins.
669         """
670         self._log("_should_download(%r, %r)" % (relpath_u, remote_version))
671         if magicpath.should_ignore_file(relpath_u):
672             self._log("nope")
673             return False
674         self._log("yep")
675         db_entry = self._db.get_db_entry(relpath_u)
676         if db_entry is None:
677             return True
678         self._log("version %r" % (db_entry.version,))
679         return (db_entry.version < remote_version)
680
681     def _get_local_latest(self, relpath_u):
682         """
683         _get_local_latest takes a unicode path string checks to see if this file object
684         exists in our magic-folder db; if not then return None
685         else check for an entry in our magic-folder db and return the version number.
686         """
687         if not self._get_filepath(relpath_u).exists():
688             return None
689         db_entry = self._db.get_db_entry(relpath_u)
690         return None if db_entry is None else db_entry.version
691
692     def _get_collective_latest_file(self, filename):
693         """
694         _get_collective_latest_file takes a file path pointing to a file managed by
695         magic-folder and returns a deferred that fires with the two tuple containing a
696         file node and metadata for the latest version of the file located in the
697         magic-folder collective directory.
698         """
699         collective_dirmap_d = self._collective_dirnode.list()
700         def scan_collective(result):
701             list_of_deferreds = []
702             for dir_name in result.keys():
703                 # XXX make sure it's a directory
704                 d = defer.succeed(None)
705                 d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
706                 list_of_deferreds.append(d)
707             deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
708             return deferList
709         collective_dirmap_d.addCallback(scan_collective)
710         def highest_version(deferredList):
711             max_version = 0
712             metadata = None
713             node = None
714             for success, result in deferredList:
715                 if success:
716                     if result[1]['version'] > max_version:
717                         node, metadata = result
718                         max_version = result[1]['version']
719             return node, metadata
720         collective_dirmap_d.addCallback(highest_version)
721         return collective_dirmap_d
722
723     def _scan_remote_dmd(self, nickname, dirnode, scan_batch):
724         self._log("_scan_remote_dmd nickname %r" % (nickname,))
725         d = dirnode.list()
726         def scan_listing(listing_map):
727             for encoded_relpath_u in listing_map.keys():
728                 relpath_u = magicpath.magic2path(encoded_relpath_u)
729                 self._log("found %r" % (relpath_u,))
730
731                 file_node, metadata = listing_map[encoded_relpath_u]
732                 local_version = self._get_local_latest(relpath_u)
733                 remote_version = metadata.get('version', None)
734                 self._log("%r has local version %r, remote version %r" % (relpath_u, local_version, remote_version))
735
736                 if local_version is None or remote_version is None or local_version < remote_version:
737                     self._log("%r added to download queue" % (relpath_u,))
738                     if scan_batch.has_key(relpath_u):
739                         scan_batch[relpath_u] += [(file_node, metadata)]
740                     else:
741                         scan_batch[relpath_u] = [(file_node, metadata)]
742
743         d.addCallback(scan_listing)
744         d.addBoth(self._logcb, "end of _scan_remote_dmd")
745         return d
746
747     def _scan_remote_collective(self, scan_self=False):
748         self._log("_scan_remote_collective")
749         scan_batch = {}  # path -> [(filenode, metadata)]
750
751         d = self._collective_dirnode.list()
752         def scan_collective(dirmap):
753             d2 = defer.succeed(None)
754             for dir_name in dirmap:
755                 (dirnode, metadata) = dirmap[dir_name]
756                 if scan_self or dirnode.get_readonly_uri() != self._upload_readonly_dircap:
757                     d2.addCallback(lambda ign, dir_name=dir_name, dirnode=dirnode:
758                                    self._scan_remote_dmd(dir_name, dirnode, scan_batch))
759                     def _err(f, dir_name=dir_name):
760                         self._log("failed to scan DMD for client %r: %s" % (dir_name, f))
761                         # XXX what should we do to make this failure more visible to users?
762                     d2.addErrback(_err)
763
764             return d2
765         d.addCallback(scan_collective)
766
767         def _filter_batch_to_deque(ign):
768             self._log("deque = %r, scan_batch = %r" % (self._deque, scan_batch))
769             for relpath_u in scan_batch.keys():
770                 file_node, metadata = max(scan_batch[relpath_u], key=lambda x: x[1]['version'])
771
772                 if self._should_download(relpath_u, metadata['version']):
773                     to_dl = DownloadItem(
774                         relpath_u,
775                         PercentProgress(file_node.get_size()),
776                         file_node,
777                         metadata,
778                     )
779                     to_dl.set_status('queued', self._clock.seconds())
780                     self._deque.append(to_dl)
781                 else:
782                     self._log("Excluding %r" % (relpath_u,))
783                     self._call_hook(None, 'processed', async=True)
784
785             self._log("deque after = %r" % (self._deque,))
786         d.addCallback(_filter_batch_to_deque)
787         return d
788
789     def _when_queue_is_empty(self):
790         d = task.deferLater(self._clock, self.REMOTE_SCAN_INTERVAL, self._scan_remote_collective)
791         d.addBoth(self._logcb, "after _scan_remote_collective 1")
792         d.addCallback(lambda ign: self._turn_deque())
793         return d
794
795     def _process(self, item, now=None):
796         # Downloader
797         self._log("_process(%r)" % (item,))
798         if now is None:  # XXX why can we pass in now?
799             now = time.time()  # self._clock.seconds()
800
801         self._log("started! %s" % (now,))
802         item.set_status('started', now)
803         fp = self._get_filepath(item.relpath_u)
804         abspath_u = unicode_from_filepath(fp)
805         conflict_path_u = self._get_conflicted_filename(abspath_u)
806
807         d = defer.succeed(None)
808
809         def do_update_db(written_abspath_u):
810             filecap = item.file_node.get_uri()
811             last_uploaded_uri = item.metadata.get('last_uploaded_uri', None)
812             last_downloaded_uri = filecap
813             last_downloaded_timestamp = now
814             written_pathinfo = get_pathinfo(written_abspath_u)
815
816             if not written_pathinfo.exists and not item.metadata.get('deleted', False):
817                 raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u))
818
819             self._db.did_upload_version(
820                 item.relpath_u, item.metadata['version'], last_uploaded_uri,
821                 last_downloaded_uri, last_downloaded_timestamp, written_pathinfo,
822             )
823             self._count('objects_downloaded')
824             item.set_status('success', self._clock.seconds())
825
826         def failed(f):
827             item.set_status('failure', self._clock.seconds())
828             self._log("download failed: %s" % (str(f),))
829             self._count('objects_failed')
830             return f
831
832         if os.path.isfile(conflict_path_u):
833             def fail(res):
834                 raise ConflictError("download failed: already conflicted: %r" % (item.relpath_u,))
835             d.addCallback(fail)
836         else:
837             is_conflict = False
838             db_entry = self._db.get_db_entry(item.relpath_u)
839             dmd_last_downloaded_uri = item.metadata.get('last_downloaded_uri', None)
840             dmd_last_uploaded_uri = item.metadata.get('last_uploaded_uri', None)
841             if db_entry:
842                 if dmd_last_downloaded_uri is not None and db_entry.last_downloaded_uri is not None:
843                     if dmd_last_downloaded_uri != db_entry.last_downloaded_uri:
844                         is_conflict = True
845                         self._count('objects_conflicted')
846                 elif dmd_last_uploaded_uri is not None and dmd_last_uploaded_uri != db_entry.last_uploaded_uri:
847                     is_conflict = True
848                     self._count('objects_conflicted')
849                 elif self._is_upload_pending(item.relpath_u):
850                     is_conflict = True
851                     self._count('objects_conflicted')
852
853             if item.relpath_u.endswith(u"/"):
854                 if item.metadata.get('deleted', False):
855                     self._log("rmdir(%r) ignored" % (abspath_u,))
856                 else:
857                     self._log("mkdir(%r)" % (abspath_u,))
858                     d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
859                     d.addCallback(lambda ign: abspath_u)
860             else:
861                 if item.metadata.get('deleted', False):
862                     d.addCallback(lambda ign: self._rename_deleted_file(abspath_u))
863                 else:
864                     d.addCallback(lambda ign: item.file_node.download_best_version(progress=item.progress))
865                     d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents,
866                                                                                is_conflict=is_conflict))
867
868         d.addCallbacks(do_update_db, failed)
869
870         def trap_conflicts(f):
871             f.trap(ConflictError)
872             return None
873         d.addErrback(trap_conflicts)
874         return d