]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/magic_folder.py
WIP: Refactoring to get db fields in one query.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / magic_folder.py
1
2 import sys, os
3 import os.path
4 from collections import deque
5 import time
6
7 from twisted.internet import defer, reactor, task
8 from twisted.python.failure import Failure
9 from twisted.python import runtime
10 from twisted.application import service
11
12 from allmydata.util import fileutil
13 from allmydata.interfaces import IDirectoryNode
14 from allmydata.util import log
15 from allmydata.util.fileutil import precondition_abspath, get_pathinfo, ConflictError
16 from allmydata.util.assertutil import precondition, _assert
17 from allmydata.util.deferredutil import HookMixin
18 from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
19      extend_filepath, unicode_from_filepath, unicode_segments_from, \
20      quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
21 from allmydata.immutable.upload import FileName, Data
22 from allmydata import magicfolderdb, magicpath
23
24
25 IN_EXCL_UNLINK = 0x04000000L
26
27 def get_inotify_module():
28     try:
29         if sys.platform == "win32":
30             from allmydata.windows import inotify
31         elif runtime.platform.supportsINotify():
32             from twisted.internet import inotify
33         else:
34             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
35                                       "This currently requires Linux or Windows.")
36         return inotify
37     except (ImportError, AttributeError) as e:
38         log.msg(e)
39         if sys.platform == "win32":
40             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
41                                       "Windows support requires at least Vista, and has only been tested on Windows 7.")
42         raise
43
44
45 class MagicFolder(service.MultiService):
46     name = 'magic-folder'
47
48     def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile,
49                  pending_delay=1.0, clock=None):
50         precondition_abspath(local_path_u)
51
52         service.MultiService.__init__(self)
53
54         immediate = clock is not None
55         clock = clock or reactor
56         db = magicfolderdb.get_magicfolderdb(dbfile, create_version=(magicfolderdb.SCHEMA_v1, 1))
57         if db is None:
58             return Failure(Exception('ERROR: Unable to load magic folder db.'))
59
60         # for tests
61         self._client = client
62         self._db = db
63
64         upload_dirnode = self._client.create_node_from_uri(upload_dircap)
65         collective_dirnode = self._client.create_node_from_uri(collective_dircap)
66
67         self.uploader = Uploader(client, local_path_u, db, upload_dirnode, pending_delay, clock, immediate)
68         self.downloader = Downloader(client, local_path_u, db, collective_dirnode,
69                                      upload_dirnode.get_readonly_uri(), clock, self.uploader.is_pending)
70
71     def startService(self):
72         # TODO: why is this being called more than once?
73         if self.running:
74             return defer.succeed(None)
75         print "%r.startService" % (self,)
76         service.MultiService.startService(self)
77         return self.uploader.start_monitoring()
78
79     def ready(self):
80         """ready is used to signal us to start
81         processing the upload and download items...
82         """
83         d = self.uploader.start_scanning()
84         d2 = self.downloader.start_scanning()
85         d.addCallback(lambda ign: d2)
86         return d
87
88     def finish(self):
89         print "finish"
90         d = self.uploader.stop()
91         d2 = self.downloader.stop()
92         d.addCallback(lambda ign: d2)
93         return d
94
95     def remove_service(self):
96         return service.MultiService.disownServiceParent(self)
97
98
99 class QueueMixin(HookMixin):
100     def __init__(self, client, local_path_u, db, name, clock):
101         self._client = client
102         self._local_path_u = local_path_u
103         self._local_filepath = to_filepath(local_path_u)
104         self._db = db
105         self._name = name
106         self._clock = clock
107         self._hooks = {'processed': None, 'started': None}
108         self.started_d = self.set_hook('started')
109
110         if not self._local_filepath.exists():
111             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
112                                  "but there is no directory at that location."
113                                  % quote_local_unicode_path(self._local_path_u))
114         if not self._local_filepath.isdir():
115             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
116                                  "but the thing at that location is not a directory."
117                                  % quote_local_unicode_path(self._local_path_u))
118
119         self._deque = deque()
120         self._lazy_tail = defer.succeed(None)
121         self._stopped = False
122         self._turn_delay = 0
123
124     def _get_filepath(self, relpath_u):
125         self._log("_get_filepath(%r)" % (relpath_u,))
126         return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
127
128     def _get_relpath(self, filepath):
129         self._log("_get_relpath(%r)" % (filepath,))
130         segments = unicode_segments_from(filepath, self._local_filepath)
131         self._log("segments = %r" % (segments,))
132         return u"/".join(segments)
133
134     def _count(self, counter_name, delta=1):
135         ctr = 'magic_folder.%s.%s' % (self._name, counter_name)
136         self._log("%s += %r" % (counter_name, delta))
137         self._client.stats_provider.count(ctr, delta)
138
139     def _logcb(self, res, msg):
140         self._log("%s: %r" % (msg, res))
141         return res
142
143     def _log(self, msg):
144         s = "Magic Folder %s %s: %s" % (quote_output(self._client.nickname), self._name, msg)
145         self._client.log(s)
146         print s
147         #open("events", "ab+").write(msg)
148
149     def _turn_deque(self):
150         self._log("_turn_deque")
151         if self._stopped:
152             self._log("stopped")
153             return
154         try:
155             item = self._deque.pop()
156             self._log("popped %r" % (item,))
157             self._count('objects_queued', -1)
158         except IndexError:
159             self._log("deque is now empty")
160             self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
161         else:
162             self._lazy_tail.addCallback(lambda ign: self._process(item))
163             self._lazy_tail.addBoth(self._call_hook, 'processed')
164             self._lazy_tail.addErrback(log.err)
165             self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
166
167
168 class Uploader(QueueMixin):
169     def __init__(self, client, local_path_u, db, upload_dirnode, pending_delay, clock,
170                  immediate=False):
171         QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
172
173         self.is_ready = False
174         self._immediate = immediate
175
176         if not IDirectoryNode.providedBy(upload_dirnode):
177             raise AssertionError("The URI in '%s' does not refer to a directory."
178                                  % os.path.join('private', 'magic_folder_dircap'))
179         if upload_dirnode.is_unknown() or upload_dirnode.is_readonly():
180             raise AssertionError("The URI in '%s' is not a writecap to a directory."
181                                  % os.path.join('private', 'magic_folder_dircap'))
182
183         self._upload_dirnode = upload_dirnode
184         self._inotify = get_inotify_module()
185         self._notifier = self._inotify.INotify()
186         self._pending = set()
187
188         if hasattr(self._notifier, 'set_pending_delay'):
189             self._notifier.set_pending_delay(pending_delay)
190
191         # TODO: what about IN_MOVE_SELF and IN_UNMOUNT?
192         #
193         self.mask = ( self._inotify.IN_CREATE
194                     | self._inotify.IN_CLOSE_WRITE
195                     | self._inotify.IN_MOVED_TO
196                     | self._inotify.IN_MOVED_FROM
197                     | self._inotify.IN_DELETE
198                     | self._inotify.IN_ONLYDIR
199                     | IN_EXCL_UNLINK
200                     )
201         self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
202                              recursive=True)
203
204     def start_monitoring(self):
205         self._log("start_monitoring")
206         d = defer.succeed(None)
207         d.addCallback(lambda ign: self._notifier.startReading())
208         d.addCallback(lambda ign: self._count('dirs_monitored'))
209         d.addBoth(self._call_hook, 'started')
210         return d
211
212     def stop(self):
213         self._log("stop")
214         self._notifier.stopReading()
215         self._count('dirs_monitored', -1)
216         if hasattr(self._notifier, 'wait_until_stopped'):
217             d = self._notifier.wait_until_stopped()
218         else:
219             d = defer.succeed(None)
220         d.addCallback(lambda ign: self._lazy_tail)
221         return d
222
223     def start_scanning(self):
224         self._log("start_scanning")
225         self.is_ready = True
226         self._pending = self._db.get_all_relpaths()
227         self._log("all_files %r" % (self._pending))
228         d = self._scan(u"")
229         def _add_pending(ign):
230             # This adds all of the files that were in the db but not already processed
231             # (normally because they have been deleted on disk).
232             self._log("adding %r" % (self._pending))
233             self._deque.extend(self._pending)
234         d.addCallback(_add_pending)
235         d.addCallback(lambda ign: self._turn_deque())
236         return d
237
238     def _scan(self, reldir_u):
239         self._log("scan %r" % (reldir_u,))
240         fp = self._get_filepath(reldir_u)
241         try:
242             children = listdir_filepath(fp)
243         except EnvironmentError:
244             raise Exception("WARNING: magic folder: permission denied on directory %s"
245                             % quote_filepath(fp))
246         except FilenameEncodingError:
247             raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
248                             % quote_filepath(fp))
249
250         d = defer.succeed(None)
251         for child in children:
252             _assert(isinstance(child, unicode), child=child)
253             d.addCallback(lambda ign, child=child:
254                           ("%s/%s" % (reldir_u, child) if reldir_u else child))
255             def _add_pending(relpath_u):
256                 if magicpath.should_ignore_file(relpath_u):
257                     return None
258
259                 self._pending.add(relpath_u)
260                 return relpath_u
261             d.addCallback(_add_pending)
262             # This call to _process doesn't go through the deque, and probably should.
263             d.addCallback(self._process)
264             d.addBoth(self._call_hook, 'processed')
265             d.addErrback(log.err)
266
267         return d
268
269     def is_pending(self, relpath_u):
270         return relpath_u in self._pending
271
272     def _notify(self, opaque, path, events_mask):
273         self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
274         relpath_u = self._get_relpath(path)
275
276         # We filter out IN_CREATE events not associated with a directory.
277         # Acting on IN_CREATE for files could cause us to read and upload
278         # a possibly-incomplete file before the application has closed it.
279         # There should always be an IN_CLOSE_WRITE after an IN_CREATE, I think.
280         # It isn't possible to avoid watching for IN_CREATE at all, because
281         # it is the only event notified for a directory creation.
282
283         if ((events_mask & self._inotify.IN_CREATE) != 0 and
284             (events_mask & self._inotify.IN_ISDIR) == 0):
285             self._log("ignoring event for %r (creation of non-directory)\n" % (relpath_u,))
286             return
287         if relpath_u in self._pending:
288             self._log("ignoring event for %r (already pending)" % (relpath_u,))
289             return
290         if magicpath.should_ignore_file(relpath_u):
291             self._log("ignoring event for %r (ignorable path)" % (relpath_u,))
292             return
293
294         self._log("appending %r to deque" % (relpath_u,))
295         self._deque.append(relpath_u)
296         self._pending.add(relpath_u)
297         self._count('objects_queued')
298         if self.is_ready:
299             if self._immediate:  # for tests
300                 self._turn_deque()
301             else:
302                 self._clock.callLater(0, self._turn_deque)
303
304     def _when_queue_is_empty(self):
305         return defer.succeed(None)
306
307     def _process(self, relpath_u):
308         # Uploader
309         self._log("_process(%r)" % (relpath_u,))
310         if relpath_u is None:
311             return
312         precondition(isinstance(relpath_u, unicode), relpath_u)
313         precondition(not relpath_u.endswith(u'/'), relpath_u)
314
315         d = defer.succeed(None)
316
317         def _maybe_upload(val, now=None):
318             if now is None:
319                 now = time.time()
320             fp = self._get_filepath(relpath_u)
321             pathinfo = get_pathinfo(unicode_from_filepath(fp))
322
323             self._log("about to remove %r from pending set %r" %
324                       (relpath_u, self._pending))
325             self._pending.remove(relpath_u)
326             encoded_path_u = magicpath.path2magic(relpath_u)
327
328             if not pathinfo.exists:
329                 # FIXME merge this with the 'isfile' case.
330                 self._log("notified object %s disappeared (this is normal)" % quote_filepath(fp))
331                 self._count('objects_disappeared')
332
333                 db_entry = self._db.get_db_entry(relpath_u)
334                 if db_entry is None:
335                     return None
336
337                 last_downloaded_timestamp = now  # is this correct?
338
339                 if self._db.is_new_file(pathinfo, relpath_u):
340                     new_version = db_entry.version + 1
341                 else:
342                     self._log("Not uploading %r" % (relpath_u,))
343                     self._count('objects_not_uploaded')
344                     return
345
346                 metadata = { 'version': new_version,
347                              'deleted': True,
348                              'last_downloaded_timestamp': last_downloaded_timestamp }
349                 if db_entry.last_downloaded_uri is not None:
350                     metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
351
352                 empty_uploadable = Data("", self._client.convergence)
353                 d2 = self._upload_dirnode.add_file(encoded_path_u, empty_uploadable,
354                                                    metadata=metadata, overwrite=True)
355
356                 def _add_db_entry(filenode):
357                     filecap = filenode.get_uri()
358                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
359                     self._db.did_upload_version(relpath_u, new_version, filecap,
360                                                 last_downloaded_uri, last_downloaded_timestamp,
361                                                 pathinfo)
362                     self._count('files_uploaded')
363                 d2.addCallback(_add_db_entry)
364                 return d2
365             elif pathinfo.islink:
366                 self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
367                 return None
368             elif pathinfo.isdir:
369                 if not getattr(self._notifier, 'recursive_includes_new_subdirectories', False):
370                     self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
371
372                 uploadable = Data("", self._client.convergence)
373                 encoded_path_u += magicpath.path2magic(u"/")
374                 self._log("encoded_path_u =  %r" % (encoded_path_u,))
375                 upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True)
376                 def _succeeded(ign):
377                     self._log("created subdirectory %r" % (relpath_u,))
378                     self._count('directories_created')
379                 def _failed(f):
380                     self._log("failed to create subdirectory %r" % (relpath_u,))
381                     return f
382                 upload_d.addCallbacks(_succeeded, _failed)
383                 upload_d.addCallback(lambda ign: self._scan(relpath_u))
384                 return upload_d
385             elif pathinfo.isfile:
386                 db_entry = self._db.get_db_entry(relpath_u)
387
388                 last_downloaded_timestamp = now
389
390                 if db_entry is None:
391                     new_version = 0
392                 elif self._db.is_new_file(pathinfo, relpath_u):
393                     new_version = db_entry.version + 1
394                 else:
395                     self._log("Not uploading %r" % (relpath_u,))
396                     self._count('objects_not_uploaded')
397                     return None
398
399                 metadata = { 'version': new_version,
400                              'last_downloaded_timestamp': last_downloaded_timestamp }
401                 if db_entry is not None and db_entry.last_downloaded_uri is not None:
402                     metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
403
404                 uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
405                 d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable,
406                                                    metadata=metadata, overwrite=True)
407
408                 def _add_db_entry(filenode):
409                     filecap = filenode.get_uri()
410                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
411                     self._db.did_upload_version(relpath_u, new_version, filecap,
412                                                 last_downloaded_uri, last_downloaded_timestamp,
413                                                 pathinfo)
414                     self._count('files_uploaded')
415                 d2.addCallback(_add_db_entry)
416                 return d2
417             else:
418                 self.warn("WARNING: cannot process special file %s" % quote_filepath(fp))
419                 return None
420
421         d.addCallback(_maybe_upload)
422
423         def _succeeded(res):
424             self._count('objects_succeeded')
425             return res
426         def _failed(f):
427             self._count('objects_failed')
428             self._log("%s while processing %r" % (f, relpath_u))
429             return f
430         d.addCallbacks(_succeeded, _failed)
431         return d
432
433     def _get_metadata(self, encoded_path_u):
434         try:
435             d = self._upload_dirnode.get_metadata_for(encoded_path_u)
436         except KeyError:
437             return Failure()
438         return d
439
440     def _get_filenode(self, encoded_path_u):
441         try:
442             d = self._upload_dirnode.get(encoded_path_u)
443         except KeyError:
444             return Failure()
445         return d
446
447
448 class WriteFileMixin(object):
449     FUDGE_SECONDS = 10.0
450
451     def _get_conflicted_filename(self, abspath_u):
452         return abspath_u + u".conflict"
453
454     def _write_downloaded_file(self, abspath_u, file_contents, is_conflict=False, now=None):
455         self._log("_write_downloaded_file(%r, <%d bytes>, is_conflict=%r, now=%r)"
456                   % (abspath_u, len(file_contents), is_conflict, now))
457
458         # 1. Write a temporary file, say .foo.tmp.
459         # 2. is_conflict determines whether this is an overwrite or a conflict.
460         # 3. Set the mtime of the replacement file to be T seconds before the
461         #    current local time.
462         # 4. Perform a file replacement with backup filename foo.backup,
463         #    replaced file foo, and replacement file .foo.tmp. If any step of
464         #    this operation fails, reclassify as a conflict and stop.
465         #
466         # Returns the path of the destination file.
467
468         precondition_abspath(abspath_u)
469         replacement_path_u = abspath_u + u".tmp"  # FIXME more unique
470         backup_path_u = abspath_u + u".backup"
471         if now is None:
472             now = time.time()
473
474         # ensure parent directory exists
475         head, tail = os.path.split(abspath_u)
476         mode = 0777 # XXX
477         fileutil.make_dirs(head, mode)
478
479         fileutil.write(replacement_path_u, file_contents)
480         os.utime(replacement_path_u, (now, now - self.FUDGE_SECONDS))
481         if is_conflict:
482             print "0x00 ------------ <><> is conflict; calling _rename_conflicted_file... %r %r" % (abspath_u, replacement_path_u)
483             return self._rename_conflicted_file(abspath_u, replacement_path_u)
484         else:
485             try:
486                 fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u)
487                 return abspath_u
488             except fileutil.ConflictError:
489                 return self._rename_conflicted_file(abspath_u, replacement_path_u)
490
491     def _rename_conflicted_file(self, abspath_u, replacement_path_u):
492         self._log("_rename_conflicted_file(%r, %r)" % (abspath_u, replacement_path_u))
493
494         conflict_path_u = self._get_conflicted_filename(abspath_u)
495         print "XXX rename %r %r" % (replacement_path_u, conflict_path_u)
496         if os.path.isfile(replacement_path_u):
497             print "%r exists" % (replacement_path_u,)
498         if os.path.isfile(conflict_path_u):
499             print "%r exists" % (conflict_path_u,)
500
501         fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
502         return conflict_path_u
503
504     def _rename_deleted_file(self, abspath_u):
505         self._log('renaming deleted file to backup: %s' % (abspath_u,))
506         try:
507             fileutil.rename_no_overwrite(abspath_u, abspath_u + u'.backup')
508         except IOError:
509             # XXX is this the correct error?
510             self._log("Already gone: '%s'" % (abspath_u,))
511         return abspath_u
512
513
514 class Downloader(QueueMixin, WriteFileMixin):
515     REMOTE_SCAN_INTERVAL = 3  # facilitates tests
516
517     def __init__(self, client, local_path_u, db, collective_dirnode,
518                  upload_readonly_dircap, clock, is_upload_pending):
519         QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
520
521         if not IDirectoryNode.providedBy(collective_dirnode):
522             raise AssertionError("The URI in '%s' does not refer to a directory."
523                                  % os.path.join('private', 'collective_dircap'))
524         if collective_dirnode.is_unknown() or not collective_dirnode.is_readonly():
525             raise AssertionError("The URI in '%s' is not a readonly cap to a directory."
526                                  % os.path.join('private', 'collective_dircap'))
527
528         self._collective_dirnode = collective_dirnode
529         self._upload_readonly_dircap = upload_readonly_dircap
530         self._is_upload_pending = is_upload_pending
531
532         self._turn_delay = self.REMOTE_SCAN_INTERVAL
533
534     def start_scanning(self):
535         self._log("start_scanning")
536         files = self._db.get_all_relpaths()
537         self._log("all files %s" % files)
538
539         d = self._scan_remote_collective()
540         d.addBoth(self._logcb, "after _scan_remote_collective 0")
541         self._turn_deque()
542         return d
543
544     def stop(self):
545         self._stopped = True
546         d = defer.succeed(None)
547         d.addCallback(lambda ign: self._lazy_tail)
548         return d
549
550     def _should_download(self, relpath_u, remote_version):
551         """
552         _should_download returns a bool indicating whether or not a remote object should be downloaded.
553         We check the remote metadata version against our magic-folder db version number;
554         latest version wins.
555         """
556         self._log("_should_download(%r, %r)" % (relpath_u, remote_version))
557         if magicpath.should_ignore_file(relpath_u):
558             self._log("nope")
559             return False
560         self._log("yep")
561         db_entry = self._db.get_db_entry(relpath_u)
562         if db_entry is None:
563             return True
564         self._log("version %r" % (db_entry.version,))
565         return (db_entry.version < remote_version)
566
567     def _get_local_latest(self, relpath_u):
568         """
569         _get_local_latest takes a unicode path string checks to see if this file object
570         exists in our magic-folder db; if not then return None
571         else check for an entry in our magic-folder db and return the version number.
572         """
573         if not self._get_filepath(relpath_u).exists():
574             return None
575         db_entry = self._db.get_db_entry(relpath_u)
576         return None if db_entry is None else db_entry.version
577
578     def _get_collective_latest_file(self, filename):
579         """
580         _get_collective_latest_file takes a file path pointing to a file managed by
581         magic-folder and returns a deferred that fires with the two tuple containing a
582         file node and metadata for the latest version of the file located in the
583         magic-folder collective directory.
584         """
585         collective_dirmap_d = self._collective_dirnode.list()
586         def scan_collective(result):
587             list_of_deferreds = []
588             for dir_name in result.keys():
589                 # XXX make sure it's a directory
590                 d = defer.succeed(None)
591                 d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
592                 list_of_deferreds.append(d)
593             deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
594             return deferList
595         collective_dirmap_d.addCallback(scan_collective)
596         def highest_version(deferredList):
597             max_version = 0
598             metadata = None
599             node = None
600             for success, result in deferredList:
601                 if success:
602                     if result[1]['version'] > max_version:
603                         node, metadata = result
604                         max_version = result[1]['version']
605             return node, metadata
606         collective_dirmap_d.addCallback(highest_version)
607         return collective_dirmap_d
608
609     def _scan_remote_dmd(self, nickname, dirnode, scan_batch):
610         self._log("_scan_remote_dmd nickname %r" % (nickname,))
611         d = dirnode.list()
612         def scan_listing(listing_map):
613             for encoded_relpath_u in listing_map.keys():
614                 relpath_u = magicpath.magic2path(encoded_relpath_u)
615                 self._log("found %r" % (relpath_u,))
616
617                 file_node, metadata = listing_map[encoded_relpath_u]
618                 local_version = self._get_local_latest(relpath_u)
619                 remote_version = metadata.get('version', None)
620                 self._log("%r has local version %r, remote version %r" % (relpath_u, local_version, remote_version))
621
622                 if local_version is None or remote_version is None or local_version < remote_version:
623                     self._log("%r added to download queue" % (relpath_u,))
624                     if scan_batch.has_key(relpath_u):
625                         scan_batch[relpath_u] += [(file_node, metadata)]
626                     else:
627                         scan_batch[relpath_u] = [(file_node, metadata)]
628
629         d.addCallback(scan_listing)
630         d.addBoth(self._logcb, "end of _scan_remote_dmd")
631         return d
632
633     def _scan_remote_collective(self):
634         self._log("_scan_remote_collective")
635         scan_batch = {}  # path -> [(filenode, metadata)]
636
637         d = self._collective_dirnode.list()
638         def scan_collective(dirmap):
639             d2 = defer.succeed(None)
640             for dir_name in dirmap:
641                 (dirnode, metadata) = dirmap[dir_name]
642                 if dirnode.get_readonly_uri() != self._upload_readonly_dircap:
643                     d2.addCallback(lambda ign, dir_name=dir_name, dirnode=dirnode:
644                                    self._scan_remote_dmd(dir_name, dirnode, scan_batch))
645                     def _err(f, dir_name=dir_name):
646                         self._log("failed to scan DMD for client %r: %s" % (dir_name, f))
647                         # XXX what should we do to make this failure more visible to users?
648                     d2.addErrback(_err)
649
650             return d2
651         d.addCallback(scan_collective)
652
653         def _filter_batch_to_deque(ign):
654             self._log("deque = %r, scan_batch = %r" % (self._deque, scan_batch))
655             for relpath_u in scan_batch.keys():
656                 file_node, metadata = max(scan_batch[relpath_u], key=lambda x: x[1]['version'])
657
658                 if self._should_download(relpath_u, metadata['version']):
659                     self._deque.append( (relpath_u, file_node, metadata) )
660                 else:
661                     self._log("Excluding %r" % (relpath_u,))
662                     self._count('objects_excluded')
663                     self._call_hook(None, 'processed')
664
665             self._log("deque after = %r" % (self._deque,))
666         d.addCallback(_filter_batch_to_deque)
667         return d
668
669     def _when_queue_is_empty(self):
670         d = task.deferLater(self._clock, self._turn_delay, self._scan_remote_collective)
671         d.addBoth(self._logcb, "after _scan_remote_collective 1")
672         d.addCallback(lambda ign: self._turn_deque())
673         return d
674
675     def _process(self, item, now=None):
676         # Downloader
677         self._log("_process(%r)" % (item,))
678         if now is None:
679             now = time.time()
680         (relpath_u, file_node, metadata) = item
681         fp = self._get_filepath(relpath_u)
682         abspath_u = unicode_from_filepath(fp)
683         conflict_path_u = self._get_conflicted_filename(abspath_u)
684
685         d = defer.succeed(None)
686
687         def do_update_db(written_abspath_u):
688             filecap = file_node.get_uri()
689             last_uploaded_uri = metadata.get('last_uploaded_uri', None)
690             last_downloaded_uri = filecap
691             last_downloaded_timestamp = now
692             written_pathinfo = get_pathinfo(written_abspath_u)
693
694             if not written_pathinfo.exists and not metadata.get('deleted', False):
695                 raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u))
696
697             self._db.did_upload_version(relpath_u, metadata['version'], last_uploaded_uri,
698                                         last_downloaded_uri, last_downloaded_timestamp, written_pathinfo)
699             self._count('objects_downloaded')
700         def failed(f):
701             self._log("download failed: %s" % (str(f),))
702             self._count('objects_failed')
703             return f
704
705         if os.path.isfile(conflict_path_u):
706             def fail(res):
707                 raise ConflictError("download failed: already conflicted: %r" % (relpath_u,))
708             d.addCallback(fail)
709         else:
710             is_conflict = False
711             db_entry = self._db.get_db_entry(relpath_u)
712             if db_entry:
713                 dmd_last_downloaded_uri = metadata.get('last_downloaded_uri', None)
714                 print "metadata %r" % (metadata,)
715                 print "<<<<--- if %r != %r" % (dmd_last_downloaded_uri, db_entry.last_downloaded_uri)
716                 if dmd_last_downloaded_uri is not None and db_entry.last_downloaded_uri is not None:
717                     if dmd_last_downloaded_uri != db_entry.last_downloaded_uri:
718                         is_conflict = True
719                         self._count('objects_conflicted')
720                 else:
721                     dmd_last_uploaded_uri = metadata.get('last_uploaded_uri', None)
722                     print ">>>>  if %r != %r" % (dmd_last_uploaded_uri, db_entry.last_uploaded_uri)
723                     if dmd_last_uploaded_uri is not None and dmd_last_uploaded_uri != db_entry.last_uploaded_uri:
724                         is_conflict = True
725                         self._count('objects_conflicted')
726                     else:
727                         # XXX todo: mark as conflict if file is in pending upload set
728                         if self._is_upload_pending(relpath_u):
729                             is_conflict = True
730                             self._count('objects_conflicted')
731
732             if relpath_u.endswith(u"/"):
733                 if metadata.get('deleted', False):
734                     self._log("rmdir(%r) ignored" % (abspath_u,))
735                 else:
736                     self._log("mkdir(%r)" % (abspath_u,))
737                     d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
738                     d.addCallback(lambda ign: abspath_u)
739             else:
740                 if metadata.get('deleted', False):
741                     d.addCallback(lambda ign: self._rename_deleted_file(abspath_u))
742                 else:
743                     d.addCallback(lambda ign: file_node.download_best_version())
744                     d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents,
745                                                                                is_conflict=is_conflict))
746
747         d.addCallbacks(do_update_db, failed)
748
749         def trap_conflicts(f):
750             f.trap(ConflictError)
751             return None
752         d.addErrback(trap_conflicts)
753         return d