Author Topic: OTN Headlines Tech Article: Joining Oracle Complex Event Processing and J2ME to React to Location an  (Read 1552 times)

Mike

  • Administrator
  • Hero Member
  • *****
  • Posts: 1949
    • View Profile
Tech Article: Joining Oracle Complex Event Processing and J2ME to React to Location and Positioning Events
   


How to join the power of the Location API for J2ME (JSR 179) with Oracle Complex Event Processing to deliver business applications that can't be built without an event processing tool.
   

http://www.oracle.com/technology/pub/articles/amadei-cep.html

Joining Oracle Complex Event Processing and J2ME to React to Location and Positioning Events         
How to join the power of the Location API for J2ME (JSR 179) with Oracle Complex Event Processing to deliver business applications that can't be built without an event processing tool.
      
By Daniel Amadei
         
Published March 2010
         
Real-time processing of data is becoming more important every day. The speed of changes in all kinds of market segments is increasing more than ever. Conversely, the time for reaction is getting shorter. This article shows how the concept of complex event processing (CEP) can help us address these challenges—CEP helps analyze events and recognize patterns in the cloud of information, making it possible to react to events as soon as they happen.
This article illustrates what can be achieved with CEP and event-driven architecture. Familiarity with Oracle Complex Event Processing is not assumed or required.
Use Cases
JSR 179 is the Location API for J2ME. From its name, you can figure out that this JSR deals with location and positioning of devices by use of their embedded GPS.
Knowing what can be achieved with JSR 179, I present two very similar use cases based on interesting business scenarios that can be delivered with CEP:
  • Average Speed. This CEP application shows a business use case with aggregation of data. This aggregation is the average speed of a user sent by the mobile device every two minutes. If the user is stuck in traffic, a new event will be generated, which might result in a Short Message Service (SMS) message telling that person how to get out of the traffic or what radio station to tune in for traffic reports.
  • AreaOfInterest. This CEP application is very similar to the preceding one. It gets events from the equipment (a mobile phone with GPS, in my case—but it could be anything compatible with the JSR) and analyzes whether the location sent is next to or in an area of interest. If it is, another event will be generated that can send an SMS message informing the customer about some kind of event in that vicinity. For this example, the location is Ibirapuera Park in SÃo Paulo, Brazil.
Technical Overview
For the example, we have a Java 2 Platform, Micro Edition (J2ME) application based on the official JSR 179 example . The mobile application collects the location information (latitude, longitude, and speed) and the phone's International Mobile Equipment Identity (IMEI) to be able to send an SMS message to the phone later and post this data to a socket, using a DataOutputStream.
Both CEP applications receive the location from the mobile device as a socket message. In the example, receive is achieved by a single thread. That would not be the case for production applications, for which you should consider better approaches such as using pooled threads or nonblocking I/O (NIO) to receive several messages at the same time. Also, the event generated when the desired behavior is recognized by CEP is simply printed to the console to keep the example simple. In a real-world application, you would probably send an SMS message to the user or do something similar.
Deep Dive: CEPMobileApp
The CEP mobile application (CEPMobileApp) receives notifications about changes in the location of the device and uses a wait/notify mechanism to update the device screen and send events to Oracle Complex Event Processing about the location and speed of the device.
Location information is obtained as an instance of javax.microedition.location.Location. This class has a getSpeed() method, which returns the current device speed in meters per second. Another method of interest is getQualifiedCoordinates(), which returns an object of type javax.microedition.location.QualifiedCoordinates, which is responsible for providing the current latitude and longitude. The code is similar to the following (assume that l is an instance of location):
QualifiedCoordinates qc = l.getQualifiedCoordinates();
double latitude = qc.getLatitude();
double longitude = qc.getLongitude();
double speed = l.getSpeed();
The phone's IMEI is obtained through a System.getProperty() call. Because J2ME works on other types of devices besides mobile phones, there is no standardized way to get this info. For my Nokia e71 device, the following works fine:String imei = System.getProperty("com.nokia.mid.imei");
Finally, we create a user for the Oracle Data Integrator work schema. For some of the data validation and transformation operations, Oracle Data Integrator needs to create (temporary) work objects. It is best practice to have these objects in a separate schema to avoid confusion between data warehouse objects and the temporary work objects.String imei = System.getProperty("com.nokia.mid.imei");The final step is to send the event to Oracle Complex Event Processing, which is done with a socket. The code below shows how this is accomplished: private void sendLocationToCep(double latitude, double longitude,
   double speed, String imei) throws Exception {

   StreamConnection connection = null;
   DataOutputStream dos = null;

   try {
   connection =
       (StreamConnection) Connector.open(
       "socket://" + SOCKET_ADDRESS);
   dos = connection.openDataOutputStream();

   dos.writeDouble(latitude);
   dos.writeDouble(longitude);
   dos.writeDouble(speed);
   dos.write(imei.getBytes());
   dos.flush();
   } finally {
   close(dos);
   close(connection);
   }
}
If you look carefully at the code, you'll see some interesting things:
  • The socket address is specified externally. Actually, it's part of the Java Application Descriptor (JAD) deployment descriptor and read by the MIDlet class (com.oracle.otn.cep.mobile.Main) with the getAppProperty() method.
  • Data is written to the server as raw bytes. This minimizes the bandwidth consumed by the application, because few bytes are written for each event. For a real-world application, the best you can do is create some kind of protocol to avoid receiving inconsistent data.
  • The socket is closed after every transmission. I've kept it this way to make things simple. If you are developing for a real-world system, consider some kind of keep-alive to avoid the overhead of opening and closing the connection, especially in a resource-constrained device.
