diff --git a/Lib/asyncio/__init__.py b/Lib/asyncio/__init__.py index 32a5dbae03af21..f4736acd41eafd 100644 --- a/Lib/asyncio/__init__.py +++ b/Lib/asyncio/__init__.py @@ -22,6 +22,7 @@ from .timeouts import * from .threads import * from .transports import * +from .fileio import * __all__ = (base_events.__all__ + coroutines.__all__ + @@ -39,7 +40,8 @@ taskgroups.__all__ + threads.__all__ + timeouts.__all__ + - transports.__all__) + transports.__all__ + + fileio.__all__) if sys.platform == 'win32': # pragma: no cover from .windows_events import * diff --git a/Lib/asyncio/fileio.py b/Lib/asyncio/fileio.py new file mode 100644 index 00000000000000..72df7112a1294b --- /dev/null +++ b/Lib/asyncio/fileio.py @@ -0,0 +1,478 @@ +"""Async file I/O for asyncio.""" + +__all__ = ('open_file', 'wrap_file', 'AsyncFile', 'Path') + +import os +import pathlib + +from . import mixins +from .threads import to_thread + + +async def open_file(file, mode='r', buffering=-1, encoding=None, + errors=None, newline=None, closefd=True, opener=None): + """Asynchronously open a file, returning an AsyncFile. + + This is the async equivalent of the builtin open(). The file is opened + in a separate thread via asyncio.to_thread(). + """ + fp = await to_thread(open, file, mode, buffering, encoding, + errors, newline, closefd, opener) + return AsyncFile(fp) + + +def wrap_file(file): + """Wrap an already-open file-like object as an AsyncFile. + + The file object must have a close() method and at least one of + read() or write(). + """ + if not hasattr(file, 'close'): + raise TypeError( + f"Expected a file-like object with a close() method, " + f"got {type(file).__name__}") + if not (hasattr(file, 'read') or hasattr(file, 'write')): + raise TypeError( + f"Expected a file-like object with read() or write(), " + f"got {type(file).__name__}") + return AsyncFile(file) + + +class AsyncFile(mixins._LoopBoundMixin): + """Async wrapper around a synchronous file object. + + All I/O methods are delegated to a thread via asyncio.to_thread(). + Sync attributes (name, mode, closed, etc.) are passed through directly. + """ + + def __init__(self, fp): + self._fp = fp + + @property + def wrapped(self): + """The underlying synchronous file object.""" + return self._fp + + # --- Async I/O methods --- + + async def read(self, size=-1): + return await to_thread(self._fp.read, size) + + async def read1(self, size=-1): + if not hasattr(self._fp, 'read1'): + raise AttributeError( + f"'{type(self._fp).__name__}' object has no attribute 'read1'") + return await to_thread(self._fp.read1, size) + + async def readinto(self, b): + if not hasattr(self._fp, 'readinto'): + raise AttributeError( + f"'{type(self._fp).__name__}' object has no attribute 'readinto'") + return await to_thread(self._fp.readinto, b) + + async def readinto1(self, b): + if not hasattr(self._fp, 'readinto1'): + raise AttributeError( + f"'{type(self._fp).__name__}' object has no attribute 'readinto1'") + return await to_thread(self._fp.readinto1, b) + + async def readline(self): + return await to_thread(self._fp.readline) + + async def readlines(self): + return await to_thread(self._fp.readlines) + + async def write(self, data): + return await to_thread(self._fp.write, data) + + async def writelines(self, lines): + return await to_thread(self._fp.writelines, lines) + + async def truncate(self, size=None): + return await to_thread(self._fp.truncate, size) + + async def seek(self, offset, whence=os.SEEK_SET): + return await to_thread(self._fp.seek, offset, whence) + + async def tell(self): + return await to_thread(self._fp.tell) + + async def flush(self): + return await to_thread(self._fp.flush) + + async def peek(self, size=0): + if not hasattr(self._fp, 'peek'): + raise AttributeError( + f"'{type(self._fp).__name__}' object has no attribute 'peek'") + return await to_thread(self._fp.peek, size) + + async def aclose(self): + return await to_thread(self._fp.close) + + # --- Sync attribute passthrough --- + + def __getattr__(self, name): + return getattr(self._fp, name) + + # --- Context manager --- + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self.aclose() + + # --- Async iteration (line by line) --- + + def __aiter__(self): + return self + + async def __anext__(self): + line = await to_thread(self._fp.readline) + if line: + return line + raise StopAsyncIteration + + def __repr__(self): + return f"" + + +# --- Path helpers --- + +def _async_method(fn): + """Wrap a pathlib.Path method as an async method on asyncio.Path.""" + name = fn.__name__ + + async def wrapper(self, *args, **kwargs): + return await to_thread(getattr(self._path, name), *args, **kwargs) + + wrapper.__name__ = wrapper.__qualname__ = name + wrapper.__doc__ = fn.__doc__ + return wrapper + + +def _async_method_path(fn): + """Like _async_method but re-wraps pathlib.Path result as asyncio.Path.""" + name = fn.__name__ + + async def wrapper(self, *args, **kwargs): + result = await to_thread(getattr(self._path, name), *args, **kwargs) + return Path(result) + + wrapper.__name__ = wrapper.__qualname__ = name + wrapper.__doc__ = fn.__doc__ + return wrapper + + +_EXHAUSTED = object() + + +def _next_sync(it): + """Call next(it), returning _EXHAUSTED instead of raising StopIteration. + + StopIteration cannot be raised into a Future, so we use a sentinel. + """ + return next(it, _EXHAUSTED) + + +class _AsyncPathIterator: + """Async iterator that lazily pumps a sync path iterator in a thread.""" + + __slots__ = ('_it',) + + def __init__(self, sync_iterator): + self._it = sync_iterator + + def __aiter__(self): + return self + + async def __anext__(self): + result = await to_thread(_next_sync, self._it) + if result is _EXHAUSTED: + raise StopAsyncIteration + return Path(result) + + +class _AsyncWalkIterator: + """Async iterator over walk() results, wrapping dirpath as asyncio.Path.""" + + __slots__ = ('_it',) + + def __init__(self, sync_iterator): + self._it = sync_iterator + + def __aiter__(self): + return self + + async def __anext__(self): + result = await to_thread(_next_sync, self._it) + if result is _EXHAUSTED: + raise StopAsyncIteration + dirpath, dirnames, filenames = result + return Path(dirpath), dirnames, filenames + + +class Path: + """Async wrapper around pathlib.Path. + + All I/O methods are delegated to a thread via asyncio.to_thread(). + Non-I/O properties and methods are passed through directly. + """ + + __slots__ = ('_path',) + + def __init__(self, *args): + if args and isinstance(args[0], pathlib.PurePath): + self._path = pathlib.Path(args[0]) + else: + self._path = pathlib.Path(*args) + + # --- Sync properties (no I/O, direct passthrough) --- + + @property + def parts(self): + return self._path.parts + + @property + def drive(self): + return self._path.drive + + @property + def root(self): + return self._path.root + + @property + def anchor(self): + return self._path.anchor + + @property + def parent(self): + return Path(self._path.parent) + + @property + def parents(self): + return tuple(Path(p) for p in self._path.parents) + + @property + def name(self): + return self._path.name + + @property + def stem(self): + return self._path.stem + + @property + def suffix(self): + return self._path.suffix + + @property + def suffixes(self): + return self._path.suffixes + + @property + def parser(self): + return self._path.parser + + # --- Sync methods (no I/O) --- + + def as_posix(self): + return self._path.as_posix() + + def as_uri(self): + return self._path.as_uri() + + def is_absolute(self): + return self._path.is_absolute() + + def is_relative_to(self, other): + return self._path.is_relative_to(other) + + def joinpath(self, *pathsegments): + return Path(self._path.joinpath(*pathsegments)) + + def match(self, pattern, **kwargs): + return self._path.match(pattern, **kwargs) + + def full_match(self, pattern, **kwargs): + return self._path.full_match(pattern, **kwargs) + + def relative_to(self, other, **kwargs): + return Path(self._path.relative_to(other, **kwargs)) + + def with_name(self, name): + return Path(self._path.with_name(name)) + + def with_stem(self, stem): + return Path(self._path.with_stem(stem)) + + def with_suffix(self, suffix): + return Path(self._path.with_suffix(suffix)) + + def with_segments(self, *pathsegments): + return Path(self._path.with_segments(*pathsegments)) + + def __truediv__(self, key): + return Path(self._path / key) + + def __rtruediv__(self, key): + return Path(key / self._path) + + def __str__(self): + return str(self._path) + + def __repr__(self): + return f"asyncio.Path({str(self._path)!r})" + + def __fspath__(self): + return os.fspath(self._path) + + def __hash__(self): + return hash(self._path) + + def __eq__(self, other): + if isinstance(other, Path): + return self._path == other._path + return NotImplemented + + def __lt__(self, other): + if isinstance(other, Path): + return self._path < other._path + return NotImplemented + + def __le__(self, other): + if isinstance(other, Path): + return self._path <= other._path + return NotImplemented + + def __gt__(self, other): + if isinstance(other, Path): + return self._path > other._path + return NotImplemented + + def __ge__(self, other): + if isinstance(other, Path): + return self._path >= other._path + return NotImplemented + + # --- Async methods (I/O, delegated to thread) --- + + # File tests + exists = _async_method(pathlib.Path.exists) + is_file = _async_method(pathlib.Path.is_file) + is_dir = _async_method(pathlib.Path.is_dir) + is_symlink = _async_method(pathlib.Path.is_symlink) + is_socket = _async_method(pathlib.Path.is_socket) + is_fifo = _async_method(pathlib.Path.is_fifo) + is_block_device = _async_method(pathlib.Path.is_block_device) + is_char_device = _async_method(pathlib.Path.is_char_device) + is_junction = _async_method(pathlib.Path.is_junction) + is_mount = _async_method(pathlib.Path.is_mount) + + # Stat + stat = _async_method(pathlib.Path.stat) + lstat = _async_method(pathlib.Path.lstat) + samefile = _async_method(pathlib.Path.samefile) + + # Permissions + chmod = _async_method(pathlib.Path.chmod) + lchmod = _async_method(pathlib.Path.lchmod) + if hasattr(pathlib.Path, 'owner'): + owner = _async_method(pathlib.Path.owner) + if hasattr(pathlib.Path, 'group'): + group = _async_method(pathlib.Path.group) + + # CRUD + touch = _async_method(pathlib.Path.touch) + mkdir = _async_method(pathlib.Path.mkdir) + rmdir = _async_method(pathlib.Path.rmdir) + unlink = _async_method(pathlib.Path.unlink) + + # Read/write shortcuts + read_bytes = _async_method(pathlib.Path.read_bytes) + read_text = _async_method(pathlib.Path.read_text) + write_bytes = _async_method(pathlib.Path.write_bytes) + write_text = _async_method(pathlib.Path.write_text) + + # Links + if hasattr(pathlib.Path, 'symlink_to'): + symlink_to = _async_method(pathlib.Path.symlink_to) + if hasattr(pathlib.Path, 'hardlink_to'): + hardlink_to = _async_method(pathlib.Path.hardlink_to) + + # Copy/move (3.14+) + if hasattr(pathlib.Path, 'copy'): + async def copy(self, target, **kwargs): + result = await to_thread(self._path.copy, target, **kwargs) + return Path(result) + + copy.__doc__ = pathlib.Path.copy.__doc__ + + if hasattr(pathlib.Path, 'copy_into'): + async def copy_into(self, target_dir, **kwargs): + result = await to_thread( + self._path.copy_into, target_dir, **kwargs) + return Path(result) + + copy_into.__doc__ = pathlib.Path.copy_into.__doc__ + + if hasattr(pathlib.Path, 'move'): + async def move(self, target): + result = await to_thread(self._path.move, target) + return Path(result) + + move.__doc__ = pathlib.Path.move.__doc__ + + if hasattr(pathlib.Path, 'move_into'): + async def move_into(self, target_dir): + result = await to_thread(self._path.move_into, target_dir) + return Path(result) + + move_into.__doc__ = pathlib.Path.move_into.__doc__ + + # Path-returning async methods + rename = _async_method_path(pathlib.Path.rename) + replace = _async_method_path(pathlib.Path.replace) + resolve = _async_method_path(pathlib.Path.resolve) + absolute = _async_method_path(pathlib.Path.absolute) + expanduser = _async_method_path(pathlib.Path.expanduser) + if hasattr(pathlib.Path, 'readlink'): + readlink = _async_method_path(pathlib.Path.readlink) + + # Open + async def open(self, mode='r', buffering=-1, encoding=None, + errors=None, newline=None): + """Open the file pointed to by this path, returning an AsyncFile.""" + fp = await to_thread( + self._path.open, mode, buffering, encoding, errors, newline) + return AsyncFile(fp) + + # Class methods + @classmethod + async def cwd(cls): + """Return a new async path pointing to the current working directory.""" + result = await to_thread(pathlib.Path.cwd) + return cls(result) + + @classmethod + async def home(cls): + """Return a new async path pointing to the user's home directory.""" + result = await to_thread(pathlib.Path.home) + return cls(result) + + # Async generators (return async iterators) + def iterdir(self): + """Return an async iterator of path objects of the directory contents.""" + return _AsyncPathIterator(self._path.iterdir()) + + def glob(self, pattern, **kwargs): + """Return an async iterator of paths matching the pattern.""" + return _AsyncPathIterator(self._path.glob(pattern, **kwargs)) + + def rglob(self, pattern, **kwargs): + """Return an async iterator of paths matching the pattern recursively.""" + return _AsyncPathIterator(self._path.rglob(pattern, **kwargs)) + + def walk(self, top_down=True, on_error=None, follow_symlinks=False): + """Walk the directory tree, returning an async iterator.""" + return _AsyncWalkIterator( + self._path.walk(top_down, on_error, follow_symlinks)) diff --git a/Lib/test/test_asyncio/test_fileio.py b/Lib/test/test_asyncio/test_fileio.py new file mode 100644 index 00000000000000..e917fa0e9ae8cb --- /dev/null +++ b/Lib/test/test_asyncio/test_fileio.py @@ -0,0 +1,537 @@ +"""Tests for asyncio.fileio (async file I/O).""" + +import asyncio +import os +import pathlib +import tempfile +import unittest + + +class TestOpenFile(unittest.IsolatedAsyncioTestCase): + + async def test_open_file_text_read(self): + with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', + delete=False) as tmp: + tmp.write("hello world\n") + path = tmp.name + try: + async with await asyncio.open_file(path, 'r') as f: + content = await f.read() + self.assertEqual(content, "hello world\n") + finally: + os.unlink(path) + + async def test_open_file_text_write(self): + with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', + delete=False) as tmp: + path = tmp.name + try: + async with await asyncio.open_file(path, 'w') as f: + await f.write("async write\n") + with open(path, 'r') as f: + self.assertEqual(f.read(), "async write\n") + finally: + os.unlink(path) + + async def test_open_file_binary(self): + with tempfile.NamedTemporaryFile(mode='wb', suffix='.bin', + delete=False) as tmp: + tmp.write(b'\x00\x01\x02\x03') + path = tmp.name + try: + async with await asyncio.open_file(path, 'rb') as f: + data = await f.read() + self.assertEqual(data, b'\x00\x01\x02\x03') + finally: + os.unlink(path) + + async def test_async_context_manager(self): + with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', + delete=False) as tmp: + tmp.write("ctx test\n") + path = tmp.name + try: + f = await asyncio.open_file(path, 'r') + async with f: + content = await f.read() + self.assertEqual(content, "ctx test\n") + self.assertTrue(f.closed) + finally: + os.unlink(path) + + async def test_async_iteration(self): + with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', + delete=False) as tmp: + tmp.write("line1\nline2\nline3\n") + path = tmp.name + try: + lines = [] + async with await asyncio.open_file(path, 'r') as f: + async for line in f: + lines.append(line) + self.assertEqual(lines, ["line1\n", "line2\n", "line3\n"]) + finally: + os.unlink(path) + + +class TestWrapFile(unittest.IsolatedAsyncioTestCase): + + async def test_wrap_file(self): + with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', + delete=False) as tmp: + tmp.write("wrap test\n") + path = tmp.name + try: + raw = open(path, 'r') + f = asyncio.wrap_file(raw) + content = await f.read() + self.assertEqual(content, "wrap test\n") + await f.aclose() + self.assertTrue(raw.closed) + finally: + os.unlink(path) + + async def test_wrap_file_invalid_no_close(self): + with self.assertRaises(TypeError): + asyncio.wrap_file(42) + + async def test_wrap_file_invalid_no_read_write(self): + class FakeFile: + def close(self): pass + with self.assertRaises(TypeError): + asyncio.wrap_file(FakeFile()) + + +class TestAsyncFile(unittest.IsolatedAsyncioTestCase): + + async def test_read_write_seek_tell(self): + with tempfile.NamedTemporaryFile(mode='w+b', suffix='.bin', + delete=False) as tmp: + path = tmp.name + try: + async with await asyncio.open_file(path, 'w+b') as f: + await f.write(b'hello') + pos = await f.tell() + self.assertEqual(pos, 5) + await f.seek(0) + data = await f.read() + self.assertEqual(data, b'hello') + finally: + os.unlink(path) + + async def test_readline_readlines(self): + with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', + delete=False) as tmp: + tmp.write("aaa\nbbb\nccc\n") + path = tmp.name + try: + async with await asyncio.open_file(path, 'r') as f: + line1 = await f.readline() + self.assertEqual(line1, "aaa\n") + rest = await f.readlines() + self.assertEqual(rest, ["bbb\n", "ccc\n"]) + finally: + os.unlink(path) + + async def test_flush_truncate(self): + with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', + delete=False) as tmp: + path = tmp.name + try: + async with await asyncio.open_file(path, 'w+') as f: + await f.write("hello world") + await f.flush() + await f.seek(5) + await f.truncate() + await f.seek(0) + data = await f.read() + self.assertEqual(data, "hello") + finally: + os.unlink(path) + + async def test_sync_attributes(self): + with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', + delete=False) as tmp: + path = tmp.name + try: + async with await asyncio.open_file(path, 'r') as f: + self.assertEqual(f.name, path) + self.assertEqual(f.mode, 'r') + self.assertFalse(f.closed) + self.assertTrue(f.closed) + finally: + os.unlink(path) + + async def test_wrapped_property(self): + with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', + delete=False) as tmp: + path = tmp.name + try: + raw = open(path, 'r') + f = asyncio.wrap_file(raw) + self.assertIs(f.wrapped, raw) + await f.aclose() + finally: + os.unlink(path) + + async def test_aclose(self): + with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', + delete=False) as tmp: + path = tmp.name + try: + f = await asyncio.open_file(path, 'r') + self.assertFalse(f.closed) + await f.aclose() + self.assertTrue(f.closed) + finally: + os.unlink(path) + + async def test_repr(self): + with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', + delete=False) as tmp: + path = tmp.name + try: + async with await asyncio.open_file(path, 'r') as f: + r = repr(f) + self.assertIn('AsyncFile', r) + self.assertIn('wrapped=', r) + finally: + os.unlink(path) + + async def test_read1_buffered(self): + with tempfile.NamedTemporaryFile(mode='wb', suffix='.bin', + delete=False) as tmp: + tmp.write(b'data') + path = tmp.name + try: + async with await asyncio.open_file(path, 'rb') as f: + data = await f.read1() + self.assertIsInstance(data, bytes) + self.assertIn(b'data', data) + finally: + os.unlink(path) + + async def test_read1_text_raises(self): + with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', + delete=False) as tmp: + tmp.write('data') + path = tmp.name + try: + async with await asyncio.open_file(path, 'r') as f: + with self.assertRaises(AttributeError): + await f.read1() + finally: + os.unlink(path) + + async def test_peek_buffered(self): + with tempfile.NamedTemporaryFile(mode='wb', suffix='.bin', + delete=False) as tmp: + tmp.write(b'peek data') + path = tmp.name + try: + async with await asyncio.open_file(path, 'rb') as f: + data = await f.peek(4) + self.assertIsInstance(data, bytes) + # peek doesn't advance position + full = await f.read() + self.assertEqual(full, b'peek data') + finally: + os.unlink(path) + + async def test_writelines(self): + with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', + delete=False) as tmp: + path = tmp.name + try: + async with await asyncio.open_file(path, 'w') as f: + await f.writelines(["a\n", "b\n", "c\n"]) + with open(path, 'r') as f: + self.assertEqual(f.read(), "a\nb\nc\n") + finally: + os.unlink(path) + + +class TestPath(unittest.IsolatedAsyncioTestCase): + + async def test_path_construction(self): + p = asyncio.Path("/tmp") + self.assertEqual(str(p), "/tmp") + + async def test_path_from_pathlib(self): + pp = pathlib.Path("/tmp") + p = asyncio.Path(pp) + self.assertEqual(str(p), "/tmp") + + async def test_sync_properties(self): + p = asyncio.Path("/tmp/foo/bar.txt") + self.assertEqual(p.name, "bar.txt") + self.assertEqual(p.stem, "bar") + self.assertEqual(p.suffix, ".txt") + self.assertEqual(p.parent, asyncio.Path("/tmp/foo")) + self.assertEqual(p.parts, ("/", "tmp", "foo", "bar.txt")) + + async def test_truediv(self): + p = asyncio.Path("/tmp") / "foo" + self.assertIsInstance(p, asyncio.Path) + self.assertEqual(str(p), "/tmp/foo") + + async def test_rtruediv(self): + p = "/tmp" / asyncio.Path("foo") + self.assertIsInstance(p, asyncio.Path) + self.assertEqual(str(p), "/tmp/foo") + + async def test_fspath(self): + p = asyncio.Path("/tmp/foo") + self.assertEqual(os.fspath(p), "/tmp/foo") + + async def test_exists(self): + with tempfile.TemporaryDirectory() as tmpdir: + p = asyncio.Path(tmpdir) + self.assertTrue(await p.exists()) + p_gone = asyncio.Path(tmpdir) + self.assertFalse(await p_gone.exists()) + + async def test_stat(self): + with tempfile.TemporaryDirectory() as tmpdir: + p = asyncio.Path(tmpdir) + st = await p.stat() + self.assertTrue(hasattr(st, 'st_mode')) + + async def test_mkdir_rmdir(self): + with tempfile.TemporaryDirectory() as tmpdir: + d = asyncio.Path(tmpdir) / "newdir" + await d.mkdir() + self.assertTrue(await d.is_dir()) + await d.rmdir() + self.assertFalse(await d.exists()) + + async def test_touch_unlink(self): + with tempfile.TemporaryDirectory() as tmpdir: + f = asyncio.Path(tmpdir) / "newfile.txt" + await f.touch() + self.assertTrue(await f.is_file()) + await f.unlink() + self.assertFalse(await f.exists()) + + async def test_read_write_text(self): + with tempfile.TemporaryDirectory() as tmpdir: + f = asyncio.Path(tmpdir) / "test.txt" + await f.write_text("hello async") + content = await f.read_text() + self.assertEqual(content, "hello async") + + async def test_read_write_bytes(self): + with tempfile.TemporaryDirectory() as tmpdir: + f = asyncio.Path(tmpdir) / "test.bin" + await f.write_bytes(b'\xde\xad\xbe\xef') + data = await f.read_bytes() + self.assertEqual(data, b'\xde\xad\xbe\xef') + + async def test_open_returns_async_file(self): + with tempfile.TemporaryDirectory() as tmpdir: + f = asyncio.Path(tmpdir) / "test.txt" + await f.write_text("open test") + async with await f.open('r') as af: + self.assertIsInstance(af, asyncio.AsyncFile) + content = await af.read() + self.assertEqual(content, "open test") + + async def test_rename(self): + with tempfile.TemporaryDirectory() as tmpdir: + src = asyncio.Path(tmpdir) / "src.txt" + dst_path = os.path.join(tmpdir, "dst.txt") + await src.write_text("rename me") + result = await src.rename(dst_path) + self.assertIsInstance(result, asyncio.Path) + self.assertFalse(await src.exists()) + dst = asyncio.Path(dst_path) + self.assertEqual(await dst.read_text(), "rename me") + + async def test_replace(self): + with tempfile.TemporaryDirectory() as tmpdir: + src = asyncio.Path(tmpdir) / "src.txt" + dst = asyncio.Path(tmpdir) / "dst.txt" + await src.write_text("source") + await dst.write_text("target") + result = await src.replace(str(dst)) + self.assertIsInstance(result, asyncio.Path) + content = await dst.read_text() + self.assertEqual(content, "source") + + async def test_resolve(self): + with tempfile.TemporaryDirectory() as tmpdir: + p = asyncio.Path(tmpdir) + resolved = await p.resolve() + self.assertIsInstance(resolved, asyncio.Path) + self.assertTrue(resolved.is_absolute()) + + async def test_absolute(self): + p = asyncio.Path("relative") + result = await p.absolute() + self.assertIsInstance(result, asyncio.Path) + self.assertTrue(result.is_absolute()) + + async def test_iterdir(self): + with tempfile.TemporaryDirectory() as tmpdir: + for name in ["a.txt", "b.txt", "c.txt"]: + pathlib.Path(tmpdir, name).touch() + p = asyncio.Path(tmpdir) + names = [] + async for child in p.iterdir(): + self.assertIsInstance(child, asyncio.Path) + names.append(child.name) + self.assertEqual(sorted(names), ["a.txt", "b.txt", "c.txt"]) + + async def test_glob(self): + with tempfile.TemporaryDirectory() as tmpdir: + for name in ["a.txt", "b.py", "c.txt"]: + pathlib.Path(tmpdir, name).touch() + p = asyncio.Path(tmpdir) + names = [] + async for child in p.glob("*.txt"): + self.assertIsInstance(child, asyncio.Path) + names.append(child.name) + self.assertEqual(sorted(names), ["a.txt", "c.txt"]) + + async def test_rglob(self): + with tempfile.TemporaryDirectory() as tmpdir: + sub = pathlib.Path(tmpdir, "sub") + sub.mkdir() + pathlib.Path(tmpdir, "a.txt").touch() + pathlib.Path(sub, "b.txt").touch() + p = asyncio.Path(tmpdir) + names = [] + async for child in p.rglob("*.txt"): + names.append(child.name) + self.assertEqual(sorted(names), ["a.txt", "b.txt"]) + + async def test_walk(self): + with tempfile.TemporaryDirectory() as tmpdir: + sub = pathlib.Path(tmpdir, "sub") + sub.mkdir() + pathlib.Path(tmpdir, "a.txt").touch() + pathlib.Path(sub, "b.txt").touch() + p = asyncio.Path(tmpdir) + entries = [] + async for dirpath, dirnames, filenames in p.walk(): + self.assertIsInstance(dirpath, asyncio.Path) + entries.append((str(dirpath), sorted(dirnames), + sorted(filenames))) + # Should have root and sub directory + self.assertEqual(len(entries), 2) + + async def test_cwd(self): + p = await asyncio.Path.cwd() + self.assertIsInstance(p, asyncio.Path) + self.assertTrue(p.is_absolute()) + + async def test_home(self): + p = await asyncio.Path.home() + self.assertIsInstance(p, asyncio.Path) + self.assertTrue(p.is_absolute()) + + @unittest.skipUnless(hasattr(os, 'symlink'), 'requires symlink support') + async def test_symlink(self): + with tempfile.TemporaryDirectory() as tmpdir: + target = asyncio.Path(tmpdir) / "target.txt" + await target.write_text("symlink target") + link = asyncio.Path(tmpdir) / "link.txt" + await link.symlink_to(str(target)) + self.assertTrue(await link.is_symlink()) + resolved = await link.readlink() + self.assertIsInstance(resolved, asyncio.Path) + content = await link.read_text() + self.assertEqual(content, "symlink target") + + async def test_repr(self): + p = asyncio.Path("/tmp/foo") + self.assertEqual(repr(p), "asyncio.Path('/tmp/foo')") + + async def test_comparison(self): + a = asyncio.Path("/a") + b = asyncio.Path("/b") + a2 = asyncio.Path("/a") + self.assertEqual(a, a2) + self.assertNotEqual(a, b) + self.assertLess(a, b) + self.assertLessEqual(a, a2) + self.assertGreater(b, a) + self.assertGreaterEqual(b, a) + + async def test_hash(self): + a = asyncio.Path("/tmp/foo") + b = asyncio.Path("/tmp/foo") + self.assertEqual(hash(a), hash(b)) + s = {a, b} + self.assertEqual(len(s), 1) + + async def test_joinpath(self): + p = asyncio.Path("/tmp") + joined = p.joinpath("foo", "bar") + self.assertIsInstance(joined, asyncio.Path) + self.assertEqual(str(joined), "/tmp/foo/bar") + + async def test_with_name_stem_suffix(self): + p = asyncio.Path("/tmp/foo.txt") + self.assertEqual(str(p.with_name("bar.txt")), "/tmp/bar.txt") + self.assertEqual(str(p.with_stem("bar")), "/tmp/bar.txt") + self.assertEqual(str(p.with_suffix(".py")), "/tmp/foo.py") + + async def test_is_absolute(self): + self.assertTrue(asyncio.Path("/tmp").is_absolute()) + self.assertFalse(asyncio.Path("relative").is_absolute()) + + @unittest.skipUnless(hasattr(pathlib.Path, 'copy'), + 'requires pathlib.Path.copy (3.14+)') + async def test_copy(self): + with tempfile.TemporaryDirectory() as tmpdir: + src = asyncio.Path(tmpdir) / "src.txt" + dst = asyncio.Path(tmpdir) / "dst.txt" + await src.write_text("copy me") + result = await src.copy(str(dst)) + self.assertIsInstance(result, asyncio.Path) + self.assertEqual(await dst.read_text(), "copy me") + + @unittest.skipUnless(hasattr(pathlib.Path, 'move'), + 'requires pathlib.Path.move (3.14+)') + async def test_move(self): + with tempfile.TemporaryDirectory() as tmpdir: + src = asyncio.Path(tmpdir) / "src.txt" + dst_path = os.path.join(tmpdir, "moved.txt") + await src.write_text("move me") + result = await src.move(dst_path) + self.assertIsInstance(result, asyncio.Path) + self.assertFalse(await src.exists()) + dst = asyncio.Path(dst_path) + self.assertEqual(await dst.read_text(), "move me") + + async def test_chmod(self): + with tempfile.TemporaryDirectory() as tmpdir: + f = asyncio.Path(tmpdir) / "test.txt" + await f.touch() + await f.chmod(0o644) + st = await f.stat() + self.assertEqual(st.st_mode & 0o777, 0o644) + + async def test_expanduser(self): + p = asyncio.Path("~") + expanded = await p.expanduser() + self.assertIsInstance(expanded, asyncio.Path) + self.assertTrue(expanded.is_absolute()) + + async def test_parents_property(self): + p = asyncio.Path("/a/b/c") + parents = p.parents + self.assertIsInstance(parents, tuple) + self.assertTrue(all(isinstance(x, asyncio.Path) for x in parents)) + self.assertEqual(str(parents[0]), "/a/b") + + async def test_samefile(self): + with tempfile.TemporaryDirectory() as tmpdir: + f = asyncio.Path(tmpdir) / "test.txt" + await f.touch() + self.assertTrue(await f.samefile(str(f))) + + +if __name__ == '__main__': + unittest.main()