first commit

This commit is contained in:
Ayxan
2022-05-23 00:16:32 +04:00
commit d660f2a4ca
24786 changed files with 4428337 additions and 0 deletions

View File

@ -0,0 +1,2 @@
# -*- coding: utf-8 -*-
"""winpty module tests."""

View File

@ -0,0 +1,159 @@
# -*- coding: utf-8 -*-
"""winpty wrapper tests."""
# Standard library imports
import os
import time
# Third party imports
from winpty import PTY, WinptyError
from winpty.enums import Backend
from winpty.ptyprocess import which
import pytest
CMD = which('cmd').lower()
def pty_factory(backend):
if os.environ.get('CI_RUNNING', None) == '1':
if backend == Backend.ConPTY:
os.environ['CONPTY_CI'] = '1'
elif backend == Backend.WinPTY:
os.environ.pop('CONPTY_CI', None)
@pytest.fixture(scope='function')
def pty_fixture():
pty = PTY(80, 20, backend=backend)
# loc = bytes(os.getcwd(), 'utf8')
assert pty.spawn(CMD)
time.sleep(0.3)
return pty
return pty_fixture
conpty_provider = pty_factory(Backend.ConPTY)
winpty_provider = pty_factory(Backend.WinPTY)
@pytest.fixture(scope='module', params=['WinPTY', 'ConPTY'])
def pty_fixture(request):
backend = request.param
if os.environ.get('CI_RUNNING', None) == '1':
if backend == 'ConPTY':
os.environ['CI'] = '1'
os.environ['CONPTY_CI'] = '1'
if backend == 'WinPTY':
os.environ.pop('CI', None)
os.environ.pop('CONPTY_CI', None)
backend = getattr(Backend, backend)
def _pty_factory():
pty = PTY(80, 25, backend=backend)
assert pty.spawn(CMD)
time.sleep(0.3)
return pty
return _pty_factory
# @pytest.fixture(scope='function', params=[
# pytest.lazy_fixture('conpty_provider'),
# pytest.lazy_fixture('winpty_provider')])
# def pty_fixture(request):
# pty = request.param
# return pty
def test_read(pty_fixture, capsys):
pty = pty_fixture()
loc = os.getcwd()
readline = ''
with capsys.disabled():
start_time = time.time()
while loc not in readline:
if time.time() - start_time > 5:
break
readline += pty.read()
assert loc in readline or 'cmd' in readline
del pty
def test_write(pty_fixture):
pty = pty_fixture()
line = pty.read()
str_text = 'Eggs, ham and spam ünicode'
# text = bytes(str_text, 'utf-8')
num_bytes = pty.write(str_text)
line = ''
start_time = time.time()
while str_text not in line:
if time.time() - start_time > 5:
break
line += pty.read()
assert str_text in line
del pty
def test_isalive(pty_fixture):
pty = pty_fixture()
pty.write('exit\r\n')
text = 'exit'
line = ''
while text not in line:
try:
line += pty.read()
except Exception:
break
while pty.isalive():
try:
pty.read()
# continue
except Exception:
break
assert not pty.isalive()
del pty
# def test_agent_spawn_fail(pty_fixture):
# pty = pty_fixture
# try:
# pty.spawn(CMD)
# assert False
# except WinptyError:
# pass
@pytest.mark.parametrize(
'backend_name,backend',
[("ConPTY", Backend.ConPTY), ('WinPTY', Backend.WinPTY)])
def test_pty_create_size_fail(backend_name, backend):
try:
PTY(80, -25, backend=backend)
assert False
except WinptyError:
pass
def test_agent_resize_fail(pty_fixture):
pty = pty_fixture()
try:
pty.set_size(-80, 70)
assert False
except WinptyError:
pass
finally:
del pty
def test_agent_resize(pty_fixture):
pty = pty_fixture()
pty.set_size(80, 70)
del pty

View File

