/ravendb-python-client

This is the official python client for RavenDB document database

Primary LanguagePythonMIT LicenseMIT

Official Python client for RavenDB NoSQL Database 🐍

Installation

Install from PyPi, as ravendb.

pip install ravendb

Introduction and changelog

Python client API (v5.2) for RavenDB , a NoSQL document database.

Although new API isn't compatible with the previous one, it comes with many improvements and new features.

Package has been reworked to match Java and other RavenDB clients

Type-hinted entire project and API results - using the API is now much more comfortable with IntelliSense

Releases

  • All client versions 5.2.x are fully compatible with and support RavenDB server releases 5.4 and 6.0.

  • Click here to view all Releases and Changelog.



What's new?

5.2.5+
  • Changes available in the releases section.
5.2.4
  • Bulk insert dependencies bugfix
5.2.3
  • Counters
  • Counters indexes
5.2.2
5.2.1
  • Bugfixes - Serialization while loading/querying (here)
5.2.0
  • Subscriptions

    • Document streams
    • Secured subscriptions
  • Querying

    • Major bugfixes
    • Spatial querying and indexing
    • Highlighting fixes
    • Custom document parsers & loaders
5.2.0b3
  • New features

    • Conditional Load
    • SelectFields & Facets
    • Projections
    • MoreLikeThis
    • Suggestions
  • Improvements

    • Compare exchange
    • Querying
    • DocumentConventions
    • Patching
    • Spatial queries
    • Aggregations
5.2.0b2
  • Lazy Operations

    • Lazy loading
    • Lazy querying
    • Lazy compare exchange operations
  • Structure

    • Important classes are now available to import from the top level ravendb module

...and many bugfixes


5.2.0b1
  • Querying

    • Simpler, well type hinted querying
    • Group by, aggregations
    • Spatial querying
    • Boost, fuzzy, proximity
    • Subclauses support
  • Static Indexes

    • Store fields, index fields, pick analyzers & more using AbstractIndexCreationTask
    • Full indexes CRUD
    • Index related commands (priority, erros, start/stop, pause, lock)
    • Additional assemblies, map-reduce, index query with results "of_type"
  • CRUD

    • Type hints for results and includes
    • Support for dataclasses

Querying features


  • Attachments

    • New attachments API
    • Better type hints
  • HTTPS

    • Support for https connection
    • Certificates CRUD operations
  • Lazy load

    • New feature
  • Cluster Transactions, Compare Exchange

    • New feature

