Skip to content

Commit

Permalink
Fixes compatibility with Python 2.7.6 (Issue #179)
Browse files Browse the repository at this point in the history
  • Loading branch information
olsonse authored and ask committed Jul 12, 2016
1 parent 3ee7a5c commit 45aaa54
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 21 deletions.
13 changes: 11 additions & 2 deletions billiard/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,18 @@ def get_errno(exc):
import _posixsubprocess
except ImportError:
def spawnv_passfds(path, args, passfds):
if not os.fork():
close_open_fds(keep=sorted(passfds))
if sys.platform != 'win32':
# when not using _posixsubprocess (on earlier python) and not on
# windows, we want to keep stdout/stderr open...
passfds = passfds + [
maybe_fileno(sys.stdout),
maybe_fileno(sys.stderr),
]
pid = os.fork()
if not pid:
close_open_fds(keep=sorted(f for f in passfds if f))
os.execv(fsencode(path), args)
return pid
else:
def spawnv_passfds(path, args, passfds):
passfds = sorted(passfds)
Expand Down
39 changes: 27 additions & 12 deletions billiard/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,19 @@ def address_type(address):
#


class _SocketContainer(object):

def __init__(self, sock):
self.sock = sock


class _ConnectionBase(object):
_handle = None

def __init__(self, handle, readable=True, writable=True):
if isinstance(handle, _SocketContainer):
self._socket = handle.sock # keep ref so not collected
handle = handle.sock.fileno()
handle = handle.__index__()
if handle < 0:
raise ValueError("invalid handle")
Expand Down Expand Up @@ -536,6 +545,14 @@ def Client(address, family=None, authkey=None):
return c


def detach(sock):
if hasattr(sock, 'detach'):
return sock.detach()
# older socket lib does not have detach. We'll keep a reference around
# so that it does not get garbage collected.
return _SocketContainer(sock)


if sys.platform != 'win32':

def Pipe(duplex=True, rnonblock=False, wnonblock=False):
Expand All @@ -546,8 +563,8 @@ def Pipe(duplex=True, rnonblock=False, wnonblock=False):
s1, s2 = socket.socketpair()
s1.setblocking(not rnonblock)
s2.setblocking(not wnonblock)
c1 = Connection(s1.detach())
c2 = Connection(s2.detach())
c1 = Connection(detach(s1))
c2 = Connection(detach(s2))
else:
fd1, fd2 = os.pipe()
if rnonblock:
Expand Down Expand Up @@ -646,7 +663,7 @@ def accept(self):
else:
break
s.setblocking(True)
return Connection(s.detach())
return Connection(detach(s))

def close(self):
try:
Expand All @@ -663,10 +680,10 @@ def SocketClient(address):
Return a connection object connected to the socket given by `address`
'''
family = address_type(address)
with socket.socket(getattr(socket, family)) as s:
s.setblocking(True)
s.connect(address)
return Connection(s.detach())
s = socket.socket(getattr(socket, family))
s.setblocking(True)
s.connect(address)
return Connection(detach(s))

#
# Definitions for connections based on named pipes
Expand Down Expand Up @@ -997,7 +1014,7 @@ def reduce_connection(conn):

def rebuild_connection(ds, readable, writable):
sock = ds.detach()
return Connection(sock.detach(), readable, writable)
return Connection(detach(sock), readable, writable)
reduction.register(Connection, reduce_connection)

def reduce_pipe_connection(conn):
Expand All @@ -1007,8 +1024,7 @@ def reduce_pipe_connection(conn):
return rebuild_pipe_connection, (dh, conn.readable, conn.writable)

def rebuild_pipe_connection(dh, readable, writable):
handle = dh.detach()
return PipeConnection(handle, readable, writable)
return PipeConnection(detach(dh), readable, writable)
reduction.register(PipeConnection, reduce_pipe_connection)

else:
Expand All @@ -1017,6 +1033,5 @@ def reduce_connection(conn):
return rebuild_connection, (df, conn.readable, conn.writable)

def rebuild_connection(df, readable, writable):
fd = df.detach()
return Connection(fd, readable, writable)
return Connection(detach(df), readable, writable)
reduction.register(Connection, reduce_connection)
8 changes: 4 additions & 4 deletions billiard/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@
AuthenticationError,
)
except ImportError:
class ProcessError(Exception): # noqa
class ProcessError(Exception): # noqa
pass

class BufferTooShort(Exception): # noqa
class BufferTooShort(ProcessError): # noqa
pass

class TimeoutError(Exception): # noqa
class TimeoutError(ProcessError): # noqa
pass

class AuthenticationError(Exception): # noqa
class AuthenticationError(ProcessError): # noqa
pass


Expand Down
2 changes: 1 addition & 1 deletion billiard/reduction.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def loadbuf(cls, buf, protocol=None):

@classmethod
def loads(self, buf, loads=pickle.loads):
return loads(buf.getvalue())
return loads(buf)
register = ForkingPickler.register


Expand Down
3 changes: 2 additions & 1 deletion billiard/semaphore_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#
from __future__ import absolute_import

import io
import os
import signal
import sys
Expand Down Expand Up @@ -108,7 +109,7 @@ def main(fd):
cache = set()
try:
# keep track of registered/unregistered semaphores
with open(fd, 'rb') as f:
with io.open(fd, 'rb') as f:
for line in f:
try:
cmd, name = line.strip().split(b':')
Expand Down
3 changes: 2 additions & 1 deletion billiard/spawn.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#
from __future__ import absolute_import

import io
import os
import pickle
import sys
Expand Down Expand Up @@ -197,7 +198,7 @@ def _setup_logging_in_child_hack():

def _main(fd):
_Django_old_layout_hack__load()
with os.fdopen(fd, 'rb', closefd=True) as from_parent:
with io.open(fd, 'rb', closefd=True) as from_parent:
process.current_process()._inheriting = True
try:
preparation_data = pickle.load(from_parent)
Expand Down

0 comments on commit 45aaa54

Please sign in to comment.