Interfaces
bunnet.odm.interfaces.getters
bunnet.odm.interfaces.detector
bunnet.odm.interfaces.update
UpdateMethods
class UpdateMethods()
Update methods
UpdateMethods.set
def set(expression: Dict[Union[ExpressionField, str], Any],
session: Optional[ClientSession] = None,
bulk_writer: Optional[BulkWriter] = None,
**kwargs)
Set values
Example:
class Sample(Document): one: int Document.find(Sample.one == 1).set({Sample.one: 100})
Uses Set operator
Arguments:
expression
: Dict[Union[ExpressionField, str], Any] - keys and values to setsession
: Optional[ClientSession] - pymongo sessionbulk_writer
: Optional[BulkWriter] - bulk writerReturns:
self
UpdateMethods.current_date
def current_date(expression: Dict[Union[ExpressionField, str], Any],
session: Optional[ClientSession] = None,
bulk_writer: Optional[BulkWriter] = None,
**kwargs)
Set current date
Uses CurrentDate operator
Arguments:
expression
: Dict[Union[ExpressionField, str], Any]session
: Optional[ClientSession] - pymongo sessionbulk_writer
: Optional[BulkWriter] - bulk writerReturns:
self
UpdateMethods.inc
def inc(expression: Dict[Union[ExpressionField, str], Any],
session: Optional[ClientSession] = None,
bulk_writer: Optional[BulkWriter] = None,
**kwargs)
Increment
Example:
class Sample(Document): one: int Document.find(Sample.one == 1).inc({Sample.one: 100})
Uses Inc operator
Arguments:
expression
: Dict[Union[ExpressionField, str], Any]session
: Optional[ClientSession] - pymongo sessionbulk_writer
: Optional[BulkWriter] - bulk writerReturns:
self
bunnet.odm.interfaces.find
FindInterface
class FindInterface()
FindInterface.find_one
@classmethod
def find_one(
cls: Type[FindType],
*args: Union[Mapping[str, Any], bool],
projection_model: Optional[Type["DocumentProjectionType"]] = None,
session: Optional[ClientSession] = None,
ignore_cache: bool = False,
fetch_links: bool = False,
with_children: bool = False,
nesting_depth: Optional[int] = None,
nesting_depths_per_field: Optional[Dict[str, int]] = None,
**pymongo_kwargs
) -> Union[FindOne[FindType], FindOne["DocumentProjectionType"]]
Find one document by criteria.
Returns FindOne query object. When awaited this will either return a document or None if no document exists for the search criteria.
Arguments:
args
: *Mapping[str, Any] - search criteriaprojection_model
: Optional[Type[BaseModel]] - projection modelsession
: Optional[ClientSession] - pymongo session instanceignore_cache
: bool**pymongo_kwargs
: pymongo native parameters for find operation (if Document class contains links, this parameter must fit the respective parameter of the aggregate MongoDB function)Returns:
FindOne - find query instance
FindInterface.find_many
@classmethod
def find_many(
cls: Type[FindType],
*args: Union[Mapping[str, Any], bool],
projection_model: Optional[Type["DocumentProjectionType"]] = None,
skip: Optional[int] = None,
limit: Optional[int] = None,
sort: Union[None, str, List[Tuple[str, SortDirection]]] = None,
session: Optional[ClientSession] = None,
ignore_cache: bool = False,
fetch_links: bool = False,
with_children: bool = False,
lazy_parse: bool = False,
nesting_depth: Optional[int] = None,
nesting_depths_per_field: Optional[Dict[str, int]] = None,
**pymongo_kwargs
) -> Union[FindMany[FindType], FindMany["DocumentProjectionType"]]
Find many documents by criteria.
Returns FindMany query object
Arguments:
args
: *Mapping[str, Any] - search criteriaskip
: Optional[int] - The number of documents to omit.limit
: Optional[int] - The maximum number of results to return.sort
: Union[None, str, List[Tuple[str, SortDirection]]] - A key or a list of (key, direction) pairs specifying the sort order for this query.projection_model
: Optional[Type[BaseModel]] - projection modelsession
: Optional[ClientSession] - pymongo sessionignore_cache
: boollazy_parse
: bool**pymongo_kwargs
: pymongo native parameters for find operation (if Document class contains links, this parameter must fit the respective parameter of the aggregate MongoDB function)Returns:
FindMany - query instance
FindInterface.find
@classmethod
def find(
cls: Type[FindType],
*args: Union[Mapping[str, Any], bool],
projection_model: Optional[Type["DocumentProjectionType"]] = None,
skip: Optional[int] = None,
limit: Optional[int] = None,
sort: Union[None, str, List[Tuple[str, SortDirection]]] = None,
session: Optional[ClientSession] = None,
ignore_cache: bool = False,
fetch_links: bool = False,
with_children: bool = False,
lazy_parse: bool = False,
nesting_depth: Optional[int] = None,
nesting_depths_per_field: Optional[Dict[str, int]] = None,
**pymongo_kwargs
) -> Union[FindMany[FindType], FindMany["DocumentProjectionType"]]
The same as find_many
FindInterface.find_all
@classmethod
def find_all(
cls: Type[FindType],
skip: Optional[int] = None,
limit: Optional[int] = None,
sort: Union[None, str, List[Tuple[str, SortDirection]]] = None,
projection_model: Optional[Type["DocumentProjectionType"]] = None,
session: Optional[ClientSession] = None,
ignore_cache: bool = False,
with_children: bool = False,
lazy_parse: bool = False,
nesting_depth: Optional[int] = None,
nesting_depths_per_field: Optional[Dict[str, int]] = None,
**pymongo_kwargs
) -> Union[FindMany[FindType], FindMany["DocumentProjectionType"]]
Get all the documents
Arguments:
skip
: Optional[int] - The number of documents to omit.limit
: Optional[int] - The maximum number of results to return.sort
: Union[None, str, List[Tuple[str, SortDirection]]] - A key or a list of (key, direction) pairs specifying the sort order for this query.projection_model
: Optional[Type[BaseModel]] - projection modelsession
: Optional[ClientSession] - pymongo session**pymongo_kwargs
: pymongo native parameters for find operation (if Document class contains links, this parameter must fit the respective parameter of the aggregate MongoDB function)Returns:
FindMany - query instance
FindInterface.all
@classmethod
def all(
cls: Type[FindType],
projection_model: Optional[Type["DocumentProjectionType"]] = None,
skip: Optional[int] = None,
limit: Optional[int] = None,
sort: Union[None, str, List[Tuple[str, SortDirection]]] = None,
session: Optional[ClientSession] = None,
ignore_cache: bool = False,
with_children: bool = False,
lazy_parse: bool = False,
nesting_depth: Optional[int] = None,
nesting_depths_per_field: Optional[Dict[str, int]] = None,
**pymongo_kwargs
) -> Union[FindMany[FindType], FindMany["DocumentProjectionType"]]
the same as find_all
FindInterface.count
@classmethod
def count(cls) -> int
Number of documents in the collections
The same as find_all().count()
Returns:
int
bunnet.odm.interfaces.setters
SettersInterface
class SettersInterface()
SettersInterface.set_collection
@classmethod
def set_collection(cls, collection)
Collection setter
SettersInterface.set_database
@classmethod
def set_database(cls, database)
Database setter
SettersInterface.set_collection_name
@classmethod
def set_collection_name(cls, name: str)
Collection name setter
bunnet.odm.interfaces.run
bunnet.odm.interfaces.session
SessionMethods
class SessionMethods()
Session methods
SessionMethods.set_session
def set_session(session: Optional[ClientSession] = None)
Set pymongo session
Arguments:
session
: Optional[ClientSession] - pymongo session
bunnet.odm.interfaces.aggregate
AggregateInterface
class AggregateInterface()
AggregateInterface.aggregate
@classmethod
def aggregate(
cls: Type[DocType],
aggregation_pipeline: list,
projection_model: Optional[Type[DocumentProjectionType]] = None,
session: Optional[ClientSession] = None,
ignore_cache: bool = False,
**pymongo_kwargs
) -> Union[
AggregationQuery[Dict[str, Any]],
AggregationQuery[DocumentProjectionType],
]
Aggregate over collection.
Returns AggregationQuery query object
Arguments:
aggregation_pipeline
: list - aggregation pipelineprojection_model
: Type[BaseModel]session
: Optional[ClientSession]ignore_cache
: bool**pymongo_kwargs
: pymongo native parameters for aggregate operationReturns:
bunnet.odm.interfaces.inheritance
bunnet.odm.interfaces.aggregation_methods
AggregateMethods
class AggregateMethods()
Aggregate methods
AggregateMethods.sum
def sum(field: Union[str, ExpressionField],
session: Optional[ClientSession] = None,
ignore_cache: bool = False) -> Optional[float]
Sum of values of the given field
Example:
class Sample(Document): price: int count: int sum_count = Document.find(Sample.price <= 100).sum(Sample.count)
Arguments:
field
: Union[str, ExpressionField]session
: Optional[ClientSession] - pymongo sessionignore_cache
: boolReturns:
float - sum. None if there are no items.
AggregateMethods.avg
def avg(field,
session: Optional[ClientSession] = None,
ignore_cache: bool = False) -> Optional[float]
Average of values of the given field
Example:
class Sample(Document): price: int count: int avg_count = Document.find(Sample.price <= 100).avg(Sample.count)
Arguments:
field
: Union[str, ExpressionField]session
: Optional[ClientSession] - pymongo sessionignore_cache
: boolReturns:
Optional[float] - avg. None if there are no items.
AggregateMethods.max
def max(field: Union[str, ExpressionField],
session: Optional[ClientSession] = None,
ignore_cache: bool = False) -> Optional[float]
Max of the values of the given field
Example:
class Sample(Document): price: int count: int max_count = Document.find(Sample.price <= 100).max(Sample.count)
Arguments:
field
: Union[str, ExpressionField]session
: Optional[ClientSession] - pymongo sessionReturns:
float - max. None if there are no items.
AggregateMethods.min
def min(field: Union[str, ExpressionField],
session: Optional[ClientSession] = None,
ignore_cache: bool = False) -> Optional[float]
Min of the values of the given field
Example:
class Sample(Document): price: int count: int min_count = Document.find(Sample.price <= 100).min(Sample.count)
Arguments:
field
: Union[str, ExpressionField]session
: Optional[ClientSession] - pymongo sessionReturns:
float - min. None if there are no items.