Tales From A Lazy Fat DBA

Its all about Databases, their performance, troubleshooting & much more …. ¯\_(ツ)_/¯

Archive for June, 2025

Demo on how to use Oracle XStream Replication Setup Using Java Client

Posted by FatDBA on June 15, 2025

Oracle XStream is a feature of Oracle Database that allows for real-time data replication between databases. Unlike Oracle GoldenGate, which is more GUI-driven and feature-rich, XStream is more code-centric. This makes it highly customizable, especially suitable when you want a Java or C-based client to control how data flows.

XStream vs GoldenGate

  • XStream is older and less frequently updated compared to Oracle GoldenGate.
  • XStream is limited to Oracle-to-Oracle replication and does not support heterogeneous environments.
  • GoldenGate supports a wide range of sources and targets, is easier to manage with its GUI-based tools, and has better support for DDL replication, integrated capture, and coordinated replication.

When and Why to Use XStream

Use XStream when:

  • You need fine-grained control using custom applications written in Java or C.
  • Your environment is Oracle-to-Oracle only.
  • You want a lightweight, programmatic replication tool that can be embedded in your application logic.
  • Licensing or infrastructure limitations do not permit GoldenGate and licen sing fees is a concern.

Do not use XStream if:

  • You require GUI-driven replication monitoring and setup.
  • You need heterogeneous replication (e.g., Oracle to PostgreSQL, MySQL, etc.).
  • Your use case demands continuous support and new feature releases.

Lets do a quick demo on how to use it, I have setup a lab environment for the test.

  • Source Database: dixitdb on 192.168.68.79:1521
  • Target Database: targetdb on 192.168.68.85:1521

Java Application (xio.java) will:

  • Connect to XStream Outbound on the source DB
  • Connect to XStream Inbound on the target DB
  • Transfer changes using Logical Change Records (LCRs)

Source Database Setup (dixitdb)

Step 1: Enable Replication Features

ALTER SYSTEM SET enable_goldengate_replication = TRUE SCOPE=BOTH;

Step 2: Ensure Archive Logging is Enabled

archive log list;

Must show: Database log mode              Archive Mode

Step 3: Create and Grant User for XStream

CREATE USER xstream_admin IDENTIFIED BY Welcome123;
GRANT CONNECT, RESOURCE, DBA TO xstream_admin;

Step 4: Enable Supplemental Logging

ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

Step 5: Create Demo Table

CREATE TABLE xstream_admin.demo_table (
  id    NUMBER PRIMARY KEY,
  name  VARCHAR2(100),
  value NUMBER
);

Step 6: Grant XStream Admin Privilege

BEGIN
  DBMS_XSTREAM_AUTH.GRANT_ADMIN_PRIVILEGE(
    grantee => 'XSTREAM_ADMIN',
    privilege_type => 'CAPTURE',
    grant_select_privileges => TRUE
  );
END;
/

Step 7: Create Outbound Server

BEGIN
  DBMS_XSTREAM_ADM.CREATE_OUTBOUND(
    server_name => 'XOUT_SRV',
    connect_user => 'XSTREAM_ADMIN'
  );
END;
/

Check status:

SELECT capture_name, status FROM dba_capture;


Target Database Setup (targetdb)

Step 1: Create the Same User

CREATE USER xstream_admin IDENTIFIED BY Welcome123;
GRANT CONNECT, RESOURCE, UNLIMITED TABLESPACE TO xstream_admin;

Step 2: Setup Queue

BEGIN
  DBMS_XSTREAM_ADM.SET_UP_QUEUE(
    queue_table => 'xstream_admin.XIN_SRV',
    queue_name  => 'xstream_admin.XIN_SRV'
  );
END;
/

Step 3: Create Inbound Server

BEGIN
  DBMS_XSTREAM_ADM.CREATE_INBOUND(
    server_name =>'XIN_SRV',
    queue_name  =>'xstream_admin.XIN_SRV',
    apply_user  =>'xstream_admin',
    comment     =>'xstream_admin in'
  );
