Sunday, May 22, 2016

OIM Scheduled Job Multi-threading Example

Tested On: Oracle Identity Manager 11.1.2.3.0
Description: Demonstrates using multi-threading in an Oracle Identity Manager scheduled job. The scheduled task example applies changes to OIM users using data given from a CSV file. A thread is created per data entry in CSV file and uses OIM API service to apply changes to the OIM user.
References: 
https://blogs.oracle.com/OIM11gR2/entry/oim_11g_multi_thread_approach

Files
FlatFileUserModificationTestDriver.java : Used to test remotely via OIMClient. Not part of plugin.

users.csv : Sample CSV file.

UserProcessor.java : Runnable object used for threading.

FlatFileUserModification.java : OIM scheduled task code.

FlatFileUserModificationMetadata.xml and plugin.xml : Metadata for OIM scheduled task and OIM plugin.

package com.blogspot.oraclestack.scheduledtasks;
import com.blogspot.oraclestack.objects.UserProcessor;
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Arrays;
import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import oracle.core.ojdl.logging.ODLLevel;
import oracle.core.ojdl.logging.ODLLogger;
import oracle.iam.identity.usermgmt.api.UserManager;
import oracle.iam.platform.Platform;
import oracle.iam.scheduler.vo.TaskSupport;
/**
* An example of a multi-threaded scheduled task.
* The scheduled task applies changes to the OIM users
* using data given from a CSV file. A thread is created per row
* in CSV file excluding the header row.
* @author rayedchan
*/
public class FlatFileUserModification extends TaskSupport
{
// Logger
private static final ODLLogger LOGGER = ODLLogger.getODLLogger(FlatFileUserModification.class.getName());
// OIM Services
// private UserManager usrMgr = Platform.getService(UserManager.class); // Getting a NullPointer Exception when using service in a threading context
private UserManager usrMgr = Platform.getServiceForEventHandlers(UserManager.class, null, "ADMIN","FlatFileUserModification", null);
/**
* Main method for scheduled job execution
* @param hm Map of the scheduled job parameters
* @throws Exception
*/
@Override
public void execute(HashMap hm) throws Exception
{
BufferedReader bReader = null;
try
{
// Get the parameters from the scheduled job
String keyAttrName = (String) hm.get("Key Attribute Name"); // Key Attribute Name to identify OIM User
String filePath = (String) hm.get("File Path");
String delimiter = (String) hm.get("Delimiter");
int numThreads = ((Long) hm.get("Number of Threads")).intValue();
LOGGER.log(ODLLevel.NOTIFICATION, "Scheduled Job Parameters: Key Attribute Name = {0}, File Path = {1}, Delimiter = {2}, Number of Threads = {3}", new Object[]{keyAttrName, filePath, delimiter, numThreads});
if(numThreads <= 0)
{
LOGGER.log(ODLLevel.SEVERE, "Threads Parameter is not valid. Value must be greater than 0.");
throw new Exception("Task Mode Parameter is not valid. Value must be greater than 0.");
}
// Load CSV file for reading
FileReader fReader = new FileReader(filePath);
bReader = new BufferedReader(fReader);
// Get Header Line
String line = bReader.readLine();
if(line == null || "".equalsIgnoreCase(line))
{
throw new Exception("Header must be provided as the first entry in file.");
}
String[] header = line.split(delimiter);
LOGGER.log(ODLLevel.NOTIFICATION, "Header: {0}", new Object[]{Arrays.asList(header)});
// Create Thread Pool
ExecutorService threadExecutor = Executors.newFixedThreadPool(numThreads);
// Initialize base configuration
UserProcessor.initializeConfig(header, delimiter, LOGGER, usrMgr, keyAttrName);
// Process data entries using multi-threading
line = bReader.readLine();
while(line != null)
{
threadExecutor.execute(new UserProcessor(line)); // Create new thread to process line
line = bReader.readLine(); // read next line
}
// Initate thread shutdown
threadExecutor.shutdown();
while(!threadExecutor.isTerminated())
{
// Wait for all event processor threads to complete
}
LOGGER.log(ODLLevel.NOTIFICATION, "Finished scheduled job.");
}
catch(Exception ex)
{
LOGGER.log(ODLLevel.SEVERE, "", ex);
}
finally
{
if(bReader != null)
{
bReader.close();
}
}
}
@Override
public HashMap getAttributes()
{
return null;
}
@Override
public void setAttributes()
{
}
}
<?xml version="1.0" encoding="UTF-8"?>
<scheduledTasks xmlns="http://xmlns.oracle.com/oim/scheduler">
<task>
<name>Flat File User Modification</name>
<class>com.blogspot.oraclestack.scheduledtasks.FlatFileUserModification</class>
<description>Modifies OIM users using data given in a CSV file</description>
<retry>5</retry>
<parameters>
<string-param required="true" encrypted="false" helpText="Key Attribute Name to identify OIM User">Key Attribute Name</string-param>
<string-param required="true" encrypted="false" helpText="Absolute File Path">File Path</string-param>
<string-param required="true" encrypted="false" helpText="Delimiter">Delimiter</string-param>
<number-param required="true" encrypted="false" helpText="Number of Threads">Number of Threads</number-param>
</parameters>
</task>
</scheduledTasks>
package com.blogspot.oraclestack.testdriver;
import com.blogspot.oraclestack.objects.UserProcessor;
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Arrays;
import java.util.Hashtable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import oracle.core.ojdl.logging.ODLLevel;
import oracle.core.ojdl.logging.ODLLogger;
import oracle.iam.identity.usermgmt.api.UserManager;
import oracle.iam.platform.OIMClient;
import oracle.iam.reconciliation.api.EventMgmtService;
import oracle.iam.reconciliation.api.ReconOperationsService;
/**
* Test Driver for testing multi-threading
* Example: Modifies OIM Users using data from CSV file
* @author rayedchan
*/
public class FlatFileUserModificationTestDriver
{
// Logger
private static final ODLLogger LOGGER = ODLLogger.getODLLogger(FlatFileUserModificationTestDriver.class.getName());
// Adjust constant variables according to you OIM environment
public static final String OIM_HOSTNAME = "localhost";
public static final String OIM_PORT = "14000"; // For SSL, use 14001; For non-SSL, use 14000
public static final String OIM_PROVIDER_URL = "t3://"+ OIM_HOSTNAME + ":" + OIM_PORT; // For SSL, use t3s protocol; For non-SSL, use t3 protocol
public static final String AUTHWL_PATH = "lib/config/authwl.conf";
public static final String APPSERVER_TYPE = "wls";
public static final String FACTORY_INITIAL_TYPE = "weblogic.jndi.WLInitialContextFactory";
// Use if using SSL connection for OIMClient
public static final String TRUST_KEYSTORE_FOR_SSL = "/home/oracle/Oracle/Middleware/wlserver_10.3/server/lib/DemoTrust.jks";
// OIM Administrator Credentials
public static final String OIM_ADMIN_USERNAME = "xelsysadm";
public static final String OIM_ADMIN_PASSWORD = "Password1";
public static void main(String[] args) throws Exception
{
OIMClient oimClient = null;
BufferedReader bReader = null;
try
{
// Set system properties required for OIMClient
System.setProperty("java.security.auth.login.config", AUTHWL_PATH);
System.setProperty("APPSERVER_TYPE", APPSERVER_TYPE);
System.setProperty("weblogic.security.SSL.trustedCAKeyStore", TRUST_KEYSTORE_FOR_SSL); // Provide if using SSL
// Create an instance of OIMClient with OIM environment information
Hashtable<String, String> env = new Hashtable<String, String>();
env.put(OIMClient.JAVA_NAMING_FACTORY_INITIAL, FACTORY_INITIAL_TYPE);
env.put(OIMClient.JAVA_NAMING_PROVIDER_URL, OIM_PROVIDER_URL);
// Establish an OIM Client
oimClient = new OIMClient(env);
// Login to OIM with System Administrator Credentials
oimClient.login(OIM_ADMIN_USERNAME, OIM_ADMIN_PASSWORD.toCharArray());
// Get OIM services
ReconOperationsService reconOps = oimClient.getService(ReconOperationsService.class);
EventMgmtService eventService = oimClient.getService(EventMgmtService.class);
UserManager usrMgr = oimClient.getService(UserManager.class);
// Parameters to change
String keyAttrName = "User Login";
String filePath = "/home/oracle/Desktop/users.csv";
String delimiter = ",";
int numOfThreads = 3;
// File Reader
FileReader fReader = new FileReader(filePath);
bReader = new BufferedReader(fReader);
// Header Line
String line = bReader.readLine();
if(line == null || "".equalsIgnoreCase(line))
{
throw new Exception("Header must be provided as the first entry in file.");
}
String[] header = line.split(delimiter);
System.out.println(Arrays.asList(header));
// Create thread pool
ExecutorService threadExecutor = Executors.newFixedThreadPool(numOfThreads);
// Initialize base configuration
UserProcessor.initializeConfig(header, delimiter, LOGGER, usrMgr, keyAttrName);
// Process data entries using multi-threading
line = bReader.readLine();
while(line != null)
{
threadExecutor.execute(new UserProcessor(line));
line = bReader.readLine();
}
// Initate thread shutdown
threadExecutor.shutdown();
while(!threadExecutor.isTerminated())
{
// Wait for all event processor threads to complete
}
}
catch(Exception ex)
{
LOGGER.log(ODLLevel.ERROR, "", ex);
}
finally
{
// Logout of OIM client
if(oimClient != null)
{
oimClient.logout();
}
if(bReader != null)
{
bReader.close();
}
}
}
}
<?xml version="1.0" encoding="UTF-8"?>
<oimplugins xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<plugins pluginpoint="oracle.iam.scheduler.vo.TaskSupport">
<plugin pluginclass="com.blogspot.oraclestack.scheduledtasks.FlatFileUserModification" version="1.0" name="FlatFileUserModification"/>
</plugins>
</oimplugins>
view raw plugin.xml hosted with ❤ by GitHub
package com.blogspot.oraclestack.objects;
import java.text.MessageFormat;
import java.util.Arrays;
import oracle.core.ojdl.logging.ODLLevel;
import oracle.core.ojdl.logging.ODLLogger;
import oracle.iam.identity.exception.NoSuchUserException;
import oracle.iam.identity.exception.SearchKeyNotUniqueException;
import oracle.iam.identity.exception.UserModifyException;
import oracle.iam.identity.exception.ValidationFailedException;
import oracle.iam.identity.usermgmt.api.UserManager;
import oracle.iam.identity.usermgmt.vo.User;
import oracle.iam.platform.authz.exception.AccessDeniedException;
/**
* Each thread modifies a user
* @author rayechan
*/
public class UserProcessor implements Runnable
{
// Class Fields needed for every thread
private static String[] header;
private static String delimiter;
private static ODLLogger logger;
private static UserManager usrMgr;
private static String keyAttrName;
// Row in a file
private String userEntryLine;
/**
* Initializes the class variables needed to process each row
* @param header Header row from CSV file
* @param delimiter Separator for file
* @param logger Logger
* @param usrMgr OIM User Manager Service
* @param keyAttrName Key User Attribute in order to identify OIM user
*/
public static void initializeConfig(String[] header, String delimiter, ODLLogger logger, UserManager usrMgr, String keyAttrName)
{
UserProcessor.header = header;
UserProcessor.delimiter = delimiter;
UserProcessor.logger = logger;
UserProcessor.usrMgr = usrMgr;
UserProcessor.keyAttrName = keyAttrName;
}
/**
* Constructor
* @param line Line from CSV file
*/
public UserProcessor(String line)
{
this.userEntryLine = line;
}
/**
* Execution method for thread
*/
@Override
public void run()
{
try
{
String[] entry = userEntryLine.split(delimiter);
logger.log(ODLLevel.NOTIFICATION,"Start processing line: {0}", new Object[]{Arrays.asList(userEntryLine)});
User modUser = new User("");
String attrKeyValue = null;
// Iterate entry columns adding attribute to modify on given user
for(int i = 0; i < entry.length; i++)
{
// One to One correlation with header row and data entry row
String attributeName = header[i];
String attributeValue = entry[i];
// Get key user attribute in order identify OIM user to modify
if(attributeName.equals(keyAttrName))
{
attrKeyValue = attributeValue;
}
// Regular attribute to modify on user
else
{
// Add attribute to modification user object
modUser.setAttribute(attributeName, attributeValue);
}
}
usrMgr.modify(keyAttrName, attrKeyValue, modUser); // Apply changes to OIM user
logger.log(ODLLevel.NOTIFICATION,"Processed {0} = {1} with {2}", new Object[]{keyAttrName, attrKeyValue, modUser});
}
catch (ValidationFailedException ex)
{
logger.log(ODLLevel.SEVERE, MessageFormat.format("Failed to process entry: {0}", new Object[]{userEntryLine}), ex);
}
catch (AccessDeniedException ex)
{
logger.log(ODLLevel.SEVERE, MessageFormat.format("Failed to process entry: {0}", new Object[]{userEntryLine}), ex);
}
catch (UserModifyException ex)
{
logger.log(ODLLevel.SEVERE, MessageFormat.format("Failed to process entry: {0}", new Object[]{userEntryLine}), ex);
}
catch (NoSuchUserException ex)
{
logger.log(ODLLevel.SEVERE, MessageFormat.format("Failed to process entry: {0}", new Object[]{userEntryLine}), ex);
}
catch (SearchKeyNotUniqueException ex)
{
logger.log(ODLLevel.SEVERE, MessageFormat.format("Failed to process entry: {0}", new Object[]{userEntryLine}), ex);
}
catch (Exception ex)
{
logger.log(ODLLevel.SEVERE, MessageFormat.format("Failed to process entry: {0}", new Object[]{userEntryLine}), ex);
}
}
}
User Login Employee Number Title Department Number Common Name Initials
JPROUDMOORE 100000 Wizard Master 11 Jaina Proudmoore JP
AWINDRUNNER 100001 Archer Captain 21 Alleria Windrunner AW
MSTORMRAGE 100002 Nature Proctector 31 Malfurion Stormage MS
GHELLSCREAM 100003 Horde Leader 41 Garrosh Hellscream GH
AWRYNN 100004 Stormwind Prince 51 Anduin Wrynn AW
ULIGHTBRINGER 100005 Silverhand Founder 61 Uther Lightbringer UL
RBEAST 100006 Beast Lover 71 Rexxar Beast RB
LLIADRIN 100007 Paladin 81 Lady Liadrin LL
VSANGUINAR 100008 Assassin Master 91 Valeera Sanguinar VS
TELEMENTS 100009 Horde Warchief 101 Thrall Elements TE
GCORRUPT 100010 Sorcerer Prodigy 111 Guldan Corrupt GC
EVANCLEEF 100011 Defias Brotherhood Founder 121 Edwin Vancleef EV
view raw users.csv hosted with ❤ by GitHub

