Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 17 additions & 14 deletions cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,13 @@ def _try_libev_import():
except DependencyException as e:
return (None, e)

def _try_asyncio_import():
try:
from cassandra.io.asyncioreactor import AsyncioConnection
return (AsyncioConnection, None)
except (ImportError, DependencyException) as e:
return (None, e)

def _try_asyncore_import():
try:
from cassandra.io.asyncorereactor import AsyncoreConnection
Expand All @@ -168,7 +175,7 @@ def _connection_reduce_fn(val,import_fn):

log = logging.getLogger(__name__)

conn_fns = (_try_gevent_import, _try_eventlet_import, _try_libev_import, _try_asyncore_import)
conn_fns = (_try_gevent_import, _try_eventlet_import, _try_libev_import, _try_asyncio_import, _try_asyncore_import)
(conn_class, excs) = reduce(_connection_reduce_fn, conn_fns, (None,[]))
if not conn_class:
raise DependencyException("Unable to load a default connection class", excs)
Expand Down Expand Up @@ -876,25 +883,21 @@ def default_retry_policy(self, policy):
This determines what event loop system will be used for managing
I/O with Cassandra. These are the current options:

* :class:`cassandra.io.asyncorereactor.AsyncoreConnection`
* :class:`cassandra.io.libevreactor.LibevConnection`
* :class:`cassandra.io.asyncioreactor.AsyncioConnection`
* :class:`cassandra.io.asyncorereactor.AsyncoreConnection` (Python < 3.12 only)
* :class:`cassandra.io.eventletreactor.EventletConnection` (requires monkey-patching - see doc for details)
* :class:`cassandra.io.geventreactor.GeventConnection` (requires monkey-patching - see doc for details)
* :class:`cassandra.io.twistedreactor.TwistedConnection`
* EXPERIMENTAL: :class:`cassandra.io.asyncioreactor.AsyncioConnection`

By default, ``AsyncoreConnection`` will be used, which uses
the ``asyncore`` module in the Python standard library.

If ``libev`` is installed, ``LibevConnection`` will be used instead.

If ``gevent`` or ``eventlet`` monkey-patching is detected, the corresponding
connection class will be used automatically.
The default is selected automatically using the following priority:

``AsyncioConnection``, which uses the ``asyncio`` module in the Python
standard library, is also available, but currently experimental. Note that
it requires ``asyncio`` features that were only introduced in the 3.4 line
in 3.4.6, and in the 3.5 line in 3.5.1.
1. If ``gevent`` or ``eventlet`` monkey-patching is detected, the
corresponding connection class will be used.
2. If the ``libev`` C extension is available, ``LibevConnection`` is used.
3. ``AsyncioConnection`` is used as the standard-library fallback. This is
the preferred default on Python 3.12+ where ``asyncore`` was removed.
4. On Python < 3.12, ``AsyncoreConnection`` is used as a last resort.
"""

control_connection_timeout = 2.0
Expand Down
66 changes: 47 additions & 19 deletions cassandra/io/asyncioreactor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from cassandra.connection import Connection, ConnectionShutdown

import atexit
import asyncio
import logging
import os
Expand All @@ -11,19 +12,30 @@
log = logging.getLogger(__name__)


# This module uses ``yield from`` and ``@asyncio.coroutine`` over ``await`` and
# ``async def`` for pre-Python-3.5 compatibility, so keep in mind that the
# managed coroutines are generator-based, not native coroutines. See PEP 492:
# https://www.python.org/dev/peps/pep-0492/#coroutine-objects
def _cleanup():
"""
Module-level cleanup called at interpreter shutdown via atexit.
Stops the asyncio event loop and joins the loop thread.
"""
loop = AsyncioConnection._loop
thread = AsyncioConnection._loop_thread
if loop is not None:
try:
loop.call_soon_threadsafe(loop.stop)
except RuntimeError:
# loop may already be closed post-fork or during shutdown
pass
if thread is not None:
thread.join(timeout=1.0)
if thread.is_alive():
log.warning(
"Event loop thread could not be joined, so shutdown may not be clean. "
"Please call Cluster.shutdown() to avoid this.")
else:
log.debug("Event loop thread was joined")


try:
asyncio.run_coroutine_threadsafe
except AttributeError:
raise ImportError(
'Cannot use asyncioreactor without access to '
'asyncio.run_coroutine_threadsafe (added in 3.4.6 and 3.5.1)'
)
atexit.register(_cleanup)


class AsyncioTimer(object):
Expand Down Expand Up @@ -67,11 +79,12 @@ def finish(self):

class AsyncioConnection(Connection):
"""
An experimental implementation of :class:`.Connection` that uses the
``asyncio`` module in the Python standard library for its event loop.
An implementation of :class:`.Connection` that uses the ``asyncio``
module in the Python standard library for its event loop.

Note that it requires ``asyncio`` features that were only introduced in the
3.4 line in 3.4.6, and in the 3.5 line in 3.5.1.
This is the preferred connection class on Python 3.12+ where the
``asyncore`` module has been removed. It is also used as a fallback
when the libev C extension is not available.
"""

_loop = None
Expand Down Expand Up @@ -106,18 +119,33 @@ def __init__(self, *args, **kwargs):
def initialize_reactor(cls):
with cls._lock:
if cls._pid != os.getpid():
cls._loop = None
log.debug("Detected fork, clearing and reinitializing reactor state")
cls.handle_fork()
if cls._loop is None:
cls._loop = asyncio.new_event_loop()
asyncio.set_event_loop(cls._loop)

if not cls._loop_thread:
if not cls._loop_thread or not cls._loop_thread.is_alive():
# daemonize so the loop will be shut down on interpreter
# shutdown
cls._loop_thread = Thread(target=cls._loop.run_forever,
daemon=True, name="asyncio_thread")
cls._loop_thread.start()

@classmethod
def handle_fork(cls):
"""
Called after a fork. Cleans up any reactor state from the parent
process so that a fresh event loop can be started in the child.
"""
if cls._loop is not None:
try:
cls._loop.call_soon_threadsafe(cls._loop.stop)
except RuntimeError:
pass
cls._loop = None
cls._loop_thread = None
cls._pid = os.getpid()

@classmethod
def create_timer(cls, timeout, callback):
return AsyncioTimer(timeout, callback, loop=cls._loop)
Expand Down Expand Up @@ -173,7 +201,7 @@ def push(self, data):

async def _push_msg(self, chunks):
# This lock ensures all chunks of a message are sequential in the Queue
with await self._write_queue_lock:
async with self._write_queue_lock:
for chunk in chunks:
self._write_queue.put_nowait(chunk)

Expand Down
2 changes: 1 addition & 1 deletion tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def is_monkey_patched():
return is_gevent_monkey_patched() or is_eventlet_monkey_patched()

MONKEY_PATCH_LOOP = bool(os.getenv('MONKEY_PATCH_LOOP', False))
EVENT_LOOP_MANAGER = os.getenv('EVENT_LOOP_MANAGER', "libev")
EVENT_LOOP_MANAGER = os.getenv('EVENT_LOOP_MANAGER', "asyncio")


# If set to to true this will force the Cython tests to run regardless of whether they are installed
Expand Down
Loading
Loading