I have a contrived example here based on a shopping-cart which performs Event Sourcing and CQRS (for analytics)
https://github.com/calvinlfer/es-cqrs-shopping-cart
Chapter 4, section 4.3.3

The mergeComb function is incorrect, it reads:
def mergeComb:((Double,Double,Int,Double),(Double,Double,Int,Double)) =>  (Double,Double,Int,Double) = { 
   case((mn1,mx1,c1,tot1),(mn2,mx2,c2,tot2)) => (scala.math.min(mn1,mn1),scala.math.max(mx1,mx2),c1+c2,tot1+tot2) 
} 


Notice
mn1
is accidentally repeated twice when calculating the minimum, when instead it should be
scala.math.min(mn1, mn2)


Here's a cleaned up version with the problem fixed. Note that def need not be used and val can be used instead because this is a function literal. The same goes for all the other functions defined along with this one.
  val mergeComb: ((Double, Double, Int, Double), (Double, Double, Int, Double)) => (Double, Double, Int, Double) = {
    case ((min1, max1, count1, total1), (min2, max2, count2, total2)) =>
      (scala.math.min(min1, min2), scala.math.max(max1, max2), count1 + count2, total1 + total2)
  }
Like an inventory system where I want to make sure I have enough quantity on hand and not over sell, if at least once is ok, why is it ok and what fail-safes should I use.


Can you elaborate a bit more about what your command and query sides are for this use case. It seems to me like the command-side is the inventory system itself and the query-sides would be used to do some analytics.
I have done some more research into this topic and have also dived into Lagom's source to understand how they do CQRS.

At a high level, you have your command side persisting to the event journal and that's all it does. You have a separate process that reads the event journal (tracking offsets as well) via Akka Persistence Query and populates the read-side database. You can use an offset tracking table to implement resumable projections and ensure that the offset tracking table is updated atomically with the read-side table to achieve exactly-once delivery. Introducing Kafka in the mix to alleviate event journal pressure introduces some more complexity.
Hey there,

I'm no expert on this but exactly-once processing is very specific to your domain so it's a bit difficult to just talk about this generally like the other two. My team is in the process of trying to build a view/query in a CQRS application that is used for billing so we require this strong exactly-once/effectively-once guarantee. If you are using Akka Persistence and making the Akka Persistence Query's eventsByTag feed off the journal then you can achieve exactly-once delivery by using an offset-tracking table in addition to your read side table. You would need to atomically persist the read-side update and the offset (that each event has) from the event journal in order to achieve exactly-once delivery.

Like I said, this approach is very specific to the implementation and the use case. If you want a bit more information, I would be happy to dive into more detail about how we are doing it. I would also advise you to check out Lagom's source code because they do a lot of this for you.

The tricky part comes when you want to introduce something like Kafka if you have a lot of query sides and you are trying to build an event-driven backbone because it's very hard to achieve exactly-once with Kafka in the mix (easy to get at-least once). The recommendation is to take each event (or at least a subset) and migrate them out into the query side databases so if you do have duplicates, they will result in upserts and no harm will be done (effectively once behavior). You will have to perform further aggregation of data on the query side.

Thanks
Cal
Hello there,

I took a look at Chapter 8 hoping to find some material on building out the Query side (for complex queries across entities that cannot be answered by the command side) with respect to Akka Persistence and the usage of Akka Persistence Query for Event Sourcing + CQRS but I didn't find any material on this. I noticed you briefly mention read projections with the example of the Employee but there's no real concrete implementation examples. Is this done on the database side? or is there some application code that's doing this process? What happens if that application goes down?

Also I noticed that there's no mention of adding new read sides, how would this be done? I know Greg Young did a talk where he says that if you don't have a Consumer Driven subscription model, then it's very difficult to pull this off since if the read sides get corrupted and they need to be restored from scratch and there's no way to demand data from inception from the command side and also adding new read sides without this would be very difficult.

Is it recommended to use multiple Akka Persistence Queries with offsets to feed the different read sides? Would be okay to have the read sides keeping track of the offsets in case they went down and would use this as a starter point instead of starting from inception every time they went down?

Are you planning to add any material on Persistence Query and an example of how to write a Query model?

Cheers
Cal
On Page 188,
The underlying idea of the CQRS model is to turn commands into events once they’ve been validated.


This isn't the CQRS model, this is Event Sourcing.
Page 37:
It says:
The first thing you have to do in this setup is get an enumerator to work with. Iteratees are used to consume streams, whereas enumeratees produce them, and you need a producing pipe so you can add adapters to it. The Concurrent.joined method provides you with a connected pair of iteratee and enumerator: whatever data is consumed by the iteratee will be immediately available to the enumerator.

I think you meant:
The first thing you have to do in this setup is get an enumerator to work with. Iteratees are used to consume streams, whereas enumerators produce them, and you need a producing pipe so you can add adapters to it. The Concurrent.joined method provides you with a connected pair of iteratee and enumerator: whatever data is consumed by the iteratee will be immediately available to the enumerator.
Thank you for posting this, saved me some hair pulling
I think he plans to release it in March with the Akka Streams update so there would be no more MEAP updates. After going through most of the book, some of the most important chapters like clustering and sharding are in my humble opinion severly lacking. All the stuff before is quite good so it was a bit weird to see a big drop in quality.

I did see some activity on GitHub few days ago
Thanks a lot for the update Raymond, looking forward to the updates. Please take your time smilie. After all, we all crave high quality content and it takes time to achieve that. I would really love to see Akka HTTP and Akka Streams included in there but I understand that this is cutting edge technology and API changes will always occur.
Thanks a lot! I like that approach and I trust you will do a great job. Looking forward to reading the book
Please consider concentrating on more functional concepts like monads, pattern matching and enforcing immutability.

Java 8 in action was great for introducing lambdas and the Streams API but didn't really give the reader a deeper understanding of functional programming. It was like biting a piece of a delicious cookie only to find that you couldn't have more smilie