Universal Messaging Concepts : Architecture : umTransport API
umTransport API
Universal Messaging offers, in addition to its standard full-featured client-server API, an extremely lightweight client-client communication API known as the umTransport API.
Broker-based Model
Historically, messaging architecture has predominantly been based on a 'broker in the middle' approach. This is often referred to as 'hub and spoke'. The broker acts as the communications hub, routing messages between logically decoupled peers:
The pub-sub model is a common paradigm for broker based architecture, where one or more publishers send messages to the broker, which then distributes the messages to interested consumers.
umTransport Model
The umTransport model is a peer to peer model that allows peers to be aware of how to communicate directly with one another rather than through a broker. In effect, each publisher peer acts like a server, and each consumer can communicate directly with the publishers:
While this model bypasses broker messaging functionality such as persistence or transactional semantics, it results in a considerably lower latency delivery of information from a publisher to a consumer. By halving of the number of "hops" between client and publisher, latency too is effectively halved. This is especially useful when ultra-low latency message delivery is paramount (in, for example, the links between pricing, quant and risk engines in FX trading platforms).
The umTransport API is currently available for Java and C++. For Java, it is located in the com.softwareag.um.io package. For C++, it is located in com::softwareag::umtransport.
The Java API
The Java API is very simple, allowing each client to accept connections from other clients, and to receive arbitrary binary data from these clients synchronously or asynchronously. In many ways the API is similar to a standard TCP socket API, but offers the additional benefit of being able to use not just TCP sockets as a communication transport, but any of the following Universal Messaging communication technologies:
*TCP Sockets: data is transmitted directly over TCP Sockets
*SSL: data is SSL encrypted then transmitted over TCP Sockets
*SHM: data is transmitted via Shared Memory (for near-instant access by processes on the same machine)
*RDMA: data is transmitted via Remote Direct Memory Access (for access by processes on a remote machine; requires network adapters that support RDMA)
The C++ API
The C++ API provides a subset of the functionality available in the Java API, with the following restrictions:
*The C++ API does not support asynchronous communication between clients.
*The C++ API does not support RDMA as a communication transport.
Using the Java API
Let's take a quick look at how to use this API. Here is an example "echo" Java client and server; the EchoClient will write a string to the EchoServer; the EchoServer will respond to the EchoClient.
Here's the Java EchoClient:
package com.softwareag.um.io.samples.echo;
import com.softwareag.um.io.ClientContextBuilderFactory;
import com.softwareag.um.io.ClientTransportContext;
import com.softwareag.um.io.SynchronousTransport;
import com.softwareag.um.io.TransportFactory;
import com.softwareag.um.io.samples.SimpleMessage;
import com.softwareag.um.io.samples.SynchronousClient;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
/**
* This sample app simply writes a string entered into the console to an EchoServer
* The EchoServer will respond and this response will be output on the console.
*/
public class EchoClient {
public EchoClient(String url) throws IOException {
//Use the factory to generate the required builder based on the protocol
// in the url string
ClientTransportContext context =
ClientContextBuilderFactory.getBuilder(url).build();
//We do not pass any handlers to the connect method because we want a
// synchronous transport
SynchronousTransport transport = TransportFactory.connect(context);
//This is just a basic wrapper for the client transport so it is easier to
// read/write messages
SynchronousClient<SimpleMessage> client = new SynchronousClient<>(transport);
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
//Start a new thread to read from the client transport because read is a
// blocking call
new ReadThread(client);
//Now continue to write messages to the EchoServer until the user enter 'quit'
while(true){
System.out.println("Enter a message or type 'quit' to exit >");
String line = br.readLine();
if(line.equalsIgnoreCase("quit")){
break;
}
else{
client.write(new SimpleMessage(line));
}
}
}
private static class ReadThread extends Thread{
SynchronousClient<SimpleMessage> client;
public ReadThread(SynchronousClient<SimpleMessage> _client){
client = _client;
start();
}
@Override
public void run(){
try{
while(true){
SimpleMessage mess = client.read(new SimpleMessage());
System.out.println(mess.toString());
}
}
catch(Exception e){
e.printStackTrace();
}
}
}
public static void main(String[] args) throws IOException {
if(args.length == 0){
usage();
System.exit(1);
}
new EchoClient(args[0]);
}
public static void usage(){
System.out.println("EchoClient <URL>");
System.out.println("<Required parameters>");
System.out.println(
"\tURL - protocol://host:port for the server to connect to e.g. "
+TransportFactory.SOCKET+"://localhost:9000");
}
}
And, the EchoServer:
package com.softwareag.um.io.samples.echo;
import com.softwareag.um.io.ServerContextBuilderFactory;
import com.softwareag.um.io.ServerTransportContext;
import com.softwareag.um.io.SynchronousServerTransport;
import com.softwareag.um.io.SynchronousTransport;
import com.softwareag.um.io.TransportFactory;
import com.softwareag.um.io.samples.SimpleMessage;
import com.softwareag.um.io.samples.SynchronousClient;
import java.io.IOException;
/**
* This sample will only handle one client connection at a time. When a client
* connects, the EchoServer will immediately respond to any messages with exactly
* the same message.
*/
public class EchoServer implements Runnable{
private volatile SynchronousClient<SimpleMessage> client;
private final SynchronousServerTransport transport;
private volatile boolean stopped = false;
public EchoServer(String url) throws IOException {
//The factory will create the correct context based on the protocol in the url
ServerTransportContext context =
ServerContextBuilderFactory.getBuilder(url).build();
//Because we have not passed an AcceptHandler into the bind method, we are returned
//a SynchronousServerTransport. This means we have to call accept on the transport
//to accept new client transports.
transport = TransportFactory.bind(context);
}
public static void main(String[] args) throws IOException {
if(args.length == 0){
usage();
System.exit(1);
}
EchoServer echoServer = new EchoServer(args[0]);
Thread t = new Thread(echoServer);
t.start();
System.out.println("Press enter to quit.");
System.in.read();
echoServer.close();
}
public static void usage(){
System.out.println("EchoServer <URL>");
System.out.println("<Required parameters>");
System.out.println(
"\tURL - protocol://host:port to bind the server transport to e.g. "
+TransportFactory.SOCKET+"://localhost:9000");
}
protected void close(){
stopped = true;
client.close();
transport.close();
}
@Override
public void run() {
try{
while(true){
System.out.println("Waiting for client");
//accept() will block until a client makes a connection to our server
SynchronousTransport clientTransport = transport.accept();
System.out.println("Client connected. Echo service started.");
//The SyncronousClient is simply a wrapper to make reading/writing easier
client = new SynchronousClient<>(clientTransport);
try{
while(!stopped){
client.write(client.read(new SimpleMessage()));
}
}
catch (IOException e){
System.out.println("Connection closed");
}
}
}
catch(IOException e){
e.printStackTrace();
}
}
}
Using the C++ API
Here's the C++ EchoClient:
#include "EchoClient.h"

