telnetlib3#
参考:telnetlib3
# !pip install -q telnetlib3
使用提供基本战争游戏的流接口编写 Telnet 服务器:
import telnetlib3
async def shell(reader, writer):
writer.write('\r\nWould you like to play a game? ')
inp = await reader.read(1)
if inp:
writer.echo(inp)
writer.write('\r\nThey say the only way to win '
'is to not play at all.\r\n')
await writer.drain()
writer.close()
coro = telnetlib3.create_server(port=6023, shell=shell)
# loop = asyncio.get_event_loop()
# server = loop.run_until_complete(coro)
# loop.run_until_complete(server.wait_closed())
---------------------------------------------------------------------------
ModuleNotFoundError Traceback (most recent call last)
Cell In[2], line 1
----> 1 import telnetlib3
3 async def shell(reader, writer):
4 writer.write('\r\nWould you like to play a game? ')
ModuleNotFoundError: No module named 'telnetlib3'
await coro
<Server sockets=(<asyncio.TransportSocket fd=79, family=10, type=1, proto=6, laddr=('::', 6023, 0, 0)>, <asyncio.TransportSocket fd=82, family=2, type=1, proto=6, laddr=('0.0.0.0', 6023)>)>
编写与该服务器玩战争游戏的 Telnet 客户端:
import telnetlib3
async def shell(reader, writer):
while True:
# read stream until '?' mark is found
outp = await reader.read(1024)
if not outp:
# End of File
break
elif '?' in outp:
# reply all questions with 'y'.
writer.write('y')
# display all server output
print(outp, flush=True)
# EOF
print()
coro = telnetlib3.open_connection('localhost', 6023, shell=shell)
reader, writer = await coro
Would you like to play a game?
y
They say the only way to win is to not play at all.
测试#
import logging
import io
import os
from dataclasses import dataclass
import telnetlib3
logging.basicConfig(level=logging.INFO)
@dataclass
class Telnet:
host: str = "localhost"
port: int = 23
username: str = os.getlogin()
password: str = ""
prompt: str = b"$ " # 系统的默认提示符
async def __call__(self, command, cols=800, rows=25, **kwargs):
with io.StringIO() as record: # 记录输出
# 连接到 Telnet 服务器
self.reader, self.writer = await telnetlib3.open_connection(self.host, self.port, cols=cols, rows=rows, **kwargs)
await self.login(self.reader, self.writer, record) # 登录
# logger.info(f"登录:\n{record.getvalue()}")
with io.StringIO() as record: # 记录输出
# 执行命令
output = await self.execute(command, record)
logger.info(f"执行命令:\n{record.getvalue()}")
# 退出
self.writer.write("exit\n")
# 关闭连接
self.writer.close()
async def execute(self, command, record):
"""执行命令
"""
self.writer.write(command + "\n")
output = await self.reader.readuntil(self.prompt)
record.write(f"{output.decode('ascii', errors='ignore')}")
async def login(self, reader, writer, record):
# 等待登录提示(根据实际服务器调整)
login_prompt = await reader.readuntil(b"login: ")
record.write(f"{login_prompt.decode('ascii', errors='ignore')}")
# 发送用户名
writer.write(self.username + "\n")
# 等待密码提示
password_prompt = await reader.readuntil(b"Password: ")
record.write(f"{password_prompt.decode('ascii', errors='ignore')}")
# 发送密码(注意:密码通常不会回显)
writer.write(self.password + "\n")
# 等待登录完成(根据实际服务器调整提示符)
output = await reader.readuntil(self.prompt)
record.write(f"{output.decode('ascii', errors='ignore')}")
return record.getvalue()
# 测试
telnet = Telnet()
# await telnet("ls")