]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/magic_folder.py
Debugging WIP.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / magic_folder.py
1
2 import sys, os
3 import os.path
4 from collections import deque
5 import time
6
7 from twisted.internet import defer, reactor, task
8 from twisted.python.failure import Failure
9 from twisted.python import runtime
10 from twisted.application import service
11
12 from allmydata.util import fileutil
13 from allmydata.interfaces import IDirectoryNode
14 from allmydata.util import log
15 from allmydata.util.fileutil import precondition_abspath, get_pathinfo, ConflictError
16 from allmydata.util.assertutil import precondition, _assert
17 from allmydata.util.deferredutil import HookMixin
18 from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
19      extend_filepath, unicode_from_filepath, unicode_segments_from, \
20      quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
21 from allmydata.immutable.upload import FileName, Data
22 from allmydata import magicfolderdb, magicpath
23
24
25 IN_EXCL_UNLINK = 0x04000000L
26
27 def get_inotify_module():
28     try:
29         if sys.platform == "win32":
30             from allmydata.windows import inotify
31         elif runtime.platform.supportsINotify():
32             from twisted.internet import inotify
33         else:
34             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
35                                       "This currently requires Linux or Windows.")
36         return inotify
37     except (ImportError, AttributeError) as e:
38         log.msg(e)
39         if sys.platform == "win32":
40             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
41                                       "Windows support requires at least Vista, and has only been tested on Windows 7.")
42         raise
43
44
45 class MagicFolder(service.MultiService):
46     name = 'magic-folder'
47
48     def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile,
49                  pending_delay=1.0, clock=reactor):
50         precondition_abspath(local_path_u)
51
52         service.MultiService.__init__(self)
53
54         db = magicfolderdb.get_magicfolderdb(dbfile, create_version=(magicfolderdb.SCHEMA_v1, 1))
55         if db is None:
56             return Failure(Exception('ERROR: Unable to load magic folder db.'))
57
58         # for tests
59         self._client = client
60         self._db = db
61
62         self.is_ready = False
63
64         upload_dirnode = self._client.create_node_from_uri(upload_dircap)
65         collective_dirnode = self._client.create_node_from_uri(collective_dircap)
66
67         self.uploader = Uploader(client, local_path_u, db, upload_dirnode, pending_delay, clock)
68         self.downloader = Downloader(client, local_path_u, db, collective_dirnode, upload_dirnode.get_readonly_uri(), clock)
69
70     def startService(self):
71         # TODO: why is this being called more than once?
72         if self.running:
73             return defer.succeed(None)
74         print "%r.startService" % (self,)
75         service.MultiService.startService(self)
76         return self.uploader.start_monitoring()
77
78     def ready(self):
79         """ready is used to signal us to start
80         processing the upload and download items...
81         """
82         self.is_ready = True
83         d = self.uploader.start_scanning()
84         d2 = self.downloader.start_scanning()
85         d.addCallback(lambda ign: d2)
86         return d
87
88     def finish(self):
89         print "finish"
90         d = self.uploader.stop()
91         d2 = self.downloader.stop()
92         d.addCallback(lambda ign: d2)
93         return d
94
95     def remove_service(self):
96         return service.MultiService.disownServiceParent(self)
97
98
99 class QueueMixin(HookMixin):
100     def __init__(self, client, local_path_u, db, name, clock):
101         self._client = client
102         self._local_path_u = local_path_u
103         self._local_filepath = to_filepath(local_path_u)
104         self._db = db
105         self._name = name
106         self._clock = clock
107         self._hooks = {'processed': None, 'started': None}
108         self.started_d = self.set_hook('started')
109
110         if not self._local_filepath.exists():
111             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
112                                  "but there is no directory at that location."
113                                  % quote_local_unicode_path(self._local_path_u))
114         if not self._local_filepath.isdir():
115             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
116                                  "but the thing at that location is not a directory."
117                                  % quote_local_unicode_path(self._local_path_u))
118
119         self._deque = deque()
120         self._lazy_tail = defer.succeed(None)
121         self._pending = set()
122         self._stopped = False
123         self._turn_delay = 0
124
125     def _get_filepath(self, relpath_u):
126         self._log("_get_filepath(%r)" % (relpath_u,))
127         return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
128
129     def _get_relpath(self, filepath):
130         self._log("_get_relpath(%r)" % (filepath,))
131         segments = unicode_segments_from(filepath, self._local_filepath)
132         self._log("segments = %r" % (segments,))
133         return u"/".join(segments)
134
135     def _count(self, counter_name, delta=1):
136         ctr = 'magic_folder.%s.%s' % (self._name, counter_name)
137         self._log("%s += %r" % (counter_name, delta))
138         self._client.stats_provider.count(ctr, delta)
139
140     def _logcb(self, res, msg):
141         self._log("%s: %r" % (msg, res))
142         return res
143
144     def _log(self, msg):
145         s = "Magic Folder %s %s: %s" % (quote_output(self._client.nickname), self._name, msg)
146         self._client.log(s)
147         print s
148         #open("events", "ab+").write(msg)
149
150     def _append_to_deque(self, relpath_u):
151         self._log("_append_to_deque(%r)" % (relpath_u,))
152         if relpath_u in self._pending or magicpath.should_ignore_file(relpath_u):
153             return
154         self._deque.append(relpath_u)
155         self._pending.add(relpath_u)
156         self._count('objects_queued')
157         if self.is_ready:
158             self._clock.callLater(0, self._turn_deque)
159
160     def _turn_deque(self):
161         self._log("_turn_deque")
162         if self._stopped:
163             self._log("stopped")
164             return
165         try:
166             item = self._deque.pop()
167             self._log("popped %r" % (item,))
168             self._count('objects_queued', -1)
169         except IndexError:
170             self._log("deque is now empty")
171             self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
172         else:
173             self._lazy_tail.addCallback(lambda ign: self._process(item))
174             self._lazy_tail.addBoth(self._call_hook, 'processed')
175             self._lazy_tail.addErrback(log.err)
176             self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
177
178
179 class Uploader(QueueMixin):
180     def __init__(self, client, local_path_u, db, upload_dirnode, pending_delay, clock):
181         QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
182
183         self.is_ready = False
184
185         if not IDirectoryNode.providedBy(upload_dirnode):
186             raise AssertionError("The URI in '%s' does not refer to a directory."
187                                  % os.path.join('private', 'magic_folder_dircap'))
188         if upload_dirnode.is_unknown() or upload_dirnode.is_readonly():
189             raise AssertionError("The URI in '%s' is not a writecap to a directory."
190                                  % os.path.join('private', 'magic_folder_dircap'))
191
192         self._upload_dirnode = upload_dirnode
193         self._inotify = get_inotify_module()
194         self._notifier = self._inotify.INotify()
195
196         if hasattr(self._notifier, 'set_pending_delay'):
197             self._notifier.set_pending_delay(pending_delay)
198
199         # TODO: what about IN_MOVE_SELF, IN_MOVED_FROM, or IN_UNMOUNT?
200         #
201         self.mask = ( self._inotify.IN_CREATE
202                     | self._inotify.IN_CLOSE_WRITE
203                     | self._inotify.IN_MOVED_TO
204                     | self._inotify.IN_MOVED_FROM
205                     | self._inotify.IN_DELETE
206                     | self._inotify.IN_ONLYDIR
207                     | IN_EXCL_UNLINK
208                     )
209         self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
210                              recursive=True)
211
212     def start_monitoring(self):
213         self._log("start_monitoring")
214         d = defer.succeed(None)
215         d.addCallback(lambda ign: self._notifier.startReading())
216         d.addCallback(lambda ign: self._count('dirs_monitored'))
217         d.addBoth(self._call_hook, 'started')
218         return d
219
220     def stop(self):
221         self._log("stop")
222         self._notifier.stopReading()
223         self._count('dirs_monitored', -1)
224         if hasattr(self._notifier, 'wait_until_stopped'):
225             d = self._notifier.wait_until_stopped()
226         else:
227             d = defer.succeed(None)
228         d.addCallback(lambda ign: self._lazy_tail)
229         return d
230
231     def start_scanning(self):
232         self._log("start_scanning")
233         self.is_ready = True
234         self._pending = self._db.get_all_relpaths()
235         self._log("all_files %r" % (self._pending))
236         d = self._scan(u"")
237         def _add_pending(ign):
238             # This adds all of the files that were in the db but not already processed
239             # (normally because they have been deleted on disk).
240             self._log("adding %r" % (self._pending))
241             self._deque.extend(self._pending)
242         d.addCallback(_add_pending)
243         d.addCallback(lambda ign: self._turn_deque())
244         return d
245
246     def _scan(self, reldir_u):
247         self._log("scan %r" % (reldir_u,))
248         fp = self._get_filepath(reldir_u)
249         try:
250             children = listdir_filepath(fp)
251         except EnvironmentError:
252             raise Exception("WARNING: magic folder: permission denied on directory %s"
253                             % quote_filepath(fp))
254         except FilenameEncodingError:
255             raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
256                             % quote_filepath(fp))
257
258         d = defer.succeed(None)
259         for child in children:
260             _assert(isinstance(child, unicode), child=child)
261             d.addCallback(lambda ign, child=child:
262                           ("%s/%s" % (reldir_u, child) if reldir_u else child))
263             def _add_pending(relpath_u):
264                 if magicpath.should_ignore_file(relpath_u):
265                     return None
266
267                 self._pending.add(relpath_u)
268                 return relpath_u
269             d.addCallback(_add_pending)
270             # This call to _process doesn't go through the deque, and probably should.
271             d.addCallback(self._process)
272             d.addBoth(self._call_hook, 'processed')
273             d.addErrback(log.err)
274
275         return d
276
277     def _notify(self, opaque, path, events_mask):
278         self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
279
280         # We filter out IN_CREATE events not associated with a directory.
281         # Acting on IN_CREATE for files could cause us to read and upload
282         # a possibly-incomplete file before the application has closed it.
283         # There should always be an IN_CLOSE_WRITE after an IN_CREATE, I think.
284         # It isn't possible to avoid watching for IN_CREATE at all, because
285         # it is the only event notified for a directory creation.
286
287         if ((events_mask & self._inotify.IN_CREATE) != 0 and
288             (events_mask & self._inotify.IN_ISDIR) == 0):
289             self._log("ignoring inotify event for creation of file %r\n" % (path,))
290             return
291
292         relpath_u = self._get_relpath(path)
293         self._append_to_deque(relpath_u)
294
295     def _when_queue_is_empty(self):
296         return defer.succeed(None)
297
298     def _process(self, relpath_u):
299         self._log("_process(%r)" % (relpath_u,))
300         if relpath_u is None:
301             return
302         precondition(isinstance(relpath_u, unicode), relpath_u)
303
304         d = defer.succeed(None)
305
306         def _maybe_upload(val, now=None):
307             if now is None:
308                 now = time.time()
309             fp = self._get_filepath(relpath_u)
310             pathinfo = get_pathinfo(unicode_from_filepath(fp))
311
312             self._log("pending = %r, about to remove %r" % (self._pending, relpath_u))
313             self._pending.remove(relpath_u)
314             encoded_path_u = magicpath.path2magic(relpath_u)
315
316             if not pathinfo.exists:
317                 # FIXME merge this with the 'isfile' case.
318                 self._log("notified object %s disappeared (this is normal)" % quote_filepath(fp))
319                 self._count('objects_disappeared')
320                 if not self._db.check_file_db_exists(relpath_u):
321                     return None
322
323                 last_downloaded_timestamp = now
324                 last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
325
326                 current_version = self._db.get_local_file_version(relpath_u)
327                 if current_version is None:
328                     new_version = 0
329                 elif self._db.is_new_file(pathinfo, relpath_u):
330                     new_version = current_version + 1
331                 else:
332                     self._log("Not uploading %r" % (relpath_u,))
333                     self._count('objects_not_uploaded')
334                     return
335
336                 metadata = { 'version': new_version,
337                              'deleted': True,
338                              'last_downloaded_timestamp': last_downloaded_timestamp }
339                 if last_downloaded_uri is not None:
340                     metadata['last_downloaded_uri'] = last_downloaded_uri
341
342                 empty_uploadable = Data("", self._client.convergence)
343                 d2 = self._upload_dirnode.add_file(encoded_path_u, empty_uploadable,
344                                                    metadata=metadata, overwrite=True)
345
346                 def _add_db_entry(filenode):
347                     filecap = filenode.get_uri()
348                     self._db.did_upload_version(relpath_u, new_version, filecap,
349                                                 last_downloaded_uri, last_downloaded_timestamp, pathinfo)
350                     self._count('files_uploaded')
351                 d2.addCallback(_add_db_entry)
352                 return d2
353             elif pathinfo.islink:
354                 self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
355                 return None
356             elif pathinfo.isdir:
357                 if not getattr(self._notifier, 'recursive_includes_new_subdirectories', False):
358                     self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
359
360                 uploadable = Data("", self._client.convergence)
361                 encoded_path_u += magicpath.path2magic(u"/")
362                 self._log("encoded_path_u =  %r" % (encoded_path_u,))
363                 upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True)
364                 def _succeeded(ign):
365                     self._log("created subdirectory %r" % (relpath_u,))
366                     self._count('directories_created')
367                 def _failed(f):
368                     self._log("failed to create subdirectory %r" % (relpath_u,))
369                     return f
370                 upload_d.addCallbacks(_succeeded, _failed)
371                 upload_d.addCallback(lambda ign: self._scan(relpath_u))
372                 return upload_d
373             elif pathinfo.isfile:
374                 last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
375                 last_downloaded_timestamp = now
376
377                 current_version = self._db.get_local_file_version(relpath_u)
378                 if current_version is None:
379                     new_version = 0
380                 elif self._db.is_new_file(pathinfo, relpath_u):
381                     new_version = current_version + 1
382                 else:
383                     self._log("Not uploading %r" % (relpath_u,))
384                     self._count('objects_not_uploaded')
385                     return None
386
387                 metadata = { 'version': new_version,
388                              'last_downloaded_timestamp': last_downloaded_timestamp }
389                 if last_downloaded_uri is not None:
390                     metadata['last_downloaded_uri'] = last_downloaded_uri
391
392                 uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
393                 d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable,
394                                                    metadata=metadata, overwrite=True)
395
396                 def _add_db_entry(filenode):
397                     filecap = filenode.get_uri()
398                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
399                     self._db.did_upload_version(relpath_u, new_version, filecap,
400                                                 last_downloaded_uri, last_downloaded_timestamp, pathinfo)
401                     self._count('files_uploaded')
402                 d2.addCallback(_add_db_entry)
403                 return d2
404             else:
405                 self.warn("WARNING: cannot process special file %s" % quote_filepath(fp))
406                 return None
407
408         d.addCallback(_maybe_upload)
409
410         def _succeeded(res):
411             self._count('objects_succeeded')
412             return res
413         def _failed(f):
414             self._count('objects_failed')
415             self._log("%s while processing %r" % (f, relpath_u))
416             return f
417         d.addCallbacks(_succeeded, _failed)
418         return d
419
420     def _get_metadata(self, encoded_path_u):
421         try:
422             d = self._upload_dirnode.get_metadata_for(encoded_path_u)
423         except KeyError:
424             return Failure()
425         return d
426
427     def _get_filenode(self, encoded_path_u):
428         try:
429             d = self._upload_dirnode.get(encoded_path_u)
430         except KeyError:
431             return Failure()
432         return d
433
434
435 class WriteFileMixin(object):
436     FUDGE_SECONDS = 10.0
437
438     def _get_conflicted_filename(self, abspath_u):
439         return abspath_u + u".conflict"
440
441     def _write_downloaded_file(self, abspath_u, file_contents, is_conflict=False, now=None):
442         self._log("_write_downloaded_file(%r, <%d bytes>, is_conflict=%r, now=%r)"
443                   % (abspath_u, len(file_contents), is_conflict, now))
444
445         # 1. Write a temporary file, say .foo.tmp.
446         # 2. is_conflict determines whether this is an overwrite or a conflict.
447         # 3. Set the mtime of the replacement file to be T seconds before the
448         #    current local time.
449         # 4. Perform a file replacement with backup filename foo.backup,
450         #    replaced file foo, and replacement file .foo.tmp. If any step of
451         #    this operation fails, reclassify as a conflict and stop.
452         #
453         # Returns the path of the destination file.
454
455         precondition_abspath(abspath_u)
456         replacement_path_u = abspath_u + u".tmp"  # FIXME more unique
457         backup_path_u = abspath_u + u".backup"
458         if now is None:
459             now = time.time()
460
461         # ensure parent directory exists
462         head, tail = os.path.split(abspath_u)
463         mode = 0777 # XXX
464         fileutil.make_dirs(head, mode)
465
466         fileutil.write(replacement_path_u, file_contents)
467         os.utime(replacement_path_u, (now, now - self.FUDGE_SECONDS))
468         if is_conflict:
469             print "0x00 ------------ <><> is conflict; calling _rename_conflicted_file... %r %r" % (abspath_u, replacement_path_u)
470             return self._rename_conflicted_file(abspath_u, replacement_path_u)
471         else:
472             try:
473                 fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u)
474                 return abspath_u
475             except fileutil.ConflictError:
476                 return self._rename_conflicted_file(abspath_u, replacement_path_u)
477
478     def _rename_conflicted_file(self, abspath_u, replacement_path_u):
479         self._log("_rename_conflicted_file(%r, %r)" % (abspath_u, replacement_path_u))
480
481         conflict_path_u = self._get_conflicted_filename(abspath_u)
482         print "XXX rename %r %r" % (replacement_path_u, conflict_path_u)
483         if os.path.isfile(replacement_path_u):
484             print "%r exists" % (replacement_path_u,)
485         if os.path.isfile(conflict_path_u):
486             print "%r exists" % (conflict_path_u,)
487
488         fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
489         return conflict_path_u
490
491     def _rename_deleted_file(self, abspath_u):
492         self._log('renaming deleted file to backup: %s' % (abspath_u,))
493         try:
494             fileutil.rename_no_overwrite(abspath_u, abspath_u + u'.backup')
495         except IOError:
496             # XXX is this the correct error?
497             self._log("Already gone: '%s'" % (abspath_u,))
498         return abspath_u
499
500
501 class Downloader(QueueMixin, WriteFileMixin):
502     REMOTE_SCAN_INTERVAL = 3  # facilitates tests
503
504     def __init__(self, client, local_path_u, db, collective_dirnode, upload_readonly_dircap, clock):
505         QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
506
507         if not IDirectoryNode.providedBy(collective_dirnode):
508             raise AssertionError("The URI in '%s' does not refer to a directory."
509                                  % os.path.join('private', 'collective_dircap'))
510         if collective_dirnode.is_unknown() or not collective_dirnode.is_readonly():
511             raise AssertionError("The URI in '%s' is not a readonly cap to a directory."
512                                  % os.path.join('private', 'collective_dircap'))
513
514         self._collective_dirnode = collective_dirnode
515         self._upload_readonly_dircap = upload_readonly_dircap
516
517         self._turn_delay = self.REMOTE_SCAN_INTERVAL
518         self._download_scan_batch = {} # path -> [(filenode, metadata)]
519
520     def start_scanning(self):
521         self._log("start_scanning")
522         files = self._db.get_all_relpaths()
523         self._log("all files %s" % files)
524
525         d = self._scan_remote_collective()
526         d.addBoth(self._logcb, "after _scan_remote_collective 0")
527         self._turn_deque()
528         return d
529
530     def stop(self):
531         self._stopped = True
532         d = defer.succeed(None)
533         d.addCallback(lambda ign: self._lazy_tail)
534         return d
535
536     def _should_download(self, relpath_u, remote_version):
537         """
538         _should_download returns a bool indicating whether or not a remote object should be downloaded.
539         We check the remote metadata version against our magic-folder db version number;
540         latest version wins.
541         """
542         self._log("_should_download(%r, %r)" % (relpath_u, remote_version))
543         if magicpath.should_ignore_file(relpath_u):
544             self._log("nope")
545             return False
546         self._log("yep")
547         v = self._db.get_local_file_version(relpath_u)
548         self._log("v = %r" % (v,))
549         return (v is None or v < remote_version)
550
551     def _get_local_latest(self, relpath_u):
552         """
553         _get_local_latest takes a unicode path string checks to see if this file object
554         exists in our magic-folder db; if not then return None
555         else check for an entry in our magic-folder db and return the version number.
556         """
557         if not self._get_filepath(relpath_u).exists():
558             return None
559         return self._db.get_local_file_version(relpath_u)
560
561     def _get_collective_latest_file(self, filename):
562         """
563         _get_collective_latest_file takes a file path pointing to a file managed by
564         magic-folder and returns a deferred that fires with the two tuple containing a
565         file node and metadata for the latest version of the file located in the
566         magic-folder collective directory.
567         """
568         collective_dirmap_d = self._collective_dirnode.list()
569         def scan_collective(result):
570             list_of_deferreds = []
571             for dir_name in result.keys():
572                 # XXX make sure it's a directory
573                 d = defer.succeed(None)
574                 d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
575                 list_of_deferreds.append(d)
576             deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
577             return deferList
578         collective_dirmap_d.addCallback(scan_collective)
579         def highest_version(deferredList):
580             max_version = 0
581             metadata = None
582             node = None
583             for success, result in deferredList:
584                 if success:
585                     if result[1]['version'] > max_version:
586                         node, metadata = result
587                         max_version = result[1]['version']
588             return node, metadata
589         collective_dirmap_d.addCallback(highest_version)
590         return collective_dirmap_d
591
592     def _append_to_batch(self, name, file_node, metadata):
593         if self._download_scan_batch.has_key(name):
594             self._download_scan_batch[name] += [(file_node, metadata)]
595         else:
596             self._download_scan_batch[name] = [(file_node, metadata)]
597
598     def _scan_remote(self, nickname, dirnode):
599         self._log("_scan_remote nickname %r" % (nickname,))
600         d = dirnode.list()
601         def scan_listing(listing_map):
602             for encoded_relpath_u in listing_map.keys():
603                 relpath_u = magicpath.magic2path(encoded_relpath_u)
604                 self._log("found %r" % (relpath_u,))
605
606                 file_node, metadata = listing_map[encoded_relpath_u]
607                 local_version = self._get_local_latest(relpath_u)
608                 remote_version = metadata.get('version', None)
609                 self._log("%r has local version %r, remote version %r" % (relpath_u, local_version, remote_version))
610                 if local_version is None or remote_version is None or local_version < remote_version:
611                     self._log("%r added to download queue" % (relpath_u,))
612                     self._append_to_batch(relpath_u, file_node, metadata)
613         d.addCallback(scan_listing)
614         d.addBoth(self._logcb, "end of _scan_remote")
615         return d
616
617     def _scan_remote_collective(self):
618         self._log("_scan_remote_collective")
619         self._download_scan_batch = {} # XXX
620
621         d = self._collective_dirnode.list()
622         def scan_collective(dirmap):
623             d2 = defer.succeed(None)
624             for dir_name in dirmap:
625                 (dirnode, metadata) = dirmap[dir_name]
626                 if dirnode.get_readonly_uri() != self._upload_readonly_dircap:
627                     d2.addCallback(lambda ign, dir_name=dir_name: self._scan_remote(dir_name, dirnode))
628                     def _err(f):
629                         self._log("failed to scan DMD for client %r: %s" % (dir_name, f))
630                         # XXX what should we do to make this failure more visible to users?
631                     d2.addErrback(_err)
632             return d2
633         d.addCallback(scan_collective)
634         d.addCallback(self._filter_scan_batch)
635         d.addCallback(self._add_batch_to_download_queue)
636         return d
637
638     def _add_batch_to_download_queue(self, result):
639         self._log("result = %r" % (result,))
640         self._log("deque = %r" % (self._deque,))
641         self._deque.extend(result)
642         self._log("deque after = %r" % (self._deque,))
643         self._count('objects_queued', len(result))
644         self._log("pending = %r" % (self._pending,))
645         self._pending.update(map(lambda x: x[0], result))
646         self._log("pending after = %r" % (self._pending,))
647
648     def _filter_scan_batch(self, result):
649         self._log("_filter_scan_batch")
650         extension = [] # consider whether this should be a dict
651         for relpath_u in self._download_scan_batch.keys():
652             if relpath_u in self._pending:
653                 continue
654             file_node, metadata = max(self._download_scan_batch[relpath_u], key=lambda x: x[1]['version'])
655             if self._should_download(relpath_u, metadata['version']):
656                 extension += [(relpath_u, file_node, metadata)]
657             else:
658                 self._log("Excluding %r" % (relpath_u,))
659                 self._count('objects_excluded')
660                 self._call_hook(None, 'processed')
661         return extension
662
663     def _when_queue_is_empty(self):
664         d = task.deferLater(self._clock, self._turn_delay, self._scan_remote_collective)
665         d.addBoth(self._logcb, "after _scan_remote_collective 1")
666         d.addCallback(lambda ign: self._turn_deque())
667         return d
668
669     def _process(self, item, now=None):
670         self._log("_process(%r)" % (item,))
671         if now is None:
672             now = time.time()
673         (relpath_u, file_node, metadata) = item
674         fp = self._get_filepath(relpath_u)
675         abspath_u = unicode_from_filepath(fp)
676         conflict_path_u = self._get_conflicted_filename(abspath_u)
677         d = defer.succeed(None)
678
679         def do_update_db(written_abspath_u):
680             filecap = file_node.get_uri()
681             last_uploaded_uri = metadata.get('last_uploaded_uri', None)
682             last_downloaded_uri = filecap
683             last_downloaded_timestamp = now
684             written_pathinfo = get_pathinfo(written_abspath_u)
685
686             if not written_pathinfo.exists and not metadata.get('deleted', False):
687                 raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u))
688
689             self._db.did_upload_version(relpath_u, metadata['version'], last_uploaded_uri,
690                                         last_downloaded_uri, last_downloaded_timestamp, written_pathinfo)
691             self._count('objects_downloaded')
692         def failed(f):
693             self._log("download failed: %s" % (str(f),))
694             self._count('objects_failed')
695             return f
696
697         if os.path.isfile(conflict_path_u):
698             def fail(res):
699                 raise ConflictError("download failed: already conflicted: %r" % (relpath_u,))
700             d.addCallback(fail)
701         else:
702             is_conflict = False
703             if self._db.check_file_db_exists(relpath_u):
704                 dmd_last_downloaded_uri = metadata.get('last_downloaded_uri', None)
705                 local_last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
706                 print "metadata %r" % (metadata,)
707                 print "<<<<--- if %r != %r" % (dmd_last_downloaded_uri, local_last_downloaded_uri)
708                 if dmd_last_downloaded_uri is not None and local_last_downloaded_uri is not None:
709                     if dmd_last_downloaded_uri != local_last_downloaded_uri:
710                         is_conflict = True
711                         self._count('objects_conflicted')
712
713                 #dmd_last_uploaded_uri = metadata.get('last_uploaded_uri', None)
714                 #local_last_uploaded_uri = ...
715
716             if relpath_u.endswith(u"/"):
717                 if metadata.get('deleted', False):
718                     self._log("rmdir(%r) ignored" % (abspath_u,))
719                 else:
720                     self._log("mkdir(%r)" % (abspath_u,))
721                     d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
722                     d.addCallback(lambda ign: abspath_u)
723             else:
724                 if metadata.get('deleted', False):
725                     d.addCallback(lambda ign: self._rename_deleted_file(abspath_u))
726                 else:
727                     d.addCallback(lambda ign: file_node.download_best_version())
728                     d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents,
729                                                                                is_conflict=is_conflict))
730
731         d.addCallbacks(do_update_db, failed)
732
733         def remove_from_pending(res):
734             self._pending.remove(relpath_u)
735             return res
736         d.addBoth(remove_from_pending)
737         def trap_conflicts(f):
738             f.trap(ConflictError)
739             return None
740         d.addErrback(trap_conflicts)
741         return d