Skip to main content

How Schedule failover message processor helps for the guaranteed delivery ?

Before we talk about the failover message forwarding processor, it’s better to understand the big picture of the concepts and use cases. The Scheduled Failover Message Forwarding Processor is part of the bigger picture of the message store and message processor.

Message Store Message Processor.
WSO2 ESB’s Message-stores and Message-processors are used to store incoming messages and then deliver them to a particular backend with added Quality of Services (QoS), such as throttling and guaranteed delivery. The basic advantage of the MSMP is that it allows you to send messages reliably to a backend service. These messages can be stored in a different reliable storage such as JMS, JDBC message stores. The MSMP powered by three basic components:



1. Store Mediator.
The Store mediator is the synapse mediator and can be used to store messages in the message store.

2. Message Store.
A message store is storage in the ESB for messages. The WSO2 ESB comes with four types of message store implementations - In Memory, JMS, JDBC and RabitMQ Message Stores. Users also have the option to create a Custom Message Store with their own message store implementation. For more information, refer to the section Message Stores section.

3. Message Processor.
The Message Processor is used for consuming messages from the message store and sending it to the defined endpoint. Currently we have Forward and Sample Message Processor implementations; the ESB provides the facility to add custom message processors as well. Other than forwarding messages to the endpoint, the message processor provides other new features, which can be helpful to guaranteed delivery such as throttling, message retries when endpoint is not available, etc.
For more information, please refer to the Message Processor.


How to achieve guaranteed delivery?
Guaranteed message delivery means once message comes to the ecosystem, it needs to be delivered to the defined endpoint without losing it while processing. We can identify a few areas which should be considered when we talking about the guaranteed delivery in message processor scenario:

1. Store message
Message store unavailability is the only scenario that we can identify for message loss when the store mediator trying to store a message in the message store. This can be happen due to reasons such as network failures, a message store crash or a system shutdown for maintenance. This kind of situation can overcome by different approaches.


  • Configure message store cluster
We can configure a message store cluster as a one solution for this. This lets us avoid the single point of failure.
  • Defining failover message store. 
This allows the message store to store messages in the failover message store if the original message store is not available. ESB 4.9.0 has introduced a new feature allowing you to define a failover message store. Full details of failover message store will be discussed under a different section.

2. Retrieve message from message store.
We need to decide on when we need to remove message from the store after retrieving the message processor from the message store, because otherwise the message can be lost in some cases. The ESB has a mechanism to signal to message store to remove the messages after a message successfully sent to the endpoint. The ESB will not remove the message from the message store if the message cannot be successfully sent to the endpoint.



3. Send message to the define endpoint.
This scenario is technically part of the ‘forwarding message to the endpoint’ scenario. As we discussed under the “Retrieve message from the message store” section, messages are removed from the message store, if only after messages are sent successfully to the endpoint. Message processors provide a retry mechanism to retry to send messages if the endpoint is not available. Even though retry mechanism does not provide guaranteed delivery, it helps successfully send off messages to the endpoint.

The message processor does have more functionality which can be used to tune the processor to improve guaranteed delivery; however, that’s outside the scope of this blog post. You can find more information about the message processor from here.

Failover Message store and Schedule failover message processor scenario.

As discussed in the earlier section, the Failover Message Store is using as a solution for message store failures. The store mediator forwards messages to the failover message store if the original message store fails.

Initially we need to define failover message store first. It can be any type of message store that are available in the ESB. No special configuration is needed to specifically define the failover message store - when you define the original message store, you can simply select failover message store for the failure situations. When a failure situation happens, all incoming messages are forwarded into the failover message store by the store mediator.

The next problem is how can we move messages which were forwarded to the failover message store to the original message store when it becomes available again. This is where the Failover Message Processor comes in. It’s the same as the Scheduled Message Forward Processor, except where the Message Forward Processor sends messages to the endpoints, this on forwards messages to the message store.

