Skip to content

Commit 2c61d5d

Browse files
committed
Implement Server and create_server
1 parent 17afc93 commit 2c61d5d

File tree

11 files changed

+202
-88
lines changed

11 files changed

+202
-88
lines changed

uvloop/handles/handle.pyx

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,25 @@ cdef class UVHandle:
3838
if self._handle is NULL:
3939
return
4040

41+
if self._handle.loop is NULL:
42+
# The handle wasn't initialized with "uv_{handle}_init"
43+
self._closed = 1
44+
PyMem_Free(self._handle)
45+
raise RuntimeError(
46+
'{} is open in __dealloc__ with loop set to NULL'
47+
.format(self.__class__.__name__))
48+
49+
# When we're at this point, something went wrong.
50+
4151
if self._closed == 1:
4252
# So _handle is not NULL and self._closed == 1?
4353
raise RuntimeError(
4454
'{}.__dealloc__: _handle is NULL, _closed == 1'.format(
4555
self.__class__.__name__))
4656

47-
# Let's close the handle.
57+
# The handle is dealloced while open. Let's try to close it.
58+
# Situations when this is possible include unhandled exceptions,
59+
# errors during Handle.__cinit__/__init__ etc.
4860
self._handle.data = <void*> __NOHANDLE__
4961
uv.uv_close(self._handle, __uv_close_handle_cb) # void; no errors
5062
self._handle = NULL

uvloop/handles/stream.pxd

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@ cdef class UVStream(UVHandle):
22
cdef:
33
uv.uv_shutdown_t _shutdown_req
44
bint __reading
5+
object __cached_socket
56

67
cdef _fileno(self)
8+
cdef _get_socket(self)
79

810
cdef _shutdown(self)
911

uvloop/handles/stream.pyx

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ cdef class _StreamWriteContext:
6969
cdef class UVStream(UVHandle):
7070
def __cinit__(self):
7171
self.__reading = 0
72+
self.__cached_socket = None
7273

7374
cdef _shutdown(self):
7475
cdef int err
@@ -186,9 +187,33 @@ cdef class UVStream(UVHandle):
186187

187188
return fd
188189

190+
cdef _get_socket(self):
191+
cdef:
192+
int buf_len = sizeof(system.sockaddr_storage)
193+
int err
194+
system.sockaddr_storage buf
195+
196+
if self.__cached_socket is not None:
197+
return self.__cached_socket
198+
199+
err = uv.uv_tcp_getsockname(<uv.uv_tcp_t*>self._handle,
200+
<system.sockaddr*>&buf,
201+
&buf_len)
202+
if err < 0:
203+
raise convert_error(err)
204+
205+
self.__cached_socket = socket_fromfd(self._fileno(),
206+
buf.ss_family,
207+
uv.SOCK_STREAM)
208+
209+
return self.__cached_socket
210+
189211
cdef _close(self):
190212
self.__reading_stopped()
191213
UVHandle._close(self)
214+
if self.__cached_socket is not None:
215+
self.__cached_socket.close()
216+
self.__cached_socket = None
192217

193218
# Methods to override.
194219

uvloop/handles/tcp.pxd

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ cdef class UVTcpStream(UVStream):
1212
cdef class UVTCPServer(UVTcpStream):
1313
cdef:
1414
object protocol_factory
15+
Server host_server
1516

1617
cdef _init(self)
1718
cdef _set_protocol_factory(self, object protocol_factory)
@@ -23,11 +24,13 @@ cdef class UVTCPServer(UVTcpStream):
2324
cdef _on_listen(self)
2425

2526
@staticmethod
26-
cdef UVTCPServer new(Loop loop, object protocol_factory)
27+
cdef UVTCPServer new(Loop loop, object protocol_factory, Server server)
2728

2829

2930
cdef class UVServerTransport(UVTcpStream):
3031
cdef:
32+
Server host_server
33+
3134
bint eof
3235
bint reading
3336

@@ -58,5 +61,7 @@ cdef class UVServerTransport(UVTcpStream):
5861
cdef _maybe_pause_protocol(self)
5962
cdef _maybe_resume_protocol(self)
6063

64+
cdef _call_connection_lost(self, exc)
65+
6166
@staticmethod
62-
cdef UVServerTransport new(Loop loop, object protocol)
67+
cdef UVServerTransport new(Loop loop, object protocol, Server server)

uvloop/handles/tcp.pyx

