The Author Online Book Forums are Moving

The Author Online Book Forums will soon redirect to Manning's liveBook and liveVideo. All book forum content will migrate to liveBook's discussion forum and all video forum content will migrate to liveVideo. Log in to liveBook or liveVideo with your Manning credentials to join the discussion!

Thank you for your engagement in the AoF over the years! We look forward to offering you a more enhanced forum experience.

211339 (4) [Avatar] Offline
I tried to execute the example in ch7.streams without success.

First, the code in ch7.streams.FrontOffice has a hardcoded path to the csv file:

  val path = "/Users/debasishghosh/projects/frdomain/src/main/resources/transactions.csv"
  val getLines = () =>

I fixed this problem like this:

  val path = Option.apply(this.getClass.getResource("/transactions.csv")) match {
    case None =>
      throw new IllegalArgumentException("Cannot find the /transactions.csv file")
    case Some(url) =>
  val getLines = () =>

Second, it would be useful to give brief instructions how to run these examples.

1) in a terminal window, start the transaction processor

$ sbt "runMain frdomain.ch7.streams.TransactionProcessor"

2) in another terminal window, start the front office

$ sbt "runMain frdomain.ch7.streams.FrontOffice"

After all of this, the FrontOffice doesn't connect to the TransactionProcessor.

Here's an example of the output I get from the TransactionProcessor:

00:08:08.347 [run-main-0] INFO  f.ch7.streams.TransactionProcessor - Receiver: binding to localhost:9982
00:08:08.517 [] INFO  frdomain.ch7.streams.Summarizer - Balance so far: Map()
00:08:09.517 [] INFO  frdomain.ch7.streams.Summarizer - Balance so far: Map()
00:08:10.526 [] INFO  frdomain.ch7.streams.Summarizer - Balance so far: Map()
00:08:11.527 [] INFO  frdomain.ch7.streams.Summarizer - Balance so far: Map()

This never changes until I kill the process.

Meanwhile, the FrontOffice does not reliably connect to the TransactionProcessor.
Often, the 1st run of the FrontOffice fails to send the information.
The result on logOnComplete is something like this:

Failure( The connection closed with error: Connection reset by peer)

The second run usually works.

In both runs, the TransactionProcessor doesn't receive anything. The output remains unchanged.

Can someone explain why the FrontOffice fails to connect reliably?

Stopping either the FrontOffice or TransactionProcessor is messy.
I think the example would be much better if there was a flow to exit gracefully these applications
when typing "q" at the standard input as shown in the akka doc:

- Nicolas.
Debasish Ghosh (116) [Avatar] Offline
Thanks for reporting. I will have a look shortly. May be something related to the updated version of Akka streams .. will post my findings here.
211339 (4) [Avatar] Offline
If it helps, I added a log statement in the TransactionProcessor like this:

Tcp().bind(host, port).runForeach { conn =>"connection: local=${conn.localAddress}, remote=${conn.remoteAddress}")
  val receiveSink = ...

This helps confirm that the FrontOffice client does connect; however nothing flows.
Often, the client fails to connect but I see the connection message from the processor.

This is really surprising that this doesn't work compared to similar other approaches.

- Nicolas.
Debasish Ghosh (116) [Avatar] Offline
Indeed this code fails after I upgraded akka-streams to the latest version. Looks like some semantics have changed. Let me work through this. Will post updates here once I fix this.

Thanks for reporting.
Debasish Ghosh (116) [Avatar] Offline
Fixed! There was a difference in the way `Source.empty` works with streams. It used to work properly in 1.0. Now had to change it to `Source.maybe`. Konrad pointed this out to me. The `master` is now updated. I have also added some note in the respective classes on how to run the 2 programs - `TransactionProcessor` and `FrontOffice`.

211339 (4) [Avatar] Offline
Thanks; note that in order to make this example work, I had to change FrontOffice like I showed earlier:

Note also that FrontOffice sends the info to the TransactionProcessor but hangs until either I abort the program or the TransactionProcessor quits in which case I see the "Transfer complete" log message.

I found that this is basically the behavior of the halfClose optional argument on Tcp().outgoingConnection method:,localAddress:Option[],options:scala.collection.immutable.Traversable[],halfClose:Boolean,connectTimeout:scala.concurrent.duration.Duration,idleTimeout:scala.concurrent.duration.Duration)[akka.util.ByteString,akka.util.ByteString,scala.concurrent.Future[]]

I think that adding 'halfClose=false' and changing logWhenComplete to the following results in more sensible behavior: FrontOffice quits after sending all of its data; there's no need to abort it or kill the TransactionProcessor server.

val logWhenComplete = Sink.onComplete( r => {"Transfer complete: " + r); system.terminate() })

- Nicolas.