The Author Online Book Forums are Moving

The Author Online Book Forums will soon redirect to Manning's liveBook and liveVideo. All book forum content will migrate to liveBook's discussion forum and all video forum content will migrate to liveVideo. Log in to liveBook or liveVideo with your Manning credentials to join the discussion!

Thank you for your engagement in the AoF over the years! We look forward to offering you a more enhanced forum experience.

zcox (5) [Avatar] Offline
#1
Since Samza runs one task instance per Kafka topic partition [1] should the examples in Ch 4 include some tasks that partition the messages?

For example, the Hourly Sales Calculator in 4.4 takes an order event, gets the sales value for the event's hour out of local storage, increments by order's value, then writes the new value for that hour back to local storage. What if the Kafka topic had 8 partitions? Seems like there would then be 8 instances of HourlySalesStreamTask, each with its own local storage, storing separate sales totals for each hour. In the window callback, each task instance would be outputting different sales totals for the same hour.

The Samza state docs [2] talk a lot about re-partitioning messages by different keys (like userId) so that message for the same entity end up at the same task instance.

Or are partitions and tasks a more advanced concept to be discussed in a later chapter?

Thanks,
Zach

[1] http://samza.incubator.apache.org/learn/documentation/0.7.0/container/samza-container.html
[2] http://samza.incubator.apache.org/learn/documentation/0.7.0/container/state-management.html
alexander.dean (12) [Avatar] Offline
#2
Re: Kafka topic partitions and Samza task instances
Hey Zach!

You are right - chapter 4 skips over working with partitions completely. (It also glosses over the ideas of batching/checkpointing and at-least-once processing.)

These are really important topics - the plan was to tackle them in a later chapter but I think it would make sense to go straight into them in chapter 5.

As a note: for stream processing on web analytics-style data, it's fairly standard to partition based on user's IP address. Everybody has an IP address, and if you're lucky (no users changing IP address in-session), then all events in a users' sessions will be sticky to the same task instance. So the abandoned cart example most likely wouldn't change in a non-toy example, whereas yes hourly sales and product look-to-book would.

In fact both hourly sales and product look-to-book have some complexities in the re-sharding: if you re-shard hourly sales based on timestamp, then you are going to send all checkout events to one partition for each hour. In practice you could probably get away with that because checkout events are low in volume, but the more generally applicable pattern would probably be to have a downstream aggregation step.

With product look-to-book, it would make sense to re-shard upfront using product SKU. The complexity here is that a checkout event contains N product SKUs. It should be fine to duplicate the event to multiple partitions but you wouldn't want the event duplicated on the same partition. Hmm, need to think about this some more.

In any case, tickets created:

https://github.com/alexanderdean/Unified-Log-Processing/issues/13
https://github.com/alexanderdean/Unified-Log-Processing/issues/14

Look forward to your thoughts!

Alex

I've created some tickets to capture all this:

Message was edited by:
alexander.dean
zcox (5) [Avatar] Offline
#3
Re: Kafka topic partitions and Samza task instances
Thanks for the details Alex! The book has been great so far, and based on the Table of Contents I am really excited about upcoming chapters.

For product look-to-book, should there be an upstream job that takes each order event and outputs one event for each item in the order? These "single item purchased" events could be keyed by itemId and contain the quantity purchased. Then the LookToBookStreamTask consumes from "item viewed" and "item purchased" streams, so that those events for the same itemId end up at the same task instance. That seems like it might solve the "order event contains N product SKUs" problem that you pointed out.

Re-partitioning streams by timestamp sounds like the well-known "monotonically increasing keys" problem, like section 6.3.2 here: http://hbase.apache.org/book/rowkey.design.html. But like you said, one could probably get away with this using Samza. Seems like adding some other id to the key, like userId, productId, etc would help partition, if you're doing hourly counts for separate users/products anyway. Also if you're replaying a lot of historical events, presumably those could be spread across multiple task instances if timestamp is in the key.

It's starting to seem like if your task uses local key-value storage, then the streams it consumes from need to use the same keys. Like in the abandoned-cart task, local storage keys are based on shopperId, so since you need all events for the same shopperId to be processed by the same task instance, the input stream also must be keyed by shopperId.

-Zach
alexander.dean (12) [Avatar] Offline
#4
Re: Kafka topic partitions and Samza task instances
Really glad you're enjoying the book so far Zach!

> For product look-to-book, should there be an upstream
> job that takes each order event and outputs one event
> for each item in the order? These "single item
> purchased" events could be keyed by itemId and
> contain the quantity purchased. Then the
> LookToBookStreamTask consumes from "item viewed" and
> "item purchased" streams, so that those events for
> the same itemId end up at the same task instance.
> That seems like it might solve the "order event
> contains N product SKUs" problem that you pointed
> out.

I think that's a great design.

> Re-partitioning streams by timestamp sounds like the
> well-known "monotonically increasing keys" problem,
> like section 6.3.2 here:
> http://hbase.apache.org/book/rowkey.design.html. But
> like you said, one could probably get away with this
> using Samza.

Ah yes - I'd forgotten HBase's language of "hotspotting". Seems like something I should warn people about in the book about certain key choices.

> Also if you're replaying a lot
> of historical events, presumably those could be
> spread across multiple task instances if timestamp is
> in the key.

Really good point - interesting how batch size will influence key choice there smilie

> It's starting to seem like if your task uses local
> key-value storage, then the streams it consumes from
> need to use the same keys. Like in the abandoned-cart
> task, local storage keys are based on shopperId, so
> since you need all events for the same shopperId to
> be processed by the same task instance, the input
> stream also must be keyed by shopperId.

Yep I agree. I imagine in a year or two there will be SQLish abstraction layers for Samza which handle these intermediate re-keying steps, but in the meantime we will have to do it ourselves smilie

- Alex