END;
/

Step 4: Create Target Table

CREATE TABLE xstream_admin.demo_table (
  id    NUMBER PRIMARY KEY,
  name  VARCHAR2(100),
  value NUMBER
);

Step 5: Add Table Rules

VAR dml_rule VARCHAR2(30);
VAR ddl_rule VARCHAR2(30);
BEGIN
  DBMS_XSTREAM_ADM.ADD_TABLE_RULES(
    table_name => 'xstream_admin.demo_table',
    streams_type => 'APPLY',
    streams_name => 'XIN_SRV',
    queue_name => 'xstream_admin.XIN_SRV',
    source_database => 'dixitdb',
    dml_rule_name => :dml_rule,
    ddl_rule_name => :ddl_rule
  );
END;
/

Step 6: Start Apply Process

BEGIN
  DBMS_APPLY_ADM.START_APPLY(apply_name => 'XIN_SRV');
END;
/

Check status:
SELECT apply_name, status FROM dba_apply;

Java Application: xio.java

The xio.java program acts as a custom replication engine. It:

  • Connects to XStream Inbound and Outbound
  • Receives LCRs (Logical Change Records) from the source
  • Sends them to the target

Pre-Requisites:

  • ojdbc8-19.15.0.0.jar (Oracle JDBC driver)
  • xstreams.jar (from $ORACLE_HOME/rdbms/jlib/)
import oracle.streams.*;
import oracle.jdbc.internal.OracleConnection;
import oracle.jdbc.*;
import oracle.sql.*;
import java.sql.*;
import java.util.*;