The following example explains how to setup a complete message processor scenario with the failover configurations.

1. Create failover message store.
You don't need to specify any special configuration here. Keep in mind that in-memory message store is used for this example, but we cannot use in-memory message stores for the cluster setups since we cannot share in-memory stores among the cluster nodes.

  <messageStore name="failover"/>  

2. Create original message store for storing messages.
A JMS message store is used for this example. For enabling guaranteed delivery on the producer side (configure failover message store), you need to enable “Producer Guaranteed Delivery” property and need to specify the failover message store located under the “Show Guaranteed Delivery Parameters” section.

 <messageStore  
     class="org.apache.synapse.message.store.impl.jms.JmsStore" name="Orginal">  
     <parameter name="store.failover.message.store.name">failover</parameter>  
     <parameter name="store.producer.guaranteed.delivery.enable">true</parameter>  
     <parameter name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>  
     <parameter name="java.naming.provider.url">tcp://localhost:61616</parameter>  
     <parameter name="store.jms.JMSSpecVersion">1.1</parameter>  
   </messageStore>  


3. Create Proxy Service to send messages to the original message store using store mediator.

 <proxy name="Proxy1" transports="https http" startOnLoad="true" trace="disable">    
  <target>  
    <inSequence>  
     <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/>  
     <property name="OUT_ONLY" value="true"/>  
     <log level="full"/>  
     <store messageStore="Orginal"/>  
    </inSequence>  
  </target>  
 </proxy>  

4. Define endpoint for the schedule forward message processor. 
SimpleStockquate service is used as a backend service.

 <endpoint name="SimpleStockQuoteService">  
  <address uri="http://127.0.0.1:9000/services/SimpleStockQuoteService"/>  
 </endpoint>  

5. Add schedule forward message processor to forward messages to the previously defined endpoint. 
This link contain more information about the schedule forward message processor.

 <messageProcessor  
     class="org.apache.synapse.message.processor.impl.forwarder.ScheduledMessageForwardingProcessor"  
     messageStore="Orginal" name="ForwardMessageProcessor" targetEndpoint="SimpleStockQuoteService">  
     <parameter name="client.retry.interval">1000</parameter>  
     <parameter name="throttle">false</parameter>  
     <parameter name="max.delivery.attempts">4</parameter>  
     <parameter name="member.count">1</parameter>  
     <parameter name="max.delivery.drop">Disabled</parameter>  
     <parameter name="interval">1000</parameter>  
     <parameter name="is.active">true</parameter>  
     <parameter name="target.endpoint">SimpleStockQuoteService</parameter>  
   </messageProcessor>  

6. Add schedule failover message processor for forwarding failover messages from failover message store to the original message store.
When you are defining a failover message processor, you need to fill two mandatory parameters which are very important for the failover scenario.
  • Source message store.
  • Target message store.
Failover message processor sends messages from the failover store to the original store when it is available in the failover scenario. In this configuration, the source message store should be the failover message store and target message store should be the original message store.


 <messageProcessor  
     class="org.apache.synapse.message.processor.impl.failover.FailoverScheduledMessageForwardingProcessor"  
     messageStore="failover" name="FailoverMessageProcessor">  
     <parameter name="client.retry.interval">60000</parameter>  
     <parameter name="throttle">false</parameter>  
     <parameter name="max.delivery.attempts">1000</parameter>  
     <parameter name="member.count">1</parameter>  
     <parameter name="max.delivery.drop">Disabled</parameter>  
     <parameter name="interval">1000</parameter>  
     <parameter name="is.active">true</parameter>  
     <parameter name="message.target.store.name">Orginal</parameter>  
   </messageProcessor>  

Other than above mandatory parameters, we have a few other optional parameters under the “Additional Parameter” section.

7. Send Request to the Proxy Service
Navigate to /samples/axis2client directory, and execute the following command to invoke the proxy service.
 ant stockquote -Daddurl=http://localhost:8280/services/Proxy1 -Dmode=placeorder  