Lines changed: 34 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,16 @@ cdef class UVTCPServer(UVTcpStream):
4242
cdef _init(self):
4343
UVTcpStream._init(<UVTcpStream>self)
4444
self.protocol_factory = None
45+
self.host_server = None
4546

4647
@staticmethod
47-
cdef UVTCPServer new(Loop loop, object protocol_factory):
48+
cdef UVTCPServer new(Loop loop, object protocol_factory, Server server):
4849
cdef UVTCPServer handle
4950
handle = UVTCPServer.__new__(UVTCPServer)
5051
handle._set_loop(loop)
5152
handle._init()
5253
handle._set_protocol_factory(protocol_factory)
54+
handle.host_server = server
5355
return handle
5456

5557
cdef _set_protocol_factory(self, object protocol_factory):
@@ -88,7 +90,7 @@ cdef class UVTCPServer(UVTcpStream):
8890

8991
cdef _on_listen(self):
9092
protocol = self.protocol_factory()
91-
client = UVServerTransport.new(self._loop, protocol)
93+
client = UVServerTransport.new(self._loop, protocol, self.host_server)
9294
client._accept(<UVStream>self)
9395

9496

@@ -112,12 +114,15 @@ cdef class UVServerTransport(UVTcpStream):
112114
self.low_water = FLOW_CONTROL_LOW_WATER
113115

114116
@staticmethod
115-
cdef UVServerTransport new(Loop loop, object protocol):
117+
cdef UVServerTransport new(Loop loop, object protocol, Server server):
116118
cdef UVServerTransport handle
117119
handle = UVServerTransport.__new__(UVServerTransport)
118120
handle._set_loop(loop)
119121
handle._init()
120122
handle._set_protocol(protocol)
123+
handle.host_server = server
124+
if server is not None:
125+
(<Server>server)._attach()
121126
return handle
122127

123128
cdef _set_protocol(self, object protocol):
@@ -210,15 +215,37 @@ cdef class UVServerTransport(UVTcpStream):
210215
'protocol': self.protocol,
211216
})
212217

218+
cdef _call_connection_lost(self, exc):
219+
try:
220+
if self.protocol is not None:
221+
self.protocol.connection_lost(exc)
222+
finally:
223+
self.protocol = None
224+
self.protocol_data_received = None
225+
226+
self._close()
227+
228+
server = self.host_server
229+
if server is not None:
230+
server._detach()
231+
self.host_server = None
232+
233+
cdef _report_callback_error(self, exc):
234+
self._call_connection_lost(exc)
235+
super()._report_callback_error(exc)
236+
237+
cdef _raise_fatal_error(self, exc):
238+
self._call_connection_lost(exc)
239+
super()._raise_fatal_error(exc)
240+
213241
# Public API
214242

215243
property _closing:
216244
def __get__(self):
217-
# "self._closing" refers to "UVHandle._closing" cdef
218-
return (<UVHandle>self)._closing or (<UVHandle>self)._closed
245+
return (<UVHandle>self)._closed
219246

220247
def is_closing(self):
221-
return (<UVHandle>self)._closing or (<UVHandle>self)._closed
248+
return (<UVHandle>self)._closed
222249

223250
def write(self, object buf):
224251
self._write(buf, None)
@@ -279,23 +306,8 @@ cdef class UVServerTransport(UVTcpStream):
279306
return (self._low_water, self._high_water)
280307

281308
def get_extra_info(self, name, default=None):
282-
cdef:
283-
int buf_len = sizeof(system.sockaddr_storage)
284-
int err
285-
system.sockaddr_storage buf
286-
287309
if name == 'socket':
288-
err = uv.uv_tcp_getsockname(<uv.uv_tcp_t*>self._handle,
289-
<system.sockaddr*>&buf,
290-
&buf_len)
291-
if err < 0:
292-
self._close()
293-
raise convert_error(err)
294-
295-
return socket_fromfd(self._fileno(),
296-
buf.ss_family,
297-
uv.SOCK_STREAM)
298-
310+
return self._get_socket()
299311
return default
300312

301313
def abort(self):

uvloop/includes/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import collections
44
import concurrent.futures
55
import functools
6+
import itertools
67
import os
78
import socket
89
import sys

uvloop/includes/stdlib.pxi

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import asyncio, asyncio.log, asyncio.base_events
22
import collections
33
import concurrent.futures
44
import functools
5+
import itertools
56
import os
67
import socket
78
import sys
@@ -29,6 +30,8 @@ cdef cc_ThreadPoolExecutor = concurrent.futures.ThreadPoolExecutor
2930

