1
- import win32pipe , win32file , win32api , winerror , win32con , pywintypes
1
+ import ctypes
2
+ from ctypes import wintypes
2
3
from os import path
3
4
import io
4
5
from typing import TypeVar , NewType , Literal , IO , Optional , Union
5
6
6
7
WritableBuffer = TypeVar ("WritableBuffer" )
7
8
PyHANDLE = NewType ("PyHANDLE" , int )
8
9
10
+ # Define global constants
11
+ PIPE_ACCESS_DUPLEX = 0x00000003
12
+ PIPE_ACCESS_INBOUND = 0x00000001
13
+ PIPE_ACCESS_OUTBOUND = 0x00000002
14
+ PIPE_TYPE_BYTE = 0x00000000
15
+ PIPE_READMODE_BYTE = 0x00000000
16
+ PIPE_WAIT = 0x00000000
17
+ PIPE_UNLIMITED_INSTANCES = 0xFFFFFFFF
18
+ ERROR_PIPE_CONNECTED = 535
19
+ ERROR_BROKEN_PIPE = 109
20
+ ERROR_MORE_DATA = 234
21
+ ERROR_IO_PENDING = 997
22
+ FORMAT_MESSAGE_FROM_SYSTEM = 0x00001000
23
+ INVALID_HANDLE_VALUE = - 1
24
+
9
25
id = 0
10
26
27
+ def _wt (value : int ) -> wintypes .DWORD :
28
+ return wintypes .DWORD (value )
11
29
12
30
def _name_pipe ():
13
31
global id
@@ -23,11 +41,8 @@ def _name_pipe():
23
41
24
42
def _win_error (code = None ):
25
43
if not code :
26
- code = win32api .GetLastError ()
27
- return OSError (
28
- f"[OS Error { code } ] { win32api .FormatMessage (win32con .FORMAT_MESSAGE_FROM_SYSTEM ,0 ,code ,0 ,None )} "
29
- )
30
-
44
+ code = ctypes .get_last_error ()
45
+ return ctypes .WinError (code )
31
46
32
47
class NPopen :
33
48
def __init__ (
@@ -44,6 +59,7 @@ def __init__(
44
59
if not isinstance (bufsize , int ):
45
60
raise TypeError ("bufsize must be an integer" )
46
61
62
+ self .kernel32 = ctypes .WinDLL ('kernel32' , use_last_error = True )
47
63
self .stream : Union [IO , None ] = None # I/O stream of the pipe
48
64
self ._path = _name_pipe () if name is None else rf"\\.\pipe\{ name } "
49
65
self ._rd = any (c in mode for c in "r+" )
@@ -72,30 +88,25 @@ def __init__(
72
88
)
73
89
74
90
self ._bufsize = - 1 if txt_mode and bufsize == 1 else bufsize
75
-
76
91
if self ._rd and self ._wr :
77
- access = win32pipe . PIPE_ACCESS_DUPLEX
92
+ access = _wt ( PIPE_ACCESS_DUPLEX )
78
93
elif self ._rd :
79
- access = win32pipe . PIPE_ACCESS_INBOUND
94
+ access = _wt ( PIPE_ACCESS_INBOUND )
80
95
elif self ._wr :
81
- access = win32pipe . PIPE_ACCESS_OUTBOUND
96
+ access = _wt ( PIPE_ACCESS_OUTBOUND )
82
97
else :
83
98
raise ValueError ("Invalid mode" )
84
99
# TODO: assess options: FILE_FLAG_WRITE_THROUGH, FILE_FLAG_OVERLAPPED, FILE_FLAG_FIRST_PIPE_INSTANCE
100
+ pipe_mode = _wt (PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT )
85
101
86
- pipe_mode = (
87
- win32pipe .PIPE_TYPE_BYTE
88
- | win32pipe .PIPE_READMODE_BYTE
89
- | win32pipe .PIPE_WAIT
90
- )
91
102
# TODO: assess options: PIPE_WAIT, PIPE_NOWAIT, PIPE_ACCEPT_REMOTE_CLIENTS, PIPE_REJECT_REMOTE_CLIENTS
92
103
93
- max_instances = win32pipe . PIPE_UNLIMITED_INSTANCES # 1
94
- buffer_size = 0
95
- timeout = 0
104
+ max_instances = _wt ( 1 ) # PIPE_UNLIMITED_INSTANCES returns 'invalid params'. Pipes are point-to-point anyway
105
+ buffer_size = _wt ( 0 )
106
+ timeout = _wt ( 0 )
96
107
97
108
# "open" named pipe
98
- self . _pipe = win32pipe . CreateNamedPipe (
109
+ h = self . kernel32 . CreateNamedPipeW (
99
110
self ._path ,
100
111
access ,
101
112
pipe_mode ,
@@ -105,37 +116,37 @@ def __init__(
105
116
timeout ,
106
117
None ,
107
118
)
108
- if self . _pipe == win32file . INVALID_HANDLE_VALUE :
119
+ if h == INVALID_HANDLE_VALUE :
109
120
raise _win_error ()
121
+ self ._pipe = h
110
122
111
123
@property
112
124
def path (self ):
113
125
"""str: path of the pipe in the file system"""
114
126
return self ._path
115
127
116
128
def __str__ (self ):
117
- # return the path
118
129
return self ._path
119
130
120
131
def close (self ):
121
- # close named pipe
132
+ """Close the named pipe.
133
+ A closed pipe cannot be used for further I/O operations. `close()` may
134
+ be called more than once without error.
135
+ """
122
136
if self .stream is not None :
123
137
self .stream .close ()
124
138
self .stream = None
125
139
if self ._pipe is not None :
126
- if win32file .CloseHandle (self ._pipe ):
127
- raise _win_error ()
140
+ self .kernel32 .CloseHandle (self ._pipe )
128
141
self ._pipe = None
129
142
130
143
def wait (self ):
131
-
144
+ """Wait for the pipe to open (the other end to be opened) and return file object to read/write."""
132
145
if not self ._pipe :
133
146
raise RuntimeError ("pipe has already been closed." )
134
-
135
- # wait for the pipe to open (the other end to be opened) and return fileobj to read/write
136
- if win32pipe .ConnectNamedPipe (self ._pipe , None ):
137
- code = win32api .GetLastError ()
138
- if code != 535 : # ERROR_PIPE_CONNECTED (ok, just indicating that the client has already connected)(Issue#3)
147
+ if not self .kernel32 .ConnectNamedPipe (self ._pipe , None ):
148
+ code = ctypes .get_last_error ()
149
+ if code != ERROR_PIPE_CONNECTED : # (ok, just indicating that the client has already connected)(Issue#3)
139
150
raise _win_error (code )
140
151
141
152
# create new io stream object
@@ -179,15 +190,16 @@ def writable(self) -> bool:
179
190
class Win32RawIO (io .RawIOBase ):
180
191
"""Raw I/O stream layer over open Windows pipe handle.
181
192
182
- `handle` is an open ``pywintypes.PyHANDLE `` object (from ``pywin32 `` package) to
193
+ `handle` is an open Windows ``HANDLE `` object (from ``ctype `` package) to
183
194
be wrapped by this class.
184
195
185
196
Specify the read and write modes by boolean flags: ``rd`` and ``wr``.
186
197
"""
187
198
188
199
def __init__ (self , handle : PyHANDLE , rd : bool , wr : bool ) -> None :
189
200
super ().__init__ ()
190
- self .handle = handle # PyHANDLE: Underlying Windows handle.
201
+ self .kernel32 = ctypes .WinDLL ('kernel32' , use_last_error = True )
202
+ self .handle = handle # Underlying Windows handle.
191
203
self ._readable : bool = rd
192
204
self ._writable : bool = wr
193
205
@@ -211,7 +223,7 @@ def close(self) -> None:
211
223
if self .closed :
212
224
return
213
225
if self .handle is not None :
214
- win32file .CloseHandle (self .handle )
226
+ self . kernel32 .CloseHandle (self .handle )
215
227
self .handle = None
216
228
217
229
super ().close ()
@@ -224,23 +236,19 @@ def readinto(self, b: WritableBuffer) -> Union[int, None]:
224
236
assert self ._readable
225
237
226
238
size = len (b )
227
- nread = 0
228
- while nread < size :
229
- try :
230
- hr , res = win32file .ReadFile (self .handle , size - nread )
231
- if hr in (winerror .ERROR_MORE_DATA , winerror .ERROR_IO_PENDING ):
232
- raise _win_error (hr )
233
- except pywintypes .error as e :
234
- if e .args [0 ] == 109 : # broken pipe error
235
- break
236
- else :
237
- raise e
238
- if not len (res ):
239
- break
240
- nnext = nread + len (res )
241
- b [nread :nnext ] = res
242
- nread = nnext
243
- return nread
239
+ nread = _wt (0 )
240
+ buf = (ctypes .c_char * size ).from_buffer (b )
241
+
242
+ success = self .kernel32 .ReadFile (self .handle , buf , size , ctypes .byref (nread ), None )
243
+ if not success :
244
+ code = ctypes .get_last_error ()
245
+ # ERROR_MORE_DATA - not big deal, will read next time
246
+ # ERROR_IO_PENDING - should not happen, unless use OVERLAPPING, which we don't so far
247
+ # ERROR_BROKEN_PIPE - pipe was closed from other end. While it is an error, test seemingly expects to receive 0 instead of exception
248
+ if code not in (ERROR_MORE_DATA , ERROR_IO_PENDING , ERROR_BROKEN_PIPE ):
249
+ raise _win_error (code )
250
+
251
+ return nread .value
244
252
245
253
def write (self , b ):
246
254
"""Write buffer ``b`` to file, return number of bytes written.
@@ -250,7 +258,10 @@ def write(self, b):
250
258
251
259
assert self .handle is not None # show type checkers we already checked
252
260
assert self ._writable
253
- hr , n_written = win32file .WriteFile (self .handle , b )
254
- if hr :
255
- raise _win_error (hr )
256
- return n_written
261
+ size = len (b )
262
+ nwritten = _wt (0 )
263
+ buf = (ctypes .c_char * size ).from_buffer_copy (b )
264
+ if not self .kernel32 .WriteFile (self .handle , buf , _wt (size ), ctypes .byref (nwritten ), None ):
265
+ raise _win_error ()
266
+
267
+ return nwritten .value
0 commit comments