]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/magic_folder.py
WIP: exclude own dirnode from scan. This is not quite right; we shouldn't exclude...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / magic_folder.py
1
2 import sys, os
3 import os.path
4 from collections import deque
5 import time
6
7 from twisted.internet import defer, reactor, task
8 from twisted.python.failure import Failure
9 from twisted.python import runtime
10 from twisted.application import service
11
12 from allmydata.util import fileutil
13 from allmydata.interfaces import IDirectoryNode
14 from allmydata.util import log
15 from allmydata.util.fileutil import precondition_abspath, get_pathinfo, ConflictError
16 from allmydata.util.assertutil import precondition, _assert
17 from allmydata.util.deferredutil import HookMixin
18 from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
19      extend_filepath, unicode_from_filepath, unicode_segments_from, \
20      quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
21 from allmydata.immutable.upload import FileName, Data
22 from allmydata import magicfolderdb, magicpath
23
24
25 IN_EXCL_UNLINK = 0x04000000L
26
27 def get_inotify_module():
28     try:
29         if sys.platform == "win32":
30             from allmydata.windows import inotify
31         elif runtime.platform.supportsINotify():
32             from twisted.internet import inotify
33         else:
34             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
35                                       "This currently requires Linux or Windows.")
36         return inotify
37     except (ImportError, AttributeError) as e:
38         log.msg(e)
39         if sys.platform == "win32":
40             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
41                                       "Windows support requires at least Vista, and has only been tested on Windows 7.")
42         raise
43
44
45 class MagicFolder(service.MultiService):
46     name = 'magic-folder'
47
48     def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile,
49                  pending_delay=1.0, clock=reactor):
50         precondition_abspath(local_path_u)
51
52         service.MultiService.__init__(self)
53
54         db = magicfolderdb.get_magicfolderdb(dbfile, create_version=(magicfolderdb.SCHEMA_v1, 1))
55         if db is None:
56             return Failure(Exception('ERROR: Unable to load magic folder db.'))
57
58         # for tests
59         self._client = client
60         self._db = db
61
62         self.is_ready = False
63
64         upload_dirnode = self._client.create_node_from_uri(upload_dircap)
65         collective_dirnode = self._client.create_node_from_uri(collective_dircap)
66
67         self.uploader = Uploader(client, local_path_u, db, upload_dirnode, pending_delay, clock)
68         self.downloader = Downloader(client, local_path_u, db, collective_dirnode, upload_dirnode.get_readonly_uri(), clock)
69
70     def startService(self):
71         # TODO: why is this being called more than once?
72         if self.running:
73             return defer.succeed(None)
74         print "%r.startService" % (self,)
75         service.MultiService.startService(self)
76         return self.uploader.start_monitoring()
77
78     def ready(self):
79         """ready is used to signal us to start
80         processing the upload and download items...
81         """
82         self.is_ready = True
83         d = self.uploader.start_scanning()
84         d2 = self.downloader.start_scanning()
85         d.addCallback(lambda ign: d2)
86         return d
87
88     def finish(self):
89         print "finish"
90         d = self.uploader.stop()
91         d2 = self.downloader.stop()
92         d.addCallback(lambda ign: d2)
93         return d
94
95     def remove_service(self):
96         return service.MultiService.disownServiceParent(self)
97
98
99 class QueueMixin(HookMixin):
100     def __init__(self, client, local_path_u, db, name, clock):
101         self._client = client
102         self._local_path_u = local_path_u
103         self._local_filepath = to_filepath(local_path_u)
104         self._db = db
105         self._name = name
106         self._clock = clock
107         self._hooks = {'processed': None, 'started': None}
108         self.started_d = self.set_hook('started')
109
110         if not self._local_filepath.exists():
111             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
112                                  "but there is no directory at that location."
113                                  % quote_local_unicode_path(self._local_path_u))
114         if not self._local_filepath.isdir():
115             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
116                                  "but the thing at that location is not a directory."
117                                  % quote_local_unicode_path(self._local_path_u))
118
119         self._deque = deque()
120         self._lazy_tail = defer.succeed(None)
121         self._pending = set()
122         self._stopped = False
123         self._turn_delay = 0
124
125     def _get_filepath(self, relpath_u):
126         return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
127
128     def _get_relpath(self, filepath):
129         self._log("_get_relpath(%r)" % (filepath,))
130         segments = unicode_segments_from(filepath, self._local_filepath)
131         self._log("segments = %r" % (segments,))
132         return u"/".join(segments)
133
134     def _count(self, counter_name, delta=1):
135         ctr = 'magic_folder.%s.%s' % (self._name, counter_name)
136         self._log("%s += %r" % (counter_name, delta))
137         self._client.stats_provider.count(ctr, delta)
138
139     def _logcb(self, res, msg):
140         self._log("%s: %r" % (msg, res))
141         return res
142
143     def _log(self, msg):
144         s = "Magic Folder %s %s: %s" % (quote_output(self._client.nickname), self._name, msg)
145         self._client.log(s)
146         print s
147         #open("events", "ab+").write(msg)
148
149     def _append_to_deque(self, relpath_u):
150         self._log("_append_to_deque(%r)" % (relpath_u,))
151         if relpath_u in self._pending or magicpath.should_ignore_file(relpath_u):
152             return
153         self._deque.append(relpath_u)
154         self._pending.add(relpath_u)
155         self._count('objects_queued')
156         if self.is_ready:
157             self._clock.callLater(0, self._turn_deque)
158
159     def _turn_deque(self):
160         self._log("_turn_deque")
161         if self._stopped:
162             self._log("stopped")
163             return
164         try:
165             item = self._deque.pop()
166             self._log("popped %r" % (item,))
167             self._count('objects_queued', -1)
168         except IndexError:
169             self._log("deque is now empty")
170             self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
171         else:
172             self._lazy_tail.addCallback(lambda ign: self._process(item))
173             self._lazy_tail.addBoth(self._call_hook, 'processed')
174             self._lazy_tail.addErrback(log.err)
175             self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
176
177
178 class Uploader(QueueMixin):
179     def __init__(self, client, local_path_u, db, upload_dirnode, pending_delay, clock):
180         QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
181
182         self.is_ready = False
183
184         if not IDirectoryNode.providedBy(upload_dirnode):
185             raise AssertionError("The URI in '%s' does not refer to a directory."
186                                  % os.path.join('private', 'magic_folder_dircap'))
187         if upload_dirnode.is_unknown() or upload_dirnode.is_readonly():
188             raise AssertionError("The URI in '%s' is not a writecap to a directory."
189                                  % os.path.join('private', 'magic_folder_dircap'))
190
191         self._upload_dirnode = upload_dirnode
192         self._inotify = get_inotify_module()
193         self._notifier = self._inotify.INotify()
194
195         if hasattr(self._notifier, 'set_pending_delay'):
196             self._notifier.set_pending_delay(pending_delay)
197
198         # TODO: what about IN_MOVE_SELF, IN_MOVED_FROM, or IN_UNMOUNT?
199         #
200         self.mask = ( self._inotify.IN_CREATE
201                     | self._inotify.IN_CLOSE_WRITE
202                     | self._inotify.IN_MOVED_TO
203                     | self._inotify.IN_MOVED_FROM
204                     | self._inotify.IN_DELETE
205                     | self._inotify.IN_ONLYDIR
206                     | IN_EXCL_UNLINK
207                     )
208         self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
209                              recursive=True)
210
211     def start_monitoring(self):
212         self._log("start_monitoring")
213         d = defer.succeed(None)
214         d.addCallback(lambda ign: self._notifier.startReading())
215         d.addCallback(lambda ign: self._count('dirs_monitored'))
216         d.addBoth(self._call_hook, 'started')
217         return d
218
219     def stop(self):
220         self._log("stop")
221         self._notifier.stopReading()
222         self._count('dirs_monitored', -1)
223         if hasattr(self._notifier, 'wait_until_stopped'):
224             d = self._notifier.wait_until_stopped()
225         else:
226             d = defer.succeed(None)
227         d.addCallback(lambda ign: self._lazy_tail)
228         return d
229
230     def start_scanning(self):
231         self._log("start_scanning")
232         self.is_ready = True
233         self._pending = self._db.get_all_relpaths()
234         self._log("all_files %r" % (self._pending))
235         d = self._scan(u"")
236         def _add_pending(ign):
237             # This adds all of the files that were in the db but not already processed
238             # (normally because they have been deleted on disk).
239             self._log("adding %r" % (self._pending))
240             self._deque.extend(self._pending)
241         d.addCallback(_add_pending)
242         d.addCallback(lambda ign: self._turn_deque())
243         return d
244
245     def _scan(self, reldir_u):
246         self._log("scan %r" % (reldir_u,))
247         fp = self._get_filepath(reldir_u)
248         try:
249             children = listdir_filepath(fp)
250         except EnvironmentError:
251             raise Exception("WARNING: magic folder: permission denied on directory %s"
252                             % quote_filepath(fp))
253         except FilenameEncodingError:
254             raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
255                             % quote_filepath(fp))
256
257         d = defer.succeed(None)
258         for child in children:
259             _assert(isinstance(child, unicode), child=child)
260             d.addCallback(lambda ign, child=child:
261                           ("%s/%s" % (reldir_u, child) if reldir_u else child))
262             def _add_pending(relpath_u):
263                 if magicpath.should_ignore_file(relpath_u):
264                     return None
265
266                 self._pending.add(relpath_u)
267                 return relpath_u
268             d.addCallback(_add_pending)
269             # This call to _process doesn't go through the deque, and probably should.
270             d.addCallback(self._process)
271             d.addBoth(self._call_hook, 'processed')
272             d.addErrback(log.err)
273
274         return d
275
276     def _notify(self, opaque, path, events_mask):
277         self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
278
279         # We filter out IN_CREATE events not associated with a directory.
280         # Acting on IN_CREATE for files could cause us to read and upload
281         # a possibly-incomplete file before the application has closed it.
282         # There should always be an IN_CLOSE_WRITE after an IN_CREATE, I think.
283         # It isn't possible to avoid watching for IN_CREATE at all, because
284         # it is the only event notified for a directory creation.
285
286         if ((events_mask & self._inotify.IN_CREATE) != 0 and
287             (events_mask & self._inotify.IN_ISDIR) == 0):
288             self._log("ignoring inotify event for creation of file %r\n" % (path,))
289             return
290
291         relpath_u = self._get_relpath(path)
292         self._append_to_deque(relpath_u)
293
294     def _when_queue_is_empty(self):
295         return defer.succeed(None)
296
297     def _process(self, relpath_u):
298         self._log("_process(%r)" % (relpath_u,))
299         if relpath_u is None:
300             return
301         precondition(isinstance(relpath_u, unicode), relpath_u)
302
303         d = defer.succeed(None)
304
305         def _maybe_upload(val, now=None):
306             if now is None:
307                 now = time.time()
308             fp = self._get_filepath(relpath_u)
309             pathinfo = get_pathinfo(unicode_from_filepath(fp))
310
311             self._log("pending = %r, about to remove %r" % (self._pending, relpath_u))
312             self._pending.remove(relpath_u)
313             encoded_path_u = magicpath.path2magic(relpath_u)
314
315             if not pathinfo.exists:
316                 # FIXME merge this with the 'isfile' case.
317                 self._log("notified object %s disappeared (this is normal)" % quote_filepath(fp))
318                 self._count('objects_disappeared')
319                 if not self._db.check_file_db_exists(relpath_u):
320                     return None
321
322                 last_downloaded_timestamp = now
323                 last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
324
325                 current_version = self._db.get_local_file_version(relpath_u)
326                 if current_version is None:
327                     new_version = 0
328                 elif self._db.is_new_file(pathinfo, relpath_u):
329                     new_version = current_version + 1
330                 else:
331                     self._log("Not uploading %r" % (relpath_u,))
332                     self._count('objects_not_uploaded')
333                     return
334
335                 metadata = { 'version': new_version,
336                              'deleted': True,
337                              'last_downloaded_timestamp': last_downloaded_timestamp }
338                 if last_downloaded_uri is not None:
339                     metadata['last_downloaded_uri'] = last_downloaded_uri
340
341                 empty_uploadable = Data("", self._client.convergence)
342                 d2 = self._upload_dirnode.add_file(encoded_path_u, empty_uploadable,
343                                                    metadata=metadata, overwrite=True)
344
345                 def _add_db_entry(filenode):
346                     filecap = filenode.get_uri()
347                     self._db.did_upload_version(relpath_u, new_version, filecap,
348                                                 last_downloaded_uri, last_downloaded_timestamp, pathinfo)
349                     self._count('files_uploaded')
350                 d2.addCallback(_add_db_entry)
351                 return d2
352             elif pathinfo.islink:
353                 self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
354                 return None
355             elif pathinfo.isdir:
356                 if not getattr(self._notifier, 'recursive_includes_new_subdirectories', False):
357                     self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
358
359                 uploadable = Data("", self._client.convergence)
360                 encoded_path_u += magicpath.path2magic(u"/")
361                 upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True)
362                 def _succeeded(ign):
363                     self._log("created subdirectory %r" % (relpath_u,))
364                     self._count('directories_created')
365                 def _failed(f):
366                     self._log("failed to create subdirectory %r" % (relpath_u,))
367                     return f
368                 upload_d.addCallbacks(_succeeded, _failed)
369                 upload_d.addCallback(lambda ign: self._scan(relpath_u))
370                 return upload_d
371             elif pathinfo.isfile:
372                 last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
373                 last_downloaded_timestamp = now
374
375                 current_version = self._db.get_local_file_version(relpath_u)
376                 if current_version is None:
377                     new_version = 0
378                 elif self._db.is_new_file(pathinfo, relpath_u):
379                     new_version = current_version + 1
380                 else:
381                     self._log("Not uploading %r" % (relpath_u,))
382                     self._count('objects_not_uploaded')
383                     return None
384
385                 metadata = { 'version': new_version,
386                              'last_downloaded_timestamp': last_downloaded_timestamp }
387                 if last_downloaded_uri is not None:
388                     metadata['last_downloaded_uri'] = last_downloaded_uri
389
390                 uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
391                 d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable,
392                                                    metadata=metadata, overwrite=True)
393
394                 def _add_db_entry(filenode):
395                     filecap = filenode.get_uri()
396                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
397                     self._db.did_upload_version(relpath_u, new_version, filecap,
398                                                 last_downloaded_uri, last_downloaded_timestamp, pathinfo)
399                     self._count('files_uploaded')
400                 d2.addCallback(_add_db_entry)
401                 return d2
402             else:
403                 self.warn("WARNING: cannot process special file %s" % quote_filepath(fp))
404                 return None
405
406         d.addCallback(_maybe_upload)
407
408         def _succeeded(res):
409             self._count('objects_succeeded')
410             return res
411         def _failed(f):
412             self._count('objects_failed')
413             self._log("%s while processing %r" % (f, relpath_u))
414             return f
415         d.addCallbacks(_succeeded, _failed)
416         return d
417
418     def _get_metadata(self, encoded_path_u):
419         try:
420             d = self._upload_dirnode.get_metadata_for(encoded_path_u)
421         except KeyError:
422             return Failure()
423         return d
424
425     def _get_filenode(self, encoded_path_u):
426         try:
427             d = self._upload_dirnode.get(encoded_path_u)
428         except KeyError:
429             return Failure()
430         return d
431
432
433 class WriteFileMixin(object):
434     FUDGE_SECONDS = 10.0
435
436     def _get_conflicted_filename(self, abspath_u):
437         return abspath_u + u".conflict"
438
439     def _write_downloaded_file(self, abspath_u, file_contents, is_conflict=False, now=None):
440         self._log("_write_downloaded_file(%r, <%d bytes>, is_conflict=%r, now=%r)"
441                   % (abspath_u, len(file_contents), is_conflict, now))
442
443         # 1. Write a temporary file, say .foo.tmp.
444         # 2. is_conflict determines whether this is an overwrite or a conflict.
445         # 3. Set the mtime of the replacement file to be T seconds before the
446         #    current local time.
447         # 4. Perform a file replacement with backup filename foo.backup,
448         #    replaced file foo, and replacement file .foo.tmp. If any step of
449         #    this operation fails, reclassify as a conflict and stop.
450         #
451         # Returns the path of the destination file.
452
453         precondition_abspath(abspath_u)
454         replacement_path_u = abspath_u + u".tmp"  # FIXME more unique
455         backup_path_u = abspath_u + u".backup"
456         if now is None:
457             now = time.time()
458
459         # ensure parent directory exists
460         head, tail = os.path.split(abspath_u)
461         mode = 0777 # XXX
462         fileutil.make_dirs(head, mode)
463
464         fileutil.write(replacement_path_u, file_contents)
465         os.utime(replacement_path_u, (now, now - self.FUDGE_SECONDS))
466         if is_conflict:
467             print "0x00 ------------ <><> is conflict; calling _rename_conflicted_file... %r %r" % (abspath_u, replacement_path_u)
468             return self._rename_conflicted_file(abspath_u, replacement_path_u)
469         else:
470             try:
471                 fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u)
472                 return abspath_u
473             except fileutil.ConflictError:
474                 return self._rename_conflicted_file(abspath_u, replacement_path_u)
475
476     def _rename_conflicted_file(self, abspath_u, replacement_path_u):
477         self._log("_rename_conflicted_file(%r, %r)" % (abspath_u, replacement_path_u))
478
479         conflict_path_u = self._get_conflicted_filename(abspath_u)
480         print "XXX rename %r %r" % (replacement_path_u, conflict_path_u)
481         if os.path.isfile(replacement_path_u):
482             print "%r exists" % (replacement_path_u,)
483         if os.path.isfile(conflict_path_u):
484             print "%r exists" % (conflict_path_u,)
485
486         fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
487         return conflict_path_u
488
489     def _rename_deleted_file(self, abspath_u):
490         self._log('renaming deleted file to backup: %s' % (abspath_u,))
491         try:
492             fileutil.rename_no_overwrite(abspath_u, abspath_u + u'.backup')
493         except IOError:
494             # XXX is this the correct error?
495             self._log("Already gone: '%s'" % (abspath_u,))
496         return abspath_u
497
498
499 class Downloader(QueueMixin, WriteFileMixin):
500     REMOTE_SCAN_INTERVAL = 3  # facilitates tests
501
502     def __init__(self, client, local_path_u, db, collective_dirnode, upload_readonly_dircap, clock):
503         QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
504
505         if not IDirectoryNode.providedBy(collective_dirnode):
506             raise AssertionError("The URI in '%s' does not refer to a directory."
507                                  % os.path.join('private', 'collective_dircap'))
508         if collective_dirnode.is_unknown() or not collective_dirnode.is_readonly():
509             raise AssertionError("The URI in '%s' is not a readonly cap to a directory."
510                                  % os.path.join('private', 'collective_dircap'))
511
512         self._collective_dirnode = collective_dirnode
513         self._upload_readonly_dircap = upload_readonly_dircap
514
515         self._turn_delay = self.REMOTE_SCAN_INTERVAL
516         self._download_scan_batch = {} # path -> [(filenode, metadata)]
517
518     def start_scanning(self):
519         self._log("start_scanning")
520         files = self._db.get_all_relpaths()
521         self._log("all files %s" % files)
522
523         d = self._scan_remote_collective()
524         self._turn_deque()
525         return d
526
527     def stop(self):
528         self._stopped = True
529         d = defer.succeed(None)
530         d.addCallback(lambda ign: self._lazy_tail)
531         return d
532
533     def _should_download(self, relpath_u, remote_version):
534         """
535         _should_download returns a bool indicating whether or not a remote object should be downloaded.
536         We check the remote metadata version against our magic-folder db version number;
537         latest version wins.
538         """
539         self._log("_should_download(%r, %r)" % (relpath_u, remote_version))
540         if magicpath.should_ignore_file(relpath_u):
541             self._log("nope")
542             return False
543         self._log("yep")
544         v = self._db.get_local_file_version(relpath_u)
545         self._log("v = %r" % (v,))
546         return (v is None or v < remote_version)
547
548     def _get_local_latest(self, relpath_u):
549         """
550         _get_local_latest takes a unicode path string checks to see if this file object
551         exists in our magic-folder db; if not then return None
552         else check for an entry in our magic-folder db and return the version number.
553         """
554         if not self._get_filepath(relpath_u).exists():
555             return None
556         return self._db.get_local_file_version(relpath_u)
557
558     def _get_collective_latest_file(self, filename):
559         """
560         _get_collective_latest_file takes a file path pointing to a file managed by
561         magic-folder and returns a deferred that fires with the two tuple containing a
562         file node and metadata for the latest version of the file located in the
563         magic-folder collective directory.
564         """
565         collective_dirmap_d = self._collective_dirnode.list()
566         def scan_collective(result):
567             list_of_deferreds = []
568             for dir_name in result.keys():
569                 # XXX make sure it's a directory
570                 d = defer.succeed(None)
571                 d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
572                 list_of_deferreds.append(d)
573             deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
574             return deferList
575         collective_dirmap_d.addCallback(scan_collective)
576         def highest_version(deferredList):
577             max_version = 0
578             metadata = None
579             node = None
580             for success, result in deferredList:
581                 if success:
582                     if result[1]['version'] > max_version:
583                         node, metadata = result
584                         max_version = result[1]['version']
585             return node, metadata
586         collective_dirmap_d.addCallback(highest_version)
587         return collective_dirmap_d
588
589     def _append_to_batch(self, name, file_node, metadata):
590         if self._download_scan_batch.has_key(name):
591             self._download_scan_batch[name] += [(file_node, metadata)]
592         else:
593             self._download_scan_batch[name] = [(file_node, metadata)]
594
595     def _scan_remote(self, nickname, dirnode):
596         self._log("_scan_remote nickname %r" % (nickname,))
597         d = dirnode.list()
598         def scan_listing(listing_map):
599             for encoded_relpath_u in listing_map.keys():
600                 relpath_u = magicpath.magic2path(encoded_relpath_u)
601                 self._log("found %r" % (relpath_u,))
602
603                 file_node, metadata = listing_map[encoded_relpath_u]
604                 local_version = self._get_local_latest(relpath_u)
605                 remote_version = metadata.get('version', None)
606                 self._log("%r has local version %r, remote version %r" % (relpath_u, local_version, remote_version))
607                 if local_version is None or remote_version is None or local_version < remote_version:
608                     self._log("%r added to download queue" % (relpath_u,))
609                     self._append_to_batch(relpath_u, file_node, metadata)
610         d.addCallback(scan_listing)
611         d.addBoth(self._logcb, "end of _scan_remote")
612         return d
613
614     def _scan_remote_collective(self):
615         self._log("_scan_remote_collective")
616         self._download_scan_batch = {} # XXX
617
618         d = self._collective_dirnode.list()
619         def scan_collective(dirmap):
620             d2 = defer.succeed(None)
621             for dir_name in dirmap:
622                 (dirnode, metadata) = dirmap[dir_name]
623                 if dirnode.get_readonly_uri() != self._upload_readonly_dircap:
624                     d2.addCallback(lambda ign, dir_name=dir_name: self._scan_remote(dir_name, dirnode))
625                     def _err(f):
626                         self._log("failed to scan DMD for client %r: %s" % (dir_name, f))
627                         # XXX what should we do to make this failure more visible to users?
628                     d2.addErrback(_err)
629             return d2
630         d.addCallback(scan_collective)
631         d.addCallback(self._filter_scan_batch)
632         d.addCallback(self._add_batch_to_download_queue)
633         return d
634
635     def _add_batch_to_download_queue(self, result):
636         self._log("result = %r" % (result,))
637         self._log("deque = %r" % (self._deque,))
638         self._deque.extend(result)
639         self._log("deque after = %r" % (self._deque,))
640         self._count('objects_queued', len(result))
641         self._log("pending = %r" % (self._pending,))
642         self._pending.update(map(lambda x: x[0], result))
643         self._log("pending after = %r" % (self._pending,))
644
645     def _filter_scan_batch(self, result):
646         self._log("_filter_scan_batch")
647         extension = [] # consider whether this should be a dict
648         for relpath_u in self._download_scan_batch.keys():
649             if relpath_u in self._pending:
650                 continue
651             file_node, metadata = max(self._download_scan_batch[relpath_u], key=lambda x: x[1]['version'])
652             if self._should_download(relpath_u, metadata['version']):
653                 extension += [(relpath_u, file_node, metadata)]
654             else:
655                 self._log("Excluding %r" % (relpath_u,))
656                 self._count('objects_excluded')
657                 self._call_hook(None, 'processed')
658         return extension
659
660     def _when_queue_is_empty(self):
661         d = task.deferLater(self._clock, self._turn_delay, self._scan_remote_collective)
662         d.addBoth(self._logcb, "after _scan_remote_collective")
663         d.addCallback(lambda ign: self._turn_deque())
664         return d
665
666     def _process(self, item, now=None):
667         self._log("_process(%r)" % (item,))
668         if now is None:
669             now = time.time()
670         (relpath_u, file_node, metadata) = item
671         fp = self._get_filepath(relpath_u)
672         abspath_u = unicode_from_filepath(fp)
673         conflict_path_u = self._get_conflicted_filename(abspath_u)
674         d = defer.succeed(None)
675
676         def do_update_db(written_abspath_u):
677             filecap = file_node.get_uri()
678             last_uploaded_uri = metadata.get('last_uploaded_uri', None)
679             last_downloaded_uri = filecap
680             last_downloaded_timestamp = now
681             written_pathinfo = get_pathinfo(written_abspath_u)
682
683             if not written_pathinfo.exists and not metadata.get('deleted', False):
684                 raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u))
685
686             self._db.did_upload_version(relpath_u, metadata['version'], last_uploaded_uri,
687                                         last_downloaded_uri, last_downloaded_timestamp, written_pathinfo)
688             self._count('objects_downloaded')
689         def failed(f):
690             self._log("download failed: %s" % (str(f),))
691             self._count('objects_failed')
692             return f
693
694         if os.path.isfile(conflict_path_u):
695             def fail(res):
696                 raise ConflictError("download failed: already conflicted: %r" % (relpath_u,))
697             d.addCallback(fail)
698         else:
699             is_conflict = False
700             if self._db.check_file_db_exists(relpath_u):
701                 dmd_last_downloaded_uri = metadata.get('last_downloaded_uri', None)
702                 local_last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
703                 print "metadata %r" % (metadata,)
704                 print "<<<<--- if %r != %r" % (dmd_last_downloaded_uri, local_last_downloaded_uri)
705                 if dmd_last_downloaded_uri is not None and local_last_downloaded_uri is not None:
706                     if dmd_last_downloaded_uri != local_last_downloaded_uri:
707                         is_conflict = True
708                         self._count('objects_conflicted')
709
710                 #dmd_last_uploaded_uri = metadata.get('last_uploaded_uri', None)
711                 #local_last_uploaded_uri = ...
712
713             if relpath_u.endswith(u"/"):
714                 if metadata.get('deleted', False):
715                     self._log("rmdir(%r) ignored" % (abspath_u,))
716                 else:
717                     self._log("mkdir(%r)" % (abspath_u,))
718                     d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
719                     d.addCallback(lambda ign: abspath_u)
720             else:
721                 if metadata.get('deleted', False):
722                     d.addCallback(lambda ign: self._rename_deleted_file(abspath_u))
723                 else:
724                     d.addCallback(lambda ign: file_node.download_best_version())
725                     d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents,
726                                                                                is_conflict=is_conflict))
727
728         d.addCallbacks(do_update_db, failed)
729
730         def remove_from_pending(res):
731             self._pending.remove(relpath_u)
732             return res
733         d.addBoth(remove_from_pending)
734         def trap_conflicts(f):
735             f.trap(ConflictError)
736             return None
737         d.addErrback(trap_conflicts)
738         return d