]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/magic_folder.py
Minor cleanup and logging fix.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / magic_folder.py
1
2 import sys, os
3 import os.path
4 from collections import deque
5 import time
6
7 from twisted.internet import defer, reactor, task
8 from twisted.python.failure import Failure
9 from twisted.python import runtime
10 from twisted.application import service
11
12 from allmydata.util import fileutil
13 from allmydata.interfaces import IDirectoryNode
14 from allmydata.util import log
15 from allmydata.util.fileutil import precondition_abspath, get_pathinfo, ConflictError
16 from allmydata.util.assertutil import precondition, _assert
17 from allmydata.util.deferredutil import HookMixin
18 from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
19      extend_filepath, unicode_from_filepath, unicode_segments_from, \
20      quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
21 from allmydata.immutable.upload import FileName, Data
22 from allmydata import magicfolderdb, magicpath
23
24 defer.setDebugging(True)
25 IN_EXCL_UNLINK = 0x04000000L
26
27 def get_inotify_module():
28     try:
29         if sys.platform == "win32":
30             from allmydata.windows import inotify
31         elif runtime.platform.supportsINotify():
32             from twisted.internet import inotify
33         else:
34             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
35                                       "This currently requires Linux or Windows.")
36         return inotify
37     except (ImportError, AttributeError) as e:
38         log.msg(e)
39         if sys.platform == "win32":
40             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
41                                       "Windows support requires at least Vista, and has only been tested on Windows 7.")
42         raise
43
44
45 def is_new_file(pathinfo, db_entry):
46     if db_entry is None:
47         return True
48
49     if not pathinfo.exists and db_entry.size is None:
50         return False
51
52     return ((pathinfo.size, pathinfo.ctime, pathinfo.mtime) !=
53             (db_entry.size, db_entry.ctime, db_entry.mtime))
54
55
56 class MagicFolder(service.MultiService):
57     name = 'magic-folder'
58
59     def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile, umask,
60                  pending_delay=1.0, clock=None):
61         precondition_abspath(local_path_u)
62
63         service.MultiService.__init__(self)
64
65         immediate = clock is not None
66         clock = clock or reactor
67         db = magicfolderdb.get_magicfolderdb(dbfile, create_version=(magicfolderdb.SCHEMA_v1, 1))
68         if db is None:
69             return Failure(Exception('ERROR: Unable to load magic folder db.'))
70
71         # for tests
72         self._client = client
73         self._db = db
74
75         upload_dirnode = self._client.create_node_from_uri(upload_dircap)
76         collective_dirnode = self._client.create_node_from_uri(collective_dircap)
77
78         self.uploader = Uploader(client, local_path_u, db, upload_dirnode, pending_delay, clock, immediate)
79         self.downloader = Downloader(client, local_path_u, db, collective_dirnode,
80                                      upload_dirnode.get_readonly_uri(), clock, self.uploader.is_pending, umask)
81
82     def startService(self):
83         # TODO: why is this being called more than once?
84         if self.running:
85             return defer.succeed(None)
86         print "%r.startService" % (self,)
87         service.MultiService.startService(self)
88         return self.uploader.start_monitoring()
89
90     def ready(self):
91         """ready is used to signal us to start
92         processing the upload and download items...
93         """
94         self.uploader.start_uploading()  # synchronous
95         return self.downloader.start_downloading()
96
97     def finish(self):
98         print "finish"
99         d = self.uploader.stop()
100         d2 = self.downloader.stop()
101         d.addCallback(lambda ign: d2)
102         return d
103
104     def remove_service(self):
105         return service.MultiService.disownServiceParent(self)
106
107
108 class QueueMixin(HookMixin):
109     def __init__(self, client, local_path_u, db, name, clock):
110         self._client = client
111         self._local_path_u = local_path_u
112         self._local_filepath = to_filepath(local_path_u)
113         self._db = db
114         self._name = name
115         self._clock = clock
116         self._hooks = {'processed': None, 'started': None}
117         self.started_d = self.set_hook('started')
118
119         if not self._local_filepath.exists():
120             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
121                                  "but there is no directory at that location."
122                                  % quote_local_unicode_path(self._local_path_u))
123         if not self._local_filepath.isdir():
124             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
125                                  "but the thing at that location is not a directory."
126                                  % quote_local_unicode_path(self._local_path_u))
127
128         self._deque = deque()
129         self._lazy_tail = defer.succeed(None)
130         self._stopped = False
131         self._turn_delay = 0
132
133     def _get_filepath(self, relpath_u):
134         self._log("_get_filepath(%r)" % (relpath_u,))
135         return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
136
137     def _get_relpath(self, filepath):
138         self._log("_get_relpath(%r)" % (filepath,))
139         segments = unicode_segments_from(filepath, self._local_filepath)
140         self._log("segments = %r" % (segments,))
141         return u"/".join(segments)
142
143     def _count(self, counter_name, delta=1):
144         ctr = 'magic_folder.%s.%s' % (self._name, counter_name)
145         self._log("%s += %r" % (counter_name, delta))
146         self._client.stats_provider.count(ctr, delta)
147
148     def _logcb(self, res, msg):
149         self._log("%s: %r" % (msg, res))
150         return res
151
152     def _log(self, msg):
153         s = "Magic Folder %s %s: %s" % (quote_output(self._client.nickname), self._name, msg)
154         self._client.log(s)
155         print s
156         #open("events", "ab+").write(msg)
157
158     def _turn_deque(self):
159         try:
160             self._log("_turn_deque")
161             if self._stopped:
162                 self._log("stopped")
163                 return
164             try:
165                 item = self._deque.pop()
166                 self._log("popped %r" % (item,))
167                 self._count('objects_queued', -1)
168             except IndexError:
169                 self._log("deque is now empty")
170                 self._lazy_tail.addBoth(self._logcb, "whawhat empty")
171                 self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
172                 self._lazy_tail.addBoth(self._logcb, "got past _when_queue_is_empty")
173             else:
174                 self._log("_turn_deque else clause")
175                 self._lazy_tail.addBoth(self._logcb, "whawhat else %r" % (item,))
176                 self._lazy_tail.addCallback(lambda ign: self._process(item))
177                 self._lazy_tail.addBoth(self._logcb, "got past _process")
178                 self._lazy_tail.addBoth(self._call_hook, 'processed')
179                 self._lazy_tail.addBoth(self._logcb, "got past _call_hook (turn_delay = %r)" % (self._turn_delay,))
180                 self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
181                 self._lazy_tail.addBoth(self._logcb, "got past deferLater")
182         except Exception as e:
183             self._log("---- turn deque exception %s" % (e,))
184             raise
185
186
187 class Uploader(QueueMixin):
188     def __init__(self, client, local_path_u, db, upload_dirnode, pending_delay, clock,
189                  immediate=False):
190         QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
191
192         self.is_ready = False
193         self._immediate = immediate
194
195         if not IDirectoryNode.providedBy(upload_dirnode):
196             raise AssertionError("The URI in '%s' does not refer to a directory."
197                                  % os.path.join('private', 'magic_folder_dircap'))
198         if upload_dirnode.is_unknown() or upload_dirnode.is_readonly():
199             raise AssertionError("The URI in '%s' is not a writecap to a directory."
200                                  % os.path.join('private', 'magic_folder_dircap'))
201
202         self._upload_dirnode = upload_dirnode
203         self._inotify = get_inotify_module()
204         self._notifier = self._inotify.INotify()
205         self._pending = set()  # of unicode relpaths
206
207         self._periodic_full_scan_duration = 10 * 60 # perform a full scan every 10 minutes
208
209         if hasattr(self._notifier, 'set_pending_delay'):
210             self._notifier.set_pending_delay(pending_delay)
211
212         # TODO: what about IN_MOVE_SELF and IN_UNMOUNT?
213         #
214         self.mask = ( self._inotify.IN_CREATE
215                     | self._inotify.IN_CLOSE_WRITE
216                     | self._inotify.IN_MOVED_TO
217                     | self._inotify.IN_MOVED_FROM
218                     | self._inotify.IN_DELETE
219                     | self._inotify.IN_ONLYDIR
220                     | IN_EXCL_UNLINK
221                     )
222         self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
223                              recursive=True)
224
225     def start_monitoring(self):
226         self._log("start_monitoring")
227         d = defer.succeed(None)
228         d.addCallback(lambda ign: self._notifier.startReading())
229         d.addCallback(lambda ign: self._count('dirs_monitored'))
230         d.addBoth(self._call_hook, 'started')
231         return d
232
233     def stop(self):
234         self._log("stop")
235         self._notifier.stopReading()
236         self._count('dirs_monitored', -1)
237         self.periodic_callid.cancel()
238         if hasattr(self._notifier, 'wait_until_stopped'):
239             d = self._notifier.wait_until_stopped()
240         else:
241             d = defer.succeed(None)
242         d.addCallback(lambda ign: self._lazy_tail)
243         return d
244
245     def start_uploading(self):
246         self._log("start_uploading")
247         self.is_ready = True
248
249         all_relpaths = self._db.get_all_relpaths()
250         self._log("all relpaths: %r" % (all_relpaths,))
251
252         for relpath_u in all_relpaths:
253             self._add_pending(relpath_u)
254
255         self._full_scan()
256
257     def _extend_queue_and_keep_going(self, relpaths_u):
258         self._log("_extend_queue_and_keep_going %r" % (relpaths_u,))
259         self._deque.extend(relpaths_u)
260         self._count('objects_queued', len(relpaths_u))
261
262         if self.is_ready:
263             if self._immediate:  # for tests
264                 self._turn_deque()
265             else:
266                 self._clock.callLater(0, self._turn_deque)
267
268     def _full_scan(self):
269         self.periodic_callid = self._clock.callLater(self._periodic_full_scan_duration, self._full_scan)
270         print "FULL SCAN"
271         self._log("_pending %r" % (self._pending))
272         self._scan(u"")
273         self._extend_queue_and_keep_going(self._pending)
274
275     def _add_pending(self, relpath_u):
276         self._log("add pending %r" % (relpath_u,))        
277         if not magicpath.should_ignore_file(relpath_u):
278             self._pending.add(relpath_u)
279
280     def _scan(self, reldir_u):
281         # Scan a directory by (synchronously) adding the paths of all its children to self._pending.
282         # Note that this doesn't add them to the deque -- that will
283
284         self._log("scan %r" % (reldir_u,))
285         fp = self._get_filepath(reldir_u)
286         try:
287             children = listdir_filepath(fp)
288         except EnvironmentError:
289             raise Exception("WARNING: magic folder: permission denied on directory %s"
290                             % quote_filepath(fp))
291         except FilenameEncodingError:
292             raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
293                             % quote_filepath(fp))
294
295         for child in children:
296             _assert(isinstance(child, unicode), child=child)
297             self._add_pending("%s/%s" % (reldir_u, child) if reldir_u != u"" else child)
298
299     def is_pending(self, relpath_u):
300         return relpath_u in self._pending
301
302     def _notify(self, opaque, path, events_mask):
303         self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
304         relpath_u = self._get_relpath(path)
305
306         # We filter out IN_CREATE events not associated with a directory.
307         # Acting on IN_CREATE for files could cause us to read and upload
308         # a possibly-incomplete file before the application has closed it.
309         # There should always be an IN_CLOSE_WRITE after an IN_CREATE, I think.
310         # It isn't possible to avoid watching for IN_CREATE at all, because
311         # it is the only event notified for a directory creation.
312
313         if ((events_mask & self._inotify.IN_CREATE) != 0 and
314             (events_mask & self._inotify.IN_ISDIR) == 0):
315             self._log("ignoring event for %r (creation of non-directory)\n" % (relpath_u,))
316             return
317         if relpath_u in self._pending:
318             self._log("not queueing %r because it is already pending" % (relpath_u,))
319             return
320         if magicpath.should_ignore_file(relpath_u):
321             self._log("ignoring event for %r (ignorable path)" % (relpath_u,))
322             return
323
324         self._pending.add(relpath_u)
325         self._extend_queue_and_keep_going([relpath_u])
326
327     def _when_queue_is_empty(self):
328         return defer.succeed(None)
329
330     def _process(self, relpath_u):
331         # Uploader
332         self._log("_process(%r)" % (relpath_u,))
333         if relpath_u is None:
334             return
335         precondition(isinstance(relpath_u, unicode), relpath_u)
336         precondition(not relpath_u.endswith(u'/'), relpath_u)
337
338         d = defer.succeed(None)
339
340         def _maybe_upload(ign, now=None):
341             self._log("_maybe_upload: relpath_u=%r, now=%r" % (relpath_u, now))
342             if now is None:
343                 now = time.time()
344             fp = self._get_filepath(relpath_u)
345             pathinfo = get_pathinfo(unicode_from_filepath(fp))
346
347             self._log("about to remove %r from pending set %r" %
348                       (relpath_u, self._pending))
349             self._pending.remove(relpath_u)
350             encoded_path_u = magicpath.path2magic(relpath_u)
351
352             if not pathinfo.exists:
353                 # FIXME merge this with the 'isfile' case.
354                 self._log("notified object %s disappeared (this is normal)" % quote_filepath(fp))
355                 self._count('objects_disappeared')
356
357                 db_entry = self._db.get_db_entry(relpath_u)
358                 if db_entry is None:
359                     return None
360
361                 last_downloaded_timestamp = now  # is this correct?
362
363                 if is_new_file(pathinfo, db_entry):
364                     new_version = db_entry.version + 1
365                 else:
366                     self._log("Not uploading %r" % (relpath_u,))
367                     self._count('objects_not_uploaded')
368                     return
369
370                 metadata = { 'version': new_version,
371                              'deleted': True,
372                              'last_downloaded_timestamp': last_downloaded_timestamp }
373                 if db_entry.last_downloaded_uri is not None:
374                     metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
375
376                 empty_uploadable = Data("", self._client.convergence)
377                 d2 = self._upload_dirnode.add_file(encoded_path_u, empty_uploadable,
378                                                    metadata=metadata, overwrite=True)
379
380                 def _add_db_entry(filenode):
381                     filecap = filenode.get_uri()
382                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
383                     self._db.did_upload_version(relpath_u, new_version, filecap,
384                                                 last_downloaded_uri, last_downloaded_timestamp,
385                                                 pathinfo)
386                     self._count('files_uploaded')
387                 d2.addCallback(_add_db_entry)
388                 return d2
389             elif pathinfo.islink:
390                 self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
391                 return None
392             elif pathinfo.isdir:
393                 print "ISDIR "
394                 if not getattr(self._notifier, 'recursive_includes_new_subdirectories', False):
395                     self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
396
397                 uploadable = Data("", self._client.convergence)
398                 encoded_path_u += magicpath.path2magic(u"/")
399                 self._log("encoded_path_u =  %r" % (encoded_path_u,))
400                 upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True)
401                 def _dir_succeeded(ign):
402                     self._log("created subdirectory %r" % (relpath_u,))
403                     self._count('directories_created')
404                 def _dir_failed(f):
405                     self._log("failed to create subdirectory %r" % (relpath_u,))
406                     return f
407                 upload_d.addCallbacks(_dir_succeeded, _dir_failed)
408                 upload_d.addCallback(lambda ign: self._scan(relpath_u))
409                 upload_d.addCallback(lambda ign: self._extend_queue_and_keep_going(self._pending))
410                 return upload_d
411             elif pathinfo.isfile:
412                 db_entry = self._db.get_db_entry(relpath_u)
413
414                 last_downloaded_timestamp = now
415
416                 if db_entry is None:
417                     new_version = 0
418                 elif is_new_file(pathinfo, db_entry):
419                     new_version = db_entry.version + 1
420                 else:
421                     self._log("Not uploading %r" % (relpath_u,))
422                     self._count('objects_not_uploaded')
423                     return None
424
425                 metadata = { 'version': new_version,
426                              'last_downloaded_timestamp': last_downloaded_timestamp }
427                 if db_entry is not None and db_entry.last_downloaded_uri is not None:
428                     metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
429
430                 uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
431                 d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable,
432                                                    metadata=metadata, overwrite=True)
433
434                 def _add_db_entry(filenode):
435                     filecap = filenode.get_uri()
436                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
437                     self._db.did_upload_version(relpath_u, new_version, filecap,
438                                                 last_downloaded_uri, last_downloaded_timestamp,
439                                                 pathinfo)
440                     self._count('files_uploaded')
441                 d2.addCallback(_add_db_entry)
442                 return d2
443             else:
444                 self.warn("WARNING: cannot process special file %s" % quote_filepath(fp))
445                 return None
446
447         d.addCallback(_maybe_upload)
448
449         def _succeeded(res):
450             self._count('objects_succeeded')
451             return res
452         def _failed(f):
453             self._count('objects_failed')
454             self._log("%s while processing %r" % (f, relpath_u))
455             return f
456         d.addCallbacks(_succeeded, _failed)
457         return d
458
459     def _get_metadata(self, encoded_path_u):
460         try:
461             d = self._upload_dirnode.get_metadata_for(encoded_path_u)
462         except KeyError:
463             return Failure()
464         return d
465
466     def _get_filenode(self, encoded_path_u):
467         try:
468             d = self._upload_dirnode.get(encoded_path_u)
469         except KeyError:
470             return Failure()
471         return d
472
473
474 class WriteFileMixin(object):
475     FUDGE_SECONDS = 10.0
476
477     def _get_conflicted_filename(self, abspath_u):
478         return abspath_u + u".conflict"
479
480     def _write_downloaded_file(self, abspath_u, file_contents, is_conflict=False, now=None):
481         self._log("_write_downloaded_file(%r, <%d bytes>, is_conflict=%r, now=%r)"
482                   % (abspath_u, len(file_contents), is_conflict, now))
483
484         # 1. Write a temporary file, say .foo.tmp.
485         # 2. is_conflict determines whether this is an overwrite or a conflict.
486         # 3. Set the mtime of the replacement file to be T seconds before the
487         #    current local time.
488         # 4. Perform a file replacement with backup filename foo.backup,
489         #    replaced file foo, and replacement file .foo.tmp. If any step of
490         #    this operation fails, reclassify as a conflict and stop.
491         #
492         # Returns the path of the destination file.
493
494         precondition_abspath(abspath_u)
495         replacement_path_u = abspath_u + u".tmp"  # FIXME more unique
496         backup_path_u = abspath_u + u".backup"
497         if now is None:
498             now = time.time()
499
500         # ensure parent directory exists
501         head, tail = os.path.split(abspath_u)
502
503         old_mask = os.umask(self._umask)
504         try:
505             fileutil.make_dirs(head, (~ self._umask) & 0777)
506             fileutil.write(replacement_path_u, file_contents)
507         finally:
508             os.umask(old_mask)
509
510         os.utime(replacement_path_u, (now, now - self.FUDGE_SECONDS))
511         if is_conflict:
512             print "0x00 ------------ <><> is conflict; calling _rename_conflicted_file... %r %r" % (abspath_u, replacement_path_u)
513             return self._rename_conflicted_file(abspath_u, replacement_path_u)
514         else:
515             try:
516                 fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u)
517                 return abspath_u
518             except fileutil.ConflictError:
519                 return self._rename_conflicted_file(abspath_u, replacement_path_u)
520
521     def _rename_conflicted_file(self, abspath_u, replacement_path_u):
522         self._log("_rename_conflicted_file(%r, %r)" % (abspath_u, replacement_path_u))
523
524         conflict_path_u = self._get_conflicted_filename(abspath_u)
525         print "XXX rename %r %r" % (replacement_path_u, conflict_path_u)
526         if os.path.isfile(replacement_path_u):
527             print "%r exists" % (replacement_path_u,)
528         if os.path.isfile(conflict_path_u):
529             print "%r exists" % (conflict_path_u,)
530
531         fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
532         return conflict_path_u
533
534     def _rename_deleted_file(self, abspath_u):
535         self._log('renaming deleted file to backup: %s' % (abspath_u,))
536         try:
537             fileutil.rename_no_overwrite(abspath_u, abspath_u + u'.backup')
538         except OSError:
539             self._log("Already gone: '%s'" % (abspath_u,))
540         return abspath_u
541
542
543 class Downloader(QueueMixin, WriteFileMixin):
544     REMOTE_SCAN_INTERVAL = 3  # facilitates tests
545
546     def __init__(self, client, local_path_u, db, collective_dirnode,
547                  upload_readonly_dircap, clock, is_upload_pending, umask):
548         QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
549
550         if not IDirectoryNode.providedBy(collective_dirnode):
551             raise AssertionError("The URI in '%s' does not refer to a directory."
552                                  % os.path.join('private', 'collective_dircap'))
553         if collective_dirnode.is_unknown() or not collective_dirnode.is_readonly():
554             raise AssertionError("The URI in '%s' is not a readonly cap to a directory."
555                                  % os.path.join('private', 'collective_dircap'))
556
557         self._collective_dirnode = collective_dirnode
558         self._upload_readonly_dircap = upload_readonly_dircap
559         self._is_upload_pending = is_upload_pending
560         self._umask = umask
561
562     def start_downloading(self):
563         self._log("start_downloading")
564         files = self._db.get_all_relpaths()
565         self._log("all files %s" % files)
566
567         d = self._scan_remote_collective(scan_self=True)
568         d.addBoth(self._logcb, "after _scan_remote_collective 0")
569         self._turn_deque()
570         return d
571
572     def stop(self):
573         self._stopped = True
574         d = defer.succeed(None)
575         d.addCallback(lambda ign: self._lazy_tail)
576         return d
577
578     def _should_download(self, relpath_u, remote_version):
579         """
580         _should_download returns a bool indicating whether or not a remote object should be downloaded.
581         We check the remote metadata version against our magic-folder db version number;
582         latest version wins.
583         """
584         self._log("_should_download(%r, %r)" % (relpath_u, remote_version))
585         if magicpath.should_ignore_file(relpath_u):
586             self._log("nope")
587             return False
588         self._log("yep")
589         db_entry = self._db.get_db_entry(relpath_u)
590         if db_entry is None:
591             return True
592         self._log("version %r" % (db_entry.version,))
593         return (db_entry.version < remote_version)
594
595     def _get_local_latest(self, relpath_u):
596         """
597         _get_local_latest takes a unicode path string checks to see if this file object
598         exists in our magic-folder db; if not then return None
599         else check for an entry in our magic-folder db and return the version number.
600         """
601         if not self._get_filepath(relpath_u).exists():
602             return None
603         db_entry = self._db.get_db_entry(relpath_u)
604         return None if db_entry is None else db_entry.version
605
606     def _get_collective_latest_file(self, filename):
607         """
608         _get_collective_latest_file takes a file path pointing to a file managed by
609         magic-folder and returns a deferred that fires with the two tuple containing a
610         file node and metadata for the latest version of the file located in the
611         magic-folder collective directory.
612         """
613         collective_dirmap_d = self._collective_dirnode.list()
614         def scan_collective(result):
615             list_of_deferreds = []
616             for dir_name in result.keys():
617                 # XXX make sure it's a directory
618                 d = defer.succeed(None)
619                 d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
620                 list_of_deferreds.append(d)
621             deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
622             return deferList
623         collective_dirmap_d.addCallback(scan_collective)
624         def highest_version(deferredList):
625             max_version = 0
626             metadata = None
627             node = None
628             for success, result in deferredList:
629                 if success:
630                     if result[1]['version'] > max_version:
631                         node, metadata = result
632                         max_version = result[1]['version']
633             return node, metadata
634         collective_dirmap_d.addCallback(highest_version)
635         return collective_dirmap_d
636
637     def _scan_remote_dmd(self, nickname, dirnode, scan_batch):
638         self._log("_scan_remote_dmd nickname %r" % (nickname,))
639         d = dirnode.list()
640         def scan_listing(listing_map):
641             for encoded_relpath_u in listing_map.keys():
642                 relpath_u = magicpath.magic2path(encoded_relpath_u)
643                 self._log("found %r" % (relpath_u,))
644
645                 file_node, metadata = listing_map[encoded_relpath_u]
646                 local_version = self._get_local_latest(relpath_u)
647                 remote_version = metadata.get('version', None)
648                 self._log("%r has local version %r, remote version %r" % (relpath_u, local_version, remote_version))
649
650                 if local_version is None or remote_version is None or local_version < remote_version:
651                     self._log("%r added to download queue" % (relpath_u,))
652                     if scan_batch.has_key(relpath_u):
653                         scan_batch[relpath_u] += [(file_node, metadata)]
654                     else:
655                         scan_batch[relpath_u] = [(file_node, metadata)]
656
657         d.addCallback(scan_listing)
658         d.addBoth(self._logcb, "end of _scan_remote_dmd")
659         return d
660
661     def _scan_remote_collective(self, scan_self=False):
662         self._log("_scan_remote_collective")
663         scan_batch = {}  # path -> [(filenode, metadata)]
664
665         d = self._collective_dirnode.list()
666         def scan_collective(dirmap):
667             d2 = defer.succeed(None)
668             for dir_name in dirmap:
669                 (dirnode, metadata) = dirmap[dir_name]
670                 if scan_self or dirnode.get_readonly_uri() != self._upload_readonly_dircap:
671                     d2.addCallback(lambda ign, dir_name=dir_name, dirnode=dirnode:
672                                    self._scan_remote_dmd(dir_name, dirnode, scan_batch))
673                     def _err(f, dir_name=dir_name):
674                         self._log("failed to scan DMD for client %r: %s" % (dir_name, f))
675                         # XXX what should we do to make this failure more visible to users?
676                     d2.addErrback(_err)
677
678             return d2
679         d.addCallback(scan_collective)
680
681         def _filter_batch_to_deque(ign):
682             self._log("deque = %r, scan_batch = %r" % (self._deque, scan_batch))
683             for relpath_u in scan_batch.keys():
684                 file_node, metadata = max(scan_batch[relpath_u], key=lambda x: x[1]['version'])
685
686                 if self._should_download(relpath_u, metadata['version']):
687                     self._deque.append( (relpath_u, file_node, metadata) )
688                 else:
689                     self._log("Excluding %r" % (relpath_u,))
690                     self._call_hook(None, 'processed')
691
692             self._log("deque after = %r" % (self._deque,))
693         d.addCallback(_filter_batch_to_deque)
694         return d
695
696     def _when_queue_is_empty(self):
697         d = task.deferLater(self._clock, self.REMOTE_SCAN_INTERVAL, self._scan_remote_collective)
698         d.addBoth(self._logcb, "after _scan_remote_collective 1")
699         d.addCallback(lambda ign: self._turn_deque())
700         return d
701
702     def _process(self, item, now=None):
703         # Downloader
704         self._log("_process(%r)" % (item,))
705         if now is None:
706             now = time.time()
707         (relpath_u, file_node, metadata) = item
708         fp = self._get_filepath(relpath_u)
709         abspath_u = unicode_from_filepath(fp)
710         conflict_path_u = self._get_conflicted_filename(abspath_u)
711
712         d = defer.succeed(None)
713
714         def do_update_db(written_abspath_u):
715             filecap = file_node.get_uri()
716             last_uploaded_uri = metadata.get('last_uploaded_uri', None)
717             last_downloaded_uri = filecap
718             last_downloaded_timestamp = now
719             written_pathinfo = get_pathinfo(written_abspath_u)
720
721             if not written_pathinfo.exists and not metadata.get('deleted', False):
722                 raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u))
723
724             self._db.did_upload_version(relpath_u, metadata['version'], last_uploaded_uri,
725                                         last_downloaded_uri, last_downloaded_timestamp, written_pathinfo)
726             self._count('objects_downloaded')
727         def failed(f):
728             self._log("download failed: %s" % (str(f),))
729             self._count('objects_failed')
730             return f
731
732         if os.path.isfile(conflict_path_u):
733             def fail(res):
734                 raise ConflictError("download failed: already conflicted: %r" % (relpath_u,))
735             d.addCallback(fail)
736         else:
737             is_conflict = False
738             db_entry = self._db.get_db_entry(relpath_u)
739             dmd_last_downloaded_uri = metadata.get('last_downloaded_uri', None)
740             dmd_last_uploaded_uri = metadata.get('last_uploaded_uri', None)
741             if db_entry:
742                 if dmd_last_downloaded_uri is not None and db_entry.last_downloaded_uri is not None:
743                     if dmd_last_downloaded_uri != db_entry.last_downloaded_uri:
744                         is_conflict = True
745                         self._count('objects_conflicted')
746                 elif dmd_last_uploaded_uri is not None and dmd_last_uploaded_uri != db_entry.last_uploaded_uri:
747                     is_conflict = True
748                     self._count('objects_conflicted')
749                 elif self._is_upload_pending(relpath_u):
750                     is_conflict = True
751                     self._count('objects_conflicted')
752
753             if relpath_u.endswith(u"/"):
754                 if metadata.get('deleted', False):
755                     self._log("rmdir(%r) ignored" % (abspath_u,))
756                 else:
757                     self._log("mkdir(%r)" % (abspath_u,))
758                     d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
759                     d.addCallback(lambda ign: abspath_u)
760             else:
761                 if metadata.get('deleted', False):
762                     d.addCallback(lambda ign: self._rename_deleted_file(abspath_u))
763                 else:
764                     d.addCallback(lambda ign: file_node.download_best_version())
765                     d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents,
766                                                                                is_conflict=is_conflict))
767
768         d.addCallbacks(do_update_db, failed)
769
770         def trap_conflicts(f):
771             f.trap(ConflictError)
772             return None
773         d.addErrback(trap_conflicts)
774         return d