]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/magic_folder.py
Fix uploader's _process to extend queue after scan and count stats properly
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / magic_folder.py
1
2 import sys, os
3 import os.path
4 from collections import deque
5 import time
6
7 from twisted.internet import defer, reactor, task
8 from twisted.python.failure import Failure
9 from twisted.python import runtime
10 from twisted.application import service
11
12 from allmydata.util import fileutil
13 from allmydata.interfaces import IDirectoryNode
14 from allmydata.util import log
15 from allmydata.util.fileutil import precondition_abspath, get_pathinfo, ConflictError
16 from allmydata.util.assertutil import precondition, _assert
17 from allmydata.util.deferredutil import HookMixin
18 from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
19      extend_filepath, unicode_from_filepath, unicode_segments_from, \
20      quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
21 from allmydata.immutable.upload import FileName, Data
22 from allmydata import magicfolderdb, magicpath
23
24
25 IN_EXCL_UNLINK = 0x04000000L
26
27 def get_inotify_module():
28     try:
29         if sys.platform == "win32":
30             from allmydata.windows import inotify
31         elif runtime.platform.supportsINotify():
32             from twisted.internet import inotify
33         else:
34             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
35                                       "This currently requires Linux or Windows.")
36         return inotify
37     except (ImportError, AttributeError) as e:
38         log.msg(e)
39         if sys.platform == "win32":
40             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
41                                       "Windows support requires at least Vista, and has only been tested on Windows 7.")
42         raise
43
44
45 def is_new_file(pathinfo, db_entry):
46     if db_entry is None:
47         return True
48
49     if not pathinfo.exists and db_entry.size is None:
50         return False
51
52     return ((pathinfo.size, pathinfo.ctime, pathinfo.mtime) !=
53             (db_entry.size, db_entry.ctime, db_entry.mtime))
54
55
56 class MagicFolder(service.MultiService):
57     name = 'magic-folder'
58
59     def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile, umask,
60                  pending_delay=1.0, clock=None):
61         precondition_abspath(local_path_u)
62
63         service.MultiService.__init__(self)
64
65         immediate = clock is not None
66         clock = clock or reactor
67         db = magicfolderdb.get_magicfolderdb(dbfile, create_version=(magicfolderdb.SCHEMA_v1, 1))
68         if db is None:
69             return Failure(Exception('ERROR: Unable to load magic folder db.'))
70
71         # for tests
72         self._client = client
73         self._db = db
74
75         upload_dirnode = self._client.create_node_from_uri(upload_dircap)
76         collective_dirnode = self._client.create_node_from_uri(collective_dircap)
77
78         self.uploader = Uploader(client, local_path_u, db, upload_dirnode, pending_delay, clock, immediate)
79         self.downloader = Downloader(client, local_path_u, db, collective_dirnode,
80                                      upload_dirnode.get_readonly_uri(), clock, self.uploader.is_pending, umask)
81
82     def startService(self):
83         # TODO: why is this being called more than once?
84         if self.running:
85             return defer.succeed(None)
86         print "%r.startService" % (self,)
87         service.MultiService.startService(self)
88         return self.uploader.start_monitoring()
89
90     def ready(self):
91         """ready is used to signal us to start
92         processing the upload and download items...
93         """
94         self.uploader.start_uploading()  # synchronous
95         return self.downloader.start_downloading()
96
97     def finish(self):
98         print "finish"
99         d = self.uploader.stop()
100         d2 = self.downloader.stop()
101         d.addCallback(lambda ign: d2)
102         return d
103
104     def remove_service(self):
105         return service.MultiService.disownServiceParent(self)
106
107
108 class QueueMixin(HookMixin):
109     def __init__(self, client, local_path_u, db, name, clock):
110         self._client = client
111         self._local_path_u = local_path_u
112         self._local_filepath = to_filepath(local_path_u)
113         self._db = db
114         self._name = name
115         self._clock = clock
116         self._hooks = {'processed': None, 'started': None}
117         self.started_d = self.set_hook('started')
118
119         if not self._local_filepath.exists():
120             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
121                                  "but there is no directory at that location."
122                                  % quote_local_unicode_path(self._local_path_u))
123         if not self._local_filepath.isdir():
124             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
125                                  "but the thing at that location is not a directory."
126                                  % quote_local_unicode_path(self._local_path_u))
127
128         self._deque = deque()
129         self._lazy_tail = defer.succeed(None)
130         self._stopped = False
131         self._turn_delay = 0
132
133     def _get_filepath(self, relpath_u):
134         self._log("_get_filepath(%r)" % (relpath_u,))
135         return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
136
137     def _get_relpath(self, filepath):
138         self._log("_get_relpath(%r)" % (filepath,))
139         segments = unicode_segments_from(filepath, self._local_filepath)
140         self._log("segments = %r" % (segments,))
141         return u"/".join(segments)
142
143     def _count(self, counter_name, delta=1):
144         ctr = 'magic_folder.%s.%s' % (self._name, counter_name)
145         self._log("%s += %r" % (counter_name, delta))
146         self._client.stats_provider.count(ctr, delta)
147
148     def _logcb(self, res, msg):
149         self._log("%s: %r" % (msg, res))
150         return res
151
152     def _log(self, msg):
153         s = "Magic Folder %s %s: %s" % (quote_output(self._client.nickname), self._name, msg)
154         self._client.log(s)
155         print s
156         #open("events", "ab+").write(msg)
157
158     def _turn_deque(self):
159         self._log("_turn_deque")
160         if self._stopped:
161             self._log("stopped")
162             return
163         try:
164             item = self._deque.pop()
165             self._log("popped %r" % (item,))
166             self._count('objects_queued', -1)
167         except IndexError:
168             self._log("deque is now empty")
169             self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
170         else:
171             self._lazy_tail.addCallback(lambda ign: self._process(item))
172             self._lazy_tail.addBoth(self._call_hook, 'processed')
173             self._lazy_tail.addErrback(log.err)
174             self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
175
176
177 class Uploader(QueueMixin):
178     def __init__(self, client, local_path_u, db, upload_dirnode, pending_delay, clock,
179                  immediate=False):
180         QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
181
182         self.is_ready = False
183         self._immediate = immediate
184
185         if not IDirectoryNode.providedBy(upload_dirnode):
186             raise AssertionError("The URI in '%s' does not refer to a directory."
187                                  % os.path.join('private', 'magic_folder_dircap'))
188         if upload_dirnode.is_unknown() or upload_dirnode.is_readonly():
189             raise AssertionError("The URI in '%s' is not a writecap to a directory."
190                                  % os.path.join('private', 'magic_folder_dircap'))
191
192         self._upload_dirnode = upload_dirnode
193         self._inotify = get_inotify_module()
194         self._notifier = self._inotify.INotify()
195         self._pending = set()  # of unicode relpaths
196
197         if hasattr(self._notifier, 'set_pending_delay'):
198             self._notifier.set_pending_delay(pending_delay)
199
200         # TODO: what about IN_MOVE_SELF and IN_UNMOUNT?
201         #
202         self.mask = ( self._inotify.IN_CREATE
203                     | self._inotify.IN_CLOSE_WRITE
204                     | self._inotify.IN_MOVED_TO
205                     | self._inotify.IN_MOVED_FROM
206                     | self._inotify.IN_DELETE
207                     | self._inotify.IN_ONLYDIR
208                     | IN_EXCL_UNLINK
209                     )
210         self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
211                              recursive=True)
212
213     def start_monitoring(self):
214         self._log("start_monitoring")
215         d = defer.succeed(None)
216         d.addCallback(lambda ign: self._notifier.startReading())
217         d.addCallback(lambda ign: self._count('dirs_monitored'))
218         d.addBoth(self._call_hook, 'started')
219         return d
220
221     def stop(self):
222         self._log("stop")
223         self._notifier.stopReading()
224         self._count('dirs_monitored', -1)
225         if hasattr(self._notifier, 'wait_until_stopped'):
226             d = self._notifier.wait_until_stopped()
227         else:
228             d = defer.succeed(None)
229         d.addCallback(lambda ign: self._lazy_tail)
230         return d
231
232     def start_uploading(self):
233         self._log("start_uploading")
234         self.is_ready = True
235
236         all_relpaths = self._db.get_all_relpaths()
237         self._log("all relpaths: %r" % (all_relpaths,))
238
239         for relpath_u in all_relpaths:
240             self._add_pending(relpath_u)
241
242         self._full_scan()
243         self._extend_queue_and_keep_going(self._pending)
244
245     def _extend_queue_and_keep_going(self, relpaths_u):
246         self._log("queueing %r" % (relpaths_u,))
247         self._deque.extend(relpaths_u)
248         self._count('objects_queued', len(relpaths_u))
249
250         if self.is_ready:
251             if self._immediate:  # for tests
252                 self._turn_deque()
253             else:
254                 self._clock.callLater(0, self._turn_deque)
255
256     def _full_scan(self):
257         print "FULL SCAN"
258         self._log("_pending %r" % (self._pending))
259         self._scan(u"")
260
261     def _add_pending(self, relpath_u):
262         if not magicpath.should_ignore_file(relpath_u):
263             self._pending.add(relpath_u)
264
265     def _scan(self, reldir_u):
266         # Scan a directory by (synchronously) adding the paths of all its children to self._pending.
267         # Note that this doesn't add them to the deque -- that will
268
269         self._log("scan %r" % (reldir_u,))
270         fp = self._get_filepath(reldir_u)
271         try:
272             children = listdir_filepath(fp)
273         except EnvironmentError:
274             raise Exception("WARNING: magic folder: permission denied on directory %s"
275                             % quote_filepath(fp))
276         except FilenameEncodingError:
277             raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
278                             % quote_filepath(fp))
279
280         for child in children:
281             _assert(isinstance(child, unicode), child=child)
282             self._add_pending("%s/%s" % (reldir_u, child) if reldir_u != u"" else child)
283
284     def is_pending(self, relpath_u):
285         return relpath_u in self._pending
286
287     def _notify(self, opaque, path, events_mask):
288         self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
289         relpath_u = self._get_relpath(path)
290
291         # We filter out IN_CREATE events not associated with a directory.
292         # Acting on IN_CREATE for files could cause us to read and upload
293         # a possibly-incomplete file before the application has closed it.
294         # There should always be an IN_CLOSE_WRITE after an IN_CREATE, I think.
295         # It isn't possible to avoid watching for IN_CREATE at all, because
296         # it is the only event notified for a directory creation.
297
298         if ((events_mask & self._inotify.IN_CREATE) != 0 and
299             (events_mask & self._inotify.IN_ISDIR) == 0):
300             self._log("ignoring event for %r (creation of non-directory)\n" % (relpath_u,))
301             return
302         if relpath_u in self._pending:
303             self._log("not queueing %r because it is already pending" % (relpath_u,))
304             return
305         if magicpath.should_ignore_file(relpath_u):
306             self._log("ignoring event for %r (ignorable path)" % (relpath_u,))
307             return
308
309         self._pending.add(relpath_u)
310         self._extend_queue_and_keep_going([relpath_u])
311
312     def _when_queue_is_empty(self):
313         return defer.succeed(None)
314
315     def _process(self, relpath_u):
316         # Uploader
317         self._log("_process(%r)" % (relpath_u,))
318         if relpath_u is None:
319             return
320         precondition(isinstance(relpath_u, unicode), relpath_u)
321         precondition(not relpath_u.endswith(u'/'), relpath_u)
322
323         d = defer.succeed(None)
324
325         def _maybe_upload(val, now=None):
326             if now is None:
327                 now = time.time()
328             fp = self._get_filepath(relpath_u)
329             pathinfo = get_pathinfo(unicode_from_filepath(fp))
330
331             self._log("about to remove %r from pending set %r" %
332                       (relpath_u, self._pending))
333             self._pending.remove(relpath_u)
334             encoded_path_u = magicpath.path2magic(relpath_u)
335
336             if not pathinfo.exists:
337                 # FIXME merge this with the 'isfile' case.
338                 self._log("notified object %s disappeared (this is normal)" % quote_filepath(fp))
339                 self._count('objects_disappeared')
340
341                 db_entry = self._db.get_db_entry(relpath_u)
342                 if db_entry is None:
343                     return None
344
345                 last_downloaded_timestamp = now  # is this correct?
346
347                 if is_new_file(pathinfo, db_entry):
348                     new_version = db_entry.version + 1
349                 else:
350                     self._log("Not uploading %r" % (relpath_u,))
351                     self._count('objects_not_uploaded')
352                     return
353
354                 metadata = { 'version': new_version,
355                              'deleted': True,
356                              'last_downloaded_timestamp': last_downloaded_timestamp }
357                 if db_entry.last_downloaded_uri is not None:
358                     metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
359
360                 empty_uploadable = Data("", self._client.convergence)
361                 d2 = self._upload_dirnode.add_file(encoded_path_u, empty_uploadable,
362                                                    metadata=metadata, overwrite=True)
363
364                 def _add_db_entry(filenode):
365                     filecap = filenode.get_uri()
366                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
367                     self._db.did_upload_version(relpath_u, new_version, filecap,
368                                                 last_downloaded_uri, last_downloaded_timestamp,
369                                                 pathinfo)
370                     self._count('files_uploaded')
371                 d2.addCallback(_add_db_entry)
372                 return d2
373             elif pathinfo.islink:
374                 self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
375                 return None
376             elif pathinfo.isdir:
377                 if not getattr(self._notifier, 'recursive_includes_new_subdirectories', False):
378                     self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
379
380                 uploadable = Data("", self._client.convergence)
381                 encoded_path_u += magicpath.path2magic(u"/")
382                 self._log("encoded_path_u =  %r" % (encoded_path_u,))
383                 upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True)
384                 def _dir_succeeded(ign):
385                     self._log("created subdirectory %r" % (relpath_u,))
386                     self._count('directories_created')
387                 def _dir_failed(f):
388                     self._log("failed to create subdirectory %r" % (relpath_u,))
389                     return f
390                 upload_d.addCallbacks(_dir_succeeded, _dir_failed)
391                 upload_d.addCallback(lambda ign: self._scan(relpath_u))
392                 upload_d.addCallback(lambda ign: self._extend_queue_and_keep_going(self._pending))
393                 return upload_d
394             elif pathinfo.isfile:
395                 db_entry = self._db.get_db_entry(relpath_u)
396
397                 last_downloaded_timestamp = now
398
399                 if db_entry is None:
400                     new_version = 0
401                 elif is_new_file(pathinfo, db_entry):
402                     new_version = db_entry.version + 1
403                 else:
404                     self._log("Not uploading %r" % (relpath_u,))
405                     self._count('objects_not_uploaded')
406                     return None
407
408                 metadata = { 'version': new_version,
409                              'last_downloaded_timestamp': last_downloaded_timestamp }
410                 if db_entry is not None and db_entry.last_downloaded_uri is not None:
411                     metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
412
413                 uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
414                 d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable,
415                                                    metadata=metadata, overwrite=True)
416
417                 def _add_db_entry(filenode):
418                     filecap = filenode.get_uri()
419                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
420                     self._db.did_upload_version(relpath_u, new_version, filecap,
421                                                 last_downloaded_uri, last_downloaded_timestamp,
422                                                 pathinfo)
423                     self._count('files_uploaded')
424                 d2.addCallback(_add_db_entry)
425                 return d2
426             else:
427                 self.warn("WARNING: cannot process special file %s" % quote_filepath(fp))
428                 return None
429
430         d.addCallback(_maybe_upload)
431
432         def _succeeded(res):
433             self._count('objects_succeeded')
434             return res
435         def _failed(f):
436             self._count('objects_failed')
437             self._log("%s while processing %r" % (f, relpath_u))
438             return f
439         d.addCallbacks(_succeeded, _failed)
440         return d
441
442     def _get_metadata(self, encoded_path_u):
443         try:
444             d = self._upload_dirnode.get_metadata_for(encoded_path_u)
445         except KeyError:
446             return Failure()
447         return d
448
449     def _get_filenode(self, encoded_path_u):
450         try:
451             d = self._upload_dirnode.get(encoded_path_u)
452         except KeyError:
453             return Failure()
454         return d
455
456
457 class WriteFileMixin(object):
458     FUDGE_SECONDS = 10.0
459
460     def _get_conflicted_filename(self, abspath_u):
461         return abspath_u + u".conflict"
462
463     def _write_downloaded_file(self, abspath_u, file_contents, is_conflict=False, now=None):
464         self._log("_write_downloaded_file(%r, <%d bytes>, is_conflict=%r, now=%r)"
465                   % (abspath_u, len(file_contents), is_conflict, now))
466
467         # 1. Write a temporary file, say .foo.tmp.
468         # 2. is_conflict determines whether this is an overwrite or a conflict.
469         # 3. Set the mtime of the replacement file to be T seconds before the
470         #    current local time.
471         # 4. Perform a file replacement with backup filename foo.backup,
472         #    replaced file foo, and replacement file .foo.tmp. If any step of
473         #    this operation fails, reclassify as a conflict and stop.
474         #
475         # Returns the path of the destination file.
476
477         precondition_abspath(abspath_u)
478         replacement_path_u = abspath_u + u".tmp"  # FIXME more unique
479         backup_path_u = abspath_u + u".backup"
480         if now is None:
481             now = time.time()
482
483         # ensure parent directory exists
484         head, tail = os.path.split(abspath_u)
485
486         old_mask = os.umask(self._umask)
487         try:
488             fileutil.make_dirs(head, (~ self._umask) & 0777)
489             fileutil.write(replacement_path_u, file_contents)
490         finally:
491             os.umask(old_mask)
492
493         os.utime(replacement_path_u, (now, now - self.FUDGE_SECONDS))
494         if is_conflict:
495             print "0x00 ------------ <><> is conflict; calling _rename_conflicted_file... %r %r" % (abspath_u, replacement_path_u)
496             return self._rename_conflicted_file(abspath_u, replacement_path_u)
497         else:
498             try:
499                 fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u)
500                 return abspath_u
501             except fileutil.ConflictError:
502                 return self._rename_conflicted_file(abspath_u, replacement_path_u)
503
504     def _rename_conflicted_file(self, abspath_u, replacement_path_u):
505         self._log("_rename_conflicted_file(%r, %r)" % (abspath_u, replacement_path_u))
506
507         conflict_path_u = self._get_conflicted_filename(abspath_u)
508         print "XXX rename %r %r" % (replacement_path_u, conflict_path_u)
509         if os.path.isfile(replacement_path_u):
510             print "%r exists" % (replacement_path_u,)
511         if os.path.isfile(conflict_path_u):
512             print "%r exists" % (conflict_path_u,)
513
514         fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
515         return conflict_path_u
516
517     def _rename_deleted_file(self, abspath_u):
518         self._log('renaming deleted file to backup: %s' % (abspath_u,))
519         try:
520             fileutil.rename_no_overwrite(abspath_u, abspath_u + u'.backup')
521         except OSError:
522             self._log("Already gone: '%s'" % (abspath_u,))
523         return abspath_u
524
525
526 class Downloader(QueueMixin, WriteFileMixin):
527     REMOTE_SCAN_INTERVAL = 3  # facilitates tests
528
529     def __init__(self, client, local_path_u, db, collective_dirnode,
530                  upload_readonly_dircap, clock, is_upload_pending, umask):
531         QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
532
533         if not IDirectoryNode.providedBy(collective_dirnode):
534             raise AssertionError("The URI in '%s' does not refer to a directory."
535                                  % os.path.join('private', 'collective_dircap'))
536         if collective_dirnode.is_unknown() or not collective_dirnode.is_readonly():
537             raise AssertionError("The URI in '%s' is not a readonly cap to a directory."
538                                  % os.path.join('private', 'collective_dircap'))
539
540         self._collective_dirnode = collective_dirnode
541         self._upload_readonly_dircap = upload_readonly_dircap
542         self._is_upload_pending = is_upload_pending
543         self._umask = umask
544
545     def start_downloading(self):
546         self._log("start_downloading")
547         files = self._db.get_all_relpaths()
548         self._log("all files %s" % files)
549
550         d = self._scan_remote_collective(scan_self=True)
551         d.addBoth(self._logcb, "after _scan_remote_collective 0")
552         self._turn_deque()
553         return d
554
555     def stop(self):
556         self._stopped = True
557         d = defer.succeed(None)
558         d.addCallback(lambda ign: self._lazy_tail)
559         return d
560
561     def _should_download(self, relpath_u, remote_version):
562         """
563         _should_download returns a bool indicating whether or not a remote object should be downloaded.
564         We check the remote metadata version against our magic-folder db version number;
565         latest version wins.
566         """
567         self._log("_should_download(%r, %r)" % (relpath_u, remote_version))
568         if magicpath.should_ignore_file(relpath_u):
569             self._log("nope")
570             return False
571         self._log("yep")
572         db_entry = self._db.get_db_entry(relpath_u)
573         if db_entry is None:
574             return True
575         self._log("version %r" % (db_entry.version,))
576         return (db_entry.version < remote_version)
577
578     def _get_local_latest(self, relpath_u):
579         """
580         _get_local_latest takes a unicode path string checks to see if this file object
581         exists in our magic-folder db; if not then return None
582         else check for an entry in our magic-folder db and return the version number.
583         """
584         if not self._get_filepath(relpath_u).exists():
585             return None
586         db_entry = self._db.get_db_entry(relpath_u)
587         return None if db_entry is None else db_entry.version
588
589     def _get_collective_latest_file(self, filename):
590         """
591         _get_collective_latest_file takes a file path pointing to a file managed by
592         magic-folder and returns a deferred that fires with the two tuple containing a
593         file node and metadata for the latest version of the file located in the
594         magic-folder collective directory.
595         """
596         collective_dirmap_d = self._collective_dirnode.list()
597         def scan_collective(result):
598             list_of_deferreds = []
599             for dir_name in result.keys():
600                 # XXX make sure it's a directory
601                 d = defer.succeed(None)
602                 d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
603                 list_of_deferreds.append(d)
604             deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
605             return deferList
606         collective_dirmap_d.addCallback(scan_collective)
607         def highest_version(deferredList):
608             max_version = 0
609             metadata = None
610             node = None
611             for success, result in deferredList:
612                 if success:
613                     if result[1]['version'] > max_version:
614                         node, metadata = result
615                         max_version = result[1]['version']
616             return node, metadata
617         collective_dirmap_d.addCallback(highest_version)
618         return collective_dirmap_d
619
620     def _scan_remote_dmd(self, nickname, dirnode, scan_batch):
621         self._log("_scan_remote_dmd nickname %r" % (nickname,))
622         d = dirnode.list()
623         def scan_listing(listing_map):
624             for encoded_relpath_u in listing_map.keys():
625                 relpath_u = magicpath.magic2path(encoded_relpath_u)
626                 self._log("found %r" % (relpath_u,))
627
628                 file_node, metadata = listing_map[encoded_relpath_u]
629                 local_version = self._get_local_latest(relpath_u)
630                 remote_version = metadata.get('version', None)
631                 self._log("%r has local version %r, remote version %r" % (relpath_u, local_version, remote_version))
632
633                 if local_version is None or remote_version is None or local_version < remote_version:
634                     self._log("%r added to download queue" % (relpath_u,))
635                     if scan_batch.has_key(relpath_u):
636                         scan_batch[relpath_u] += [(file_node, metadata)]
637                     else:
638                         scan_batch[relpath_u] = [(file_node, metadata)]
639
640         d.addCallback(scan_listing)
641         d.addBoth(self._logcb, "end of _scan_remote_dmd")
642         return d
643
644     def _scan_remote_collective(self, scan_self=False):
645         self._log("_scan_remote_collective")
646         scan_batch = {}  # path -> [(filenode, metadata)]
647
648         d = self._collective_dirnode.list()
649         def scan_collective(dirmap):
650             d2 = defer.succeed(None)
651             for dir_name in dirmap:
652                 (dirnode, metadata) = dirmap[dir_name]
653                 if scan_self or dirnode.get_readonly_uri() != self._upload_readonly_dircap:
654                     d2.addCallback(lambda ign, dir_name=dir_name, dirnode=dirnode:
655                                    self._scan_remote_dmd(dir_name, dirnode, scan_batch))
656                     def _err(f, dir_name=dir_name):
657                         self._log("failed to scan DMD for client %r: %s" % (dir_name, f))
658                         # XXX what should we do to make this failure more visible to users?
659                     d2.addErrback(_err)
660
661             return d2
662         d.addCallback(scan_collective)
663
664         def _filter_batch_to_deque(ign):
665             self._log("deque = %r, scan_batch = %r" % (self._deque, scan_batch))
666             for relpath_u in scan_batch.keys():
667                 file_node, metadata = max(scan_batch[relpath_u], key=lambda x: x[1]['version'])
668
669                 if self._should_download(relpath_u, metadata['version']):
670                     self._deque.append( (relpath_u, file_node, metadata) )
671                 else:
672                     self._log("Excluding %r" % (relpath_u,))
673                     self._call_hook(None, 'processed')
674
675             self._log("deque after = %r" % (self._deque,))
676         d.addCallback(_filter_batch_to_deque)
677         return d
678
679     def _when_queue_is_empty(self):
680         d = task.deferLater(self._clock, self.REMOTE_SCAN_INTERVAL, self._scan_remote_collective)
681         d.addBoth(self._logcb, "after _scan_remote_collective 1")
682         d.addCallback(lambda ign: self._turn_deque())
683         return d
684
685     def _process(self, item, now=None):
686         # Downloader
687         self._log("_process(%r)" % (item,))
688         if now is None:
689             now = time.time()
690         (relpath_u, file_node, metadata) = item
691         fp = self._get_filepath(relpath_u)
692         abspath_u = unicode_from_filepath(fp)
693         conflict_path_u = self._get_conflicted_filename(abspath_u)
694
695         d = defer.succeed(None)
696
697         def do_update_db(written_abspath_u):
698             filecap = file_node.get_uri()
699             last_uploaded_uri = metadata.get('last_uploaded_uri', None)
700             last_downloaded_uri = filecap
701             last_downloaded_timestamp = now
702             written_pathinfo = get_pathinfo(written_abspath_u)
703
704             if not written_pathinfo.exists and not metadata.get('deleted', False):
705                 raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u))
706
707             self._db.did_upload_version(relpath_u, metadata['version'], last_uploaded_uri,
708                                         last_downloaded_uri, last_downloaded_timestamp, written_pathinfo)
709             self._count('objects_downloaded')
710         def failed(f):
711             self._log("download failed: %s" % (str(f),))
712             self._count('objects_failed')
713             return f
714
715         if os.path.isfile(conflict_path_u):
716             def fail(res):
717                 raise ConflictError("download failed: already conflicted: %r" % (relpath_u,))
718             d.addCallback(fail)
719         else:
720             is_conflict = False
721             db_entry = self._db.get_db_entry(relpath_u)
722             dmd_last_downloaded_uri = metadata.get('last_downloaded_uri', None)
723             dmd_last_uploaded_uri = metadata.get('last_uploaded_uri', None)
724             if db_entry:
725                 if dmd_last_downloaded_uri is not None and db_entry.last_downloaded_uri is not None:
726                     if dmd_last_downloaded_uri != db_entry.last_downloaded_uri:
727                         is_conflict = True
728                         self._count('objects_conflicted')
729                 elif dmd_last_uploaded_uri is not None and dmd_last_uploaded_uri != db_entry.last_uploaded_uri:
730                     is_conflict = True
731                     self._count('objects_conflicted')
732                 elif self._is_upload_pending(relpath_u):
733                     is_conflict = True
734                     self._count('objects_conflicted')
735
736             if relpath_u.endswith(u"/"):
737                 if metadata.get('deleted', False):
738                     self._log("rmdir(%r) ignored" % (abspath_u,))
739                 else:
740                     self._log("mkdir(%r)" % (abspath_u,))
741                     d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
742                     d.addCallback(lambda ign: abspath_u)
743             else:
744                 if metadata.get('deleted', False):
745                     d.addCallback(lambda ign: self._rename_deleted_file(abspath_u))
746                 else:
747                     d.addCallback(lambda ign: file_node.download_best_version())
748                     d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents,
749                                                                                is_conflict=is_conflict))
750
751         d.addCallbacks(do_update_db, failed)
752
753         def trap_conflicts(f):
754             f.trap(ConflictError)
755             return None
756         d.addErrback(trap_conflicts)
757         return d