Average Speed
As already mentioned, the Average Speed application recognizes patterns that may indicate that the user is stuck in traffic and send an SMS message to that person, suggesting alternative routes, a radio station with traffic reports, or something similar. Actually, as already mentioned, the event sink in the example is simply a mock implementation that prints the IMEI and the average speed to the standard out, but you are free to implement your own logic.
Figure 1 shows the event processing network (EPN) of the Average Speed application.

Figure 1: Average Speed event processing network

Analyzing the EPN, we see what happens when an event gets inside Oracle Complex Event Processing. The first node of the EPN, called locationReceiverAdapter and represented by com.oracle.otn.cep.LocationReceiverAdapter, is the one responsible for receiving the event. [[Stet original sentence if the part between the commas refers to the event rather than the node.]] This class is registered inside the application as a thread, as you can see here:
public class LocationReceiverAdapter
implements RunnableBean, StreamSource {
    //source code omitted
}
The StreamSource is an interface that forces this class to have a setEventSender() method to receive an instance of com.bea.wlevs.ede.api.StreamSender. This object received is later used to send an event through the EPN. The RunnableBean is an Oracle Complex Event Processing interface that extends java.lang.Runnable, the traditional threading interface, and the SuspendableBean interface is another one from the CEP API, forcing the class to implement a method that enables it to be notified when the bean should be suspended by the CEP server infrastructure. We use this feature to stop the thread and close the server socket, so that when the application is started again, the TCP/IP address and port will be available.
As an ordinary thread, this class has a run() method implemented, and the thread is started automatically by the server infrastructure—you do not need to call start() on this thread. Oracle Complex Event Processing will do that for you. The following is the run() method source code:
1.public void run() {
2. try {
3.  serverSocket = new ServerSocket(5555, 0, InetAddress
4.   .getByName("localhost"));

5.  suspended = false;

6.  while (!isSuspended()) {
7.   Socket socket = null;
8.   DataInputStream dis = null;

9.    try {
10.   socket = serverSocket.accept();
11.   dis = new DataInputStream(socket.getInputStream());

12.   double latitude = dis.readDouble();
13.   double longitude = dis.readDouble();
14.   double speed = dis.readDouble();

15.   byte[] imeiBytes = new byte[15];
16.   dis.read(imeiBytes);

17.   String imei = new String(imeiBytes);

18.   System.out.println("Received: " + latitude + ", " + longitude + ", "
19.     + speed + ", " + imei);

20.   generateEvent(latitude, longitude, speed, new String(imeiBytes));

21.    } catch (Exception e) {
22.   System.out.println("Problems reading socket: " + e);

23.    } finally {
24.   close(dis);
25.   close(socket);
26.    }
27.   }
28.  } catch (Exception e) {
29.   e.printStackTrace();
30.  } finally {
31.   close();
32.  }
33. }
Let's analyze it line by line. On lines 3 and 4, the server socket is created, and on line 10, this socket keeps waiting to receive a connection. As already mentioned, this is not the best option for receiving many simultaneous connections and was used just to keep the example simple. On line 6, a loop is started that checks to see if the bean was suspended or not. If not, a new iteration of the loop will be executed. On line 11, a java.io.DataInputStream is created, so the socket bytes can be read as datatypes. From line 12 to line 17, data is read, and on line 20, we use the received data to call the generateEvent method, which is responsible for sending the event. The source of the generateEvent method is shown here:1. private void generateEvent(double latitude, double longitude, double speed,
2.   String imei) {
3.  LocationEvent event = new LocationEvent(latitude, longitude,
4.    getSpeedInKmPerHour(speed), imei);
5.  eventSender.sendInsertEvent(event);
6. }
You can see that the code is extremely simple. A JavaBean is created, represented by the class LocationEvent, and the event is sent by the eventSender object, which was injected by the CEP infrastructure.
After raw data is received and the event is generated, the event is sent to a channel. The channel's responsibility is to receive and normalize the event. The channel is materialized as an XML declaration inside the application configuration file, as shown here:
<wlevs:channel id="locationInputChannel" event-type="LocationEvent">
<wlevs:listener ref="speedingProcessor" />
<wlevs:source ref="locationReceiverAdapter" />
</wlevs:channel>
The channel is connected to a processor, the speedingProcessor, which is responsible for processing the event and recognizing patterns within the cloud of events. The processor is declared as a simple XML element inside the configuration file:<wlevs:processor id="speedingProcessor" />
Besides that, the processor executes queries against the events, so it's capable of recognizing the patterns, based on what is specified by the queries. These queries obey the Continuous Query Language (CQL) syntax and are declared in the processor configuration file:<n1:config xmlns:n1="http://www.bea.com/ns/wlevs/config/application"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<processor>
<name>speedingProcessor</name>
<rules>
  <query id="averageSpeedingRule">
   <![CDATA[
      select imei, avg(speed) as averageSpeed
      from locationInputChannel [range 2 minutes slide 2 minutes]
      group by imei
      having avg(speed) <= 20
   
  </query>
</rules>
</processor>
</n1:config>
The CQL is very powerful. You can execute complex queries against the events, using SQL-like syntax along with native support for XML, regular expressions, aggregation, subqueries (views), and definition of custom functions written in Java—among a lot of other features. If you intend to learn Oracle Complex Event Processing for real, get a deep understanding of CQL!
Analyzing the CQL query of our example,
select imei, avg(speed) as averageSpeed
from locationInputChannel [range 2 minutes slide 2 minutes]
group by imei
having avg(speed) <= 20
we see that it's a SELECT clause that groups events by IMEI and select those with an average less than or equal to 20 kilometers per hour. The last two minutes of data is considered, using the range operator. Along with that, we use the slide operator to slide the data window analyzed and get results output every 2 minutes.
If CEP recognizes an IMEI with an average speed of less than 20 kilometers per hour in the last two minutes, an event will be generated by the processor. The generated event is a MessageEvent, represented by the com.oracle.otn.cep.MessageEvent class. The channel configured to be the output channel for the processor defines the event type generated. The data used to populate such an event is defined by the SELECT clause. In our case, this is the messageOutputChannel, whose XML declaration is shown here:
<wlevs:channel id="messageOutputChannel" event-type="MessageEvent"
advertise="true">
<wlevs:listener>
  <bean class="com.oracle.otn.cep.MessageSenderBean" />
</wlevs:listener>
<wlevs:source ref="speedingProcessor" />
</wlevs:channel>
After passing through the channel, the event is sent to the configured event sink, which is a simple JavaBean where you can place any logic you want. For the sake of the example's simplicity, I just print a message to the console, but you could change this code to send an SMS message. The event sink for this example is the MessageSender component and is implemented by the com.oracle.otn.cep.MessageSenderBean class, shown here:import com.bea.wlevs.ede.api.StreamSink;

public class MessageSenderBean implements StreamSink {

public void onInsertEvent(Object event) {
if (event instanceof MessageEvent) {
  MessageEvent messageEvent = (MessageEvent) event;
  System.out.println("=====> Found a Customer stuck in traffic. IMEI: "
    + messageEvent.getImei() + " - Average Speed: "
    + messageEvent.getAverageSpeed());
}
}
}
As you can see in this example, Oracle Complex Event Processing delivers a lot of power that enables you to analyze incoming events and search for patterns. It also enables you to think of new business paradigms that previously seemed impossible. Along with that, Oracle Complex Event Processing enables you to analyze thousands of events per second with minimal overhead by using the Oracle JRockit Real Time Java Virtual Machine (JVM), which delivers predictable garbage collection pause times, enabling you to consider very small time frames for the patterns without losing events due to GC execution.
The next example illustrates a similar use case, leveraging the same mobile application but delivering more business features. From a technical perspective, you will see how to define a custom Java function to be used inside your CQL query.
AreaOfInterest
The AreaOfInterest application is very similar to Average Speed. The major difference is that we will not work with aggregation of event data; instead, we will inspect each incoming event to see if the user is near or in an area of interest and, if so, we will react to this scenario.
Accomplishing the verification of proximity to an area of interest involves calculating the distance between the coordinate points of the user, sent by the GPS device, and the coordinates of interest. This example takes into consideration the distance as the crow flies from point A to point B. This distance is calculated with a custom function registered from the com.oracle.otn.cep.Distance Java class. I found the code for the distance calculation on the zips.sourceforge.net Web site and just changed it to return the distance in kilometers, not miles, per hour.
The coordinates for the area of interest are hard-coded into the Distance class. For a real-world application, you'd probably get a list of coordinates from a cache provider (Coherence, for example) or from the database. I decided to keep things this way to reduce the example's complexity.
Let's take a look at the AreaOfInterest application EPN in Figure 2.

Figure 2: reaOfInterest event processing network
As you can see, the EPN is very similar to the one you saw before, except that it has an extra bean declaration, represented by the Distance node.
Because of the similarity of the two examples, I'll focus on the only difference: the areaOfInterestProcessor processor. The declaration of this processor is as follows:
<wlevs:processor id="areaOfInterestProcessor">
<wlevs:function function-name="distanceToAreaOfInterestInKm"
  exec-method="distanceToAreaOfInterestInKm">
  <bean class="com.oracle.otn.cep.Distance" />
</wlevs:function>
</wlevs:processor>
The function element declares a function named by the function-name attribute. The method executed is specified by the exec-method, which must be a method of the class specified in the nested bean declaration. In our case, we declare a CQL function, distanceToAreaOfInterestInKm, which internally executes the com.oracle.otn.cep.Distance.distanceToAreaOfInterestInKm() method.
The Distance class is shown here:
public class Distance {
private static final double LATITUDE_OF_INTEREST = -23.589823955777362;
private static final double LONGITUDE_OF_INTEREST = -46.662063002586365;

public static double distanceToAreaOfInterestInKm(double latitude,
  double longitude) {
return distanceInKm(latitude, longitude, LATITUDE_OF_INTEREST,
   LONGITUDE_OF_INTEREST);
}

public static double distanceInKm(double latA, double longA, double latB,
  double longB) {

double theDistance = (Math.sin(Math.toRadians(latA))
   * Math.sin(Math.toRadians(latB)) + Math.cos(Math.toRadians(latA))
   * Math.cos(Math.toRadians(latB))
   * Math.cos(Math.toRadians(longA - longB)));

return (((Math.toDegrees(Math.acos(theDistance))) * 69.09) * 1.609344);
}
}
The next point is the CQL query we execute:  <processor>
<name>areaOfInterestProcessor</name>
<rules>
  <view id="DistanceCalculatedView" schema="imei distance">
   <![CDATA[
    select imei, distanceToAreaOfInterestInKm(latitude, longitude) as distance
    from positioningInputChannel [Now]
   
  </view>

  <query id="ProximityRule">
   <![CDATA[
      select * from
      DistanceCalculatedView
     where distance <= 1.5
   
  </query>
</rules>
</processor>
The first interesting aspect you may have noticed is the absence of aggregation. Another nice thing is that a view is being used. Views act as subqueries for CQL clauses. The use of DistanceCalculatedView helps us avoid calling the distanceToAreaOfInterestInKm function twice; it would have to be specified in the SELECT and WHERE clauses if no view were used. The schema element specifies which elements are exposed by the view.
The ProximityRule query acts over the DistanceCalculatedView view to select only the events whose distance is less than 1.5 kilometers from Ibirapuera Park. Because the view does the distance calculation, the query just selects the view records whose distance attribute is less than 1.5 kilometers.
After that, a new event is generated, getting to the MessageSenderBean , and is printed to the standard output, as shown below:
System.out
.println("=====> Found a Customer next to an Area Of Interest. IMEI: "
   + messageEvent.getImei()
   + " - Distance: "
   + messageEvent.getDistance());
Installing and Running the Sample Applications
The two Oracle Complex Event Processing projects and the J2ME application are available for download here. After setting up your local environment with an Oracle Complex Event Processing server, Eclipse plus Oracle Complex Event Processing plug-in, and NetBeans, you can import the projects into each IDE and deploy them . to the server.
After importing the CEPMobileApp project into NetBeans, right-click the project -> Properties -> Application Descriptor, which gets you to the screen shown in Figure 3.

Figure 3: J2ME application descriptor properties
On this screen, you can specify a custom property value for Socket-Address property. This will be used by the J2ME application to know where to connect and send data. This address will probably be the address of your IP exposed to the internet. If you are inside some kind of LAN, it may be necessary to use port forwarding or something similar to forward requests from the Web to your local computer where Oracle Complex Event Processing is running.
After that, build the project and install CEPMobileApp.jar in your cell phone or run your application by using the J2ME emulator.

Conclusion
You may have liked the examples from a technical perspective, but if you look at the business appeal of both examples, you will realize that the use of events enables you to deliver solutions that seemed impossible before. Given that Oracle Complex Event Processing is made to scale, you will be able to use them to handle thousands of concurrent requests if you make some small adjustments to the examples.
Going further, think about other use cases in which this kind of solution can be applied, and you will see where the concept of complex event processing can take you.

   
Daniel Amadei is a principal consultant for Oracle Consulting in Brazil, specializing in SOA and integration technologies. He has been working with SOA for the last years and Java technologies since 1999. Among other certifications, he is certified as an Oracle SOA Architect Certified Expert and Sun Certified Enterprise Architect (SCEA).
         
« Last Edit: March 29, 2010, 08:25:29 PM by Mike »