public class xio
{
public static String xsinusername = null;
public static String xsinpasswd = null;
public static String xsinName = null;
public static String xsoutusername = null;
public static String xsoutpasswd = null;
public static String xsoutName = null;
public static String in_url = null;
public static String out_url = null;
public static Connection in_conn = null;
public static Connection out_conn = null;
public static XStreamIn xsIn = null;
public static XStreamOut xsOut = null;
public static byte[] lastPosition = null;
public static byte[] processedLowPosition = null;

public static void main(String args[])
{
// get connection url to inbound and outbound server
in_url = parseXSInArguments(args);
out_url = parseXSOutArguments(args);

// create connection to inbound and outbound server
in_conn = createConnection(in_url, xsinusername, xsinpasswd);
out_conn = createConnection(out_url, xsoutusername, xsoutpasswd);

// attach to inbound and outbound server
xsIn = attachInbound(in_conn);
xsOut = attachOutbound(out_conn);

// main loop to get lcrs
get_lcrs(xsIn, xsOut);

// detach from inbound and outbound server
detachInbound(xsIn);
detachOutbound(xsOut);
}

// parse the arguments to get the conncetion url to inbound db
public static String parseXSInArguments(String args[])
{
String trace, pref;
String orasid, host, port;

if (args.length != 12)
{
printUsage();
System.exit(0);
}

orasid = args[0];
host = args[1];
port = args[2];
xsinusername = args[3];
xsinpasswd = args[4];
xsinName = args[5];

System.out.println("xsin_host = "+host);
System.out.println("xsin_port = "+port);
System.out.println("xsin_ora_sid = "+orasid);

String in_url = "jdbc:oracle:oci:@"+host+":"+port+":"+orasid;
System.out.println("xsin connection url: "+ in_url);

return in_url;
}

// parse the arguments to get the conncetion url to outbound db
public static String parseXSOutArguments(String args[])
{
String trace, pref;
String orasid, host, port;

if (args.length != 12)
{
printUsage();
System.exit(0);
}

orasid = args[6];
host = args[7];
port = args[8];
xsoutusername = args[9];
xsoutpasswd = args[10];
xsoutName = args[11];


System.out.println("xsout_host = "+host);
System.out.println("xsout_port = "+port);
System.out.println("xsout_ora_sid = "+orasid);

String out_url = "jdbc:oracle:oci:@"+host+":"+port+":"+orasid;
System.out.println("xsout connection url: "+ out_url);

return out_url;
}

// print out sample program usage message
public static void printUsage()
{
System.out.println("");
System.out.println("Usage: java xio "+"<xsin_oraclesid> " + "<xsin_host> "
+ "<xsin_port> ");
System.out.println(" "+"<xsin_username> " + "<xsin_passwd> "
+ "<xsin_servername> ");
System.out.println(" "+"<xsout_oraclesid> " + "<xsout_host> "
+ "<xsout_port> ");
System.out.println(" "+"<xsout_username> " + "<xsout_passwd> "
+ "<xsout_servername> ");
}

// create a connection to an Oracle Database
public static Connection createConnection(String url,
String username,
String passwd)
{
try
{
DriverManager.registerDriver(new oracle.jdbc.OracleDriver());
return DriverManager.getConnection(url, username, passwd);
}
catch(Exception e)
{
System.out.println("fail to establish DB connection to: " +url);
e.printStackTrace();
return null;
}
}

// attach to the XStream Inbound Server
public static XStreamIn attachInbound(Connection in_conn)
{
XStreamIn xsIn = null;
try
{
xsIn = XStreamIn.attach((OracleConnection)in_conn, xsinName,
"XSDEMOINCLIENT" , XStreamIn.DEFAULT_MODE);

// use last position to decide where should we start sending LCRs
lastPosition = xsIn.getLastPosition();
System.out.println("Attached to inbound server:"+xsinName);
System.out.print("Inbound Server Last Position is: ");
if (null == lastPosition)
{
System.out.println("null");
}
else
{
printHex(lastPosition);
}
return xsIn;
}
catch(Exception e)
{
System.out.println("cannot attach to inbound server: "+xsinName);
System.out.println(e.getMessage());
e.printStackTrace();
return null;
}
}

// attach to the XStream Outbound Server
public static XStreamOut attachOutbound(Connection out_conn)
{
XStreamOut xsOut = null;

try
{
// when attach to an outbound server, client needs to tell outbound
// server the last position.
xsOut = XStreamOut.attach((OracleConnection)out_conn, xsoutName,
lastPosition, XStreamOut.DEFAULT_MODE);
System.out.println("Attached to outbound server:"+xsoutName);
System.out.print("Last Position is: ");
if (lastPosition != null)
{
printHex(lastPosition);
}
else
{
System.out.println("NULL");
}
return xsOut;
}
catch(Exception e)
{
System.out.println("cannot attach to outbound server: "+xsoutName);
System.out.println(e.getMessage());
e.printStackTrace();
return null;
}
}

// detach from the XStream Inbound Server
public static void detachInbound(XStreamIn xsIn)
{
byte[] processedLowPosition = null;
try
{
processedLowPosition = xsIn.detach(XStreamIn.DEFAULT_MODE);
System.out.print("Inbound server processed low Position is: ");
if (processedLowPosition != null)
{
printHex(processedLowPosition);
}
else
{
System.out.println("NULL");
}
}
catch(Exception e)
{
System.out.println("cannot detach from the inbound server: "+xsinName);
System.out.println(e.getMessage());
e.printStackTrace();
}
}

// detach from the XStream Outbound Server
public static void detachOutbound(XStreamOut xsOut)
{
try
{
xsOut.detach(XStreamOut.DEFAULT_MODE);
}
catch(Exception e)
{
System.out.println("cannot detach from the outbound server: "+xsoutName);
System.out.println(e.getMessage());
e.printStackTrace();
}
}

public static void get_lcrs(XStreamIn xsIn, XStreamOut xsOut)
{
if (null == xsIn)
{
System.out.println("xstreamIn is null");
System.exit(0);
}

if (null == xsOut)
{
System.out.println("xstreamOut is null");
System.exit(0);
}

try
{
while(true)
{
// receive an LCR from outbound server
LCR alcr = xsOut.receiveLCR(XStreamOut.DEFAULT_MODE);

if (xsOut.getBatchStatus() == XStreamOut.EXECUTING) // batch is active
{
assert alcr != null;
// send the LCR to the inbound server
xsIn.sendLCR(alcr, XStreamIn.DEFAULT_MODE);

// also get chunk data for this LCR if any
if (alcr instanceof RowLCR)
{
// receive chunk from outbound then send to inbound
if (((RowLCR)alcr).hasChunkData())
{
ChunkColumnValue chunk = null;
do
{
chunk = xsOut.receiveChunk(XStreamOut.DEFAULT_MODE);
xsIn.sendChunk(chunk, XStreamIn.DEFAULT_MODE);
} while (!chunk.isEndOfRow());
}
}
processedLowPosition = alcr.getPosition();
}
else // batch is end
{
assert alcr == null;
// flush the network
xsIn.flush(XStreamIn.DEFAULT_MODE);
// get the processed_low_position from inbound server
processedLowPosition =
xsIn.getProcessedLowWatermark();
// update the processed_low_position at oubound server
if (null != processedLowPosition)
xsOut.setProcessedLowWatermark(processedLowPosition,
XStreamOut.DEFAULT_MODE);
}
}
}
catch(Exception e)
{
System.out.println("exception when processing LCRs");
System.out.println(e.getMessage());
e.printStackTrace();
}
}

public static void printHex(byte[] b)
{
for (int i = 0; i < b.length; ++i)
{
System.out.print(
Integer.toHexString((b[i]&0xFF) | 0x100).substring(1,3));
}
System.out.println("");
}
}