#include "ClientTransportContextFactory.h"
#include "TransportFactory.h"
#include <utility>
#include <iostream>


com::softwareag::umtransport::samples::echo::EchoClient::EchoClient(std::string url)
{
m_url = url;
}

com::softwareag::umtransport::samples::echo::EchoClient::~EchoClient()
{
}

void com::softwareag::umtransport::samples::echo::EchoClient::run(){
try{
//Use the factory to generate the required builder based on the protocol in
// the url string
auto context = ClientTransportContextFactory::build(m_url);

//We do not pass any handlers to the connect method because we want a
// synchronous transport
auto transport = TransportFactory::connect(std::move(context));

//This is just a basic wrapper for the client transport so it is easier to
// read/write messages
SynchronousClient<SimpleMessage> client(std::move(transport));

//Start a new thread to read from the client transport because read is a
// blocking call
ReadThread readThread(client);
Poco::Thread th;
th.start(readThread);

bool canRun = true;
//Now continue to write messages to the EchoServer until the user enter 'quit'
while (canRun){
std::cout << "Enter a message or type 'quit' to exit >" << std::endl;
std::string input;
std::getline(std::cin, input);
if (input == "quit"){
canRun = false;
}
else{
client.write(SimpleMessage(input));
}
}
readThread.shutdown();
client.close();
th.tryJoin(10000);
}
catch (Poco::Exception &ex){
std::cout << ex.displayText();
}
}

