Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions Lib/imaplib.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
__version__ = "2.58"

import binascii, errno, random, re, socket, subprocess, sys, time, calendar
from collections import namedtuple
from datetime import datetime, timezone, timedelta
from io import DEFAULT_BUFFER_SIZE
from typing import Tuple

try:
import ssl
Expand Down Expand Up @@ -131,6 +133,10 @@
_Untagged_status = br'\* (?P<data>\d+) (?P<type>[A-Z-]+)( (?P<data2>.*))?'


# Store capabilities present in the current server
# Add additional ones as needed
AvailableCapabilities = namedtuple('Capas', ['MOVE', 'UIDPLUS'])


class IMAP4:

Expand Down Expand Up @@ -194,6 +200,7 @@ def __init__(self, host='', port=IMAP4_PORT, timeout=None):
self.continuation_response = '' # Last continuation response
self.is_readonly = False # READ-ONLY desired state
self.tagnum = 0
self._features_available = AvailableCapabilities(False, False)
self._tls_established = False
self._mode_ascii()

Expand Down Expand Up @@ -611,6 +618,9 @@ def login(self, user, password):
if typ != 'OK':
raise self.error(dat[-1])
self.state = 'AUTH'

self._get_capabilities()

return typ, dat


Expand Down Expand Up @@ -655,6 +665,44 @@ def lsub(self, directory='""', pattern='*'):
typ, dat = self._simple_command(name, directory, pattern)
return self._untagged_response(typ, dat, name)

def move_messages(self, target: str, msgs: str) -> Tuple[str, str]:
"""
Higher level command to move message inside of one account.
Following RFC-6851.

:param str target: name of the folder to move messages into
:param str messages: UID sequence set (according to
https://tools.ietf.org/html/rfc3501#section-9)
:return: type, data tuple; type is 'OK' or something else in
case of failure.
:rtype: tuple [str, str]
"""
if self._features_available.MOVE:
typ, dat = self._simple_command('UID', 'MOVE',
msgs, target)
return self._untagged_response(typ, dat, 'MOVE')
elif self._features_available.UIDPLUS:
ok, data = self.uid('COPY', f'{msgs} {target}')
if ok != 'OK':
return ok, f'Cannot copy messages {msgs} to folder {target}'
ok, data = self.uid('STORE',
f'+FLAGS.SILENT (\\DELETED) {msgs}')
if ok != 'OK':
return ok, f'Cannot delete messages {msgs}.'
ok, data = self.uid('EXPUNGE', msgs)
if ok != 'OK':
return ok, f'Cannot expunge messages {msgs}.'
return ok, f'Messages {msgs} moved to {target}.'
else:
ok, data = self.uid('COPY', f'{msgs} {target}')
if ok != 'OK':
return ok, f'Cannot copy messages {msgs} to folder {target}'
ok, data = self.uid('STORE',
f'+FLAGS.SILENT (\\DELETED) {msgs}')
if ok != 'OK':
return ok, f'Cannot delete messages {msgs}.'
return ok, f'Messages {msgs} moved to {target}.'

def myrights(self, mailbox):
"""Show my ACLs for a mailbox (i.e. the rights that I have on mailbox).

Expand Down Expand Up @@ -1058,11 +1106,15 @@ def _command_complete(self, name, tag):

def _get_capabilities(self):
typ, dat = self.capability()
if typ != 'OK':
raise self.error(dat[-1])
if dat == [None]:
raise self.error('no CAPABILITY response from server')
dat = str(dat[-1], self._encoding)
dat = dat.upper()
self.capabilities = tuple(dat.split())
self._features_available = AvailableCapabilities._make(
['MOVE' in dat, 'UIDPLUS' in dat])


def _get_response(self):
Expand Down
53 changes: 53 additions & 0 deletions Lib/test/test_imaplib.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,59 @@ def test_unselect(self):
self.assertEqual(data[0], b'Returned to authenticated state. (Success)')
self.assertEqual(client.state, 'AUTH')

def test_move_messages(self):
from email.message import EmailMessage

class MoveServer(SimpleIMAPHandler):
capabilities = 'ENABLE UTF8=ACCEPT'

def cmd_ENABLE(self, tag, args):
self._send_tagged(tag, 'OK', 'ENABLE successful')

def cmd_AUTHENTICATE(self, tag, args):
self._send_textline('+')
self.server.response = yield
self._send_tagged(tag, 'OK', 'FAKEAUTH successful')

def cmd_APPEND(self, tag, args):
self._send_textline('+')
self.server.response = yield
self._send_tagged(tag, 'OK', 'okay')

def cmd_CREATE(self, tag, args):
self._send_textline('* CREATE ' + args[0])
self._send_tagged(tag, 'OK', 'okay')

def cmd_MOVE(self, tag, args):
self._send_textline('* MOVE ' + args[0])
self._send_tagged(tag, 'OK', 'okay')

def cmd_SELECT(self, tag, args):
self.server.is_selected = True
self._send_textline('* 2 EXISTS')
self._send_tagged(tag, 'OK', '[READ-WRITE] SELECT completed.')

client, _ = self._setup(MoveServer)
typ, data = client.login('user', 'pass')
typ, data = client.create('source')
typ, data = client.create('target')
typ, data = client.select('source')

msg = EmailMessage()
msg['Subject'] = 'Test message'
msg['From'] = '[email protected]'
msg['To'] = '[email protected]'
msg.set_content('This is a testing message')
print(f'=========================\nmsg:\n{msg}=============')
typ, data = client.append('source', None, None, msg.as_bytes())

typ, data = client.search(None, 'ALL')
typ, data = client.move_messages('target', data[0])
self.assertEqual(typ, 'OK')
typ, data = client.search(None, 'target', 'ALL')
self.assertEqual(typ, 'OK')
self.assertEqual(int(data[0]), 1)


class NewIMAPTests(NewIMAPTestsMixin, unittest.TestCase):
imap_class = imaplib.IMAP4
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Provides method :meth:`.move_messages()` of IMAP4 object, which
moves messages among two IMAP folders. This change introduces and
uses caching of the server’s capabilities in
:attr:`__features_available` internal attribute.