"""functools.py - Tools for working with functions and callable objects"""# Python module wrapper for _functools C module# to allow utilities written in Python to be added# to the functools module.# Written by Nick Coghlan <ncoghlan at gmail.com>,# Raymond Hettinger <python at rcn.com>,# and Łukasz Langa <lukasz at langa.pl>.# Copyright (C) 2006-2013 Python Software Foundation.# See C source code for _functools credits/copyright__all__=['update_wrapper','wraps','WRAPPER_ASSIGNMENTS','WRAPPER_UPDATES','total_ordering','cache','cmp_to_key','lru_cache','reduce','partial','partialmethod','singledispatch','singledispatchmethod','cached_property']fromabcimportget_cache_tokenfromcollectionsimportnamedtuple# import types, weakref # Deferred to single_dispatch()fromreprlibimportrecursive_reprfrom_threadimportRLockfromtypesimportGenericAlias################################################################################### update_wrapper() and wraps() decorator################################################################################# update_wrapper() and wraps() are tools to help write# wrapper functions that can handle naive introspectionWRAPPER_ASSIGNMENTS=('__module__','__name__','__qualname__','__doc__','__annotations__')WRAPPER_UPDATES=('__dict__',)defupdate_wrapper(wrapper,wrapped,assigned=WRAPPER_ASSIGNMENTS,updated=WRAPPER_UPDATES):"""Update a wrapper function to look like the wrapped function wrapper is the function to be updated wrapped is the original function assigned is a tuple naming the attributes assigned directly from the wrapped function to the wrapper function (defaults to functools.WRAPPER_ASSIGNMENTS) updated is a tuple naming the attributes of the wrapper that are updated with the corresponding attribute from the wrapped function (defaults to functools.WRAPPER_UPDATES) """forattrinassigned:try:value=getattr(wrapped,attr)exceptAttributeError:passelse:setattr(wrapper,attr,value)forattrinupdated:getattr(wrapper,attr).update(getattr(wrapped,attr,{}))# Issue #17482: set __wrapped__ last so we don't inadvertently copy it# from the wrapped function when updating __dict__wrapper.__wrapped__=wrapped# Return the wrapper so this can be used as a decorator via partial()returnwrapperdefwraps(wrapped,assigned=WRAPPER_ASSIGNMENTS,updated=WRAPPER_UPDATES):"""Decorator factory to apply update_wrapper() to a wrapper function Returns a decorator that invokes update_wrapper() with the decorated function as the wrapper argument and the arguments to wraps() as the remaining arguments. Default arguments are as for update_wrapper(). This is a convenience function to simplify applying partial() to update_wrapper(). """returnpartial(update_wrapper,wrapped=wrapped,assigned=assigned,updated=updated)################################################################################### total_ordering class decorator################################################################################# The total ordering functions all invoke the root magic method directly# rather than using the corresponding operator. This avoids possible# infinite recursion that could occur when the operator dispatch logic# detects a NotImplemented result and then calls a reflected method.def_gt_from_lt(self,other,NotImplemented=NotImplemented):'Return a > b. Computed by @total_ordering from (not a < b) and (a != b).'op_result=type(self).__lt__(self,other)ifop_resultisNotImplemented:returnop_resultreturnnotop_resultandself!=otherdef_le_from_lt(self,other,NotImplemented=NotImplemented):'Return a <= b. Computed by @total_ordering from (a < b) or (a == b).'op_result=type(self).__lt__(self,other)ifop_resultisNotImplemented:returnop_resultreturnop_resultorself==otherdef_ge_from_lt(self,other,NotImplemented=NotImplemented):'Return a >= b. Computed by @total_ordering from (not a < b).'op_result=type(self).__lt__(self,other)ifop_resultisNotImplemented:returnop_resultreturnnotop_resultdef_ge_from_le(self,other,NotImplemented=NotImplemented):'Return a >= b. Computed by @total_ordering from (not a <= b) or (a == b).'op_result=type(self).__le__(self,other)ifop_resultisNotImplemented:returnop_resultreturnnotop_resultorself==otherdef_lt_from_le(self,other,NotImplemented=NotImplemented):'Return a < b. Computed by @total_ordering from (a <= b) and (a != b).'op_result=type(self).__le__(self,other)ifop_resultisNotImplemented:returnop_resultreturnop_resultandself!=otherdef_gt_from_le(self,other,NotImplemented=NotImplemented):'Return a > b. Computed by @total_ordering from (not a <= b).'op_result=type(self).__le__(self,other)ifop_resultisNotImplemented:returnop_resultreturnnotop_resultdef_lt_from_gt(self,other,NotImplemented=NotImplemented):'Return a < b. Computed by @total_ordering from (not a > b) and (a != b).'op_result=type(self).__gt__(self,other)ifop_resultisNotImplemented:returnop_resultreturnnotop_resultandself!=otherdef_ge_from_gt(self,other,NotImplemented=NotImplemented):'Return a >= b. Computed by @total_ordering from (a > b) or (a == b).'op_result=type(self).__gt__(self,other)ifop_resultisNotImplemented:returnop_resultreturnop_resultorself==otherdef_le_from_gt(self,other,NotImplemented=NotImplemented):'Return a <= b. Computed by @total_ordering from (not a > b).'op_result=type(self).__gt__(self,other)ifop_resultisNotImplemented:returnop_resultreturnnotop_resultdef_le_from_ge(self,other,NotImplemented=NotImplemented):'Return a <= b. Computed by @total_ordering from (not a >= b) or (a == b).'op_result=type(self).__ge__(self,other)ifop_resultisNotImplemented:returnop_resultreturnnotop_resultorself==otherdef_gt_from_ge(self,other,NotImplemented=NotImplemented):'Return a > b. Computed by @total_ordering from (a >= b) and (a != b).'op_result=type(self).__ge__(self,other)ifop_resultisNotImplemented:returnop_resultreturnop_resultandself!=otherdef_lt_from_ge(self,other,NotImplemented=NotImplemented):'Return a < b. Computed by @total_ordering from (not a >= b).'op_result=type(self).__ge__(self,other)ifop_resultisNotImplemented:returnop_resultreturnnotop_result_convert={'__lt__':[('__gt__',_gt_from_lt),('__le__',_le_from_lt),('__ge__',_ge_from_lt)],'__le__':[('__ge__',_ge_from_le),('__lt__',_lt_from_le),('__gt__',_gt_from_le)],'__gt__':[('__lt__',_lt_from_gt),('__ge__',_ge_from_gt),('__le__',_le_from_gt)],'__ge__':[('__le__',_le_from_ge),('__gt__',_gt_from_ge),('__lt__',_lt_from_ge)]}deftotal_ordering(cls):"""Class decorator that fills in missing ordering methods"""# Find user-defined comparisons (not those inherited from object).roots={opforopin_convertifgetattr(cls,op,None)isnotgetattr(object,op,None)}ifnotroots:raiseValueError('must define at least one ordering operation: < > <= >=')root=max(roots)# prefer __lt__ to __le__ to __gt__ to __ge__foropname,opfuncin_convert[root]:ifopnamenotinroots:opfunc.__name__=opnamesetattr(cls,opname,opfunc)returncls################################################################################### cmp_to_key() function converter################################################################################defcmp_to_key(mycmp):"""Convert a cmp= function into a key= function"""classK(object):__slots__=['obj']def__init__(self,obj):self.obj=objdef__lt__(self,other):returnmycmp(self.obj,other.obj)<0def__gt__(self,other):returnmycmp(self.obj,other.obj)>0def__eq__(self,other):returnmycmp(self.obj,other.obj)==0def__le__(self,other):returnmycmp(self.obj,other.obj)<=0def__ge__(self,other):returnmycmp(self.obj,other.obj)>=0__hash__=NonereturnKtry:from_functoolsimportcmp_to_keyexceptImportError:pass################################################################################### reduce() sequence to a single item################################################################################_initial_missing=object()defreduce(function,sequence,initial=_initial_missing):""" reduce(function, sequence[, initial]) -> value Apply a function of two arguments cumulatively to the items of a sequence, from left to right, so as to reduce the sequence to a single value. For example, reduce(lambda x, y: x+y, [1, 2, 3, 4, 5]) calculates ((((1+2)+3)+4)+5). If initial is present, it is placed before the items of the sequence in the calculation, and serves as a default when the sequence is empty. """it=iter(sequence)ifinitialis_initial_missing:try:value=next(it)exceptStopIteration:raiseTypeError("reduce() of empty sequence with no initial value")fromNoneelse:value=initialforelementinit:value=function(value,element)returnvaluetry:from_functoolsimportreduceexceptImportError:pass################################################################################### partial() argument application################################################################################# Purely functional, no descriptor behaviourclasspartial:"""New function with partial application of the given arguments and keywords. """__slots__="func","args","keywords","__dict__","__weakref__"def__new__(cls,func,/,*args,**keywords):ifnotcallable(func):raiseTypeError("the first argument must be callable")ifhasattr(func,"func"):args=func.args+argskeywords={**func.keywords,**keywords}func=func.funcself=super(partial,cls).__new__(cls)self.func=funcself.args=argsself.keywords=keywordsreturnselfdef__call__(self,/,*args,**keywords):keywords={**self.keywords,**keywords}returnself.func(*self.args,*args,**keywords)@recursive_repr()def__repr__(self):qualname=type(self).__qualname__args=[repr(self.func)]args.extend(repr(x)forxinself.args)args.extend(f"{k}={v!r}"for(k,v)inself.keywords.items())iftype(self).__module__=="functools":returnf"functools.{qualname}({', '.join(args)})"returnf"{qualname}({', '.join(args)})"def__reduce__(self):returntype(self),(self.func,),(self.func,self.args,self.keywordsorNone,self.__dict__orNone)def__setstate__(self,state):ifnotisinstance(state,tuple):raiseTypeError("argument to __setstate__ must be a tuple")iflen(state)!=4:raiseTypeError(f"expected 4 items in state, got {len(state)}")func,args,kwds,namespace=stateif(notcallable(func)ornotisinstance(args,tuple)or(kwdsisnotNoneandnotisinstance(kwds,dict))or(namespaceisnotNoneandnotisinstance(namespace,dict))):raiseTypeError("invalid partial state")args=tuple(args)# just in case it's a subclassifkwdsisNone:kwds={}eliftype(kwds)isnotdict:# XXX does it need to be *exactly* dict?kwds=dict(kwds)ifnamespaceisNone:namespace={}self.__dict__=namespaceself.func=funcself.args=argsself.keywords=kwdstry:from_functoolsimportpartialexceptImportError:pass# Descriptor versionclasspartialmethod(object):"""Method descriptor with partial application of the given arguments and keywords. Supports wrapping existing descriptors and handles non-descriptor callables as instance methods. """def__init__(self,func,/,*args,**keywords):ifnotcallable(func)andnothasattr(func,"__get__"):raiseTypeError("{!r} is not callable or a descriptor".format(func))# func could be a descriptor like classmethod which isn't callable,# so we can't inherit from partial (it verifies func is callable)ifisinstance(func,partialmethod):# flattening is mandatory in order to place cls/self before all# other arguments# it's also more efficient since only one function will be calledself.func=func.funcself.args=func.args+argsself.keywords={**func.keywords,**keywords}else:self.func=funcself.args=argsself.keywords=keywordsdef__repr__(self):args=", ".join(map(repr,self.args))keywords=", ".join("{}={!r}".format(k,v)fork,vinself.keywords.items())format_string="{module}.{cls}({func}, {args}, {keywords})"returnformat_string.format(module=self.__class__.__module__,cls=self.__class__.__qualname__,func=self.func,args=args,keywords=keywords)def_make_unbound_method(self):def_method(cls_or_self,/,*args,**keywords):keywords={**self.keywords,**keywords}returnself.func(cls_or_self,*self.args,*args,**keywords)_method.__isabstractmethod__=self.__isabstractmethod___method._partialmethod=selfreturn_methoddef__get__(self,obj,cls=None):get=getattr(self.func,"__get__",None)result=NoneifgetisnotNone:new_func=get(obj,cls)ifnew_funcisnotself.func:# Assume __get__ returning something new indicates the# creation of an appropriate callableresult=partial(new_func,*self.args,**self.keywords)try:result.__self__=new_func.__self__exceptAttributeError:passifresultisNone:# If the underlying descriptor didn't do anything, treat this# like an instance methodresult=self._make_unbound_method().__get__(obj,cls)returnresult@propertydef__isabstractmethod__(self):returngetattr(self.func,"__isabstractmethod__",False)__class_getitem__=classmethod(GenericAlias)# Helper functionsdef_unwrap_partial(func):whileisinstance(func,partial):func=func.funcreturnfunc################################################################################### LRU Cache function decorator################################################################################_CacheInfo=namedtuple("CacheInfo",["hits","misses","maxsize","currsize"])class_HashedSeq(list):""" This class guarantees that hash() will be called no more than once per element. This is important because the lru_cache() will hash the key multiple times on a cache miss. """__slots__='hashvalue'def__init__(self,tup,hash=hash):self[:]=tupself.hashvalue=hash(tup)def__hash__(self):returnself.hashvaluedef_make_key(args,kwds,typed,kwd_mark=(object(),),fasttypes={int,str},tuple=tuple,type=type,len=len):"""Make a cache key from optionally typed positional and keyword arguments The key is constructed in a way that is flat as possible rather than as a nested structure that would take more memory. If there is only a single argument and its data type is known to cache its hash value, then that argument is returned without a wrapper. This saves space and improves lookup speed. """# All of code below relies on kwds preserving the order input by the user.# Formerly, we sorted() the kwds before looping. The new way is *much*# faster; however, it means that f(x=1, y=2) will now be treated as a# distinct call from f(y=2, x=1) which will be cached separately.key=argsifkwds:key+=kwd_markforiteminkwds.items():key+=itemiftyped:key+=tuple(type(v)forvinargs)ifkwds:key+=tuple(type(v)forvinkwds.values())eliflen(key)==1andtype(key[0])infasttypes:returnkey[0]return_HashedSeq(key)deflru_cache(maxsize=128,typed=False):"""Least-recently-used cache decorator. If *maxsize* is set to None, the LRU features are disabled and the cache can grow without bound. If *typed* is True, arguments of different types will be cached separately. For example, f(3.0) and f(3) will be treated as distinct calls with distinct results. Arguments to the cached function must be hashable. View the cache statistics named tuple (hits, misses, maxsize, currsize) with f.cache_info(). Clear the cache and statistics with f.cache_clear(). Access the underlying function with f.__wrapped__. See: https://en.wikipedia.org/wiki/Cache_replacement_policies#Least_recently_used_(LRU) """# Users should only access the lru_cache through its public API:# cache_info, cache_clear, and f.__wrapped__# The internals of the lru_cache are encapsulated for thread safety and# to allow the implementation to change (including a possible C version).ifisinstance(maxsize,int):# Negative maxsize is treated as 0ifmaxsize<0:maxsize=0elifcallable(maxsize)andisinstance(typed,bool):# The user_function was passed in directly via the maxsize argumentuser_function,maxsize=maxsize,128wrapper=_lru_cache_wrapper(user_function,maxsize,typed,_CacheInfo)wrapper.cache_parameters=lambda:{'maxsize':maxsize,'typed':typed}returnupdate_wrapper(wrapper,user_function)elifmaxsizeisnotNone:raiseTypeError('Expected first argument to be an integer, a callable, or None')defdecorating_function(user_function):wrapper=_lru_cache_wrapper(user_function,maxsize,typed,_CacheInfo)wrapper.cache_parameters=lambda:{'maxsize':maxsize,'typed':typed}returnupdate_wrapper(wrapper,user_function)returndecorating_functiondef_lru_cache_wrapper(user_function,maxsize,typed,_CacheInfo):# Constants shared by all lru cache instances:sentinel=object()# unique object used to signal cache missesmake_key=_make_key# build a key from the function argumentsPREV,NEXT,KEY,RESULT=0,1,2,3# names for the link fieldscache={}hits=misses=0full=Falsecache_get=cache.get# bound method to lookup a key or return Nonecache_len=cache.__len__# get cache size without calling len()lock=RLock()# because linkedlist updates aren't threadsaferoot=[]# root of the circular doubly linked listroot[:]=[root,root,None,None]# initialize by pointing to selfifmaxsize==0:defwrapper(*args,**kwds):# No caching -- just a statistics updatenonlocalmissesmisses+=1result=user_function(*args,**kwds)returnresultelifmaxsizeisNone:defwrapper(*args,**kwds):# Simple caching without ordering or size limitnonlocalhits,misseskey=make_key(args,kwds,typed)result=cache_get(key,sentinel)ifresultisnotsentinel:hits+=1returnresultmisses+=1result=user_function(*args,**kwds)cache[key]=resultreturnresultelse:defwrapper(*args,**kwds):# Size limited caching that tracks accesses by recencynonlocalroot,hits,misses,fullkey=make_key(args,kwds,typed)withlock:link=cache_get(key)iflinkisnotNone:# Move the link to the front of the circular queuelink_prev,link_next,_key,result=linklink_prev[NEXT]=link_nextlink_next[PREV]=link_prevlast=root[PREV]last[NEXT]=root[PREV]=linklink[PREV]=lastlink[NEXT]=roothits+=1returnresultmisses+=1result=user_function(*args,**kwds)withlock:ifkeyincache:# Getting here means that this same key was added to the# cache while the lock was released. Since the link# update is already done, we need only return the# computed result and update the count of misses.passeliffull:# Use the old root to store the new key and result.oldroot=rootoldroot[KEY]=keyoldroot[RESULT]=result# Empty the oldest link and make it the new root.# Keep a reference to the old key and old result to# prevent their ref counts from going to zero during the# update. That will prevent potentially arbitrary object# clean-up code (i.e. __del__) from running while we're# still adjusting the links.root=oldroot[NEXT]oldkey=root[KEY]oldresult=root[RESULT]root[KEY]=root[RESULT]=None# Now update the cache dictionary.delcache[oldkey]# Save the potentially reentrant cache[key] assignment# for last, after the root and links have been put in# a consistent state.cache[key]=oldrootelse:# Put result in a new link at the front of the queue.last=root[PREV]link=[last,root,key,result]last[NEXT]=root[PREV]=cache[key]=link# Use the cache_len bound method instead of the len() function# which could potentially be wrapped in an lru_cache itself.full=(cache_len()>=maxsize)returnresultdefcache_info():"""Report cache statistics"""withlock:return_CacheInfo(hits,misses,maxsize,cache_len())defcache_clear():"""Clear the cache and cache statistics"""nonlocalhits,misses,fullwithlock:cache.clear()root[:]=[root,root,None,None]hits=misses=0full=Falsewrapper.cache_info=cache_infowrapper.cache_clear=cache_clearreturnwrappertry:from_functoolsimport_lru_cache_wrapperexceptImportError:pass################################################################################### cache -- simplified access to the infinity cache################################################################################defcache(user_function,/):'Simple lightweight unbounded cache. Sometimes called "memoize".'returnlru_cache(maxsize=None)(user_function)################################################################################### singledispatch() - single-dispatch generic function decorator################################################################################def_c3_merge(sequences):"""Merges MROs in *sequences* to a single MRO using the C3 algorithm. Adapted from https://www.python.org/download/releases/2.3/mro/. """result=[]whileTrue:sequences=[sforsinsequencesifs]# purge empty sequencesifnotsequences:returnresultfors1insequences:# find merge candidates among seq headscandidate=s1[0]fors2insequences:ifcandidateins2[1:]:candidate=Nonebreak# reject the current head, it appears laterelse:breakifcandidateisNone:raiseRuntimeError("Inconsistent hierarchy")result.append(candidate)# remove the chosen candidateforseqinsequences:ifseq[0]==candidate:delseq[0]def_c3_mro(cls,abcs=None):"""Computes the method resolution order using extended C3 linearization. If no *abcs* are given, the algorithm works exactly like the built-in C3 linearization used for method resolution. If given, *abcs* is a list of abstract base classes that should be inserted into the resulting MRO. Unrelated ABCs are ignored and don't end up in the result. The algorithm inserts ABCs where their functionality is introduced, i.e. issubclass(cls, abc) returns True for the class itself but returns False for all its direct base classes. Implicit ABCs for a given class (either registered or inferred from the presence of a special method like __len__) are inserted directly after the last ABC explicitly listed in the MRO of said class. If two implicit ABCs end up next to each other in the resulting MRO, their ordering depends on the order of types in *abcs*. """fori,baseinenumerate(reversed(cls.__bases__)):ifhasattr(base,'__abstractmethods__'):boundary=len(cls.__bases__)-ibreak# Bases up to the last explicit ABC are considered first.else:boundary=0abcs=list(abcs)ifabcselse[]explicit_bases=list(cls.__bases__[:boundary])abstract_bases=[]other_bases=list(cls.__bases__[boundary:])forbaseinabcs:ifissubclass(cls,base)andnotany(issubclass(b,base)forbincls.__bases__):# If *cls* is the class that introduces behaviour described by# an ABC *base*, insert said ABC to its MRO.abstract_bases.append(base)forbaseinabstract_bases:abcs.remove(base)explicit_c3_mros=[_c3_mro(base,abcs=abcs)forbaseinexplicit_bases]abstract_c3_mros=[_c3_mro(base,abcs=abcs)forbaseinabstract_bases]other_c3_mros=[_c3_mro(base,abcs=abcs)forbaseinother_bases]return_c3_merge([[cls]]+explicit_c3_mros+abstract_c3_mros+other_c3_mros+[explicit_bases]+[abstract_bases]+[other_bases])def_compose_mro(cls,types):"""Calculates the method resolution order for a given class *cls*. Includes relevant abstract base classes (with their respective bases) from the *types* iterable. Uses a modified C3 linearization algorithm. """bases=set(cls.__mro__)# Remove entries which are already present in the __mro__ or unrelated.defis_related(typ):return(typnotinbasesandhasattr(typ,'__mro__')andnotisinstance(typ,GenericAlias)andissubclass(cls,typ))types=[nfornintypesifis_related(n)]# Remove entries which are strict bases of other entries (they will end up# in the MRO anyway.defis_strict_base(typ):forotherintypes:iftyp!=otherandtypinother.__mro__:returnTruereturnFalsetypes=[nfornintypesifnotis_strict_base(n)]# Subclasses of the ABCs in *types* which are also implemented by# *cls* can be used to stabilize ABC ordering.type_set=set(types)mro=[]fortypintypes:found=[]forsubintyp.__subclasses__():ifsubnotinbasesandissubclass(cls,sub):found.append([sforsinsub.__mro__ifsintype_set])ifnotfound:mro.append(typ)continue# Favor subclasses with the biggest number of useful basesfound.sort(key=len,reverse=True)forsubinfound:forsubclsinsub:ifsubclsnotinmro:mro.append(subcls)return_c3_mro(cls,abcs=mro)def_find_impl(cls,registry):"""Returns the best matching implementation from *registry* for type *cls*. Where there is no registered implementation for a specific type, its method resolution order is used to find a more generic implementation. Note: if *registry* does not contain an implementation for the base *object* type, this function may return None. """mro=_compose_mro(cls,registry.keys())match=Nonefortinmro:ifmatchisnotNone:# If *match* is an implicit ABC but there is another unrelated,# equally matching implicit ABC, refuse the temptation to guess.if(tinregistryandtnotincls.__mro__andmatchnotincls.__mro__andnotissubclass(match,t)):raiseRuntimeError("Ambiguous dispatch: {} or {}".format(match,t))breakiftinregistry:match=treturnregistry.get(match)defsingledispatch(func):"""Single-dispatch generic function decorator. Transforms a function into a generic function, which can have different behaviours depending upon the type of its first argument. The decorated function acts as the default implementation, and additional implementations can be registered using the register() attribute of the generic function. """# There are many programs that use functools without singledispatch, so we# trade-off making singledispatch marginally slower for the benefit of# making start-up of such applications slightly faster.importtypes,weakrefregistry={}dispatch_cache=weakref.WeakKeyDictionary()cache_token=Nonedefdispatch(cls):"""generic_func.dispatch(cls) -> <function implementation> Runs the dispatch algorithm to return the best available implementation for the given *cls* registered on *generic_func*. """nonlocalcache_tokenifcache_tokenisnotNone:current_token=get_cache_token()ifcache_token!=current_token:dispatch_cache.clear()cache_token=current_tokentry:impl=dispatch_cache[cls]exceptKeyError:try:impl=registry[cls]exceptKeyError:impl=_find_impl(cls,registry)dispatch_cache[cls]=implreturnimpldef_is_valid_dispatch_type(cls):returnisinstance(cls,type)andnotisinstance(cls,GenericAlias)defregister(cls,func=None):"""generic_func.register(cls, func) -> func Registers a new implementation for the given *cls* on a *generic_func*. """nonlocalcache_tokenif_is_valid_dispatch_type(cls):iffuncisNone:returnlambdaf:register(cls,f)else:iffuncisnotNone:raiseTypeError(f"Invalid first argument to `register()`. "f"{cls!r} is not a class.")ann=getattr(cls,'__annotations__',{})ifnotann:raiseTypeError(f"Invalid first argument to `register()`: {cls!r}. "f"Use either `@register(some_class)` or plain `@register` "f"on an annotated function.")func=cls# only import typing if annotation parsing is necessaryfromtypingimportget_type_hintsargname,cls=next(iter(get_type_hints(func).items()))ifnot_is_valid_dispatch_type(cls):raiseTypeError(f"Invalid annotation for {argname!r}. "f"{cls!r} is not a class.")registry[cls]=funcifcache_tokenisNoneandhasattr(cls,'__abstractmethods__'):cache_token=get_cache_token()dispatch_cache.clear()returnfuncdefwrapper(*args,**kw):ifnotargs:raiseTypeError(f'{funcname} requires at least ''1 positional argument')returndispatch(args[0].__class__)(*args,**kw)funcname=getattr(func,'__name__','singledispatch function')registry[object]=funcwrapper.register=registerwrapper.dispatch=dispatchwrapper.registry=types.MappingProxyType(registry)wrapper._clear_cache=dispatch_cache.clearupdate_wrapper(wrapper,func)returnwrapper# Descriptor versionclasssingledispatchmethod:"""Single-dispatch generic method descriptor. Supports wrapping existing descriptors and handles non-descriptor callables as instance methods. """def__init__(self,func):ifnotcallable(func)andnothasattr(func,"__get__"):raiseTypeError(f"{func!r} is not callable or a descriptor")self.dispatcher=singledispatch(func)self.func=func# bpo-45678: special-casing for classmethod/staticmethod in Python <=3.9,# as functools.update_wrapper doesn't work properly in singledispatchmethod.__get__# if it is applied to an unbound classmethod/staticmethodifisinstance(func,(staticmethod,classmethod)):self._wrapped_func=func.__func__else:self._wrapped_func=funcdefregister(self,cls,method=None):"""generic_method.register(cls, func) -> func Registers a new implementation for the given *cls* on a *generic_method*. """# bpo-39679: in Python <= 3.9, classmethods and staticmethods don't# inherit __annotations__ of the wrapped function (fixed in 3.10+ as# a side-effect of bpo-43682) but we need that for annotation-derived# singledispatches. So we add that just-in-time here.ifisinstance(cls,(staticmethod,classmethod)):cls.__annotations__=getattr(cls.__func__,'__annotations__',{})returnself.dispatcher.register(cls,func=method)def__get__(self,obj,cls=None):def_method(*args,**kwargs):method=self.dispatcher.dispatch(args[0].__class__)returnmethod.__get__(obj,cls)(*args,**kwargs)_method.__isabstractmethod__=self.__isabstractmethod___method.register=self.registerupdate_wrapper(_method,self._wrapped_func)return_method@propertydef__isabstractmethod__(self):returngetattr(self.func,'__isabstractmethod__',False)################################################################################### cached_property() - computed once per instance, cached as attribute################################################################################_NOT_FOUND=object()classcached_property:def__init__(self,func):self.func=funcself.attrname=Noneself.__doc__=func.__doc__self.lock=RLock()def__set_name__(self,owner,name):ifself.attrnameisNone:self.attrname=nameelifname!=self.attrname:raiseTypeError("Cannot assign the same cached_property to two different names "f"({self.attrname!r} and {name!r}).")def__get__(self,instance,owner=None):ifinstanceisNone:returnselfifself.attrnameisNone:raiseTypeError("Cannot use cached_property instance without calling __set_name__ on it.")try:cache=instance.__dict__exceptAttributeError:# not all objects have __dict__ (e.g. class defines slots)msg=(f"No '__dict__' attribute on {type(instance).__name__!r} "f"instance to cache {self.attrname!r} property.")raiseTypeError(msg)fromNoneval=cache.get(self.attrname,_NOT_FOUND)ifvalis_NOT_FOUND:withself.lock:# check if another thread filled cache while we awaited lockval=cache.get(self.attrname,_NOT_FOUND)ifvalis_NOT_FOUND:val=self.func(instance)try:cache[self.attrname]=valexceptTypeError:msg=(f"The '__dict__' attribute on {type(instance).__name__!r} instance "f"does not support item assignment for caching {self.attrname!r} property.")raiseTypeError(msg)fromNonereturnval__class_getitem__=classmethod(GenericAlias)