Coming soon, work in progress

  • Time Series
  • Replication & ETL Commands
  • Streaming (ready, will be merged on v5.4 - #168)

Documentation

Getting started

  1. Import the DocumentStore class from the ravendb module
from ravendb import DocumentStore
  1. Initialize the document store (you should have a single DocumentStore instance per application)
store = DocumentStore('http://live-test.ravendb.net', 'databaseName')
store.initialize()
  1. Open a session
with store.open_session() as session:
  1. Call save_changes() when you're done
    user = session.load('users/1-A') 
    user.name = "Gracjan"
    session.save_changes()
    
# Data is now persisted
# You can proceed e.g. finish web request

CRUD example

Store documents

product = Product(
    Id=None,
    title='iPhone 14 Pro Max',
    price=1199.99,
    currency='USD',
    storage=256,
    manufacturer='Apple',
    in_stock=True,
    last_update=datetime.datetime.now(),
)

session.store(product, 'products/1-A')
print(product.Id) # products/1-A
session.save_changes()
Some related tests:

store()
ID generation - session.store()
store document with @metadata
storing docs with same ID in same session should throw

Load documents

product = session.load('products/1-A')
print(product.title) # iPhone 14 Pro Max
print(product.Id)    # products/1-A
Related tests:

load()

Load documents with include

# users/1
# {
#      "name": "John",
#      "kids": ["users/2", "users/3"]
# }

session = store.open_session()
user1 = session.include("kids").load("users/1")
  # Document users/1 and all docs referenced in "kids"
  # will be fetched from the server in a single request.

user2 = session.load("users/2") 
# this won't call server again

assert(user1 is not None)
assert(user2 is not None)
assert(session.advanced.number_of_requests == 1)
Related tests:

can load with includes

Update documents

import datetime

product = session.load('products/1-A')
product.in_stock = False
product.last_update = datetime.datetime.now()
session.save_changes()
# ...
product =  session.load('products/1-A')
print(product.in_stock) # false
print(product.last_update) # the current date
Related tests:

update document
update document metadata

Delete documents

  1. Using entity
product =  session.load('products/1-A')
session.delete(product)
session.save_changes()

product = session.load('products/1-A')
print(product) # None
  1. Using document ID
 session.delete('products/1-A')
Related tests:

delete doc by entity
delete doc by ID
cannot delete after change
loading deleted doc returns null

Query documents

  1. Use query() session method:

Query by collection:

import datetime
from ravendb import DocumentStore, QueryOperator

store = DocumentStore()
store.initialize()

session = store.open_session()
session.query_collection(str()).not_()

Query by index name:

query = session.query_index("productsByCategory")

Query by index:

query = session.query_index_type(Product_ByName, Product) # the second argument (object_type) is optional, as always

Query by entity type:

query = session.query(object_type=User) # object_type is an optional argument, but we can call this method only with it 
  1. Build up the query - apply search conditions, set ordering, etc.
    Query supports chaining calls:
query = session.query_collection("Users") 
    .wait_for_non_stale_results() 
    .using_default_operator(QueryOperator.AND) 
    .where_equals("manufacturer", "Apple")
    .where_equals("in_stock", True)
    .where_between("last_update", datetime.datetime(2022,11,1), datetime.datetime.now()).order_by("price")
  1. Execute the query to get results:
results =  list(query) # get all results
# ...
first_result =  query.first() # gets first result
# ...
single =  query.single()  # gets single result 

Query methods overview

select_fields() - projections using a single field

# RQL
# from users select name

# Query
class UserProj:
    def __init__(self, name: str = None, age: int = None):
        self.name = name
        self.age = age


user_names = [user_proj.name for user_proj in session.query_collection("Users").select_fields(UserProj, "name")]

# Sample results
# John, Stefanie, Thomas

select_fields() - projections using multiple fields

# RQL
# from users select name, age

# Query
 results = list(session.query_collection("Users").select_fields(UserProj, "name", "age"))
    

# Sample results
# [ { name: 'John', age: 30 },
#   { name: 'Stefanie', age: 25 },
#   { name: 'Thomas', age: 25 } ]
Related tests:

query with projections (query only two fields)
can_project_id_field

distinct()

# RQL
# from users select distinct age

# Query
 [user_proj.age for user_proj in session.query_collection("Users").select_fields(UserProj, "age").distinct()]
    

# Sample results
# [ 30, 25 ]

where_equals() / where_not_equals()

# RQL
# from users where age = 30 

# Query
 list(session.query_collection("Users").where_equals("age", 30))

# Saple results
# [ User {
#    name: 'John',
#    age: 30,
#    kids: [...],
#    registered_at: 2017-11-10T23:00:00.000Z } ]
Related tests:

where equals
where not equals

where_in()

# RQL
# from users where name in ("John", "Thomas")

# Query
list(session.query_collection("Users").where_in("name", ["John", "Thomas"]))

# Sample results
# [ User {
#     name: 'John',
#     age: 30,
#     registered_at: 2017-11-10T23:00:00.000Z,
#     kids: [...],
#     id: 'users/1-A' },
#   User {
#     name: 'Thomas',
#     age: 25,
#     registered_at: 2016-04-24T22:00:00.000Z,
#     id: 'users/3-A' } ]
Related tests:

where in
query with where in

where_starts_with() / where_ends_with()

# RQL
# from users where startsWith(name, 'J')

# Query
list(session.query_collection("Users").where_starts_with("name", "J"))

# Sample results
# [ User {
#    name: 'John',
#    age: 30,
#    kids: [...],
#    registered_at: 2017-11-10T23:00:00.000Z } ]
Related tests:

query with where clause

where_between()

# RQL
# from users where registeredAt between '2016-01-01' and '2017-01-01'

# Query
import datetime

list(session.query_collection("Users").where_between("registered_at", datetime.datetime(2016, 1, 1), datetime.datetime(2017,1,1)))

# Sample results
# [ User {
#     name: 'Thomas',
#     age: 25,
#     registered_at: 2016-04-24T22:00:00.000Z,
#     id: 'users/3-A' } ]
Related tests:

where between
query with where between

where_greater_than() / where_greater_than_or_equal() / where_less_than() / where_less_than_or_equal()

# RQL
# from users where age > 29

# Query
list(session.query_collection("Users").where_greater_than("age", 29))

# Sample results
# [ User {
#   name: 'John',
#   age: 30,
#   registered_at: 2017-11-10T23:00:00.000Z,
#   kids: [...],
#   id: 'users/1-A' } ]
Related tests:

query with where less than
query with where less than or equal
query with where greater than
query with where greater than or equal

where_exists()

Checks if the field exists.

# RQL
# from users where exists("age")

# Query
session.query_collection("Users").where_exists("kids")

# Sample results
# [ User {
#   name: 'John',
#   age: 30,
#   registered_at: 2017-11-10T23:00:00.000Z,
#   kids: [...],
#   id: 'users/1-A' } ]
Related tests:

query where exists

contains_any() / contains_all()

# RQL
# from users where kids in ('Mara')

# Query
list(session.query_collection("Users").contains_all("kids", ["Mara", "Dmitri"]))

# Sample results
# [ User {
#   name: 'John',
#   age: 30,
#   registered_at: 2017-11-10T23:00:00.000Z,
#   kids: ["Dmitri", "Mara"]
#   id: 'users/1-A' } ]
Related tests:

queries with contains any/all

search()

Perform full-text search.

# RQL
# from users where search(kids, 'Mara')

# Query
 list(session.query_collection("Users").search("kids", "Mara Dmitri"))

# Sample results
# [ User {
#   name: 'John',
#   age: 30,
#   registered_at: 2017-11-10T23:00:00.000Z,
#   kids: ["Dmitri", "Mara"]
#   id: 'users/1-A' } ]
Related tests:

search()
query search with or

open_subclause() / close_subclause()

# RQL
# from users where exists(kids) or (age = 25 and name != Thomas)

# Query
list(session.query_collection("Users").where_exists("kids").or_else()
    .open_subclause()
    .where_equals("age", 25)
    .where_not_equals("name", "Thomas")
    .close_subclause())

# Sample results
# [ User {
#     name: 'John',
#     age: 30,
#     registered_at: 2017-11-10T23:00:00.000Z,
#     kids: ["Dmitri", "Mara"]
#     id: 'users/1-A' },
#   User {
#     name: 'Stefanie',
#     age: 25,
#     registered_at: 2015-07-29T22:00:00.000Z,
#     id: 'users/2-A' } ]
Related tests:

working with subclause

not_()

# RQL
# from users where age != 25

# Query
list(session.query_collection("Users").not_().where_equals("age", 25))


# Sample results
# [ User {
#   name: 'John',
#   age: 30,
#   registered_at: 2017-11-10T23:00:00.000Z,
#   kids: ["Dmitri", "Mara"]
#   id: 'users/1-A' } ]
Related tests:

query where not

or_else() / and_also()

# RQL
# from users where exists(kids) or age < 30

# Query
list(session.query_collection("Users")
    .where_exists("kids")
    .or_else()
    .where_less_than("age", 30))

# Sample results
#  [ User {
#     name: 'John',
#     age: 30,
#     registered_at: 2017-11-10T23:00:00.000Z,
#     kids: [ 'Dmitri', 'Mara' ],
#     id: 'users/1-A' },
#   User {
#     name: 'Thomas',
#     age: 25,
#     registered_at: 2016-04-24T22:00:00.000Z,
#     id: 'users/3-A' },
#   User {
#     name: 'Stefanie',
#     age: 25,
#     registered_at: 2015-07-29T22:00:00.000Z,
#     id: 'users/2-A' } ]
Related tests:

working with subclause

using_default_operator()

If neither and_also() nor or_else() is called then the default operator between the query filtering conditions will be AND .
You can override that with using_default_operator which must be called before any other where conditions.

# RQL
# from users where exists(kids) or age < 29
# Query
from ravendb import QueryOperator

list(session.query_collection("Users")
    .using_default_operator(QueryOperator.OR) # override the default 'AND' operator
    .where_exists("kids")
    .where_less_than("age", 29))

# Sample results
#  [ User {
#     name: 'John',
#     age: 30,
#     registered_at: 2017-11-10T23:00:00.000Z,
#     kids: [ 'Dmitri', 'Mara' ],
#     id: 'users/1-A' },
#   User {
#     name: 'Thomas',
#     age: 25,
#     registered_at: 2016-04-24T22:00:00.000Z,
#     id: 'users/3-A' },
#   User {
#     name: 'Stefanie',
#     age: 25,
#     registered_at: 2015-07-29T22:00:00.000Z,
#     id: 'users/2-A' } ]

order_by() / order_by_desc() / order_by_score() / random_ordering()

# RQL
# from users order by age

# Query
list(session.query_collection("Users").order_by("age"))

# Sample results
# [ User {
#     name: 'Stefanie',
#     age: 25,
#     registered_at: 2015-07-29T22:00:00.000Z,
#     id: 'users/2-A' },
#   User {
#     name: 'Thomas',
#     age: 25,
#     registered_at: 2016-04-24T22:00:00.000Z,
#     id: 'users/3-A' },
#   User {
#     name: 'John',
#     age: 30,
#     registered_at: 2017-11-10T23:00:00.000Z,
#     kids: [ 'Dmitri', 'Mara' ],
#     id: 'users/1-A' } ]
Related tests:

order_by()
order_by_desc()
query random order
order by AlphaNumeric
query with boost - order by score

take()

Limit the number of query results.

# RQL
# from users order by age

# Query
 list(session.query_collection("Users")
    .order_by("age") 
    .take(2)) # only the first 2 entries will be returned

# Sample results
# [ User {
#     name: 'Stefanie',
#     age: 25,
#     registered_at: 2015-07-29T22:00:00.000Z,
#     id: 'users/2-A' },
#   User {
#     name: 'Thomas',
#     age: 25,
#     registered_at: 2016-04-24T22:00:00.000Z,
#     id: 'users/3-A' } ]
Related tests:

query skip take

skip()

Skip a specified number of results from the start.

# RQL
# from users order by age

# Query
list(session.query_collection("Users").order_by("age") 
    .take(1) # return only 1 result
    .skip(1)) # skip the first result, return the second result

# Sample results
# [ User {
#     name: 'Thomas',
#     age: 25,
#     registered_at: 2016-04-24T22:00:00.000Z,
#     id: 'users/3-A' } ]
Related tests:

raw query skip take

Getting query statistics

Use the statistics() method to obtain query statistics.

# Query
statistics: QueryStatistics = None
def __statistics_callback(stats: QueryStatistics):
    nonlocal statistics
    statistics = stats  # plug-in the reference, value will be changed later

results = list(session.query_collection("Users")
               .where_greater_than("age", 29)
               .statistics(__statistics_callback))

# Sample results
# QueryStatistics {
#   is_stale: false,
#   duration_in_ms: 744,
#   total_results: 1,
#   skipped_results: 0,
#   timestamp: 2018-09-24T05:34:15.260Z,
#   index_name: 'Auto/users/Byage',
#   index_timestamp: 2018-09-24T05:34:15.260Z,
#   last_query_time: 2018-09-24T05:34:15.260Z,
#   result_etag: 8426908718162809000 }
Related tests:

can get stats in aggregation query

####) / first() / single() / count() )` - returns all results

first() - first result only

single() - first result, throws error if there's more entries

count() - returns the number of entries in the results (not affected by take())

Related tests:

query first and single

Attachments

Store attachments

doc = User(name="John")

# Store a document, the entity will be tracked.
session.store(doc)

with open("photo.png", "rb+") as file:
    session.advanced.attachments.store(doc, "photo.png", file.read(), "image/png")

# OR store attachment using document ID
session.advanced.attachments.store(doc.Id, "photo.png", file.read(), "image/png")

# Persist all changes
session.save_changes()
Related tests:

can put attachments

Get attachments

# Get an attachment
attachment =  session.advanced.attachments.get(document_id, "photo.png")

# Attachment.details contains information about the attachment:
#     { 
#       name: 'photo.png',
#       document_id: 'users/1-A',
#       content_type: 'image/png',
#       hash: 'MvUEcrFHSVDts5ZQv2bQ3r9RwtynqnyJzIbNYzu1ZXk=',
#       change_vector: '"A:3-K5TR36dafUC98AItzIa6ow"',
#       size: 4579 
#     }
Related tests:

can get & delete attachments

Check if attachment exists

session.advanced.attachments.exists(doc.Id, "photo.png")
# True

session.advanced.attachments.exists(doc.Id, "not_there.avi")
# False
Related tests:

attachment exists

Get attachment names

# Use a loaded entity to determine attachments' names
session.advanced.attachments.get_names(doc)

# Sample results:
# [ { name: 'photo.png',
#     hash: 'MvUEcrFHSVDts5ZQv2bQ3r9RwtynqnyJzIbNYzu1ZXk=',
#     content_type: 'image/png',
#     size: 4579 } ]
Related tests:

get attachment names

Changes API

Listen for database changes e.g. document changes.

# Subscribe to change notifications
changes = store.changes()

all_documents_changes = []

# Subscribe for all documents, or for specific collection (or other database items)
all_observer = self.store.changes().for_all_documents()

close_method = all_observer.subscribe_with_observer(ActionObserver(on_next=all_documents_changes.append))
all_observer.ensure_subscribe_now()

session = store.open_session()
session.store(User("Starlord"))
session.save_changes()

# ...
# Dispose the changes instance when you're done
close_method()
Related tests:

can obtain single document changes
can obtain all documents changes
can obtain notification about documents starting with

Suggestions

Suggest options for similar/misspelled terms

from ravendb.documents.indexes.definitions import FieldIndexing
from ravendb import AbstractIndexCreationTask
# Some documents in users collection with misspelled name term
# [ User {
#     name: 'Johne',
#     age: 30,
#     ...
#     id: 'users/1-A' },
#   User {
#     name: 'Johm',
#     age: 31,
#     ...
#     id: 'users/2-A' },
#   User {
#     name: 'Jon',
#     age: 32,
#     ...
#     id: 'users/3-A' },
# ]

# Static index definition
class UsersIndex(AbstractIndexCreationTask):
    def __init__(self):
        super().__init__()
        self.map = "from u in docs.Users select new { u.name }"
        # Enable the suggestion feature on index-field 'name'
        self._index("name", FieldIndexing.SEARCH)
        self._index_suggestions.add("name")

# ...
session = store.open_session()

# Query for similar terms to 'John'
# Note: the term 'John' itself will Not be part of the results

suggestedNameTerms =  list(session.query_index_type(UsersIndex, User)
    .suggest_using(lambda x: x.by_field("name", "John")) 
    .execute())

# Sample results:
# { name: { name: 'name', suggestions: [ 'johne', 'johm', 'jon' ] } }
Related tests:

can suggest
canChainSuggestions
canUseAliasInSuggestions
canUseSuggestionsWithAutoIndex
can suggest using linq
can suggest using multiple words
can get suggestions with options

Advanced patching

# Increment 'age' field by 1
session.advanced.increment("users/1", "age", 1)

# Set 'underAge' field to false
session.advanced.patch("users/1", "underAge", False)

session.save_changes()
Related tests:

can patch
can patch complex
can add to array
can increment
patchWillUpdateTrackedDocumentAfterSaveChanges
can patch multiple documents

Subscriptions

# Create a subscription task on the server
# Documents that match the query will be send to the client worker upon opening a connection
from ravendb import DocumentStore
from ravendb.documents.subscriptions.worker import SubscriptionBatch
from ravendb.documents.subscriptions.options import SubscriptionCreationOptions, SubscriptionWorkerOptions

store = DocumentStore("http://live-test.ravendb.net", "TestDatabase")
store.initialize()

subscription_name = store.subscriptions.create_for_options(SubscriptionCreationOptions(query="from users where age >= 30"))

# Open a connection
# Create a subscription worker that will consume document batches sent from the server
# Documents are sent from the last document that was processed for this subscription

with store.subscriptions.get_subscription_worker(SubscriptionWorkerOptions(subscription_name)) as subscription_worker:
    def __callback(x: SubscriptionBatch):
        # Process the incoming batch items
        # Sample batch.items:
        # [ Item {
        #     change_vector: 'A:2-r6nkF5nZtUKhcPEk6/LL+Q',
        #     id: 'users/1-A',
        #     raw_result:
        #      { name: 'John',
        #        age: 30,
        #        registered_at: '2017-11-11T00:00:00.0000000',
        #        kids: [Array],
        #        '@metadata': [Object],
        #        id: 'users/1-A' },
        #     rawMetadata:
        #      { '@collection': 'Users',
        #        '@nested-object-types': [Object],
        #        'Raven-Node-Type': 'User',
        #        '@change-vector': 'A:2-r6nkF5nZtUKhcPEk6/LL+Q',
        #        '@id': 'users/1-A',
        #        '@last-modified': '2018-10-18T11:15:51.4882011Z' },
        #     exception_message: undefined } ]
        # ...
    
    
    def __exception_callback(ex: Exception):
        # Handle exceptions here
    
    subscription_worker.add_on_unexpected_subscription_error(__exception_callback)
    subscription_worker.run(__callback)
Related tests:

can subscribe to index and document
should stream all documents
should send all new and modified docs
should respect max doc count in batch
can disable subscription
can delete subscription

Counters

There are many ways to play with counters. The most common path is to use session API (session.counters_for()).

    with store.open_session() as session:
        user1 = User("Aviv1")
        user2 = User("Aviv2")
        session.store(user1, "users/1-A")
        session.store(user2, "users/2-A")
        session.save_changes()

    # storing counters via session API
    with store.open_session() as session:
        session.counters_for("users/1-A").increment("likes", 100)
        session.counters_for("users/1-A").increment("downloads", 500)
        session.counters_for("users/2-A").increment("votes", 1000)

        session.save_changes()

    # alternatively, loading counters via GetCountersOperation
    counters = store.operations.send(GetCountersOperation("users/1-A", ["likes", "downloads"])).counters
    
    # loading counters via session API
    with store.open_session() as session:
        user1_likes = session.counters_for("users/1-A").get("likes")

    # deleting counters via session API
    with store.open_session() as session:
        session.counters_for("users/1-A").delete("likes")
        session.counters_for("users/1-A").delete("downloads")
        session.counters_for("users/2-A").delete("votes")

        session.save_changes()
Playing with counters using CounterBatchOperation
counter_operation = DocumentCountersOperation(document_id="users/1-A", operations=[])
counter_operation.add_operations(
    CounterOperation("Likes", counter_operation_type=CounterOperationType.INCREMENT, delta=4)
)
counter_operation.add_operations(
    CounterOperation(
        "Shares",
        counter_operation_type=CounterOperationType.INCREMENT,
        delta=422,
    )
)
counter_operation.add_operations(CounterOperation("Likes", counter_operation_type=CounterOperationType.DELETE))

counter_batch = CounterBatch(documents=[counter_operation])
results = self.store.operations.send(CounterBatchOperation(counter_batch))
Related tests:

incrementing counters
document counters operation
including counters
counters indexes

Bulk insert

Bulk insert is the efficient way to store a large amount of documents.

https://ravendb.net/docs/article-page/5.4/csharp/client-api/bulk-insert/how-to-work-with-bulk-insert-operation.

For example:

foo_bar1 = FooBar("John Doe")
foo_bar2 = FooBar("Jane Doe")
foo_bar3 = FooBar("John")
foo_bar4 = FooBar("Jane")

with store.bulk_insert() as bulk_insert:
    bulk_insert.store(foo_bar1)
    bulk_insert.store(foo_bar2)
    bulk_insert.store_as(foo_bar3, "foobars/66")
    bulk_insert.store_as(foo_bar4, "foobars/99", MetadataAsDictionary({"some_metadata_value": 75}))

The 3rd insert will store document named "foobars/66".

The 4th insert will store document with custom name and extra metadata.

Related tests:

simple bulk insert
modifying metadata on bulk insert

Using classes for entities

  1. Define your model as class.
import datetime


class Product:
    def __init__(self, Id: str = None, title: str = None, price: int = None, currency: str = None, storage: int = None,
                 manufacturer: str = None, in_stock: bool = False, last_update: datetime.datetime = None):
        self.Id = Id
        self.title = title
        self.price = price
        self.currency = currency
        self.storage = storage
        self.manufacturer = manufacturer
        self.in_stock = in_stock
        self.last_update = last_update
  1. To store a document pass its instance to store().
    The collection name will automatically be detected from the entity's class name.
import datetime
from models import Product

product = Product(None, 'iPhone X', 999.99, 'USD', 64, 'Apple', True, datetime.datetime(2017,10,1))
product = session.store(product)
print(isinstance(product, Product))  # True
print('products/' in product.Id)  # True
session.save_changes()
  1. Loading a document
product =  session.load('products/1-A')
print(isinstance(product, Product))     # True
print(product.Id)                       # products/1-A
  1. Querying for documents
products =  list(session.query_collection("Products"))

for product in products:
    print(isinstance(product, Product))   # True
    print("products/" in product.Id)      # True

P.S Python client does support dataclasses

Related tests:

using dataclasses

Working with secured server

from ravendb import DocumentStore

URLS = ["https://raven.server.url"]
DB_NAME = "SecuredDemo"
CERT_PATH = "path\\to\\cert.pem"


class User:
    def __init__(self, name: str, tag: str):
        self.name = name
        self.tag = tag


store = DocumentStore(URLS, DB_NAME)
store.certificate_pem_path = CERT_PATH
store.initialize()
user = User("Gracjan", "Admin")

with store.open_session() as session:
    session.store(user, "users/1")
    session.save_changes()

with store.open_session() as session:
    user = session.load("users/1", User)
    assert user.name == "Gracjan"
    assert user.tag == "Admin"

Running tests

# To run the suite, set the following environment variables:
# 
# - Location of RavenDB server binary:
# RAVENDB_TEST_SERVER_PATH="C:\\work\\test\\Server\\Raven.Server.exe" 
#
# - Certificates paths for tests requiring a secure server:
# RAVENDB_TEST_SERVER_CERTIFICATE_PATH="C:\\work\\test\\cluster.server.certificate.pfx"
# RAVENDB_TEST_CLIENT_CERTIFICATE_PATH="C:\\work\\test\\python.pem"
# RAVENDB_TEST_CA_PATH="C:\\work\\test\\ca.crt"
#
# - Certificate hostname: 
# RAVENDB_TEST_HTTPS_SERVER_URL="https://a.nodejstest.development.run:7326"
#

python -m unittest discover

RavenDB Documentation

https://ravendb.net/docs/article-page/5.3/python


Bug Tracker

http://issues.hibernatingrhinos.com/issues/RDBC