—-> Downloaded compatibile version of ojdbc driver ojdbc8-19.15.0.0.jar from oracle’s website
—- $ORACLE_HOME/rdbms/jlib/xstreams.jar
—-> copy ‘xstreams.jar’ from the ORACLE_HOME
—-> Compile the java code to create classes.
javac -cp “ojdbc8-19.15.0.0.jar:xstreams.jar:.” xio.java

— Once complied, call the java program now.
— Run Java Replication App
— This does: Connect to outbound server, Read LCRs (Logical Change Record), Send LCRs to inbound server and Confirm position/flush status

[oracle@oracleontario lib]$ java -cp "ojdbc8-19.15.0.0.jar:xstreams.jar:." xio targetdb
192.168.68.85 1521 XSTREAM_ADMIN Welcome123 XIN_SRV dixitdb 192.168.68.79 1521
XSTREAM_ADMIN Welcome123 XOUT_SRV
xsin_host = 192.168.68.85
xsin_port = 1521
xsin_ora_sid = targetdb
xsin connection url: jdbc:oracle:oci:@192.168.68.85:1521:targetdb
xsout_host = 192.168.68.79
xsout_port = 1521
xsout_ora_sid = dixitdb
xsout connection url: jdbc:oracle:oci:@192.168.68.79:1521:dixitdb
Attached to inbound server:XIN_SRV
Inbound Server Last Position is: null
Attached to outbound server:XOUT_SRV
Last Position is: NULL

Now when the code is running and source and target is ready. Lets try to do an insert and see if we gets it on the target.

-- on source database
SQL> select * from xstream_admin.demo_table;
ID
NAME
VALUE
----------
-----------------------------------------------------------------------------------------
----------- ----------
 10
Maine


SQL> insert into xstream_admin.demo_table (ID, NAME, VALUE) values (101, 'Calgary', 100);

1 row created.

SQL> commit;

Commit complete.

SQL> /

ID
NAME
VALUE
650 ----------
-----------------------------------------------------------------------------------------
----------- ----------
10
Maine
800
101
Calgary
100



--- on target database
SQL>
SQL> select * from xstream_admin.demo_table;
ID
NAME
VALUE
----------
-----------------------------------------------------------------------------------------
----------- ----------
10
Maine
800
101
Calgary
100

Hope It Helped!
Prashant Dixit

Posted in Uncategorized | Tagged: , , , , , , , , | Leave a Comment »