Scheduled Job Parameters
Delimiter : Separator used in flat file
File Path : Absolute path of file on the machine where OIM is deployed
Key Attribute Name : User attribute name to uniquely identify OIM user
Number of Threads : Number of threads in pool

Scheduled Job Parameters

Things to Take Note Of
1. Had to use "Platform.getServiceForEventHandlers" instead of "Platform.getService" when using OIM API service in threading context. The exception below is thrown when using "Platform.getService" for multi-threading in a scheduled job.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
java.lang.NullPointerException
 at oracle.iam.request.impl.RequestEngine.performValidations(RequestEngine.java:4628)
 at oracle.iam.request.impl.RequestEngine.doOperation(RequestEngine.java:4520)
 at oracle.iam.impl.OIMServiceImpl.doOperation(OIMServiceImpl.java:43)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
 at java.lang.reflect.Method.invoke(Method.java:597)
 at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:307)
 at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:182)
 at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:149)
 at oracle.iam.platform.utils.DMSMethodInterceptor.invoke(DMSMethodInterceptor.java:35)
 at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:171)
 at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
 at $Proxy353.doOperation(Unknown Source)
 at oracle.iam.identity.utils.Utils.invokeUnifiedService(Utils.java:3849)
 at oracle.iam.identity.usermgmt.impl.UserManagerImpl.modify(UserManagerImpl.java:648)
 at oracle.iam.identity.usermgmt.impl.UserManagerImpl.modify(UserManagerImpl.java:741)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
 at java.lang.reflect.Method.invoke(Method.java:597)
 at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:307)
 at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:182)
 at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:149)
 at oracle.iam.platform.utils.DMSMethodInterceptor.invoke(DMSMethodInterceptor.java:35)
 at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:171)
 at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
 at $Proxy476.modify(Unknown Source)
 at com.blogspot.oraclestack.objects.UserProcessor.run(UserProcessor.java:91)
 at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
 at java.lang.Thread.run(Thread.java:662)
 
]]

No comments:

Post a Comment