@ -0,0 +1,257 @@
# -*- coding: utf-8 -*-
"""winpty wrapper tests."""
# Standard library imports
import asyncio
import os
import signal
import time
import sys
import re
# Third party imports
import pytest
from flaky import flaky
# Local imports
from winpty.enums import Backend
from winpty.ptyprocess import PtyProcess, which
@pytest.fixture(scope='module', params=['WinPTY', 'ConPTY'])
def pty_fixture(request):
backend = request.param
if os.environ.get('CI_RUNNING', None) == '1':
if backend == 'ConPTY':
os.environ['CI'] = '1'
os.environ['CONPTY_CI'] = '1'
if backend == 'WinPTY':
os.environ.pop('CI', None)
os.environ.pop('CONPTY_CI', None)
backend = getattr(Backend, backend)
def _pty_factory(cmd=None, env=None):
cmd = cmd or 'cmd'
return PtyProcess.spawn(cmd, env=env, backend=backend)
_pty_factory.backend = request.param
return _pty_factory
@flaky(max_runs=40, min_passes=1)
def test_read(pty_fixture):
pty = pty_fixture()
loc = os.getcwd()
data = ''
tries = 0
while loc not in data and tries < 10:
try:
data += pty.read()
except EOFError:
pass
tries += 1
assert loc in data
pty.terminate()
time.sleep(2)
@flaky(max_runs=40, min_passes=1)
def test_write(pty_fixture):
pty = pty_fixture()
text = 'Eggs, ham and spam ünicode'
pty.write(text)
data = ''
tries = 0
while text not in data and tries < 10:
try:
data += pty.read()
except EOFError:
pass
tries += 1
assert text in data
pty.terminate()
@pytest.mark.xfail(reason="It fails sometimes due to long strings")
@flaky(max_runs=40, min_passes=1)
def test_isalive(pty_fixture):
pty = pty_fixture()
pty.write('echo \"foo\"\r\nexit\r\n')
data = ''
while True:
try:
print('Stuck')
data += pty.read()
except EOFError:
break
regex = re.compile(".*foo.*")
assert regex.findall(data)
assert not pty.isalive()
pty.terminate()
@pytest.mark.xfail(reason="It fails sometimes due to long strings")
@flaky(max_runs=40, min_passes=1)
def test_readline(pty_fixture):
env = os.environ.copy()
env['foo'] = 'bar'
pty = pty_fixture(env=env)
# Ensure that the echo print has its own CRLF
pty.write('cls\r\n')
pty.write('echo %foo%\r\n')
data = ''
tries = 0
while 'bar' not in data and tries < 10:
data = pty.readline()
tries += 1
assert 'bar' in data
pty.terminate()
def test_close(pty_fixture):
pty = pty_fixture()
pty.close()
assert not pty.isalive()
def test_flush(pty_fixture):
pty = pty_fixture()
pty.flush()
pty.terminate()
def test_intr(pty_fixture):
pty = pty_fixture(cmd=[sys.executable, 'import time; time.sleep(10)'])
pty.sendintr()
assert pty.wait() != 0
def test_send_control(pty_fixture):
pty = pty_fixture(cmd=[sys.executable, 'import time; time.sleep(10)'])
pty.sendcontrol('d')
assert pty.wait() != 0
@pytest.mark.skipif(which('cat') is None, reason="Requires cat on the PATH")
def test_send_eof(pty_fixture):
cat = pty_fixture('cat')
cat.sendeof()
assert cat.wait() == 0
def test_isatty(pty_fixture):
pty = pty_fixture()
assert pty.isatty()
pty.terminate()
assert not pty.isatty()
def test_wait(pty_fixture):
pty = pty_fixture(cmd=[sys.executable, '--version'])
assert pty.wait() == 0
def test_exit_status(pty_fixture):
pty = pty_fixture(cmd=[sys.executable])
pty.write('import sys;sys.exit(1)\r\n')
pty.wait()
assert pty.exitstatus == 1
@pytest.mark.timeout(30)
def test_kill_sigterm(pty_fixture):
pty = pty_fixture()
pty.write('echo \"foo\"\r\nsleep 1000\r\n')
pty.read()
pty.kill(signal.SIGTERM)
while True:
try:
pty.read()
except EOFError:
break
assert not pty.isalive()
assert pty.exitstatus == signal.SIGTERM
@pytest.mark.timeout(30)
def test_terminate(pty_fixture):
pty = pty_fixture()
pty.write('echo \"foo\"\r\nsleep 1000\r\n')
pty.read()
pty.terminate()
while True:
try:
pty.read()
except EOFError:
break
assert not pty.isalive()
@pytest.mark.timeout(30)
def test_terminate_force(pty_fixture):
pty = pty_fixture()
pty.write('echo \"foo\"\r\nsleep 1000\r\n')
pty.read()
pty.terminate(force=True)
while True:
try:
pty.read()
except EOFError:
break
assert not pty.isalive()
def test_terminate_loop(pty_fixture):
pty = pty_fixture()
loop = asyncio.SelectorEventLoop()
asyncio.set_event_loop(loop)
def reader():
try:
data = pty.read()
except EOFError:
loop.remove_reader(pty.fd)
loop.stop()
loop.add_reader(pty.fd, reader)
loop.call_soon(pty.write, 'echo \"foo\"\r\nsleep 1000\r\n')
loop.call_soon(pty.terminate, True)
try:
loop.run_forever()
finally:
loop.close()
assert not pty.isalive()
def test_getwinsize(pty_fixture):
pty = pty_fixture()
assert pty.getwinsize() == (24, 80)
pty.terminate()
def test_setwinsize(pty_fixture):
pty = pty_fixture()
pty.setwinsize(50, 110)
assert pty.getwinsize() == (50, 110)
pty.terminate()
pty = PtyProcess.spawn('cmd', dimensions=(60, 120))
assert pty.getwinsize() == (60, 120)
pty.terminate()