OrBee (57) [Avatar] Offline
#1
I am not quite clear how to use the until-successful scope as explained on page 137. I have a flow which should take a list of query strings, make a call to a long running and unreliable query and then either save the failed messages it the retry attempts are exceeded or save the successful results from the query. I want to use a collection-aggregator to get the list back but I am not sure whether what I attempted is the right way to do it as I could not find a similar example. Is this right attempt or is there a better way?

Here is what I tried:

<spring:beans>
<spring:bean id="objectStore" class="org.mule.util.store.SimpleMemoryObjectStore" />
</spring:beans>
<http:listener-config name="HTTP_Listener_Configuration" host="0.0.0.0" port="8081" doc:name="HTTP Listener Configuration"/>

<vm:endpoint name="retries-exceeded-queue" path="retries-exceeded-vm" doc:name="VM"/>

<flow name="UntilsuccessfultestsFlow">
<http:listener config-ref="HTTP_Listener_Configuration" path="/mytest" doc:name="HTTP" />
<vm:outbound-endpoint path="myprocess-vm" exchange-pattern="one-way"/>
</flow>


<flow name="processMyLoad">
<vm:inbound-endpoint path="myprocess-vm" exchange-pattern="one-way"/>
<splitter expression="#[xpath3('//Errors')]" doc:name="Splitter"/>
<until-successful maxRetries="5" millisBetweenRetries="5000" doc:name="Until Successful" objectStore-ref="objectStore"
   failureExpression="#[groovy:(exception
    && exception in java.lang.RuntimException) ||
   !(payload instanceof java.util.Map && payload['NonFamily'])]" deadLetterQueue-ref="retries-exceeded-queue">
<vm:outbound-endpoint path="process-vm" exchange-pattern="request-response" doc:name="VM"/>
</until-successful>
<collection-aggregator/>
<flow-ref name="SaveSuccessfulQueryOutcomes"/>

</flow>

<!-- Failed Responses are saved here. -->
<flow name="ReprocessingExceeded" processingStrategy="synchronous" tracking:enable-default-events="true">
<vm:inbound-endpoint path="retries-exceeded-vm" doc:name="VM"/>
<flow-ref name="SaveFailedResults"/>
</flow>



<!-- Long Running Query may not return a result until a number of attempts have been made -->
<flow name="QueryProcessorFlow">
<vm:inbound-endpoint path="process-vm" exchange-pattern="request-response" doc:name="VM"/>
<flow-ref name="LongRunningUnreliableQuery"
></flow>

David Dossot (232) [Avatar] Offline
#2
Recent versions of Mule offer a synchronous mode for until-successful: https://docs.mulesoft.com/mule-user-guide/v/3.6/until-successful-scope#synchronous-until-successful You need to use it if you want to execute code after until-successful is done.
OrBee (57) [Avatar] Offline
#3
I am aware of the synchronous mode of the until-successful scope. However, I want to be able to take advantage of using the dead letter queue and the object store which is only available in asynchronous mode. Otherwise, I need to figure out how to deal with failures which are very likely with the type of service. The code I presented works and detects responses of successful queries and failed queries. For the failed ones, they go the dead letter queue and the process for handling them works. I am unclear as to where the processing should occur for the successful queries returned by the long running and unreliable query. So are you saying there is no way to use the asynchronous mode for doing what I'm trying to accomplish? Does the processing only have to be following the until-successful scope? Surely, there has to be a way to use asynchronous mode to process results post the service call.
David Dossot (232) [Avatar] Offline
#4
You should be able to put the vm:outbound-endpoint, collection-aggregator and flow-ref into another private flow and call it with flow-ref from within the until-successful.
OrBee (57) [Avatar] Offline
#5
If the private flow has a vm inbound endpoint I cannot call it from a flow-ref within the until-successful scope. I would have to pass the payload via a vm outbound endpoint within the until-successful scope which leaves me with the same situation. There really aren't any examples showing a good use of the asynchronous scope for performing complex processing. If I were to change to using the synchronous alternative, I would have to manage saving and retrieving the payloads as well as managing all the processing in the dead letter queue, which I really want to avoid. My other attempt was like this, does this make sense to you?: Thank you

<flow name="ProcessQuery">
<vm:inbound-endpoint path="in-queue" exchange-pattern="request-response"/>
<until-successful faiurelExpression="....." objectStore-ref="objectStore" >
<vm:outbound-endpoint path="query.queue" exchange-pattern="request-response"/>
</until-successful>
</flow>

<flow name="QueryService">
<vm:inbound-endpoint path="query.queue" exchange-pattern="request-response"/>
<flow-ref name="PerformQuery">
<choice>
<when expression="#[groovy: payload && payload instanceof com.stnt.QueryResult]">
<!-- Check for Successful outcome then perform processing steps Followed by the transformer-->
<!-- The transformer is for the failureExpression check-->
<scripting-transformer>
<scripting:script engine="groovy">
return payload
</scripting:script>
</scripting-transformer>

</when>
<otherwise>
<!-- Query Failed, return data to test failureExpression, so retry will be carried out-->
<scripting-transformer>
<scripting:script engine="groovy">
return payloadFor Failure
</scripting:script>
</scripting-transformer>
</otherwise>
</flow>
David Dossot (232) [Avatar] Offline
#6
Sorry I meant a sub-flow, not a private flow. No need for a VM endpoint in this case, just flow-ref it.
OrBee (57) [Avatar] Offline
#7
Will there be threading issues? Will each call to the sub-flow use a separate thread from start to finish from the pool?
David Dossot (232) [Avatar] Offline
#8
No, flow-ref calls to sub-flows are in the same thread: BTW it's essential for exceptions to be propagated back to the until-successful scope. Using a private flow instead of a sub-flow would defeat this: exceptions would not bubble up to the until-successful.
OrBee (57) [Avatar] Offline
#9
Thank you, very much appreciated.