/propulsion

.NET event stream projection and scheduling platform with EventStore, CosmosDb, Equinox and Kafka integrations

Primary LanguageF#Apache License 2.0Apache-2.0

Propulsion Build Status release NuGet license code size docs status

While the bulk of this code is in production across various Walmart systems, the documentation is very much a work in progress (ideally there'd be a nice summary of various projection patterns, but also much broader information discussing the tradeoffs implied in an event-centric system as a whole

If you're looking for a good discussion forum on these kinds of topics, look no further than the DDD-CQRS-ES Slack's #equinox channel (invite link).

Components

The components within this repository are delivered as a multi-targeted Nuget package targeting net461 (F# 3.1+) and netstandard2.0 (F# 4.5+) profiles

  • Propulsion NuGet Implements core functionality in a channel-independent fashion including ParallelProjector, StreamsProjector. Depends on MathNet.Numerics, Serilog
  • Propulsion.Cosmos NuGet Provides bindings to Azure CosmosDb a) writing to Equinox.Cosmos :- CosmosSink b) reading from CosmosDb's changefeed by wrapping the dotnet-changefeedprocessor library :- CosmosSource. Depends on Equinox.Cosmos, Microsoft.Azure.DocumentDB.ChangeFeedProcessor, Serilog
  • Propulsion.EventStore NuGet. Provides bindings to EventStore, writing via Propulsion.EventStore.EventStoreSink Depends on Equinox.EventStore, Serilog
  • Propulsion.Kafka NuGet Provides bindings for producing and consuming both streamwise and in parallel. Includes a standard codec for use with streamwise projection and consumption, Propulsion.Kafka.Codec.NewtonsoftJson.RenderedSpan. Depends on FsKafka v 1.4.3, Serilog
  • Propulsion.Kafka0 NuGet. Same functionality/purpose as Propulsion.Kafka but uses FsKafka0 instead of FsKafka in order to target an older Confluent.Kafka/librdkafka version pairing for interoperability with systems that have a hard dependency on that. Depends on FsKafka0 v 1.4.3 (which depends on Confluent.Kafka [0.11.3], librdkafka.redist [0.11.4]), Serilog`
  • Propulsion.SqlStreamStore NuGet. Provides bindings to SqlStreamStore, maintaining checkpoints in a SQL table using Dapper Depends on SqlStreamStore, Dapper v 2.0, Microsoft.Data.SqlClient v 1.1.3, Serilog

The ubiquitous Serilog dependency is solely on the core module, not any sinks, i.e. you configure to emit to NLog etc.

dotnet tool provisioning / projections test tool

  • Propulsion.Tool Tool NuGet: Tool used to initialize a Change Feed Processor aux container for Propulsion.Cosmos and demonstrate basic projection, including to Kafka. (Install via: dotnet tool install Propulsion.Tool -g)

Related repos

  • See the Equinox QuickStart for examples of using this library to project to Kafka from Equinox.Cosmos and/or Equinox.EventStore.

  • See the dotnet new templates repo for examples using the packages herein:

    • Propulsion-specific templates:

      • proProjector template for CosmosSource+StreamsProjector logic consuming from a CosmosDb ChangeFeedProcessor.
      • proProjector template (in --kafka mode) for producer logic using StreamsProducerSink or ParallelProducerSink.
      • proConsumer template for example consumer logic using ParallelConsumer and StreamsConsumer etc.
    • Propulsion+Equinox templates:

      • proReactor template, which includes multiple sources and multiple processing modes
      • summaryConsumer template, consumes from the output of a proReactor --kafka, saving them in an Equinox.Cosmos store
      • trackingConsumertemplate, which consumes from Kafka, feeding into example Ingester logic
      • proSync template is a fully fledged store <-> store synchronization tool syncing from a CosmosSource or EventStoreSource to a CosmosSink or EventStoreSink
  • See the FsKafka repo for BatchedProducer and BatchedConsumer implementations (together with the KafkaConsumerConfig and KafkaProducerConfig used in the Parallel and Streams wrappers in Propulsion.Kafka)

Overview

The Equinox Perspective

Propulsion and Equinox have a Yin and yang relationship; the use cases for both naturally interlock and overlap.

It can be relevant to peruse the Equinox Documentation's Overview Diagrams for the perspective from the other side (TL;DR its largely the same topology, with elements that are de-emphasized here central over there, and vice versa)

C4 Context diagram

While Equinox focuses on the Consistent Processing element of building an event-sourced decision processing system, offering tailored components that interact with a specific Consistent Event Store, Propulsion elements support the building of complementary facilities as part of an overall Application:

  • Ingesters: read stuff from outside the Bounded Context of the System. This kind of service covers aspects such as feeding reference data into Read Models, ingesting changes into a consistent model via Consistent Processing. These services are not acting in reaction to events emanating from the Consistent Event Store, as opposed to...
  • Publishers: react to events as they are arrive from the Consistent Event Store by filtering, rendering and producing to feeds for downstreams. While these services may in some cases rely on synchronous queries via Consistent Processing, it's never transacting or driving follow-on work; which brings us to...
  • Reactors: drive reactive actions triggered by either upstream feeds, or events observed in the Consistent Event Store. These services handle anything beyond the duties of Ingesters or Publishers, and will often drive follow-on processing via Process Managers and/or transacting via Consistent Processing. In some cases, a reactor app's function may be to progressively compose a notification for a Publisher to eventually publish.

The overall territory is laid out here in this C4 System Context Diagram:

Propulsion c4model.com Context Diagram

See Overview section in DOCUMENTATION.md for further drill down

QuickStart

1. Use propulsion tool to run a CosmosDb ChangeFeedProcessor

dotnet tool uninstall Propulsion.Tool -g
dotnet tool install Propulsion.Tool -g

propulsion init -ru 400 cosmos # generates a -aux container for the ChangeFeedProcessor to maintain consumer group progress within
# -V for verbose ChangeFeedProcessor logging
# `-g projector1` represents the consumer group - >=1 are allowed, allowing multiple independent projections to run concurrently
# stats specifies one only wants stats regarding items (other options include `kafka` to project to Kafka)
# cosmos specifies source overrides (using defaults in step 1 in this instance)
propulsion -V project -g projector1 stats cosmos

2. Use propulsion tool to Run a CosmosDb ChangeFeedProcessor, emitting to a Kafka topic

$env:PROPULSION_KAFKA_BROKER="instance.kafka.mysite.com:9092" # or use -b

# `-V` for verbose logging
# `-g projector3` represents the consumer group; >=1 are allowed, allowing multiple independent projections to run concurrently
# `-l 5` to report ChangeFeed lags every 5 minutes
# `kafka` specifies one wants to emit to Kafka
# `temp-topic` is the topic to emit to
# `cosmos` specifies source overrides (using defaults in step 1 in this instance)
propulsion -V project -g projector3 -l 5 kafka temp-topic cosmos

CONTRIBUTING

See CONTRIBUTING.md

TEMPLATES

The best place to start, sample-wise is with the QuickStart, which walks you through sample code, tuned for approachability, from dotnet new templates stored in a dedicated repo.

BUILDING

Please note the QuickStart is probably the best way to gain an overview, and the templates are the best way to see how to consume it; these instructions are intended mainly for people looking to make changes.

NB The Propulsion.Kafka.Integration tests are reliant on a TEST_KAFKA_BROKER environment variable pointing to a Broker that has been configured to auto-create ephemeral Kafka Topics as required by the tests (each test run blindly writes to a guid-named topic and trusts the broker will accept the write without any initialization step)

build, including tests on net461 and netcoreapp3.1

dotnet build build.proj -v n

FAQ

why do you employ Kafka as an additional layer, when downstream processes could simply subscribe directly and individually to the relevant Cosmos db change feed(s)? Is it to accommodate other messages besides those emitted from events and snapshot updates? 🙏 @Roland Andrag

Well, Kafka is definitely not a critical component or a panacea.

You're correct that the bulk of things that can be achieved using Kafka can be accomplished via usage of the changefeed. One thing to point out is that in the context of enterprise systems, having a well maintained Kafka cluster does have less incremental cost that it might do if you're building a smaller system from nothing.

Some of the negatives of consuming from the CF direct:

  • each CFP reader imposes RU charges (its a set of continuous queries against each and every physical range of which the cosmos store is composed)
  • you can't apply a server-side filter, so you pay for everything you see
  • you're very prone to falling into coupling issues
  • (as you alluded to), if there's some logic or work involved in the production of events you'd emit to Kafka, each consumer would need to duplicate that

While many of these concerns can be alleviated to varying degrees by splitting the storage up into multiple containers in order that each consumer will intrinsically be interested in a large proportion of the data it will observe (potentially using database level RU allocations), the write amplification effects of having multiple consumers will always be more significant when reading directly than when using Kafka, the design of which is well suited to running lots of concurrent readers.

Splitting event categories into containers solely to optimize these effects can also make the management of the transactional workload more complex; the ideal for any given container is to balance the concerns of:

  • ensuring that datasets for which you want to ringfence availability / RU allocations don't share with containers/databases for which running hot (potentially significant levels of rate limiting but overall high throughput in aggregate as a result of using a high percentage of the allocated capacity)
  • avoiding prematurely splitting data prior to it being required by the constraints of CosmosDB (i.e. you want to let splitting primarily be driven by reaching the [10GB] physical partition range)
  • not having logical partition hotspots that lead to a small number of physical partitions having significantly above average RU consumption
  • having relatively consistent document sizes
  • economies of scale - if each container (or database if you provision at that level) needs to individually managed (with a degree of headroom to ensure availability for load spikes etc), you'll tend to require higher aggregate RU assignment for a given overall workload based on a topology that has more containers

What's the deal with the history of this repo?

This repo is derived from FsKafka; the history has been edited to focus only on edits to the Propulsion libraries.

Your question here

  • Please feel free to log question-issues; they'll get answered here

FURTHER READING

See DOCUMENTATION.md and Equinox's DOCUMENTATION.md