Note a message similar to the following example printed in the Axis2 Server console.
 SimpleStockQuoteService :: Accepted order for : 7482 stocks of IBM at $ 169.27205579038733  


Parameter Name
Description
Forwarding interval (interval)Interval in milliseconds in which processor consumes messages.
Retry interval (client.retry.interval)Message retry interval in milliseconds.
Maximum delivery attempts (max.delivery.attempts)Maximum redelivery attempts before deactivating the processor. This is used when the backend server is inactive and the ESB tries to resend the message.
Drop message after maximum delivery attempts (max.delivery.drop)
If this parameter is set to Enabled, the message will be dropped from the message store after the maximum number of delivery attempts are made, and the message processor will remain activated. This parameter would have no effect when no value is specified for the Maximum Delivery Attempts parameter.
The Maximum Delivery Attempts parameter can be used when the backend is inactive and the message is resent.

If this parameter is disabled, the undeliverable message will not be dropped and the message processor will be deactivated.
Fault sequence name (message.processor.fault.sequence)The name of the sequence where the fault message should be sent to in case of a SOAP fault.
Deactivate sequence name (message.processor.deactivate.sequence)The deactivate sequence that will be executed when the processor is deactivated automatically. Automatic deactivation occurs when the maximum delivery attempts is exceeded and the Drop message after maximum delivery attempts parameter is disabled.
Quartz configuration file path (quartz.conf)The Quartz configuration file path. This properties file contains the Quartz configuration 
parameters for fine tuning the Quartz engine. More details of the configuration can be 
found at http://quartz-scheduler.org/documentation/quartz-2.x/configuration/ConfigMain.
Cron Expression (cronExpression)The cron expression to be used to configure the retry pattern.
Task Count (Cluster Mode)The required number of worker nodes when you need to run the processor in more than 1 worker node. Specifying this will not guarantee that the processor will run on each worker node. There can be instances where the processor will not run in some workers nodes.


To test the failover scenario, you can shutdown the JMS broker(original message store) and send few messages to the proxy service. These messages are not sent to the backend since original message store is not available. You can see those messages are stored in the failover message store. If you check the ESB log, you can see the failover message processor is trying to forward messages to original message store periodically. Once the original message store is available, the failover processor send those messages to the original store and forward message processor send that to the backend service.

Comments

Popular posts from this blog

Java Source Code to Change Local IP Address

Hi guys..

Try This code to change your Local IP address.


import java.io.IOException;
import java.lang.Runtime;
public class Chang_Ip {



public static void main(String args[]) throws IOException
{

String str1="192.168.0.201";
String str2="255.255.255.0";
String[] command1 = { "netsh", "interface", "ip", "set", "address",
"name=", "Local Area Connection" ,"source=static", "addr=",str1,
"mask=", str2};
Process pp = java.lang.Runtime.getRuntime().exec(command1);

}


}

How to preserving HTTP headers in WSO2 ESB 4.9.0 ?

Preserving HTTP headers are important when executing backend services via applications/middleware. This is because most of the time certain important headers are removed or modified by the applications/middleware which run the communication. The previous version of our WSO2 ESB, version 4.8.1, only supported “server” and “user agent” header fields to preserve with, but with the new ESB 4.9.0, we’ve introduced a new new property (http.headers.preserve) for the passthru (repository/conf/passthru-http.properties) and Nhttp(repository/conf/nhttp.properties) transporters to preserve more HTTP headers.
Passthru transporter – support header fields LocationKeep-AliveContent-LengthContent-TypeDateServerUser-AgentHostNhttp transport – support headersServerUser-AgentDate
You can specify header fields which should be preserved in a comma-separated list, as shown below. http.headers.preserve = Location, Date, Server Note that properties(http.user.agent.preserve, http.server.preserve), which were used …