mirror of
https://github.com/pulb/mailnag.git
synced 2026-05-07 12:35:44 +02:00
Merge pull request #137 from tikank/maildir
Maildir backend implementation
This commit is contained in:
@@ -28,7 +28,7 @@ import re
|
||||
|
||||
from Mailnag.backends.imap import IMAPMailboxBackend
|
||||
from Mailnag.backends.pop3 import POP3MailboxBackend
|
||||
from Mailnag.backends.local import MBoxBackend
|
||||
from Mailnag.backends.local import MBoxBackend, MaildirBackend
|
||||
from Mailnag.common.utils import splitstr
|
||||
|
||||
|
||||
@@ -80,6 +80,10 @@ _backends = {
|
||||
'mbox' : Backend(MBoxBackend, [
|
||||
Param('path', 'path', str, str, ''),
|
||||
]),
|
||||
'maildir' : Backend(MaildirBackend, [
|
||||
Param('path', 'path', str, str, ''),
|
||||
Param('folders', 'folder', _str_to_folders, _folders_to_str, []),
|
||||
]),
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -33,7 +33,7 @@ class MBoxBackend(MailboxBackend):
|
||||
"""Implementation of mbox mail boxes."""
|
||||
|
||||
def __init__(self, name = '', path=None, **kw):
|
||||
"""Initialize mbox mailbox backens with a name and path."""
|
||||
"""Initialize mbox mailbox backend with a name and path."""
|
||||
self.name = name
|
||||
self.path = path
|
||||
self.opened = False
|
||||
@@ -82,3 +82,82 @@ class MBoxBackend(MailboxBackend):
|
||||
def cancel_notifications(self):
|
||||
raise NotImplementedError("mbox does not support notifications")
|
||||
|
||||
|
||||
class MaildirBackend(MailboxBackend):
|
||||
"""Implementation of maildir mail boxes."""
|
||||
|
||||
def __init__(self, name = '', path=None, folders=[], **kw):
|
||||
"""Initialize maildir mailbox backend with a name, path and folders."""
|
||||
self.name = name
|
||||
self.path = path
|
||||
self.folders = folders
|
||||
self.opened = False
|
||||
|
||||
|
||||
def open(self, reopen=False):
|
||||
"""'Open' mailbox. (Actually just checks that maildir directory exists.)"""
|
||||
if not os.path.isdir(self.path):
|
||||
raise IOError('Mailbox {} does not exist.'.format(self.path))
|
||||
self.opened = True
|
||||
|
||||
|
||||
def close(self):
|
||||
"""Close mailbox."""
|
||||
self.opened = False
|
||||
|
||||
|
||||
def is_open(self):
|
||||
"""Return True if mailbox is opened."""
|
||||
return self.opened
|
||||
|
||||
|
||||
def list_messages(self):
|
||||
"""List unread messages from the mailbox.
|
||||
Yields pairs (folder, message).
|
||||
"""
|
||||
folders = self.folders if len(self.folders) != 0 else ['']
|
||||
root_maildir = mailbox.Maildir(self.path, factory=None, create=False)
|
||||
try:
|
||||
for folder in folders:
|
||||
if isinstance(folder, unicode):
|
||||
# Python2 maildir folders must be str not unicode.
|
||||
# TODO: Python3 probably does not need this.
|
||||
folder = folder.encode('utf-8')
|
||||
maildir = self._get_folder(root_maildir, folder)
|
||||
for msg in maildir:
|
||||
if 'S' not in msg.get_flags():
|
||||
yield folder, msg
|
||||
finally:
|
||||
root_maildir.close()
|
||||
|
||||
|
||||
def request_folders(self):
|
||||
"""Lists folder from maildir recursively."""
|
||||
|
||||
def list_folders(maildir, parent):
|
||||
for folder in maildir.list_folders():
|
||||
this_folder = parent + folder
|
||||
yield this_folder
|
||||
for subfolder in list_folders(maildir.get_folder(folder), this_folder + '/'):
|
||||
yield subfolder
|
||||
|
||||
maildir = mailbox.Maildir(self.path, factory=None, create=False)
|
||||
return [''] + list(list_folders(maildir, ''))
|
||||
|
||||
|
||||
def notify_next_change(self, callback=None, timeout=None):
|
||||
raise NotImplementedError("mbox does not support notifications")
|
||||
|
||||
|
||||
def cancel_notifications(self):
|
||||
raise NotImplementedError("mbox does not support notifications")
|
||||
|
||||
|
||||
def _get_folder(self, maildir, folder):
|
||||
"""Returns folder instance of the given maildir."""
|
||||
f = maildir
|
||||
for subfolder in folder.split('/'):
|
||||
if subfolder != '':
|
||||
f = f.get_folder(subfolder)
|
||||
return f
|
||||
|
||||
|
||||
@@ -72,7 +72,7 @@ folder = folderA, folderB, folderC
|
||||
[account6]
|
||||
enabled = 1
|
||||
name = Imap config with json folder option
|
||||
folder = folderA, folderB, folderC
|
||||
folder = ["folderA", "folderB", "folderC"]
|
||||
"""
|
||||
|
||||
@pytest.fixture
|
||||
|
||||
@@ -28,108 +28,233 @@ import pytest
|
||||
from Mailnag.backends import create_backend
|
||||
|
||||
|
||||
def test_create_mbox_backend():
|
||||
be = create_backend('mbox')
|
||||
assert be is not None
|
||||
@pytest.fixture
|
||||
def sample_path(tmpdir):
|
||||
"""Temporary path for mailbox used in tests."""
|
||||
return str(tmpdir.join('sample'))
|
||||
|
||||
|
||||
def test_initially_mailbox_should_be_closed():
|
||||
be = create_backend('mbox')
|
||||
assert not be.is_open()
|
||||
class TestMBox:
|
||||
"""Tests for mbox backend."""
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def sample_mbox(self, sample_path):
|
||||
"""Temporary mbox instance for tests."""
|
||||
return mailbox.mbox(sample_path, create=True)
|
||||
|
||||
def test_when_opened_mailbox_should_be_open(tmpdir):
|
||||
tmpdir.join('sample').write('')
|
||||
path = str(tmpdir.join('sample'))
|
||||
be = create_backend('mbox', path=path)
|
||||
be.open()
|
||||
assert be.is_open()
|
||||
def test_create_mbox_backend(self):
|
||||
be = create_backend('mbox')
|
||||
assert be is not None
|
||||
|
||||
def test_initially_mailbox_should_be_closed(self):
|
||||
be = create_backend('mbox')
|
||||
assert not be.is_open()
|
||||
|
||||
def test_closed_mailbox_should_be_closed(tmpdir):
|
||||
tmpdir.join('sample').write('')
|
||||
path = str(tmpdir.join('sample'))
|
||||
be = create_backend('mbox', path=path)
|
||||
be.open()
|
||||
be.close()
|
||||
assert not be.is_open()
|
||||
|
||||
|
||||
def test_mbox_lists_no_messages_from_empty_mailbox(tmpdir):
|
||||
path = str(tmpdir.join('sample'))
|
||||
sample_mbox = mailbox.mbox(path, create=True)
|
||||
|
||||
be = create_backend('mbox', name='sample', path=path)
|
||||
be.open()
|
||||
try:
|
||||
msgs = list(be.list_messages())
|
||||
assert len(msgs) == 0
|
||||
finally:
|
||||
be.close()
|
||||
|
||||
|
||||
def test_mbox_lists_two_messages_from_mailbox(tmpdir):
|
||||
path = str(tmpdir.join('sample'))
|
||||
sample_mbox = mailbox.mbox(path, create=True)
|
||||
add_mbox_message(sample_mbox, 'blaa-blaa-1', '')
|
||||
add_mbox_message(sample_mbox, 'blaa-blaa-2', 'O')
|
||||
add_mbox_message(sample_mbox, 'blaa-blaa-3', 'RO')
|
||||
sample_mbox.close()
|
||||
|
||||
be = create_backend('mbox', name='sample', path=path)
|
||||
be.open()
|
||||
try:
|
||||
msgs = list(be.list_messages())
|
||||
folders = [folder for folder, msg in msgs]
|
||||
msg_ids = set(msg.get('message-id') for folder, msg in msgs)
|
||||
finally:
|
||||
be.close()
|
||||
assert len(msgs) == 2
|
||||
assert all(folder == '' for folder in folders)
|
||||
assert msg_ids == set(['blaa-blaa-1', 'blaa-blaa-2'])
|
||||
|
||||
|
||||
def test_mbox_should_not_have_folders(tmpdir):
|
||||
tmpdir.join('sample').write('')
|
||||
path = str(tmpdir.join('sample'))
|
||||
be = create_backend('mbox', path=path)
|
||||
be.open()
|
||||
with pytest.raises(NotImplementedError):
|
||||
be.request_folders()
|
||||
|
||||
|
||||
def test_mbox_does_not_support_notifications(tmpdir): # for now
|
||||
tmpdir.join('sample').write('')
|
||||
path = str(tmpdir.join('sample'))
|
||||
be = create_backend('mbox', path=path)
|
||||
be.open()
|
||||
with pytest.raises(NotImplementedError):
|
||||
be.notify_next_change()
|
||||
with pytest.raises(NotImplementedError):
|
||||
be.cancel_notifications()
|
||||
|
||||
|
||||
def test_mbox_open_should_fail_if_mailbox_does_not_exist(tmpdir):
|
||||
path = str(tmpdir.join('not-exist'))
|
||||
|
||||
be = create_backend('mbox', path=path)
|
||||
with pytest.raises(IOError):
|
||||
def test_mailbox_should_be_open_when_opened(self, sample_path):
|
||||
be = create_backend('mbox', path=sample_path)
|
||||
be.open()
|
||||
assert be.is_open()
|
||||
|
||||
def test_closed_mailbox_should_be_closed(self, sample_path):
|
||||
be = create_backend('mbox', path=sample_path)
|
||||
be.open()
|
||||
be.close()
|
||||
assert not be.is_open()
|
||||
|
||||
def test_lists_no_messages_from_empty_mailbox(self, sample_path):
|
||||
be = create_backend('mbox', name='sample', path=sample_path)
|
||||
be.open()
|
||||
try:
|
||||
msgs = list(be.list_messages())
|
||||
assert len(msgs) == 0
|
||||
finally:
|
||||
be.close()
|
||||
|
||||
def test_lists_unread_messages_from_mailbox(self, sample_path, sample_mbox):
|
||||
self._add_message(sample_mbox, 'blaa-blaa-1', '')
|
||||
self._add_message(sample_mbox, 'blaa-blaa-2', 'O')
|
||||
self._add_message(sample_mbox, 'blaa-blaa-3', 'RO')
|
||||
sample_mbox.close()
|
||||
|
||||
be = create_backend('mbox', name='sample', path=sample_path)
|
||||
be.open()
|
||||
try:
|
||||
msgs = list(be.list_messages())
|
||||
folders = [folder for folder, msg in msgs]
|
||||
msg_ids = set(msg.get('message-id') for folder, msg in msgs)
|
||||
finally:
|
||||
be.close()
|
||||
assert len(msgs) == 2
|
||||
assert all(folder == '' for folder in folders)
|
||||
assert msg_ids == set(['blaa-blaa-1', 'blaa-blaa-2'])
|
||||
|
||||
def test_mbox_should_not_have_folders(self, sample_path):
|
||||
be = create_backend('mbox', path=sample_path)
|
||||
be.open()
|
||||
with pytest.raises(NotImplementedError):
|
||||
be.request_folders()
|
||||
|
||||
def test_mbox_does_not_support_notifications(self, sample_path):
|
||||
be = create_backend('mbox', path=sample_path)
|
||||
be.open()
|
||||
with pytest.raises(NotImplementedError):
|
||||
be.notify_next_change()
|
||||
with pytest.raises(NotImplementedError):
|
||||
be.cancel_notifications()
|
||||
|
||||
def test_open_should_fail_if_mailbox_does_not_exist(self, tmpdir):
|
||||
path = str(tmpdir.join('not-exist'))
|
||||
|
||||
be = create_backend('mbox', path=path)
|
||||
with pytest.raises(IOError):
|
||||
be.open()
|
||||
|
||||
def _add_message(self, mbox, msg_id, flags):
|
||||
m = mailbox.mboxMessage()
|
||||
m.set_payload('Hello world!', 'ascii')
|
||||
m.add_header('from', 'me@example.org')
|
||||
m.add_header('to', 'you@example.org')
|
||||
m.add_header('subject', 'Hi!')
|
||||
m.add_header('message-id', msg_id)
|
||||
m.set_flags(flags)
|
||||
mbox.lock()
|
||||
try:
|
||||
mbox.add(m)
|
||||
finally:
|
||||
mbox.unlock()
|
||||
|
||||
|
||||
# Helper fuctions
|
||||
class TestMaildir:
|
||||
"""Tests for maildir backend."""
|
||||
|
||||
def add_mbox_message(mbox, msg_id, flags):
|
||||
m = mailbox.mboxMessage()
|
||||
m.set_payload('Hello world!', 'ascii')
|
||||
m.add_header('from', 'me@example.org')
|
||||
m.add_header('to', 'you@example.org')
|
||||
m.add_header('subject', 'Hi!')
|
||||
m.add_header('message-id', msg_id)
|
||||
m.set_flags(flags)
|
||||
mbox.lock()
|
||||
try:
|
||||
mbox.add(m)
|
||||
finally:
|
||||
mbox.unlock()
|
||||
@pytest.fixture(autouse=True)
|
||||
def sample_maildir(self, sample_path):
|
||||
"""Temporary maildir instance for tests."""
|
||||
return mailbox.Maildir(sample_path, create=True)
|
||||
|
||||
def test_create_maildir_backend(self):
|
||||
be = create_backend('maildir')
|
||||
assert be is not None
|
||||
|
||||
def test_initially_mailbox_should_be_closed(self):
|
||||
be = create_backend('maildir')
|
||||
assert not be.is_open()
|
||||
|
||||
def test_mailbox_should_be_open_when_opened(self, sample_path):
|
||||
be = create_backend('maildir', path=sample_path)
|
||||
be.open()
|
||||
assert be.is_open()
|
||||
|
||||
def test_mailbox_should_be_closed(self, sample_path):
|
||||
be = create_backend('maildir', path=sample_path)
|
||||
be.open()
|
||||
be.close()
|
||||
assert not be.is_open()
|
||||
|
||||
def test_lists_no_messages_from_empty_mailbox(self, sample_path):
|
||||
be = create_backend('maildir', name='sample', path=sample_path)
|
||||
be.open()
|
||||
try:
|
||||
msgs = list(be.list_messages())
|
||||
assert len(msgs) == 0
|
||||
finally:
|
||||
be.close()
|
||||
|
||||
def test_lists_unread_messages_from_mailbox(self, sample_path, sample_maildir):
|
||||
self._add_message(sample_maildir, 'blaa-blaa-1', None, 'new')
|
||||
self._add_message(sample_maildir, 'blaa-blaa-2', None, 'cur')
|
||||
self._add_message(sample_maildir, 'blaa-blaa-3', 'S', 'cur')
|
||||
sample_maildir.close()
|
||||
|
||||
be = create_backend('maildir', name='sample', path=sample_path)
|
||||
be.open()
|
||||
try:
|
||||
msgs = list(be.list_messages())
|
||||
folders = [folder for folder, msg in msgs]
|
||||
msg_ids = set(msg.get('message-id') for folder, msg in msgs)
|
||||
finally:
|
||||
be.close()
|
||||
assert len(msgs) == 2
|
||||
assert all(folder == '' for folder in folders)
|
||||
assert msg_ids == set(['blaa-blaa-1', 'blaa-blaa-2'])
|
||||
|
||||
def test_lists_unread_messages_from_selected_folders(self, sample_path, sample_maildir):
|
||||
f = sample_maildir.add_folder('folder1')
|
||||
sample_maildir.add_folder('folder2')
|
||||
s = f.add_folder('subfolder')
|
||||
self._add_message(sample_maildir, 'blaa-blaa-1', None, 'new')
|
||||
self._add_message(f, 'blaa-blaa-2', None, 'cur')
|
||||
self._add_message(s, 'blaa-blaa-3', None, 'cur')
|
||||
sample_maildir.close()
|
||||
|
||||
be = create_backend('maildir', name='sample', path=sample_path, folders=['', 'folder1/subfolder'])
|
||||
be.open()
|
||||
try:
|
||||
msgs = list(be.list_messages())
|
||||
folders = [folder for folder, msg in msgs]
|
||||
msg_ids = set(msg.get('message-id') for folder, msg in msgs)
|
||||
finally:
|
||||
be.close()
|
||||
assert len(msgs) == 2
|
||||
assert set(['', 'folder1/subfolder']) == set(folders)
|
||||
assert msg_ids == set(['blaa-blaa-1', 'blaa-blaa-3'])
|
||||
|
||||
def test_should_support_unicode_folder_names(self, sample_path, sample_maildir):
|
||||
"""Python2 maildir folders must be str, not unicode.
|
||||
However folder in Mailnag configuration is represented as a json
|
||||
list, and json.loads converts strings to unicode. This test is here
|
||||
to ensure that unicode folder names work.
|
||||
Note: This is probably not needed with Python3.
|
||||
"""
|
||||
f = sample_maildir.add_folder('folder1')
|
||||
self._add_message(f, 'blaa-blaa-2', 'S', 'cur')
|
||||
sample_maildir.close()
|
||||
|
||||
be = create_backend('maildir', name='sample', path=sample_path, folders=[u'', u'folder1'])
|
||||
be.open()
|
||||
try:
|
||||
msgs = list(be.list_messages())
|
||||
finally:
|
||||
be.close()
|
||||
|
||||
def test_folders_should_be_listed(self, sample_path, sample_maildir):
|
||||
f = sample_maildir.add_folder('folder1')
|
||||
sample_maildir.add_folder('folder2')
|
||||
f.add_folder('subfolder')
|
||||
|
||||
be = create_backend('maildir', path=sample_path)
|
||||
be.open()
|
||||
folders = be.request_folders()
|
||||
assert set(['', 'folder1', 'folder2', 'folder1/subfolder']) == set(folders)
|
||||
|
||||
def test_maildir_does_not_support_notifications(self, sample_path):
|
||||
be = create_backend('maildir', path=sample_path)
|
||||
be.open()
|
||||
with pytest.raises(NotImplementedError):
|
||||
be.notify_next_change()
|
||||
with pytest.raises(NotImplementedError):
|
||||
be.cancel_notifications()
|
||||
|
||||
def test_open_should_fail_if_mailbox_does_not_exist(self, tmpdir):
|
||||
path = str(tmpdir.join('not-exist'))
|
||||
|
||||
be = create_backend('maildir', path=path)
|
||||
with pytest.raises(IOError):
|
||||
be.open()
|
||||
|
||||
def _add_message(self, maildir, msg_id, flags, subdir):
|
||||
m = mailbox.MaildirMessage()
|
||||
m.set_payload('Hello world!', 'ascii')
|
||||
m.add_header('from', 'me@example.org')
|
||||
m.add_header('to', 'you@example.org')
|
||||
m.add_header('subject', 'Hi!')
|
||||
m.add_header('message-id', msg_id)
|
||||
if flags:
|
||||
m.set_flags(flags)
|
||||
m.set_subdir(subdir)
|
||||
maildir.lock()
|
||||
try:
|
||||
maildir.add(m)
|
||||
finally:
|
||||
maildir.unlock()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user