]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/magic_folder.py
353839aaa84e67dd76bf8642bf42c99c8d734923
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / magic_folder.py
1
2 import sys, os
3 import os.path
4 from collections import deque
5 import time
6
7 from twisted.internet import defer, reactor, task
8 from twisted.python.failure import Failure
9 from twisted.python import runtime
10 from twisted.application import service
11
12 from allmydata.util import fileutil
13 from allmydata.interfaces import IDirectoryNode
14 from allmydata.util import log
15 from allmydata.util.fileutil import precondition_abspath, get_pathinfo, ConflictError
16 from allmydata.util.assertutil import precondition, _assert
17 from allmydata.util.deferredutil import HookMixin
18 from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
19      extend_filepath, unicode_from_filepath, unicode_segments_from, \
20      quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
21 from allmydata.immutable.upload import FileName, Data
22 from allmydata import magicfolderdb, magicpath
23
24
25 IN_EXCL_UNLINK = 0x04000000L
26
27 def get_inotify_module():
28     try:
29         if sys.platform == "win32":
30             from allmydata.windows import inotify
31         elif runtime.platform.supportsINotify():
32             from twisted.internet import inotify
33         else:
34             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
35                                       "This currently requires Linux or Windows.")
36         return inotify
37     except (ImportError, AttributeError) as e:
38         log.msg(e)
39         if sys.platform == "win32":
40             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
41                                       "Windows support requires at least Vista, and has only been tested on Windows 7.")
42         raise
43
44
45 class MagicFolder(service.MultiService):
46     name = 'magic-folder'
47
48     def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile,
49                  pending_delay=1.0, clock=reactor):
50         precondition_abspath(local_path_u)
51
52         service.MultiService.__init__(self)
53
54         db = magicfolderdb.get_magicfolderdb(dbfile, create_version=(magicfolderdb.SCHEMA_v1, 1))
55         if db is None:
56             return Failure(Exception('ERROR: Unable to load magic folder db.'))
57
58         # for tests
59         self._client = client
60         self._db = db
61
62         upload_dirnode = self._client.create_node_from_uri(upload_dircap)
63         collective_dirnode = self._client.create_node_from_uri(collective_dircap)
64
65         self.uploader = Uploader(client, local_path_u, db, upload_dirnode, pending_delay, clock)
66         self.downloader = Downloader(client, local_path_u, db, collective_dirnode, upload_dirnode.get_readonly_uri(), clock)
67
68     def startService(self):
69         # TODO: why is this being called more than once?
70         if self.running:
71             return defer.succeed(None)
72         print "%r.startService" % (self,)
73         service.MultiService.startService(self)
74         return self.uploader.start_monitoring()
75
76     def ready(self):
77         """ready is used to signal us to start
78         processing the upload and download items...
79         """
80         d = self.uploader.start_scanning()
81         d2 = self.downloader.start_scanning()
82         d.addCallback(lambda ign: d2)
83         return d
84
85     def finish(self):
86         print "finish"
87         d = self.uploader.stop()
88         d2 = self.downloader.stop()
89         d.addCallback(lambda ign: d2)
90         return d
91
92     def remove_service(self):
93         return service.MultiService.disownServiceParent(self)
94
95
96 class QueueMixin(HookMixin):
97     def __init__(self, client, local_path_u, db, name, clock):
98         self._client = client
99         self._local_path_u = local_path_u
100         self._local_filepath = to_filepath(local_path_u)
101         self._db = db
102         self._name = name
103         self._clock = clock
104         self._hooks = {'processed': None, 'started': None}
105         self.started_d = self.set_hook('started')
106
107         if not self._local_filepath.exists():
108             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
109                                  "but there is no directory at that location."
110                                  % quote_local_unicode_path(self._local_path_u))
111         if not self._local_filepath.isdir():
112             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
113                                  "but the thing at that location is not a directory."
114                                  % quote_local_unicode_path(self._local_path_u))
115
116         self._deque = deque()
117         self._lazy_tail = defer.succeed(None)
118         self._pending = set()
119         self._stopped = False
120         self._turn_delay = 0
121
122     def _get_filepath(self, relpath_u):
123         self._log("_get_filepath(%r)" % (relpath_u,))
124         return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
125
126     def _get_relpath(self, filepath):
127         self._log("_get_relpath(%r)" % (filepath,))
128         segments = unicode_segments_from(filepath, self._local_filepath)
129         self._log("segments = %r" % (segments,))
130         return u"/".join(segments)
131
132     def _count(self, counter_name, delta=1):
133         ctr = 'magic_folder.%s.%s' % (self._name, counter_name)
134         self._log("%s += %r" % (counter_name, delta))
135         self._client.stats_provider.count(ctr, delta)
136
137     def _logcb(self, res, msg):
138         self._log("%s: %r" % (msg, res))
139         return res
140
141     def _log(self, msg):
142         s = "Magic Folder %s %s: %s" % (quote_output(self._client.nickname), self._name, msg)
143         self._client.log(s)
144         print s
145         #open("events", "ab+").write(msg)
146
147     def _append_to_deque(self, relpath_u):
148         self._log("_append_to_deque(%r)" % (relpath_u,))
149         if relpath_u in self._pending or magicpath.should_ignore_file(relpath_u):
150             return
151         self._deque.append(relpath_u)
152         self._pending.add(relpath_u)
153         self._count('objects_queued')
154         if self.is_ready:
155             self._clock.callLater(0, self._turn_deque)
156
157     def _turn_deque(self):
158         self._log("_turn_deque")
159         if self._stopped:
160             self._log("stopped")
161             return
162         try:
163             item = self._deque.pop()
164             self._log("popped %r" % (item,))
165             self._count('objects_queued', -1)
166         except IndexError:
167             self._log("deque is now empty")
168             self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
169         else:
170             self._lazy_tail.addCallback(lambda ign: self._process(item))
171             self._lazy_tail.addBoth(self._call_hook, 'processed')
172             self._lazy_tail.addErrback(log.err)
173             self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
174
175
176 class Uploader(QueueMixin):
177     def __init__(self, client, local_path_u, db, upload_dirnode, pending_delay, clock):
178         QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
179
180         self.is_ready = False
181
182         if not IDirectoryNode.providedBy(upload_dirnode):
183             raise AssertionError("The URI in '%s' does not refer to a directory."
184                                  % os.path.join('private', 'magic_folder_dircap'))
185         if upload_dirnode.is_unknown() or upload_dirnode.is_readonly():
186             raise AssertionError("The URI in '%s' is not a writecap to a directory."
187                                  % os.path.join('private', 'magic_folder_dircap'))
188
189         self._upload_dirnode = upload_dirnode
190         self._inotify = get_inotify_module()
191         self._notifier = self._inotify.INotify()
192
193         if hasattr(self._notifier, 'set_pending_delay'):
194             self._notifier.set_pending_delay(pending_delay)
195
196         # TODO: what about IN_MOVE_SELF, IN_MOVED_FROM, or IN_UNMOUNT?
197         #
198         self.mask = ( self._inotify.IN_CREATE
199                     | self._inotify.IN_CLOSE_WRITE
200                     | self._inotify.IN_MOVED_TO
201                     | self._inotify.IN_MOVED_FROM
202                     | self._inotify.IN_DELETE
203                     | self._inotify.IN_ONLYDIR
204                     | IN_EXCL_UNLINK
205                     )
206         self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
207                              recursive=True)
208
209     def start_monitoring(self):
210         self._log("start_monitoring")
211         d = defer.succeed(None)
212         d.addCallback(lambda ign: self._notifier.startReading())
213         d.addCallback(lambda ign: self._count('dirs_monitored'))
214         d.addBoth(self._call_hook, 'started')
215         return d
216
217     def stop(self):
218         self._log("stop")
219         self._notifier.stopReading()
220         self._count('dirs_monitored', -1)
221         if hasattr(self._notifier, 'wait_until_stopped'):
222             d = self._notifier.wait_until_stopped()
223         else:
224             d = defer.succeed(None)
225         d.addCallback(lambda ign: self._lazy_tail)
226         return d
227
228     def start_scanning(self):
229         self._log("start_scanning")
230         self.is_ready = True
231         self._pending = self._db.get_all_relpaths()
232         self._log("all_files %r" % (self._pending))
233         d = self._scan(u"")
234         def _add_pending(ign):
235             # This adds all of the files that were in the db but not already processed
236             # (normally because they have been deleted on disk).
237             self._log("adding %r" % (self._pending))
238             self._deque.extend(self._pending)
239         d.addCallback(_add_pending)
240         d.addCallback(lambda ign: self._turn_deque())
241         return d
242
243     def _scan(self, reldir_u):
244         self._log("scan %r" % (reldir_u,))
245         fp = self._get_filepath(reldir_u)
246         try:
247             children = listdir_filepath(fp)
248         except EnvironmentError:
249             raise Exception("WARNING: magic folder: permission denied on directory %s"
250                             % quote_filepath(fp))
251         except FilenameEncodingError:
252             raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
253                             % quote_filepath(fp))
254
255         d = defer.succeed(None)
256         for child in children:
257             _assert(isinstance(child, unicode), child=child)
258             d.addCallback(lambda ign, child=child:
259                           ("%s/%s" % (reldir_u, child) if reldir_u else child))
260             def _add_pending(relpath_u):
261                 if magicpath.should_ignore_file(relpath_u):
262                     return None
263
264                 self._pending.add(relpath_u)
265                 return relpath_u
266             d.addCallback(_add_pending)
267             # This call to _process doesn't go through the deque, and probably should.
268             d.addCallback(self._process)
269             d.addBoth(self._call_hook, 'processed')
270             d.addErrback(log.err)
271
272         return d
273
274     def _notify(self, opaque, path, events_mask):
275         self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
276
277         # We filter out IN_CREATE events not associated with a directory.
278         # Acting on IN_CREATE for files could cause us to read and upload
279         # a possibly-incomplete file before the application has closed it.
280         # There should always be an IN_CLOSE_WRITE after an IN_CREATE, I think.
281         # It isn't possible to avoid watching for IN_CREATE at all, because
282         # it is the only event notified for a directory creation.
283
284         if ((events_mask & self._inotify.IN_CREATE) != 0 and
285             (events_mask & self._inotify.IN_ISDIR) == 0):
286             self._log("ignoring inotify event for creation of file %r\n" % (path,))
287             return
288
289         relpath_u = self._get_relpath(path)
290         self._append_to_deque(relpath_u)
291
292     def _when_queue_is_empty(self):
293         return defer.succeed(None)
294
295     def _process(self, relpath_u):
296         self._log("_process(%r)" % (relpath_u,))
297         if relpath_u is None:
298             return
299         precondition(isinstance(relpath_u, unicode), relpath_u)
300         precondition(not relpath_u.endswith(u'/'), relpath_u)
301
302         d = defer.succeed(None)
303
304         def _maybe_upload(val, now=None):
305             if now is None:
306                 now = time.time()
307             fp = self._get_filepath(relpath_u)
308             pathinfo = get_pathinfo(unicode_from_filepath(fp))
309
310             self._log("pending = %r, about to remove %r" % (self._pending, relpath_u))
311             self._pending.remove(relpath_u)
312             encoded_path_u = magicpath.path2magic(relpath_u)
313
314             if not pathinfo.exists:
315                 # FIXME merge this with the 'isfile' case.
316                 self._log("notified object %s disappeared (this is normal)" % quote_filepath(fp))
317                 self._count('objects_disappeared')
318                 if not self._db.check_file_db_exists(relpath_u):
319                     return None
320
321                 last_downloaded_timestamp = now
322                 last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
323
324                 current_version = self._db.get_local_file_version(relpath_u)
325                 if current_version is None:
326                     new_version = 0
327                 elif self._db.is_new_file(pathinfo, relpath_u):
328                     new_version = current_version + 1
329                 else:
330                     self._log("Not uploading %r" % (relpath_u,))
331                     self._count('objects_not_uploaded')
332                     return
333
334                 metadata = { 'version': new_version,
335                              'deleted': True,
336                              'last_downloaded_timestamp': last_downloaded_timestamp }
337                 if last_downloaded_uri is not None:
338                     metadata['last_downloaded_uri'] = last_downloaded_uri
339
340                 empty_uploadable = Data("", self._client.convergence)
341                 d2 = self._upload_dirnode.add_file(encoded_path_u, empty_uploadable,
342                                                    metadata=metadata, overwrite=True)
343
344                 def _add_db_entry(filenode):
345                     filecap = filenode.get_uri()
346                     self._db.did_upload_version(relpath_u, new_version, filecap,
347                                                 last_downloaded_uri, last_downloaded_timestamp, pathinfo)
348                     self._count('files_uploaded')
349                 d2.addCallback(_add_db_entry)
350                 return d2
351             elif pathinfo.islink:
352                 self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
353                 return None
354             elif pathinfo.isdir:
355                 if not getattr(self._notifier, 'recursive_includes_new_subdirectories', False):
356                     self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
357
358                 uploadable = Data("", self._client.convergence)
359                 encoded_path_u += magicpath.path2magic(u"/")
360                 self._log("encoded_path_u =  %r" % (encoded_path_u,))
361                 upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True)
362                 def _succeeded(ign):
363                     self._log("created subdirectory %r" % (relpath_u,))
364                     self._count('directories_created')
365                 def _failed(f):
366                     self._log("failed to create subdirectory %r" % (relpath_u,))
367                     return f
368                 upload_d.addCallbacks(_succeeded, _failed)
369                 upload_d.addCallback(lambda ign: self._scan(relpath_u))
370                 return upload_d
371             elif pathinfo.isfile:
372                 last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
373                 last_downloaded_timestamp = now
374
375                 current_version = self._db.get_local_file_version(relpath_u)
376                 if current_version is None:
377                     new_version = 0
378                 elif self._db.is_new_file(pathinfo, relpath_u):
379                     new_version = current_version + 1
380                 else:
381                     self._log("Not uploading %r" % (relpath_u,))
382                     self._count('objects_not_uploaded')
383                     return None
384
385                 metadata = { 'version': new_version,
386                              'last_downloaded_timestamp': last_downloaded_timestamp }
387                 if last_downloaded_uri is not None:
388                     metadata['last_downloaded_uri'] = last_downloaded_uri
389
390                 uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
391                 d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable,
392                                                    metadata=metadata, overwrite=True)
393
394                 def _add_db_entry(filenode):
395                     filecap = filenode.get_uri()
396                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
397                     self._db.did_upload_version(relpath_u, new_version, filecap,
398                                                 last_downloaded_uri, last_downloaded_timestamp, pathinfo)
399                     self._count('files_uploaded')
400                 d2.addCallback(_add_db_entry)
401                 return d2
402             else:
403                 self.warn("WARNING: cannot process special file %s" % quote_filepath(fp))
404                 return None
405
406         d.addCallback(_maybe_upload)
407
408         def _succeeded(res):
409             self._count('objects_succeeded')
410             return res
411         def _failed(f):
412             self._count('objects_failed')
413             self._log("%s while processing %r" % (f, relpath_u))
414             return f
415         d.addCallbacks(_succeeded, _failed)
416         return d
417
418     def _get_metadata(self, encoded_path_u):
419         try:
420             d = self._upload_dirnode.get_metadata_for(encoded_path_u)
421         except KeyError:
422             return Failure()
423         return d
424
425     def _get_filenode(self, encoded_path_u):
426         try:
427             d = self._upload_dirnode.get(encoded_path_u)
428         except KeyError:
429             return Failure()
430         return d
431
432
433 class WriteFileMixin(object):
434     FUDGE_SECONDS = 10.0
435
436     def _get_conflicted_filename(self, abspath_u):
437         return abspath_u + u".conflict"
438
439     def _write_downloaded_file(self, abspath_u, file_contents, is_conflict=False, now=None):
440         self._log("_write_downloaded_file(%r, <%d bytes>, is_conflict=%r, now=%r)"
441                   % (abspath_u, len(file_contents), is_conflict, now))
442
443         # 1. Write a temporary file, say .foo.tmp.
444         # 2. is_conflict determines whether this is an overwrite or a conflict.
445         # 3. Set the mtime of the replacement file to be T seconds before the
446         #    current local time.
447         # 4. Perform a file replacement with backup filename foo.backup,
448         #    replaced file foo, and replacement file .foo.tmp. If any step of
449         #    this operation fails, reclassify as a conflict and stop.
450         #
451         # Returns the path of the destination file.
452
453         precondition_abspath(abspath_u)
454         replacement_path_u = abspath_u + u".tmp"  # FIXME more unique
455         backup_path_u = abspath_u + u".backup"
456         if now is None:
457             now = time.time()
458
459         # ensure parent directory exists
460         head, tail = os.path.split(abspath_u)
461         mode = 0777 # XXX
462         fileutil.make_dirs(head, mode)
463
464         fileutil.write(replacement_path_u, file_contents)
465         os.utime(replacement_path_u, (now, now - self.FUDGE_SECONDS))
466         if is_conflict:
467             print "0x00 ------------ <><> is conflict; calling _rename_conflicted_file... %r %r" % (abspath_u, replacement_path_u)
468             return self._rename_conflicted_file(abspath_u, replacement_path_u)
469         else:
470             try:
471                 fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u)
472                 return abspath_u
473             except fileutil.ConflictError:
474                 return self._rename_conflicted_file(abspath_u, replacement_path_u)
475
476     def _rename_conflicted_file(self, abspath_u, replacement_path_u):
477         self._log("_rename_conflicted_file(%r, %r)" % (abspath_u, replacement_path_u))
478
479         conflict_path_u = self._get_conflicted_filename(abspath_u)
480         print "XXX rename %r %r" % (replacement_path_u, conflict_path_u)
481         if os.path.isfile(replacement_path_u):
482             print "%r exists" % (replacement_path_u,)
483         if os.path.isfile(conflict_path_u):
484             print "%r exists" % (conflict_path_u,)
485
486         fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
487         return conflict_path_u
488
489     def _rename_deleted_file(self, abspath_u):
490         self._log('renaming deleted file to backup: %s' % (abspath_u,))
491         try:
492             fileutil.rename_no_overwrite(abspath_u, abspath_u + u'.backup')
493         except IOError:
494             # XXX is this the correct error?
495             self._log("Already gone: '%s'" % (abspath_u,))
496         return abspath_u
497
498
499 class Downloader(QueueMixin, WriteFileMixin):
500     REMOTE_SCAN_INTERVAL = 3  # facilitates tests
501
502     def __init__(self, client, local_path_u, db, collective_dirnode, upload_readonly_dircap, clock):
503         QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
504
505         if not IDirectoryNode.providedBy(collective_dirnode):
506             raise AssertionError("The URI in '%s' does not refer to a directory."
507                                  % os.path.join('private', 'collective_dircap'))
508         if collective_dirnode.is_unknown() or not collective_dirnode.is_readonly():
509             raise AssertionError("The URI in '%s' is not a readonly cap to a directory."
510                                  % os.path.join('private', 'collective_dircap'))
511
512         self._collective_dirnode = collective_dirnode
513         self._upload_readonly_dircap = upload_readonly_dircap
514
515         self._turn_delay = self.REMOTE_SCAN_INTERVAL
516         self._download_scan_batch = {} # path -> [(filenode, metadata)]
517
518     def start_scanning(self):
519         self._log("start_scanning")
520         files = self._db.get_all_relpaths()
521         self._log("all files %s" % files)
522
523         d = self._scan_remote_collective()
524         d.addBoth(self._logcb, "after _scan_remote_collective 0")
525         self._turn_deque()
526         return d
527
528     def stop(self):
529         self._stopped = True
530         d = defer.succeed(None)
531         d.addCallback(lambda ign: self._lazy_tail)
532         return d
533
534     def _should_download(self, relpath_u, remote_version):
535         """
536         _should_download returns a bool indicating whether or not a remote object should be downloaded.
537         We check the remote metadata version against our magic-folder db version number;
538         latest version wins.
539         """
540         self._log("_should_download(%r, %r)" % (relpath_u, remote_version))
541         if magicpath.should_ignore_file(relpath_u):
542             self._log("nope")
543             return False
544         self._log("yep")
545         v = self._db.get_local_file_version(relpath_u)
546         self._log("v = %r" % (v,))
547         return (v is None or v < remote_version)
548
549     def _get_local_latest(self, relpath_u):
550         """
551         _get_local_latest takes a unicode path string checks to see if this file object
552         exists in our magic-folder db; if not then return None
553         else check for an entry in our magic-folder db and return the version number.
554         """
555         if not self._get_filepath(relpath_u).exists():
556             return None
557         return self._db.get_local_file_version(relpath_u)
558
559     def _get_collective_latest_file(self, filename):
560         """
561         _get_collective_latest_file takes a file path pointing to a file managed by
562         magic-folder and returns a deferred that fires with the two tuple containing a
563         file node and metadata for the latest version of the file located in the
564         magic-folder collective directory.
565         """
566         collective_dirmap_d = self._collective_dirnode.list()
567         def scan_collective(result):
568             list_of_deferreds = []
569             for dir_name in result.keys():
570                 # XXX make sure it's a directory
571                 d = defer.succeed(None)
572                 d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
573                 list_of_deferreds.append(d)
574             deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
575             return deferList
576         collective_dirmap_d.addCallback(scan_collective)
577         def highest_version(deferredList):
578             max_version = 0
579             metadata = None
580             node = None
581             for success, result in deferredList:
582                 if success:
583                     if result[1]['version'] > max_version:
584                         node, metadata = result
585                         max_version = result[1]['version']
586             return node, metadata
587         collective_dirmap_d.addCallback(highest_version)
588         return collective_dirmap_d
589
590     def _append_to_batch(self, name, file_node, metadata):
591         if self._download_scan_batch.has_key(name):
592             self._download_scan_batch[name] += [(file_node, metadata)]
593         else:
594             self._download_scan_batch[name] = [(file_node, metadata)]
595
596     def _scan_remote(self, nickname, dirnode):
597         self._log("_scan_remote nickname %r" % (nickname,))
598         d = dirnode.list()
599         def scan_listing(listing_map):
600             for encoded_relpath_u in listing_map.keys():
601                 relpath_u = magicpath.magic2path(encoded_relpath_u)
602                 self._log("found %r" % (relpath_u,))
603
604                 file_node, metadata = listing_map[encoded_relpath_u]
605                 local_version = self._get_local_latest(relpath_u)
606                 remote_version = metadata.get('version', None)
607                 self._log("%r has local version %r, remote version %r" % (relpath_u, local_version, remote_version))
608                 if local_version is None or remote_version is None or local_version < remote_version:
609                     self._log("%r added to download queue" % (relpath_u,))
610                     self._append_to_batch(relpath_u, file_node, metadata)
611         d.addCallback(scan_listing)
612         d.addBoth(self._logcb, "end of _scan_remote")
613         return d
614
615     def _scan_remote_collective(self):
616         self._log("_scan_remote_collective")
617         self._download_scan_batch = {} # XXX
618
619         d = self._collective_dirnode.list()
620         def scan_collective(dirmap):
621             d2 = defer.succeed(None)
622             for dir_name in dirmap:
623                 (dirnode, metadata) = dirmap[dir_name]
624                 if dirnode.get_readonly_uri() != self._upload_readonly_dircap:
625                     d2.addCallback(lambda ign, dir_name=dir_name: self._scan_remote(dir_name, dirnode))
626                     def _err(f):
627                         self._log("failed to scan DMD for client %r: %s" % (dir_name, f))
628                         # XXX what should we do to make this failure more visible to users?
629                     d2.addErrback(_err)
630             return d2
631         d.addCallback(scan_collective)
632         d.addCallback(self._filter_scan_batch)
633         d.addCallback(self._add_batch_to_download_queue)
634         return d
635
636     def _add_batch_to_download_queue(self, result):
637         self._log("result = %r" % (result,))
638         self._log("deque = %r" % (self._deque,))
639         self._deque.extend(result)
640         self._log("deque after = %r" % (self._deque,))
641         self._count('objects_queued', len(result))
642         self._log("pending = %r" % (self._pending,))
643         self._pending.update(map(lambda x: x[0], result))
644         self._log("pending after = %r" % (self._pending,))
645
646     def _filter_scan_batch(self, result):
647         self._log("_filter_scan_batch")
648         extension = [] # consider whether this should be a dict
649         for relpath_u in self._download_scan_batch.keys():
650             if relpath_u in self._pending:
651                 continue
652             file_node, metadata = max(self._download_scan_batch[relpath_u], key=lambda x: x[1]['version'])
653             if self._should_download(relpath_u, metadata['version']):
654                 extension += [(relpath_u, file_node, metadata)]
655             else:
656                 self._log("Excluding %r" % (relpath_u,))
657                 self._count('objects_excluded')
658                 self._call_hook(None, 'processed')
659         return extension
660
661     def _when_queue_is_empty(self):
662         d = task.deferLater(self._clock, self._turn_delay, self._scan_remote_collective)
663         d.addBoth(self._logcb, "after _scan_remote_collective 1")
664         d.addCallback(lambda ign: self._turn_deque())
665         return d
666
667     def _process(self, item, now=None):
668         self._log("_process(%r)" % (item,))
669         if now is None:
670             now = time.time()
671         (relpath_u, file_node, metadata) = item
672         fp = self._get_filepath(relpath_u)
673         abspath_u = unicode_from_filepath(fp)
674         conflict_path_u = self._get_conflicted_filename(abspath_u)
675         d = defer.succeed(None)
676
677         def do_update_db(written_abspath_u):
678             filecap = file_node.get_uri()
679             last_uploaded_uri = metadata.get('last_uploaded_uri', None)
680             last_downloaded_uri = filecap
681             last_downloaded_timestamp = now
682             written_pathinfo = get_pathinfo(written_abspath_u)
683
684             if not written_pathinfo.exists and not metadata.get('deleted', False):
685                 raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u))
686
687             self._db.did_upload_version(relpath_u, metadata['version'], last_uploaded_uri,
688                                         last_downloaded_uri, last_downloaded_timestamp, written_pathinfo)
689             self._count('objects_downloaded')
690         def failed(f):
691             self._log("download failed: %s" % (str(f),))
692             self._count('objects_failed')
693             return f
694
695         if os.path.isfile(conflict_path_u):
696             def fail(res):
697                 raise ConflictError("download failed: already conflicted: %r" % (relpath_u,))
698             d.addCallback(fail)
699         else:
700             is_conflict = False
701             if self._db.check_file_db_exists(relpath_u):
702                 dmd_last_downloaded_uri = metadata.get('last_downloaded_uri', None)
703                 local_last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
704                 print "metadata %r" % (metadata,)
705                 print "<<<<--- if %r != %r" % (dmd_last_downloaded_uri, local_last_downloaded_uri)
706                 if dmd_last_downloaded_uri is not None and local_last_downloaded_uri is not None:
707                     if dmd_last_downloaded_uri != local_last_downloaded_uri:
708                         is_conflict = True
709                         self._count('objects_conflicted')
710
711                 #dmd_last_uploaded_uri = metadata.get('last_uploaded_uri', None)
712                 #local_last_uploaded_uri = ...
713
714             if relpath_u.endswith(u"/"):
715                 if metadata.get('deleted', False):
716                     self._log("rmdir(%r) ignored" % (abspath_u,))
717                 else:
718                     self._log("mkdir(%r)" % (abspath_u,))
719                     d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
720                     d.addCallback(lambda ign: abspath_u)
721             else:
722                 if metadata.get('deleted', False):
723                     d.addCallback(lambda ign: self._rename_deleted_file(abspath_u))
724                 else:
725                     d.addCallback(lambda ign: file_node.download_best_version())
726                     d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents,
727                                                                                is_conflict=is_conflict))
728
729         d.addCallbacks(do_update_db, failed)
730
731         def remove_from_pending(res):
732             self._pending.remove(relpath_u)
733             return res
734         d.addBoth(remove_from_pending)
735         def trap_conflicts(f):
736             f.trap(ConflictError)
737             return None
738         d.addErrback(trap_conflicts)
739         return d