add typing to imap_recent_messages
This commit is contained in:
parent
4b619b564c
commit
f9ca93bfe6
1 changed files with 28 additions and 5 deletions
|
|
@ -5,9 +5,11 @@ import os
|
||||||
import re
|
import re
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
from typing import Protocol, TypedDict
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
from dotenv import dotenv_values
|
from dotenv import dotenv_values
|
||||||
|
from icecream import ic
|
||||||
from imbox import Imbox # type: ignore
|
from imbox import Imbox # type: ignore
|
||||||
from playwright.sync_api import BrowserContext, expect
|
from playwright.sync_api import BrowserContext, expect
|
||||||
from pytest import Parser
|
from pytest import Parser
|
||||||
|
|
@ -116,16 +118,37 @@ def imap_client() -> None:
|
||||||
imbox.logout()
|
imbox.logout()
|
||||||
|
|
||||||
|
|
||||||
|
class Body(TypedDict):
|
||||||
|
plain: list
|
||||||
|
html: list
|
||||||
|
|
||||||
|
|
||||||
|
class Message(Protocol):
|
||||||
|
sent_from: list
|
||||||
|
sent_to: list
|
||||||
|
subject: str
|
||||||
|
headers: list
|
||||||
|
date: str
|
||||||
|
body: Body
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def imap_recent_messages(imap_client: Imbox, n_minutes: int):
|
def imap_recent_messages(imap_client: Imbox) -> list[Message]:
|
||||||
"""Get all messages from [n_minutes] ago till now.
|
"""Get all messages from [n_minutes] ago till now.
|
||||||
|
|
||||||
# iterate with
|
# iterate with
|
||||||
for uid, message in messages:
|
for uid, message in messages:
|
||||||
print(uid, message.subject, message.date)"""
|
print(uid, message.subject, message.date)"""
|
||||||
|
|
||||||
n_minutes_ago = datetime.now() - timedelta(minutes=n_minutes)
|
N_MINUTES = 30
|
||||||
uids_and_messages = imap_client.messages(date__gt=n_minutes_ago)
|
|
||||||
uids, messages = zip(*uids_and_messages)
|
n_minutes_ago = datetime.now() - timedelta(minutes=N_MINUTES)
|
||||||
print(type(messages))
|
uids: list[bytes] = []
|
||||||
|
messages: list[Message] = []
|
||||||
|
# for uid, message in imap_client.messages(date__gt=n_minutes_ago):
|
||||||
|
for uid, message in imap_client.messages():
|
||||||
|
ic("one time")
|
||||||
|
uids.append(uid)
|
||||||
|
messages.append(message)
|
||||||
|
|
||||||
return messages
|
return messages
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue