wjr (2) [Avatar] Offline
#1
Hi,
I run the sample code in section 5.3.1, but it turned out to be a little strange.

I run the code with a new APPLICATION ID each time

Sample data:
{symbol='MORI', shares=7,074, industry=Railroads}
{symbol='MORI', shares=31,196, industry=Railroads}
{symbol='MBER', shares=9,636, industry=Oil & Gas Production}
{symbol='GROU', shares=17,932, industry=Finance/Investors Services}
{symbol='UEIL', shares=10,462, industry=Finance/Investors Services}
{symbol='ANDC', shares=36,198, industry=Major Pharmaceuticals}
{symbol='AHNH', shares=23,079, industry=Major Pharmaceuticals}
{symbol='ANDC', shares=23,383, industry=Major Pharmaceuticals}
{symbol='ANDS', shares=20,959, industry=Coal Mining}
{symbol='DAUG', shares=42,002, industry=Coal Mining}

Result:
Railroads 1)MORI:38,270
Finance/Investors Services 1)DAUG:42,002 2)MORI:38,270 3)AHNH:23,079 4)GROU:17,932 5)UEIL:10,462
Oil & Gas Production 1)MORI:38,270 2)MBER:9,636
Coal Mining 1)DAUG:42,002 2)MORI:38,270 3)ANDS:20,959 4)MBER:9,636
Major Pharmaceuticals 1)ANDC:59,581 2)DAUG:42,002 3)MORI:38,270 4)AHNH:23,079 5)MBER:9,636

It seems that the data in the result are not grouped according to the enterprise
Any suggestions on how to explain this?

Thanks.
wjr (2) [Avatar] Offline
#2
There are some problems in the sample code

FixedSizePriorityQueue<ShareVolume> fixedQueue = new FixedSizePriorityQueue<>(comparator, 5);
...
aggregate(
() -> fixedQueue,
(k, v, agg) -> agg.add(v),
(k, v, agg) -> agg.remove(v),
Materialized.with(stringSerde, fixedSizePriorityQueueSerde))


I changed it to this:

Initializer<FixedSizePriorityQueue> initializer = () -> {
LOG.info("Initialize FixedSizePriorityQueue");
return new FixedSizePriorityQueue<>(comparator, 5);
};
...
aggregate(
initializer,
(k, v, agg) -> agg.add(v),
(k, v, agg) -> agg.remove(v),,
Materialized.with(stringSerde, fixedSizePriorityQueueSerde))