int com::softwareag::umtransport::samples::echo::EchoClient::main(int argc,
char** argv){
if (argc < 2){
std::cout <<
"EchoClient <URL>\n<Required parameters>\n\tURL -
protocol://host:port for the server to connect to e.g. tcp://localhost:9000" <<
std::endl;
}
EchoClient client(argv[1]);
client.run();
return 0;
}

com::softwareag::umtransport::samples::echo::EchoClient::ReadThread::ReadThread(
SynchronousClient<SimpleMessage>
&client) : m_client(client){
}

com::softwareag::umtransport::samples::echo::EchoClient::ReadThread::~ReadThread(){
}

void com::softwareag::umtransport::samples::echo::EchoClient::ReadThread::shutdown(){
canRun = false;
}

void com::softwareag::umtransport::samples::echo::EchoClient::ReadThread::run(){
try {
while (canRun){
SimpleMessage message;
m_client.read(message);
std::cout << "Message Content: " << message << std::endl;
}
}
catch (Poco::Exception& e) {
std::cout << "Connection Closed " << e.displayText();
}
}
And here's the EchoServer:
#include "EchoServer.h"
#include "ServerTransportContextFactory.h"
#include "TransportFactory.h"
#include "SynchronousClient.h"
#include "SimpleMessage.h"

using namespace com::softwareag::umtransport;


com::softwareag::umtransport::samples::echo::EchoServer::EchoServer(std::string url){
//The factory will create the correct context based on the protocol in the url
//Because we have not passed an AcceptHandler into the bind method, we are returned
//a SynchronousServerTransport. This means we have to call accept on the transport
//to accept new client transports.
m_transport = TransportFactory::bind(ServerTransportContextFactory::build(url));
}

com::softwareag::umtransport::samples::echo::EchoServer::~EchoServer(){

}

int com::softwareag::umtransport::samples::echo::EchoServer::main(int argc,
char** argv){
if (argc < 2){
std::cout << "EchoServer <URL>" << std::endl << "EchoServer <URL>" << std::endl <<
"\tURL - protocol://host:port to bind the server transport to
e.g. tcp://localhost:9000" <<
std::endl;
exit(1);
}
EchoServer echoServer(argv[1]);
Poco::Thread th;
th.start(echoServer);

std::cout << "Press any key to finish" << std::endl;
std::cin.ignore();

echoServer.close();
th.tryJoin(10000);
return 0;
}

void com::softwareag::umtransport::samples::echo::EchoServer::close(){
stopped = true;
m_transport->close();
}

void com::softwareag::umtransport::samples::echo::EchoServer::run(){
try{
while (!stopped){
std::cout << "Waiting for a client" << std::endl;
//accept() will block until a client makes a connection to our server
SynchronousClient<SimpleMessage> client(m_transport->accept());
//Client connected echo servcie started
try{
while (!stopped){
SimpleMessage msg;
client.read(msg);
client.write(msg);
}
}
catch (Poco::Exception & ex){
std::cout << "Connection Closed" << std::endl;
}
}
}
catch (Poco::Exception &ex){
std::cout << ex.displayText() << std::endl;
}
}
Copyright © 2013-2016 Software AG, Darmstadt, Germany.

Product LogoContact Support   |   Community   |   Feedback