3031
cdef ft_partial = functools.partial
3132

33+
cdef iter_chain = itertools.chain
34+
3235
cdef int has_AF_INET6 = hasattr(socket, 'AF_INET6')
3336
cdef int has_SO_REUSEPORT = hasattr(socket, 'SO_REUSEPORT')
3437
cdef int has_IPPROTO_IPV6 = hasattr(socket, 'IPPROTO_IPV6')
@@ -66,5 +69,5 @@ cdef long MAIN_THREAD_ID = <long>threading.main_thread().ident
6669

6770

6871
# Cython doesn't clean-up imported objects properly in Py3 mode.
69-
del asyncio, concurrent, collections, \
70-
functools, socket, os, sys, threading
72+
del asyncio, concurrent, collections
73+
del functools, itertools, socket, os, sys, threading

uvloop/loop.pxd

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,4 +120,6 @@ include "handles/tcp.pxd"
120120

121121
include "request.pxd"
122122

123+
include "server.pxd"
124+
123125
include "os_signal.pxd"

uvloop/loop.pyx

Lines changed: 57 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -662,21 +662,66 @@ cdef class Loop:
662662
return self._getaddrinfo(host, port, family, type, proto, flags, 1)
663663

664664
@aio_coroutine
665-
async def create_server(self, protocol_factory, str host, int port):
666-
addrinfo = await self._getaddrinfo(host, port, 0, 0, 0, 0, 0)
667-
if not AddrInfo.isinstance(addrinfo):
668-
raise RuntimeError('unvalid loop._getaddeinfo() result')
665+
async def create_server(self, protocol_factory, str host, int port,
666+
*,
667+
int family=uv.AF_UNSPEC,
668+
int flags=uv.AI_PASSIVE,
669+
sock=None,
670+
int backlog=100,
671+
ssl=None, # not implemented
672+
reuse_address=None, # ignored, libuv sets it
673+
reuse_port=None): # ignored
669674

670-
cdef system.addrinfo *ai = (<AddrInfo>addrinfo).data
671-
if ai is NULL:
672-
raise RuntimeError('loop._getaddeinfo() result is NULL')
675+
cdef:
676+
UVTCPServer tcp
677+
system.addrinfo *addrinfo
678+
Server server = Server(self)
679+
680+
if ssl is not None:
681+
raise NotImplementedError('SSL is not yet supported')
682+
683+
if host is not None or port is not None:
684+
if sock is not None:
685+
raise ValueError(
686+
'host/port and sock can not be specified at the same time')
687+
688+
if host == '':
689+
hosts = [None]
690+
elif (isinstance(host, str) or not isinstance(host, col_Iterable)):
691+
hosts = [host]
692+
else:
693+
hosts = host
694+
695+
fs = [self._getaddrinfo(host, port, family,
696+
uv.SOCK_STREAM, 0, flags,
697+
0) for host in hosts]
673698

674-
cdef UVTCPServer srv = UVTCPServer.new(self, protocol_factory)
699+
infos = await aio_gather(*fs, loop=self)
700+
701+
completed = False
702+
try:
703+
for info in infos:
704+
addrinfo = (<AddrInfo>info).data
705+
while addrinfo != NULL:
706+
tcp = UVTCPServer.new(self, protocol_factory, server)
707+
tcp.bind(addrinfo.ai_addr)
708+
tcp.listen(backlog)
709+
710+
server._add_server(tcp)
711+
712+
addrinfo = addrinfo.ai_next
713+
714+
completed = True
715+
finally:
716+
if not completed:
717+
server.close()
718+
else:
719+
tcp = UVTCPServer.new(self, protocol_factory, server)
720+
tcp.open(sock.fileno())
721+
tcp.listen(backlog)
722+
server._add_server(tcp)
675723

676-
srv.bind(ai.ai_addr)
677-
srv.listen()
678-
self._servers.add(srv) # XXX
679-
return srv
724+
return server
680725

681726
def default_exception_handler(self, context):
682727
message = context.get('message')

uvloop/server.pxd

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
cdef class Server:
2+
cdef:
3+
list _servers
4+
list _waiters
5+
int _active_count
6+
Loop _loop
7+
8+
9+
cdef _add_server(self, UVTCPServer srv)
10+
cdef _wakeup(self)
11+
cdef _attach(self)
12+
cdef _detach(self)

0 commit comments

Comments
 (0)