Source code for afew.Database

# SPDX-License-Identifier: ISC
# Copyright (c) Justus Winter <4winter@informatik.uni-hamburg.de>

import os
import time
import logging

import notmuch

from afew.NotmuchSettings import notmuch_settings, get_notmuch_new_tags


[docs]class Database: """ Convenience wrapper around `notmuch`. """ def __init__(self): self.db_path = self._calculate_db_path() self.handle = None def _calculate_db_path(self): """ Calculates the path to use for the database. Supports notmuch's methodology including falling back to $MAILDIR or $HOME/mail if a path is not specified and using $HOME/<path> if path is relative. """ default_path = os.environ.get('MAILDIR', '{}/mail'.format(os.environ.get('HOME'))) db_path = notmuch_settings.get('database', 'path', fallback=default_path) # If path is relative, notmuch prepends $HOME in front if not os.path.isabs(db_path): db_path = '{}/{}'.format(os.environ.get('HOME'), db_path) return db_path def __enter__(self): """ Implements the context manager protocol. """ return self def __exit__(self, exc_type, exc_value, traceback): """ Implements the context manager protocol. """ self.close() def open(self, rw=False, retry_for=180, retry_delay=1, create=False): if rw: if self.handle and self.handle.mode == notmuch.Database.MODE.READ_WRITE: return self.handle start_time = time.time() while True: try: self.handle = notmuch.Database(self.db_path, mode=notmuch.Database.MODE.READ_WRITE, create=create) break except notmuch.NotmuchError: time_left = int(retry_for - (time.time() - start_time)) if time_left <= 0: raise if time_left % 15 == 0: logging.debug( 'Opening the database failed. Will keep trying for another {} seconds'.format(time_left)) time.sleep(retry_delay) else: if not self.handle: self.handle = notmuch.Database(self.db_path, create=create) return self.handle
[docs] def close(self): """ Closes the notmuch database if it has been opened. """ if self.handle: self.handle.close() self.handle = None
[docs] def do_query(self, query): """ Executes a notmuch query. :param query: the query to execute :type query: str :returns: the query result :rtype: :class:`notmuch.Query` """ logging.debug('Executing query %r' % query) return notmuch.Query(self.open(), query)
[docs] def get_messages(self, query, full_thread=False): """ Get all messages mathing the given query. :param query: the query to execute using :func:`Database.do_query` :type query: str :param full_thread: return all messages from mathing threads :type full_thread: bool :returns: an iterator over :class:`notmuch.Message` objects """ if not full_thread: for message in self.do_query(query).search_messages(): yield message else: for thread in self.do_query(query).search_threads(): for message in self.walk_thread(thread): yield message
[docs] def walk_replies(self, message): """ Returns all replies to the given message. :param message: the message to start from :type message: :class:`notmuch.Message` :returns: an iterator over :class:`notmuch.Message` objects """ yield message # TODO: bindings are *very* unpythonic here... iterator *or* None # is a nono replies = message.get_replies() if replies != None: for message in replies: # TODO: yield from for message in self.walk_replies(message): yield message
[docs] def walk_thread(self, thread): """ Returns all messages in the given thread. :param thread: the tread you are interested in :type thread: :class:`notmuch.Thread` :returns: an iterator over :class:`notmuch.Message` objects """ for message in thread.get_toplevel_messages(): # TODO: yield from for message in self.walk_replies(message): yield message
[docs] def add_message(self, path, sync_maildir_flags=False, new_mail_handler=None): """ Adds the given message to the notmuch index. :param path: path to the message :type path: str :param sync_maildir_flags: if `True` notmuch converts the standard maildir flags to tags :type sync_maildir_flags: bool :param new_mail_handler: callback for new messages :type new_mail_handler: a function that is called with a :class:`notmuch.Message` object as its only argument :raises: :class:`notmuch.NotmuchError` if adding the message fails :returns: a :class:`notmuch.Message` object """ # TODO: it would be nice to update notmuchs directory index here handle = self.open(rw=True) if hasattr(notmuch.Database, 'index_file'): message, status = handle.index_file(path, sync_maildir_flags=sync_maildir_flags) else: message, status = handle.add_message(path, sync_maildir_flags=sync_maildir_flags) if status != notmuch.STATUS.DUPLICATE_MESSAGE_ID: logging.info('Found new mail in {}'.format(path)) for tag in get_notmuch_new_tags(): message.add_tag(tag) if new_mail_handler: new_mail_handler(message) return message
[docs] def remove_message(self, path): """ Remove the given message from the notmuch index. :param path: path to the message :type path: str """ self.open(rw=True).remove_message(path)