import weakref
from weakref import ref, _remove_dead_weakref
from _weakref import ref
import types
import asyncio
def get_method_vars(m):
f = m.__func__
obj = m.__self__
return f, obj
def isfunction(m):
return isinstance(m, types.FunctionType)
def iscoroutinefunction(obj):
return asyncio.iscoroutinefunction(obj)
[docs]class WeakMethodContainer(weakref.WeakValueDictionary):
"""Container to store weak references to callbacks
Instance methods are stored using the underlying :term:`function` object
and the instance id (using :func:`id(obj) <id>`) as the key (a two-tuple)
and the object itself as the value. This ensures proper weak referencing.
Functions are stored using the string "function" and the id of the function
as the key (a two-tuple).
"""
[docs] def add_method(self, m, **kwargs):
"""Add an instance method or function
Args:
m: The instance method or function to store
"""
if isfunction(m):
self['function', id(m)] = m
else:
f, obj = get_method_vars(m)
wrkey = (f, id(obj))
self[wrkey] = obj
[docs] def del_method(self, m):
"""Remove an instance method or function if it exists
Args:
m: The instance method or function to remove
"""
if isfunction(m):
wrkey = ('function', id(m))
else:
f, obj = get_method_vars(m)
wrkey = (f, id(obj))
if wrkey in self:
del self[wrkey]
[docs] def del_instance(self, obj):
"""Remove any stored instance methods that belong to an object
Args:
obj: The instance object to remove
"""
to_remove = set()
for wrkey, _obj in self.iter_instances():
if obj is _obj:
to_remove.add(wrkey)
for wrkey in to_remove:
del self[wrkey]
[docs] def iter_instances(self):
"""Iterate over the stored objects
Yields:
wrkey: The two-tuple key used to store the object
obj: The instance or function object
"""
for wrkey in set(self.keys()):
obj = self.get(wrkey)
if obj is None:
continue
yield wrkey, obj
[docs] def iter_methods(self):
"""Iterate over stored functions and instance methods
Yields:
Instance methods or function objects
"""
for wrkey, obj in self.iter_instances():
f, obj_id = wrkey
if f == 'function':
yield self[wrkey]
else:
yield getattr(obj, f.__name__)
class InformativeDict(dict):
def __delitem__(self, key):
super(InformativeDict, self).__delitem__(key)
self.del_callback(key)
[docs]class EmissionHoldLock:
"""Context manager used for :meth:`pydispatch.dispatch.Dispatcher.emission_lock`
Supports use as a :term:`context manager` used in :keyword:`with` statements
and an :term:`asynchronous context manager` when used in
:keyword:`async with` statements.
Args:
event_instance: The :class:`~pydispatch.dispatch.Event` instance
associated with the lock
Attributes:
event_instance: The :class:`~pydispatch.dispatch.Event` instance
associated with the lock
last_event: The positional and keyword arguments from the event's last
emission as a two-tuple. If no events were triggered while the lock
was held, :obj:`None`.
held (bool): The internal state of the lock
"""
def __init__(self, event_instance):
self.event_instance = event_instance
self.last_event = None
self.held = False
@property
def aio_locks(self):
d = getattr(self, '_aio_locks', None)
if d is None:
d = self._aio_locks = {}
return d
def acquire(self):
if self.held:
return
self.held = True
self.last_event = None
def release(self):
if not self.held:
return
if self.last_event is not None:
args, kwargs = self.last_event
self.last_event = None
self.held = False
self.event_instance(*args, **kwargs)
async def acquire_async(self):
self.acquire()
lock = await self._build_aio_lock()
if not lock.locked():
await lock.acquire()
async def release_async(self):
lock = await self._build_aio_lock()
if lock.locked:
lock.release()
self.release()
def __enter__(self):
self.acquire()
return self
def __exit__(self, *args):
self.release()
async def __aenter__(self):
await self.acquire_async()
return self
async def __aexit__(self, *args):
await self.release_async()
async def _build_aio_lock(self):
loop = asyncio.get_event_loop()
key = id(loop)
lock = self.aio_locks.get(key)
if lock is None:
lock = asyncio.Lock()
self.aio_locks[key] = lock
return lock