Skip to content

Commit 4fbc2dc

Browse files
committed
WIP
1 parent 2ff9049 commit 4fbc2dc

File tree

3 files changed

+90
-16
lines changed

3 files changed

+90
-16
lines changed

mitogen/core.py

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2224,7 +2224,19 @@ def read(self, n=CHUNK_SIZE):
22242224
# Refuse to touch the handle after closed, it may have been reused
22252225
# by another thread. TODO: synchronize read()/write()/close().
22262226
return b('')
2227-
s, disconnected = io_op(os.read, self.fd, n)
2227+
2228+
# FIXME What should this do with BlockingIOError?
2229+
try:
2230+
s, disconnected = io_op(os.read, self.fd, n)
2231+
except IOError:
2232+
exc = sys.exc_info()[1]
2233+
if exc.errno in (errno.EAGAIN, errno.EWOULDBLOCK):
2234+
LOG.warning(
2235+
'%r.read() BlockingIOError %r fp=%r\n%s',
2236+
self, exc, self.fp, traceback.format_exc(),
2237+
)
2238+
raise
2239+
22282240
if disconnected:
22292241
LOG.debug('%r: disconnected during read: %s', self, disconnected)
22302242
return b('')
@@ -2244,7 +2256,19 @@ def write(self, s):
22442256
# Don't touch the handle after close, it may be reused elsewhere.
22452257
return None
22462258

2247-
written, disconnected = io_op(os.write, self.fd, s)
2259+
# FIXME Does BlockingIOError *always* mean 0 bytes written?
2260+
# FIXME Are callers handling a return value of 0?
2261+
try:
2262+
written, disconnected = io_op(os.write, self.fd, s)
2263+
except IOError:
2264+
exc = sys.exc_info()[1]
2265+
if exc.errno in (errno.EAGAIN, errno.EWOULDBLOCK):
2266+
LOG.warning(
2267+
'%r.write() BlockingIOError %r fp=%r len(s)=%d\n%s',
2268+
self, exc, self.fp, len(s), traceback.format_exc(),
2269+
)
2270+
return 0
2271+
22482272
if disconnected:
22492273
LOG.debug('%r: disconnected during write: %s', self, disconnected)
22502274
return None
@@ -2933,7 +2957,16 @@ def put(self, obj=None):
29332957
self._wake(wsock, cookie)
29342958

29352959
def _wake(self, wsock, cookie):
2936-
written, disconnected = io_op(os.write, wsock.fileno(), cookie)
2960+
try:
2961+
written, disconnected = io_op(os.write, wsock.fileno(), cookie)
2962+
except IOError:
2963+
exc = sys.exc_info()[1]
2964+
if exc.errno in (errno.EAGAIN, errno.EWOULDBLOCK):
2965+
LOG.warning(
2966+
'%r._wake() BlockingIOError %r wsock=%r len(cookie)=%d\n%s',
2967+
self, exc, wsock, len(cookie), traceback.format_exc(),
2968+
)
2969+
raise
29372970
assert written == len(cookie) and not disconnected
29382971

29392972
def __repr__(self):
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# 288230376151711744 mitogen-test-file ABCDEFGHIJKLMNOPQRSTUVWXYZ
2+
3+
import sys
4+
5+
PADDING_POOL = b'ABCDEFGHIJKLMNOPQRSTUVWXYZ' * 2
6+
PADDING_SIZE = len(PADDING_POOL) // 2
7+
FILE_SIZE = 512 * 2**20
8+
LINE_SIZE = 64
9+
10+
try:
11+
xrange = range
12+
except NameError:
13+
pass
14+
15+
def format_line(lineno):
16+
# type: (int) -> bytes
17+
line_offset = lineno * LINE_SIZE
18+
padding_offset = lineno % PADDING_SIZE
19+
padding = PADDING_POOL[padding_offset:padding_offset + PADDING_SIZE]
20+
return b'%018d mitogen-test-file %s\x0a' % (line_offset, padding)
21+
22+
23+
def main():
24+
output_file = sys.argv[1]
25+
written_bytes_count = 0
26+
with open(output_file, 'wb') as f:
27+
for lineno in xrange(0, FILE_SIZE // LINE_SIZE):
28+
line = format_line(lineno)
29+
f.write(line)
30+
written_bytes_count += len(line)
31+
32+
print('Wrote %d bytes to %s' % (written_bytes_count, output_file))
33+
34+
35+
if __name__ == '__main__':
36+
main()

tests/ansible/regression/issue_615__streaming_transfer.yml

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,30 +7,35 @@
77
become: true
88
vars:
99
mitogen_ssh_compression: false
10+
controller_file_path: "/tmp/fetch-{{ inventory_hostname }}-512mb.txt"
11+
target_file_path: /tmp/mitogen-test-512mb.txt
1012
tasks:
1113
- include_tasks: _mitogen_only.yml
1214
- block:
13-
- name: Create /tmp/512mb.zero
14-
shell: |
15-
dd if=/dev/zero of=/tmp/512mb.zero bs=1048576 count=512;
16-
chmod go= /tmp/512mb.zero
17-
args:
18-
creates: /tmp/512mb.zero
15+
- name: Create test file on target
16+
script:
17+
cmd: write_512mb_test_file "{{ target_file_path }}"
18+
executable: "{{ ansible_python_interpreter | default(ansible_facts.discovered_interpreter_python) }}"
19+
creates: "{{ target_file_path }}"
20+
register: target_file_task
1921

20-
- name: Fetch /tmp/512mb.zero
22+
- debug:
23+
var: target_file_task
24+
25+
- name: Fetch test file
2126
fetch:
22-
src: /tmp/512mb.zero
23-
dest: /tmp/fetch-{{ inventory_hostname }}-512mb.zero
27+
src: "{{ target_file_path }}"
28+
dest: "{{ controller_file_path }}"
2429
flat: true
2530

26-
- name: Cleanup /tmp/512mb.zero
31+
- name: Cleanup target
2732
file:
28-
path: /tmp/512mb.zero
33+
path: "{{ target_file_path }}"
2934
state: absent
3035

31-
- name: Cleanup fetched file
36+
- name: Cleanup controller
3237
file:
33-
path: /tmp/fetch-{{ inventory_hostname }}-512mb.zero
38+
path: "{{ controller_file_path }}"
3439
state: absent
3540
become: false
3641
delegate_to: localhost

0 commit comments

Comments
 (0)