This repository has been archived by the owner on Oct 21, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy path__init__.py
117 lines (91 loc) · 3.52 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
"""A connector to send messages using the command line."""
import logging
import os
import sys
import platform
import asyncio
from opsdroid.connector import Connector, register_event
from opsdroid.events import Message
_LOGGER = logging.getLogger(__name__)
class ConnectorShell(Connector):
"""A connector to send messages using the command line."""
def __init__(self, config, opsdroid=None):
"""Create the connector."""
_LOGGER.debug(_("Loaded shell connector"))
super().__init__(config, opsdroid=opsdroid)
self.name = "shell"
self.config = config
self.bot_name = config.get("bot-name", "opsdroid")
self.prompt_length = None
self.listening = True
self.reader = None
self._closing = asyncio.Event()
self.loop = asyncio.get_event_loop()
for name in ('LOGNAME', 'USER', 'LNAME', 'USERNAME'):
user = os.environ.get(name)
if user:
self.user = user
async def read_stdin(self):
"""Create a stream reader to read stdin asynchronously.
Returns:
class: asyncio.streams.StreamReader
"""
self.reader = asyncio.StreamReader(loop=self.loop)
reader_protocol = asyncio.StreamReaderProtocol(self.reader)
await self.loop.connect_read_pipe(
lambda: reader_protocol,
sys.stdin)
return self.reader
async def async_input(self):
"""Read user input asynchronously from stdin.
Returns:
string: A decoded string from user input.
"""
if not self.reader:
self.reader = await self.read_stdin()
line = await self.reader.readline()
return line.decode('utf8').replace('\r', '').replace('\n', '')
def draw_prompt(self):
"""Draw the user input prompt."""
prompt = self.bot_name + '> '
self.prompt_length = len(prompt)
print(prompt, end="", flush=True)
def clear_prompt(self):
"""Clear the prompt."""
print("\r" + (" " * self.prompt_length) + "\r", end="", flush=True)
async def _parse_message(self):
"""Parse user input."""
while self.listening:
self.draw_prompt()
user_input = await self.async_input()
message = Message(user_input, self.user, None, self)
await self.opsdroid.parse(message)
async def connect(self):
"""Connect to the shell.
There is nothing to do here since stdin is already available.
Since this is the first method called when opsdroid starts, a logging
message is shown if the user is using windows.
"""
if platform.system() == "Windows":
_LOGGER.warning("The shell connector does not work on windows."
" Please install the Opsdroid Desktop App.")
pass
async def listen(self):
"""Listen for and parse new user input."""
_LOGGER.debug(_("Connecting to shell"))
message_processor = self.loop.create_task(self._parse_message())
await self._closing.wait()
message_processor.cancel()
@register_event(Message)
async def respond(self, message):
"""Respond with a message.
Args:
message (object): An instance of Message
"""
_LOGGER.debug(_("Responding with: %s"), message.text)
self.clear_prompt()
print(message.text)
self.draw_prompt()
async def disconnect(self):
"""Disconnects the connector."""
self._closing.set()