Changeset 306 for kraken-filter
- Timestamp:
- 06/14/09 23:45:32 (14 months ago)
- Location:
- kraken-filter
- Files:
-
- 4 added
- 28 modified
- 7 moved
-
pom.xml (modified) (2 diffs)
-
src/main/java/org/krakenapps/filter/ActiveFilter.java (modified) (2 diffs)
-
src/main/java/org/krakenapps/filter/ComponentDescription.java (modified) (1 diff)
-
src/main/java/org/krakenapps/filter/ComponentDescriptionParser.java (modified) (1 diff)
-
src/main/java/org/krakenapps/filter/DefaultFilter.java (moved) (moved from kraken-filter/src/main/java/org/krakenapps/filter/AbstractFilter.java) (3 diffs)
-
src/main/java/org/krakenapps/filter/DefaultFilterEventListener.java (added)
-
src/main/java/org/krakenapps/filter/DefaultMessageSpec.java (moved) (moved from kraken-filter/src/main/java/org/krakenapps/filter/MessageSpecImpl.java) (1 diff)
-
src/main/java/org/krakenapps/filter/DefaultMessageSpecVersion.java (moved) (moved from kraken-filter/src/main/java/org/krakenapps/filter/MessageSpecVersionImpl.java) (1 diff)
-
src/main/java/org/krakenapps/filter/DefaultMessageSpecVersionRange.java (moved) (moved from kraken-filter/src/main/java/org/krakenapps/filter/MessageSpecVersionRangeImpl.java) (2 diffs)
-
src/main/java/org/krakenapps/filter/Filter.java (modified) (2 diffs)
-
src/main/java/org/krakenapps/filter/FilterChain.java (modified) (1 diff)
-
src/main/java/org/krakenapps/filter/FilterEventListener.java (modified) (1 diff)
-
src/main/java/org/krakenapps/filter/FilterHandler.java (modified) (8 diffs)
-
src/main/java/org/krakenapps/filter/FilterManager.java (modified) (1 diff)
-
src/main/java/org/krakenapps/filter/FilterScript.java (modified) (4 diffs)
-
src/main/java/org/krakenapps/filter/Message.java (modified) (1 diff)
-
src/main/java/org/krakenapps/filter/MessageBuilder.java (modified) (6 diffs)
-
src/main/java/org/krakenapps/filter/MessageSpec.java (modified) (1 diff)
-
src/main/java/org/krakenapps/filter/MessageSpecVersion.java (modified) (1 diff)
-
src/main/java/org/krakenapps/filter/MessageSpecVersionRange.java (modified) (1 diff)
-
src/main/java/org/krakenapps/filter/exception/AlreadyBoundException.java (modified) (1 diff)
-
src/main/java/org/krakenapps/filter/exception/ConfigurationException.java (modified) (1 diff)
-
src/main/java/org/krakenapps/filter/exception/DuplicatedFilterNameException.java (modified) (1 diff)
-
src/main/java/org/krakenapps/filter/exception/FilterFactoryNotFoundException.java (modified) (1 diff)
-
src/main/java/org/krakenapps/filter/exception/FilterNotBoundException.java (modified) (1 diff)
-
src/main/java/org/krakenapps/filter/exception/FilterNotFoundException.java (modified) (1 diff)
-
src/main/java/org/krakenapps/filter/exception/MessageSpecMismatchException.java (modified) (1 diff)
-
src/main/java/org/krakenapps/filter/exception/package-info.java (added)
-
src/main/java/org/krakenapps/filter/impl/ActiveFilterRunner.java (modified) (8 diffs)
-
src/main/java/org/krakenapps/filter/impl/DefaultFilterChain.java (moved) (moved from kraken-filter/src/main/java/org/krakenapps/filter/impl/FilterChainImpl.java) (1 diff)
-
src/main/java/org/krakenapps/filter/impl/DefaultFilterManager.java (moved) (moved from kraken-filter/src/main/java/org/krakenapps/filter/impl/FilterManagerImpl.java) (31 diffs)
-
src/main/java/org/krakenapps/filter/impl/DefaultMessage.java (moved) (moved from kraken-filter/src/main/java/org/krakenapps/filter/impl/MessageImpl.java) (1 diff)
-
src/main/java/org/krakenapps/filter/impl/FilterConfig.java (modified) (29 diffs)
-
src/main/java/org/krakenapps/filter/impl/FilterFactoryTracker.java (modified) (6 diffs)
-
src/main/java/org/krakenapps/filter/impl/FilterInstance.java (modified) (1 diff)
-
src/main/java/org/krakenapps/filter/impl/FilterScriptFactory.java (modified) (1 diff)
-
src/main/java/org/krakenapps/filter/impl/package-info.java (added)
-
src/main/java/org/krakenapps/filter/package-info.java (added)
-
src/main/resources/metadata.xml (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
-
kraken-filter/pom.xml
r301 r306 23 23 <Bundle-SymbolicName>org.krakenapps.filter 24 24 </Bundle-SymbolicName> 25 <Export-Package>org.krakenapps.filter ,26 org.krakenapps.filter.exception </Export-Package>27 <Private-Package>org.krakenapps.filter.impl 25 <Export-Package>org.krakenapps.filter;version=1.0.0, 26 org.krakenapps.filter.exception;version=1.0.0</Export-Package> 27 <Private-Package>org.krakenapps.filter.impl;version=1.0.0 28 28 </Private-Package> 29 29 <Import-Package>!org.junit,org.hsqldb,* … … 63 63 </dependency> 64 64 <dependency> 65 <groupId>org.apache.felix</groupId>66 <artifactId>org.apache.felix.configadmin67 </artifactId>68 <version>1.0.10</version>69 </dependency>70 <dependency>71 65 <groupId>org.krakenapps</groupId> 72 66 <artifactId>kraken-api</artifactId> -
kraken-filter/src/main/java/org/krakenapps/filter/ActiveFilter.java
r302 r306 18 18 import org.krakenapps.filter.exception.ConfigurationException; 19 19 20 public abstract class ActiveFilter extends AbstractFilter { 20 /** 21 * ActiveFilter class should be implemented by any filter class which filters 22 * are intended to be excuted by a thread. 23 * 24 * @author xeraph 25 * @since 1.0.0 26 * @see Filter 27 */ 28 public abstract class ActiveFilter extends DefaultFilter { 21 29 private boolean isRunning; 22 30 31 /** 32 * Returns true if thread is running. 33 * 34 * @return true if thread is running 35 */ 23 36 public boolean isRunning() { 24 37 return isRunning; … … 29 42 } 30 43 44 /** 45 * Initialize an active filter instance. ActiveFilterRunner calls this 46 * method before the run loop. 47 * 48 * @throws ConfigurationException 49 * if failed to configure 50 */ 31 51 public void open() throws ConfigurationException { 32 52 } 33 53 54 /** 55 * Finalize an active filter instance. ActiveFilterRunner calls this method 56 * after the run loop. 57 */ 34 58 public void close() { 35 59 } 36 60 61 /** 62 * ActiveFilterRunner calls this method in each loop. Thread will sleep some 63 * milliseconds after run. 64 * 65 * @throws InterruptedException 66 * if thread is interrupted 67 */ 37 68 abstract public void run() throws InterruptedException; 38 69 } -
kraken-filter/src/main/java/org/krakenapps/filter/ComponentDescription.java
r302 r306 19 19 import java.util.List; 20 20 21 /** 22 * Represents iPOJO component description. 23 * 24 * @author xeraph 25 * @since 1.0.0 26 */ 21 27 public class ComponentDescription { 22 28 private String instanceName; -
kraken-filter/src/main/java/org/krakenapps/filter/ComponentDescriptionParser.java
r302 r306 19 19 import java.util.List; 20 20 21 21 /** 22 * Parses iPOJO component description string. 23 * 24 * @author xeraph 25 * @since 1.0.0 26 */ 22 27 public class ComponentDescriptionParser { 23 28 public static ComponentDescription parse(String instanceName, String description) { -
kraken-filter/src/main/java/org/krakenapps/filter/DefaultFilter.java
r302 r306 23 23 import org.krakenapps.filter.exception.ConfigurationException; 24 24 25 public abstract class AbstractFilter implements Filter { 25 /** 26 * This class provides default implementations for the {@link Filter} interface. 27 * 28 * @author xeraph 29 * @since 1.0.0 30 */ 31 public class DefaultFilter implements Filter { 26 32 private Map<String, Object> properties; 27 33 28 public AbstractFilter() { 34 /** 35 * Creates a new default filter instance. 36 */ 37 public DefaultFilter() { 29 38 properties = new ConcurrentHashMap<String, Object>(); 30 39 } 31 40 41 /** 42 * No input message specifications are supported. 43 */ 32 44 @Override 33 public MessageSpec[] get ReceiveSpecs() {45 public MessageSpec[] getInputMessageSpecs() { 34 46 return null; 35 47 } 36 48 49 /** 50 * No output message specification is supported. 51 */ 37 52 @Override 38 public MessageSpec get SendSpec() {53 public MessageSpec getOutputMessageSpec() { 39 54 return null; 40 55 } 41 56 57 /** 58 * Empty message processing. 59 */ 42 60 @Override 43 61 public void process(Message message) { 44 62 } 45 63 64 /** 65 * Returns the value of the specified property. 66 */ 46 67 @Override 47 68 public Object getProperty(String key) { … … 49 70 } 50 71 72 /** 73 * Returns a key set of properties. 74 */ 51 75 @Override 52 76 public Set<String> getPropertyKeys() { … … 54 78 } 55 79 80 /** 81 * 82 */ 56 83 @Override 57 84 public void setProperty(String key, Object value) { -
kraken-filter/src/main/java/org/krakenapps/filter/DefaultMessageSpec.java
r277 r306 16 16 package org.krakenapps.filter; 17 17 18 public class MessageSpecImpl implements MessageSpec { 18 /** 19 * 20 * @author xeraph 21 * @since 1.0.0 22 */ 23 public class DefaultMessageSpec implements MessageSpec { 19 24 private String name; 20 25 private String description; 21 26 private MessageSpecVersionRange range; 22 27 23 public MessageSpecImpl(String name, MessageSpecVersion version) {28 public DefaultMessageSpec(String name, MessageSpecVersion version) { 24 29 this(name, null, version); 25 30 } 26 31 27 public MessageSpecImpl(String name, int majorVersion, int minorVersion) {28 this(name, new MessageSpecVersionImpl(majorVersion, minorVersion));32 public DefaultMessageSpec(String name, int majorVersion, int minorVersion) { 33 this(name, new DefaultMessageSpecVersion(majorVersion, minorVersion)); 29 34 } 30 35 31 public MessageSpecImpl(String name, String description, MessageSpecVersion version) {32 this(name, description, new MessageSpecVersionRangeImpl(version, version));36 public DefaultMessageSpec(String name, String description, MessageSpecVersion version) { 37 this(name, description, new DefaultMessageSpecVersionRange(version, version)); 33 38 } 34 39 35 public MessageSpecImpl(String name, String description, MessageSpecVersionRange range) {40 public DefaultMessageSpec(String name, String description, MessageSpecVersionRange range) { 36 41 this.name = name; 37 42 this.description = description; -
kraken-filter/src/main/java/org/krakenapps/filter/DefaultMessageSpecVersion.java
r215 r306 16 16 package org.krakenapps.filter; 17 17 18 public class MessageSpecVersionImpl implements MessageSpecVersion { 18 /** 19 * 20 * @author xeraph 21 * @since 1.0.0 22 */ 23 public class DefaultMessageSpecVersion implements MessageSpecVersion { 19 24 private int majorVersion; 20 25 private int minorVersion; 21 26 22 public MessageSpecVersionImpl(int majorVersion, int minorVersion) {27 public DefaultMessageSpecVersion(int majorVersion, int minorVersion) { 23 28 this.majorVersion = majorVersion; 24 29 this.minorVersion = minorVersion; -
kraken-filter/src/main/java/org/krakenapps/filter/DefaultMessageSpecVersionRange.java
r215 r306 16 16 package org.krakenapps.filter; 17 17 18 public class MessageSpecVersionRangeImpl implements MessageSpecVersionRange { 18 /** 19 * 20 * @author xeraph 21 * @since 1.0.0 22 */ 23 public class DefaultMessageSpecVersionRange implements MessageSpecVersionRange { 19 24 private MessageSpecVersion lowerBound; 20 25 private MessageSpecVersion upperBound; 21 26 22 public MessageSpecVersionRangeImpl(MessageSpecVersion v1, MessageSpecVersion v2) {27 public DefaultMessageSpecVersionRange(MessageSpecVersion v1, MessageSpecVersion v2) { 23 28 if (isLowerThan(v1, v2)) { 24 29 this.lowerBound = v1; … … 44 49 return true; 45 50 46 if ((lhs.getMajorVersion() == rhs.getMajorVersion()) && lhs.getMinorVersion() < rhs.getMinorVersion()) { 51 if ((lhs.getMajorVersion() == rhs.getMajorVersion()) 52 && lhs.getMinorVersion() < rhs.getMinorVersion()) { 47 53 return true; 48 54 } -
kraken-filter/src/main/java/org/krakenapps/filter/Filter.java
r302 r306 20 20 import org.krakenapps.filter.exception.ConfigurationException; 21 21 22 /** 23 * Filter is a message processing unit. Filter can receive various message types 24 * from binded filters and send messages to binded filters. 25 * 26 * @author xeraph 27 * @since 1.0.0 28 */ 22 29 public interface Filter { 23 MessageSpec[] getReceiveSpecs(); 30 /** 31 * Returns the input message specifications that can be bound to this 32 * filter. 33 * 34 * @return the input message specifications array. null or empty array if no 35 * types are supported 36 */ 37 MessageSpec[] getInputMessageSpecs(); 24 38 25 MessageSpec getSendSpec(); 39 /** 40 * Returns the output message specification that can be bound to this 41 * filter. 42 * 43 * @return the output message specification. null if no type is supported 44 */ 45 MessageSpec getOutputMessageSpec(); 26 46 27 /* 47 /** 28 48 * Process a message object pushed from other filters. ActiveFilter usually 29 49 * queues incoming message in this method, rather than processes it … … 32 52 void process(Message message); 33 53 54 /** 55 * Returns a Set of the keys of properties. 56 */ 34 57 Set<String> getPropertyKeys(); 35 58 59 /** 60 * Returns the value of specified property. 61 * 62 * @param key 63 * the name of the property 64 * @return the value of specified property 65 */ 36 66 Object getProperty(String key); 37 67 68 /** 69 * Sets the property. 70 * 71 * @param key 72 * the name of the property 73 * @param value 74 * the value of the property 75 */ 38 76 void setProperty(String key, Object value); 39 77 78 /** 79 * Removes the specified property. 80 * 81 * @param key 82 * the name of the property 83 */ 40 84 void unsetProperty(String key); 41 85 86 /** 87 * Validates current configuration of the filter. 88 * 89 * @throws ConfigurationException 90 * if failed to validate 91 */ 42 92 void validateConfiguration() throws ConfigurationException; 43 93 } -
kraken-filter/src/main/java/org/krakenapps/filter/FilterChain.java
r282 r306 16 16 package org.krakenapps.filter; 17 17 18 /** 19 * A container of {@link Filter}s that forwards {@link Message}s to the 20 * consisting filters sequentially. 21 * 22 * @author xeraph 23 * @since 1.0.0 24 */ 18 25 public interface FilterChain { 26 /** 27 * Processes a {@link Message}. 28 * 29 * @param message 30 * the forwarding message 31 */ 19 32 void process(Message message); 20 33 } -
kraken-filter/src/main/java/org/krakenapps/filter/FilterEventListener.java
r154 r306 16 16 package org.krakenapps.filter; 17 17 18 /** 19 * The listener interface for receiving filter events. 20 * 21 * @author xeraph 22 * @since 1.0.0 23 */ 18 24 public interface FilterEventListener { 25 /** 26 * Invoked when a filter is loaded. 27 */ 19 28 void onFilterLoaded(String filterId); 20 29 30 /** 31 * Invoked when a filter is unloading. 32 */ 21 33 void onFilterUnloading(String filterId); 22 34 35 /** 36 * Invoked when filters are bound. 37 * 38 * @param fromFilterId 39 * the source filter id 40 * @param toFilterId 41 * the destination filter id 42 */ 43 void onFilterBound(String fromFilterId, String toFilterId); 44 45 /** 46 * Invoked when filters are unbounding. 47 * 48 * @param fromFilterId 49 * the source filter id 50 * @param toFilterId 51 * the destination filter id 52 */ 53 void onFilterUnbinding(String fromFilterId, String toFilterId); 54 55 /** 56 * Invoked when a property is set. 57 * 58 * @param filterId 59 * the target filter id 60 * @param name 61 * the name of the property 62 * @param value 63 * the value of the property 64 */ 65 void onFilterSet(String filterId, String name, Object value); 66 67 /** 68 * Invoked when a property is removed. 69 * 70 * @param filterId 71 * the target filter id 72 * @param name 73 * the name of the property 74 */ 75 void onFilterUnset(String filterId, String name); 23 76 } -
kraken-filter/src/main/java/org/krakenapps/filter/FilterHandler.java
r302 r306 17 17 18 18 import java.util.Dictionary; 19 import java.util.Enumeration;20 19 import java.util.Properties; 21 20 … … 28 27 import org.krakenapps.filter.exception.FilterNotFoundException; 29 28 import org.krakenapps.filter.exception.MessageSpecMismatchException; 30 import org.krakenapps.filter.impl. FilterChainImpl;29 import org.krakenapps.filter.impl.DefaultFilterChain; 31 30 import org.krakenapps.filter.impl.FilterConfig; 32 31 import org.slf4j.Logger; 33 32 import org.slf4j.LoggerFactory; 34 33 34 /** 35 * This handler class provides filter extension of iPOJO component instance. 36 * FilterHanler injects a {@link FilterChain} instance and loads all properties. 37 * Dynamic filter binding feature is implemented based on iPOJO code injection. 38 * 39 * @author xeraph 40 * @since 1.0.0 41 */ 35 42 public class FilterHandler extends PrimitiveHandler { 43 /** 44 * the slf4j logger 45 */ 36 46 final Logger logger = LoggerFactory.getLogger(FilterHandler.class.getName()); 37 47 48 /** 49 * the filter manager instance 50 */ 38 51 private FilterManager filterManager; 52 53 /** 54 * the filter config instance 55 */ 39 56 private FilterConfig filterConfig; 57 58 /** 59 * the injected filter chain instance 60 */ 40 61 private volatile FilterChain filterChain; 41 private String pid;42 62 63 /** 64 * the filter id 65 */ 66 private String filterId; 67 68 /** 69 * Invoked when a filter component instance is created. Inject filter chain 70 * field here. 71 */ 43 72 @SuppressWarnings("unchecked") 44 73 @Override 45 74 public void configure(Element metadata, Dictionary config) throws ConfigurationException { 46 Enumeration iterator = config.keys();47 while (iterator.hasMoreElements()) {48 String key = (String) iterator.nextElement();49 Object value = config.get(key);50 51 logger.info("configuring " + key + " " + value.toString());52 }53 75 54 76 filterManager = (FilterManager) config.get("filter.manager"); 55 77 filterConfig = (FilterConfig) config.get("filter.config"); 56 pid = (String) config.get("instance.name");78 filterId = (String) config.get("instance.name"); 57 79 80 // inspects all fields and inject filter chain field. 58 81 PojoMetadata pojoMetadata = getPojoMetadata(); 59 82 for (FieldMetadata fieldMetadata : pojoMetadata.getFields()) { … … 63 86 } 64 87 65 filterChain = new FilterChainImpl(new Filter[0]); 88 // set instance.name property. 89 Filter filter = getFilter(); 90 filter.setProperty("instance.name", filterId); 66 91 67 Filter filter = (Filter) getInstanceManager().getPojoObject();68 filter .setProperty("instance.name", pid);92 // creates a filter chain instance with empty filter bindings. 93 filterChain = new DefaultFilterChain(filter, new Filter[0]); 69 94 70 logger. info("filter [{}] configuration succeeded.", pid);95 logger.trace("filter [{}] configuration succeeded.", filterId); 71 96 } 72 97 98 /** 99 * Invoked when a filter is validated. Register the filter instance and 100 * restore previous states. (properties and bind states) 101 */ 73 102 @Override 74 103 public void start() { 75 logger. info("Kraken Filter Handlerstarted.");104 logger.debug("Kraken filter handler is started."); 76 105 77 106 Filter filter = getFilter(); 78 filterManager.registerFilter(this, pid, filter);107 filterManager.registerFilter(this, filterId, filter); 79 108 80 109 loadProperties(filter); 81 bindAutomatically( pid);110 bindAutomatically(filterId); 82 111 } 83 112 … … 88 117 } 89 118 90 // write filter instance information to d b.91 filterConfig.addFilterInstance( pid, filter.getClass().getName(), filterType);119 // write filter instance information to data store. 120 filterConfig.addFilterInstance(filterId, filter.getClass().getName(), filterType); 92 121 93 // load properties from d bif exists.94 Properties props = filterConfig.getFilterProperties( pid);122 // load properties from data store if exists. 123 Properties props = filterConfig.getFilterProperties(filterId); 95 124 for (Object key : props.keySet()) { 96 125 filter.setProperty((String) key, (String) props.get(key)); … … 98 127 } 99 128 129 /** 130 * Invoked when a filter is invalidated. 131 */ 100 132 @Override 101 133 public void stop() { 102 logger. info("Kraken Filter Handlerstopped.");134 logger.debug("Kraken filter handler is stopped."); 103 135 104 filterManager.unregisterFilter( pid);136 filterManager.unregisterFilter(filterId); 105 137 } 106 138 … … 110 142 try { 111 143 filterManager.bindFilter(pid, target); 112 logger. info(pid + " -> " + target + " binded.");144 logger.trace(pid + " -> " + target + " binded."); 113 145 } catch (FilterNotFoundException e) { 114 146 logger.warn("filter not found:", e); … … 136 168 } 137 169 170 /** 171 * Invoked when the registered filter chain field is accessed. 172 */ 138 173 @Override 139 174 public Object onGet(Object pojo, String fieldName, Object value) { 140 if (filterChain == null) {141 logger.warn("filterChain is null!");142 }143 175 return filterChain; 144 176 } … … 148 180 } 149 181 150 public void stateChanged(Filter[] bindedFilters) { 151 filterChain = new FilterChainImpl(bindedFilters); 182 /** 183 * Invoked when a filter's binding states are changed cause of bind command 184 * or unbind command. 185 * 186 * @param boundOutputFilters 187 * the bound output filters 188 */ 189 public void stateChanged(Filter[] boundOutputFilters) { 190 filterChain = new DefaultFilterChain(getFilter(), boundOutputFilters); 152 191 } 153 192 } -
kraken-filter/src/main/java/org/krakenapps/filter/FilterManager.java
r302 r306 26 26 import org.krakenapps.filter.exception.MessageSpecMismatchException; 27 27 28 /** 29 * The interface for filter management. 30 * 31 * @author xeraph 32 * @since 1.0.0 33 */ 28 34 public interface FilterManager { 35 /** 36 * Returns a list of filter factory type name. 37 */ 29 38 List<String> getFilterFactoryNames(); 30 39 40 /** 41 * Returns a list of all filter instance descriptions. 42 */ 31 43 List<ComponentDescription> getFilterInstanceDescriptions(); 32 44 33 ComponentDescription getComponentDescription(String filterFactoryName) throws FilterFactoryNotFoundException; 34 35 void loadFilter(int filterFactoryId, String pid) throws FilterFactoryNotFoundException, 45 /** 46 * Loads a filter using filter factory index. 47 * 48 * @param filterFactoryIndex 49 * the filter factory index which is listed before. 50 * @param filterId 51 * the filter id 52 * @throws FilterFactoryNotFoundException 53 * if filter factory is not found 54 * @throws DuplicatedFilterNameException 55 * if filter id is duplicated 56 */ 57 void loadFilter(int filterFactoryIndex, String filterId) throws FilterFactoryNotFoundException, 36 58 DuplicatedFilterNameException; 37 59 38 void loadFilter(String filterFactoryName, String pid) throws FilterFactoryNotFoundException, 39 DuplicatedFilterNameException; 40 41 void unloadFilter(String pid) throws FilterNotFoundException; 42 43 void runFilter(String pid, long period) throws FilterNotFoundException, IllegalThreadStateException; 44 45 void stopFilter(String pid) throws FilterNotFoundException; 46 47 void bindFilter(String fromFilterPid, String toFilterPid) throws FilterNotFoundException, 60 /** 61 * Loads a filter using the filter factory name. 62 * 63 * @param filterFactoryName 64 * the filter factory name 65 * @param filterId 66 * the filter id 67 * @throws FilterFactoryNotFoundException 68 * if filter factory is not found 69 * @throws DuplicatedFilterNameException 70 * if filter id is duplicated 71 */ 72 void loadFilter(String filterFactoryName, String filterId) 73 throws FilterFactoryNotFoundException, DuplicatedFilterNameException; 74 75 /** 76 * Unloads the filter. 77 * 78 * @param filterId 79 * the filter id 80 * @throws FilterNotFoundException 81 * if filter is not found 82 */ 83 void unloadFilter(String filterId) throws FilterNotFoundException; 84 85 /** 86 * Starts an {@link ActiveFilter} thread with specified milliseconds 87 * interval. 88 * 89 * @param filterId 90 * the {@link ActiveFilter} id 91 * @param period 92 * the sleep interval in milliseconds 93 * @throws FilterNotFoundException 94 * if active filter is not found 95 * @throws IllegalThreadStateException 96 * if active filter is already running 97 */ 98 void runFilter(String filterId, long period) throws FilterNotFoundException, 99 IllegalThreadStateException; 100 101 /** 102 * Stops the active filter. 103 * 104 * @param filterId 105 * the {@link ActiveFilter} id 106 * @throws FilterNotFoundException 107 * if active filter is not found 108 */ 109 void stopFilter(String filterId) throws FilterNotFoundException; 110 111 /** 112 * Binds two filters. 113 * 114 * @param fromFilterId 115 * the source filter id 116 * @param toFilterId 117 * the destination filter id 118 * @throws FilterNotFoundException 119 * if source or destination filter is not found 120 * @throws AlreadyBoundException 121 * if filters are already bound 122 * @throws MessageSpecMismatchException 123 * if source filter's message specification and destinatino 124 * filter's message specification is not matched. 125 */ 126 void bindFilter(String fromFilterId, String toFilterId) throws FilterNotFoundException, 48 127 AlreadyBoundException, MessageSpecMismatchException; 49 128 50 void unbindFilter(String fromFilterPid, String toFilterPid) throws FilterNotFoundException, 129 /** 130 * Unbind filters. 131 * 132 * @param fromFilterId 133 * the source filter id 134 * @param toFilterId 135 * the destination filter id 136 * @throws FilterNotFoundException 137 * if source or destination filter is not found 138 * @throws FilterNotBoundException 139 * if filters are not bound 140 */ 141 void unbindFilter(String fromFilterId, String toFilterId) throws FilterNotFoundException, 51 142 FilterNotBoundException; 52 143 53 Filter getFilter(String pid); 54 55 Filter[] getInputFilters(String pid); 56 57 Filter[] getOutputFilters(String pid); 58 59 void registerFilter(FilterHandler filterHandler, String pid, Filter filter); 60 61 void unregisterFilter(String pid); 62 63 Set<String> getPropertyKeys(String pid) throws FilterNotFoundException; 64 65 Object getProperty(String pid, String key) throws FilterNotFoundException; 66 67 void setProperty(String pid, String key, String value) throws FilterNotFoundException; 68 69 void unsetProperty(String pid, String key) throws FilterNotFoundException; 70 144 /** 145 * Returns the filter 146 * 147 * @param filterId 148 * the filter id 149 * @return the filter. returns null if not exists. 150 */ 151 Filter getFilter(String filterId); 152 153 /** 154 * Returns all bound input filters. 155 * 156 * @param filterId 157 * the filter id 158 * @return all bound input filters. empty array if input filter does not 159 * exists 160 */ 161 Filter[] getInputFilters(String filterId); 162 163 /** 164 * Returns all bound output filters. 165 * 166 * @param filterId 167 * the filter id 168 * @return all bound output filters. empty array if output filter does not 169 * exists 170 */ 171 Filter[] getOutputFilters(String filterId); 172 173 /** 174 * Register the filter with filter id and handler. Can be used as 175 * sub-routine of loadFilter. Invoked from {@link FilterHandler}. 176 * 177 * @param filterHandler 178 * the filter handler 179 * @param filterId 180 * the filter id 181 * @param filter 182 * the filter 183 */ 184 void registerFilter(FilterHandler filterHandler, String filterId, Filter filter); 185 186 /** 187 * Unregister the filter. Can be used as sub-routine of unloadFilter. 188 * Invoked from {@link FilterHandler}. 189 * 190 * @param filterId 191 * the filter id 192 */ 193 void unregisterFilter(String filterId); 194 195 /** 196 * Returns a Set of property keys 197 * 198 * @param filterId 199 * the filter id 200 * @return a Set of property keys 201 * @throws FilterNotFoundException 202 * if filter is not found 203 */ 204 Set<String> getPropertyKeys(String filterId) throws FilterNotFoundException; 205 206 /** 207 * Returns the value of the property. 208 * 209 * @param filterId 210 * the filter id 211 * @param key 212 * the key of property 213 * @return the property value 214 * @throws FilterNotFoundException 215 * if filter is not found 216 */ 217 Object getProperty(String filterId, String key) throws FilterNotFoundException; 218 219 /** 220 * Sets the property. 221 * 222 * @param filterId 223 * the filter id 224 * @param key 225 * the key of property 226 * @param value 227 * the value of property 228 * @throws FilterNotFoundException 229 * if filter is not found 230 */ 231 void setProperty(String filterId, String key, String value) throws FilterNotFoundException; 232 233 /** 234 * Removes the property. 235 * 236 * @param filterId 237 * the filter id 238 * @param key 239 * the key of property 240 * @throws FilterNotFoundException 241 * if filter is not found 242 */ 243 void unsetProperty(String filterId, String key) throws FilterNotFoundException; 244 245 /** 246 * Subscribes filter events. 247 * 248 * @param listener 249 * the filter event listener 250 */ 71 251 void subscribeFilterEvent(FilterEventListener listener); 72 252 253 /** 254 * Unsubscribes filter events. 255 * 256 * @param listener 257 * the filter event listener 258 */ 73 259 void unsubscribeFilterEvent(FilterEventListener listener); 74 260 } -
kraken-filter/src/main/java/org/krakenapps/filter/FilterScript.java
r302 r306 29 29 import org.slf4j.LoggerFactory; 30 30 31 /** 32 * Provides filter management command scripts. 33 * 34 * @author xeraph 35 * @since 1.0.0 36 */ 31 37 public class FilterScript implements Script { 32 38 final Logger logger = LoggerFactory.getLogger(FilterScript.class.getName()); … … 137 143 long period = Long.parseLong(args[1]); 138 144 manager.runFilter(pid, period); 139 context.println("[%s] filter's thread started. run every %d millisecond.", pid, period); 145 context.println("[%s] filter's thread started. run every %d millisecond.", pid, 146 period); 140 147 } 141 148 } catch (Exception e) { … … 161 168 context.println("========================"); 162 169 for (ComponentDescription description : descriptions) { 163 context.println(description.getInstanceName() + " -> " + description.getFactoryName()); 170 context.println(description.getInstanceName() + " -> " 171 + description.getFactoryName()); 164 172 } 165 173 } else if (args.length == 1) { … … 172 180 173 181 for (String key : keys) { 174 context.println("Key: [%s], Value: [%s]", key, manager.getProperty(pid, key).toString()); 182 context.println("Key: [%s], Value: [%s]", key, manager.getProperty(pid, key) 183 .toString()); 175 184 } 176 185 } catch (FilterNotFoundException e) { -
kraken-filter/src/main/java/org/krakenapps/filter/Message.java
r146 r306 18 18 import java.util.Set; 19 19 20 /** 21 * Represents the communication unit of filter. Composed of header and 22 * properties. Message is immutable. 23 * 24 * @author xeraph 25 * @since 1.0.0 26 */ 20 27 public interface Message { 28 /** 29 * Returns the message specification. 30 */ 21 31 public MessageSpec getMessageSpec(); 22 32 33 /** 34 * Returns a Set of property keys. 35 */ 23 36 public Set<String> keySet(); 24 37 38 /** 39 * Checks if specific key is contained. 40 * 41 * @param key 42 * the name of property 43 * @return true if key exists. 44 */ 25 45 public boolean containsKey(String key); 26 46 47 /** 48 * Returns the value of the property 49 * 50 * @param key 51 * the name of property 52 * @returns the value of the property. null if not exists. 53 */ 27 54 public Object get(String key); 28 55 56 /** 57 * Returns a Set of header keys. 58 */ 29 59 public Set<String> headerKeySet(); 30 60 61 /** 62 * Checks if specific header is contained. 63 * 64 * @param key 65 * the name of header 66 * @return true if header key exists 67 */ 31 68 public boolean containsHeader(String key); 32 69 70 /** 71 * Returns the value of the header. 72 * 73 * @param key 74 * the name of header 75 * @return the value of the header. null if not exists. 76 */ 33 77 public Object getHeader(String key); 34 78 } -
kraken-filter/src/main/java/org/krakenapps/filter/MessageBuilder.java
r302 r306 19 19 import java.util.Map; 20 20 21 import org.krakenapps.filter.impl. MessageImpl;21 import org.krakenapps.filter.impl.DefaultMessage; 22 22 23 23 /** 24 * Builds a message of specific message specification. This class is for 25 * convenience. 26 * 27 * @author xeraph 28 * @since 1.0.0 29 */ 24 30 public class MessageBuilder { 25 31 private MessageSpec spec; … … 27 33 private Map<String, Object> headers; 28 34 35 /** 36 * Prepare a builder of specific message specification. 37 * 38 * @param spec 39 * the message specification 40 */ 29 41 public MessageBuilder(MessageSpec spec) { 30 42 this.spec = spec; … … 33 45 } 34 46 47 /** 48 * Copy from the other message 49 * 50 * @param message 51 * the source message 52 * @return builder for method chaining 53 */ 35 54 public MessageBuilder setBase(Message message) { 36 55 this.spec = message.getMessageSpec(); 37 56 38 57 for (String key : message.keySet()) { 39 58 fields.put(key, message.get(key)); … … 47 66 } 48 67 68 /** 69 * Sets a property 70 * 71 * @param key 72 * the name of the property 73 * @param value 74 * the value of the property 75 * @return builder for method chaining 76 */ 49 77 public MessageBuilder set(String key, Object value) { 50 78 fields.put(key, value); … … 52 80 } 53 81 82 /** 83 * Sets a header 84 * 85 * @param key 86 * the name of the header 87 * @param value 88 * the value of the header 89 * @return builder for method chaining 90 */ 54 91 public MessageBuilder setHeader(String key, Object value) { 55 92 headers.put(key, value); … … 57 94 } 58 95 96 /** 97 * Creates a message instance. 98 * 99 * @return the immutable message instance. 100 */ 59 101 public Message build() { 60 Message message = new MessageImpl(spec, fields, headers);102 Message message = new DefaultMessage(spec, fields, headers); 61 103 fields = new HashMap<String, Object>(); 62 104 headers = new HashMap<String, Object>(); -
kraken-filter/src/main/java/org/krakenapps/filter/MessageSpec.java
r147 r306 16 16 package org.krakenapps.filter; 17 17 18 /** 19 * 20 * @author xeraph 21 * @since 1.0.0 22 */ 18 23 public interface MessageSpec { 19 24 public String getName(); 25 20 26 public String getDescription(); 27 21 28 public MessageSpecVersion getLatestVersion(); 29 22 30 public MessageSpecVersionRange getVersionRange(); 23 31 } -
kraken-filter/src/main/java/org/krakenapps/filter/MessageSpecVersion.java
r215 r306 16 16 package org.krakenapps.filter; 17 17 18 /** 19 * 20 * @author xeraph 21 * @since 1.0.0 22 */ 18 23 public interface MessageSpecVersion { 19 24 int getMajorVersion(); 25 20 26 int getMinorVersion(); 27 21 28 boolean isInRange(MessageSpecVersionRange range); 22 29 } -
kraken-filter/src/main/java/org/krakenapps/filter/MessageSpecVersionRange.java
r215 r306 16 16 package org.krakenapps.filter; 17 17 18 /** 19 * 20 * @author xeraph 21 * @since 1.0.0 22 */ 18 23 public interface MessageSpecVersionRange { 19 24 MessageSpecVersion getLowerBound(); 25 20 26 MessageSpecVersion getUpperBound(); 21 27 } -
kraken-filter/src/main/java/org/krakenapps/filter/exception/AlreadyBoundException.java
r302 r306 16 16 package org.krakenapps.filter.exception; 17 17 18 /** 19 * Unchecked exception thrown when the filter is already bound to the specified 20 * filter. 21 * 22 * @author xeraph 23 * @since 1.0.0 24 */ 18 25 public class AlreadyBoundException extends RuntimeException { 19 26 private static final long serialVersionUID = 1L; 20 private String fromPid; 21 private String toPid; 22 23 public AlreadyBoundException(String fromPid, String toPid) { 24 this.fromPid = fromPid; 25 this.toPid = toPid; 27 private String fromFilterId; 28 private String toFilterId; 29 30 /** 31 * Creates an exception with source and destination filter informations. 32 * 33 * @param fromFilterId 34 * the source filter id 35 * @param toFilterId 36 * the destination filter id 37 */ 38 public AlreadyBoundException(String fromFilterId, String toFilterId) { 39 this.fromFilterId = fromFilterId; 40 this.toFilterId = toFilterId; 26 41 } 27 42 28 43 @Override 29 44 public String getMessage() { 30 return to Pid + " is already bound to " + fromPid;45 return toFilterId + " is already bound to " + fromFilterId; 31 46 } 32 47 } -
kraken-filter/src/main/java/org/krakenapps/filter/exception/ConfigurationException.java
r302 r306 1 1 package org.krakenapps.filter.exception; 2 2 3 /** 4 * Unchecked exception thrown when the filter can not accept current 5 * configurations. 6 * 7 * @author xeraph 8 * @since 1.0.0 9 */ 3 10 public class ConfigurationException extends RuntimeException { 4 11 private static final long serialVersionUID = 1L; 5 private String name; 12 13 /** 14 * property name or internal configuration name 15 */ 16 private String configName; 17 18 /** 19 * cause of the error 20 */ 6 21 private String errorMessage; 7 8 public ConfigurationException(String name) {9 this. name = name;22 23 public ConfigurationException(String configName) { 24 this.configName = configName; 10 25 } 11 26 12 27 public ConfigurationException(String name, String errorMessage) { 13 this. name = name;28 this.configName = name; 14 29 this.errorMessage = errorMessage; 15 30 } 16 31 17 32 public String getConfigurationName() { 18 return name;33 return configName; 19 34 } 20 35 21 36 public String getErrorMessage() { 22 37 return errorMessage; -
kraken-filter/src/main/java/org/krakenapps/filter/exception/DuplicatedFilterNameException.java
r302 r306 1 1 package org.krakenapps.filter.exception; 2 2 3 /** 4 * Unchecked exception thrown when the requested filter id already exists at 5 * filter loading. 6 * 7 * @author xeraph 8 * @since 1.0.0 9 */ 3 10 public class DuplicatedFilterNameException extends RuntimeException { 4 11 private static final long serialVersionUID = 1L; 5 private String name;12 private String filterId; 6 13 7 public DuplicatedFilterNameException(String name) {8 this. name = name;14 public DuplicatedFilterNameException(String filterId) { 15 this.filterId = filterId; 9 16 } 10 17 11 public String getFilter Name() {12 return name;18 public String getFilterId() { 19 return filterId; 13 20 } 14 21 } -
kraken-filter/src/main/java/org/krakenapps/filter/exception/FilterFactoryNotFoundException.java
r302 r306 1 1 package org.krakenapps.filter.exception; 2 2 3 /** 4 * Unchecked exception thrown when the filter factory is not found. 5 * 6 * @author xeraph 7 * @since 1.0.0 8 */ 3 9 public class FilterFactoryNotFoundException extends RuntimeException { 4 10 private static final long serialVersionUID = 1L; 5 private String filterTypeName;6 11 7 public FilterFactoryNotFoundException(String filterTypeName) { 8 this.filterTypeName = filterTypeName; 12 private String filterFactoryName; 13 14 /** 15 * Creates an exception with filter factory name 16 * 17 * @param filterFactoryName 18 * filter class name in general, but can have alias name. 19 */ 20 public FilterFactoryNotFoundException(String filterFactoryName) { 21 this.filterFactoryName = filterFactoryName; 9 22 } 10 23 11 public String getFilterTypeName() { 12 return filterTypeName; 24 /** 25 * Returns the filter factory name. 26 */ 27 public String getFilterFactoryName() { 28 return filterFactoryName; 13 29 } 14 30 -
kraken-filter/src/main/java/org/krakenapps/filter/exception/FilterNotBoundException.java
r302 r306 16 16 package org.krakenapps.filter.exception; 17 17 18 /** 19 * Unchecked exception thrown when filters are not bound at unbind operation. 20 * 21 * @author xeraph 22 * @since 1.0.0 23 */ 18 24 public class FilterNotBoundException extends RuntimeException { 19 25 private static final long serialVersionUID = 1L; 20 private String from Pid;21 private String to Pid;22 23 public FilterNotBoundException(String from Pid, String toPid) {24 this.from Pid = fromPid;25 this.to Pid = toPid;26 private String fromFilterId; 27 private String toFilterId; 28 29 public FilterNotBoundException(String fromFilterId, String toFilterId) { 30 this.fromFilterId = fromFilterId; 31 this.toFilterId = toFilterId; 26 32 } 27 33 28 34 @Override 29 35 public String getMessage() { 30 return to Pid + " is not bound to " + fromPid;36 return toFilterId + " is not bound to " + fromFilterId; 31 37 } 32 38 } -
kraken-filter/src/main/java/org/krakenapps/filter/exception/FilterNotFoundException.java
r302 r306 16 16 package org.krakenapps.filter.exception; 17 17 18 /** 19 * Unchecked exception thrown when the specified filter is not found. 20 * 21 * @author xeraph 22 * @since 1.0.0 23 */ 18 24 public class FilterNotFoundException extends RuntimeException { 19 25 private static final long serialVersionUID = 1L; 20 private String pid;21 22 public FilterNotFoundException(String pid) {23 this. pid = pid;26 private String filterId; 27 28 public FilterNotFoundException(String filterId) { 29 this.filterId = filterId; 24 30 } 25 31 26 32 @Override 27 33 public String getMessage() { 28 return pid + " filter not found.";34 return filterId + " filter not found."; 29 35 } 30 36 } -
kraken-filter/src/main/java/org/krakenapps/filter/exception/MessageSpecMismatchException.java
r302 r306 18 18 import org.krakenapps.filter.MessageSpec; 19 19 20 /** 21 * Unchecked exception thrown when filters can not be bound cause of message 22 * specification mismatch. 23 * 24 * @author xeraph 25 * @since 1.0.0 26 */ 20 27 public class MessageSpecMismatchException extends RuntimeException { 21 28 private static final long serialVersionUID = 1L; 22 private String from Pid;23 private String to Pid;24 private MessageSpec sendMessageSpec;25 private MessageSpec[] receiveMessageSpecs;29 private String fromFilterId; 30 private String toFilterId; 31 private MessageSpec outputMessageSpec; 32 private MessageSpec[] inputMessageSpecs; 26 33 27 public MessageSpecMismatchException(String from Pid, String toPid, MessageSpec sendMessageSpec,28 MessageSpec [] receiveMessageSpec) {29 this.from Pid = fromPid;30 this.to Pid = toPid;31 this. sendMessageSpec = sendMessageSpec;32 this. receiveMessageSpecs = receiveMessageSpec;34 public MessageSpecMismatchException(String fromFilterId, String toFilterId, 35 MessageSpec outputMessageSpec, MessageSpec[] inputMessageSpec) { 36 this.fromFilterId = fromFilterId; 37 this.toFilterId = toFilterId; 38 this.outputMessageSpec = outputMessageSpec; 39 this.inputMessageSpecs = inputMessageSpec; 33 40 } 34 41 35 42 @Override 36 43 public String getMessage() { 37 return String.format( 38 "message spec does not match: %s -> %s\n" + "source filter message spec: %s\n" 39 + "destination filter message specs: %s\n", 40 fromPid, 41 toPid, 42 sendMessageSpec.getName(), 43 getReceiveMessageSpecs()); 44 return String.format("message spec does not match: %s -> %s\n" 45 + "source filter message spec: %s\n" + "destination filter message specs: %s\n", 46 fromFilterId, toFilterId, outputMessageSpec.getName(), 47 getCommaSeparatedInputMessageSpecs()); 44 48 } 45 49 46 private String get ReceiveMessageSpecs() {50 private String getCommaSeparatedInputMessageSpecs() { 47 51 String buffer = ""; 48 52 49 int length = receiveMessageSpecs.length;53 int length = inputMessageSpecs.length; 50 54 for (int i = 0; i < length; i++) { 51 MessageSpec spec = receiveMessageSpecs[i];55 MessageSpec spec = inputMessageSpecs[i]; 52 56 buffer += spec.getName(); 53 57 if (i < length - 1) { -
kraken-filter/src/main/java/org/krakenapps/filter/impl/ActiveFilterRunner.java
r302 r306 21 21 import org.slf4j.LoggerFactory; 22 22 23 /** 24 * This class provides thread management for the active filter. 25 * 26 * @author xeraph 27 * @since 1.0.0 28 */ 23 29 public class ActiveFilterRunner implements Runnable { 24 30 final Logger logger = LoggerFactory.getLogger(ActiveFilterRunner.class); … … 28 34 private Thread thread; 29 35 36 /** 37 * Creates an active filter with 1 second of sleep interval. 38 * 39 * @param activeFilter 40 */ 30 41 public ActiveFilterRunner(ActiveFilter activeFilter) { 31 42 this(activeFilter, 1000); 32 43 } 33 44 34 /* 35 * Initializes concurrent filter that operates periodically Period is36 * expressed in millisecond. TODO : make period configurable45 /** 46 * Creates an active filter that operates periodically Period is expressed 47 * in millisecond. 37 48 */ 38 49 public ActiveFilterRunner(ActiveFilter activeFilter, long period) { … … 43 54 } 44 55 56 /** 57 * Runs the infinite loop of the active filter. 58 */ 45 59 @Override 46 60 public void run() { 47 logger.info("starting active filter: " + activeFilter.getClass().getName()); 61 logger.info("starting active filter: class [{}], filter id [{}]", activeFilter.getClass() 62 .getName(), activeFilter.getProperty("instance.name")); 48 63 try { 49 64 activeFilter.validateConfiguration(); … … 51 66 activeFilter.setRunning(true); 52 67 } catch (ConfigurationException e) { 53 logger.error("configuration error: {} - {}", e.getConfigurationName(), e.getErrorMessage()); 68 logger.error("configuration error: {} - {}", e.getConfigurationName(), e 69 .getErrorMessage()); 54 70 return; 55 71 } … … 71 87 } 72 88 } catch (InterruptedException e) { 73 logger.info("active filter interrupted."); 89 logger.info("active filter interrupted: " 90 + activeFilter.getProperty("instance.name")); 74 91 break; 75 92 } catch (Exception e) { … … 83 100 84 101 logger.info("active filter thread stopped."); 85 102 86 103 // create new thread instance for next run 87 104 thread = new Thread(this); 88 105 } 89 106 90 public long getPeriodInMilli() { 107 /** 108 * Returns the sleep interval of the active filter in milliseconds unit. 109 */ 110 public long getSleepInterval() { 91 111 return sleepMilliseconds; 92 112 } 93 113 114 /** 115 * Starts the thread for the active filter. 116 */ 94 117 public void start() { 95 118 logger.info("starting active filter runner thread."); … … 97 120 } 98 121 122 /** 123 * Requests to stop thread of the active filter. 124 */ 99 125 public void stop() { 100 126 doStop = true; … … 102 128 } 103 129 130 /** 131 * Waits termination of the active filter. 132 */ 104 133 @SuppressWarnings("deprecation") 105 134 public void waitToFinish() { -
kraken-filter/src/main/java/org/krakenapps/filter/impl/DefaultFilterChain.java
r302 r306 22 22 import org.slf4j.LoggerFactory; 23 23 24 public class FilterChainImpl implements FilterChain { 25 final Logger logger = LoggerFactory.getLogger(FilterChainImpl.class.getName()); 26 private Filter[] filters; 24 /** 25 * This class provides default implementation for the {@link FilterChain} 26 * interface. 27 * 28 * @author xeraph 29 * @since 1.0.0 30 */ 31 public class DefaultFilterChain implements FilterChain { 32 final Logger logger = LoggerFactory.getLogger(DefaultFilterChain.class.getName()); 27 33 28 public FilterChainImpl(Filter[] filters) { 29 this.filters = filters; 34 private Filter hostFilter; 35 private Filter[] outputFilters; 36 37 public DefaultFilterChain(Filter hostFilter, Filter[] outputFilters) { 38 this.hostFilter = hostFilter; 39 this.outputFilters = outputFilters; 30 40 } 31 41 32 42 @Override 33 43 public void process(Message message) { 34 for (Filter filter : filters) {44 for (Filter filter : outputFilters) { 35 45 try { 36 46 filter.process(message); 37 47 } catch (Exception e) { 38 logger.warn( "filter chain error: ", e);48 logger.warn(getHostFilterId() + " filter chain error: ", e); 39 49 } 40 50 } 41 51 } 52 53 private String getHostFilterId() { 54 return (String) hostFilter.getProperty("instance.name"); 55 } 42 56 } -
kraken-filter/src/main/java/org/krakenapps/filter/impl/DefaultFilterManager.java
r302 r306 50 50 import org.slf4j.LoggerFactory; 51 51 52 public class FilterManagerImpl implements FilterManager { 53 final Logger logger = LoggerFactory.getLogger(FilterManagerImpl.class.getName()); 52 /** 53 * This class provides default implementation for the {@link FilterManager} 54 * interface. 55 * 56 * @author xeraph 57 * @since 1.0.0 58 */ 59 public class DefaultFilterManager implements FilterManager { 60 final Logger logger = LoggerFactory.getLogger(DefaultFilterManager.class.getName()); 54 61 55 62 private static final String KRAKEN_FILTER_INTERFACE = "org.krakenapps.filter.Filter"; … … 58 65 private Map<String, Filter> filterMap; 59 66 private Map<String, ActiveFilterRunner> runnerMap; 60 private Map<String, List<Filter>> bindMap;67 private Map<String, List<Filter>> forwardBindMap; 61 68 private Map<String, List<Filter>> reverseBindMap; 62 69 private Map<String, FilterHandler> handlerMap; … … 67 74 private FilterFactoryTracker factoryTracker; 68 75 69 public FilterManagerImpl(BundleContext context) {76 public DefaultFilterManager(BundleContext context) { 70 77 filterMap = new HashMap<String, Filter>(); 71 78 runnerMap = new HashMap<String, ActiveFilterRunner>(); 72 bindMap = new HashMap<String, List<Filter>>();79 forwardBindMap = new HashMap<String, List<Filter>>(); 73 80 reverseBindMap = new HashMap<String, List<Filter>>(); 74 81 handlerMap = new HashMap<String, FilterHandler>(); … … 83 90 try { 84 91 Class.forName("org.hsqldb.jdbcDriver"); 85 filterConfig = new FilterConfig( "jdbc:hsqldb:file:filter-config;shutdown=true", "sa", "");92 filterConfig = new FilterConfig(); 86 93 87 94 factoryTracker = new FilterFactoryTracker(context, this, filterConfig); … … 93 100 94 101 @Override 95 public void loadFilter(int filterTypeIndex, String pid) throws FilterFactoryNotFoundException,96 DuplicatedFilterNameException {102 public void loadFilter(int filterTypeIndex, String filterId) 103 throws FilterFactoryNotFoundException, DuplicatedFilterNameException { 97 104 if (latestFilterFactories == null) { 98 105 throw new FilterFactoryNotFoundException(null); … … 104 111 } 105 112 106 loadFilter(filterFactoryName, pid);107 } 108 109 @Override 110 public void loadFilter(String factoryName, String pid) throws FilterFactoryNotFoundException,111 DuplicatedFilterNameException {113 loadFilter(filterFactoryName, filterId); 114 } 115 116 @Override 117 public void loadFilter(String factoryName, String filterId) 118 throws FilterFactoryNotFoundException, DuplicatedFilterNameException { 112 119 ServiceReference factoryReference = findFilterFactoryReference(factoryName); 113 120 if (factoryReference == null) … … 117 124 118 125 Factory factory = (Factory) context.getService(factoryReference); 119 Properties configuration = newServiceConfiguration( pid, description);126 Properties configuration = newServiceConfiguration(filterId, description); 120 127 ComponentInstance instance = null; 121 128 try { … … 123 130 } catch (UnacceptableConfiguration e) { 124 131 logger.warn("load filter error: ", e); 125 throw new DuplicatedFilterNameException( pid);132 throw new DuplicatedFilterNameException(filterId); 126 133 } catch (MissingHandlerException e) { 127 134 logger.warn("load filter error: ", e); … … 136 143 137 144 if (instance == null) { 138 logger.warn("[{} -> {}] component initialization failed.", factoryName, pid);145 logger.warn("[{} -> {}] component initialization failed.", factoryName, filterId); 139 146 return; 140 147 } 141 148 142 149 synchronized (this) { 143 componentMap.put( pid, instance);150 componentMap.put(filterId, instance); 144 151 145 152 for (FilterEventListener listener : listeners) { 146 listener.onFilterLoaded( pid);153 listener.onFilterLoaded(filterId); 147 154 } 148 155 } … … 152 159 153 160 @Override 154 public void unloadFilter(String pid) throws FilterNotFoundException {155 synchronized (this) { 156 ComponentInstance instance = componentMap.remove( pid);161 public void unloadFilter(String filterId) throws FilterNotFoundException { 162 synchronized (this) { 163 ComponentInstance instance = componentMap.remove(filterId); 157 164 if (instance == null) { 158 logger.warn("filter not found: " + pid);159 throw new FilterNotFoundException( pid);165 logger.warn("filter not found: " + filterId); 166 throw new FilterNotFoundException(filterId); 160 167 } 161 168 162 169 for (FilterEventListener listener : listeners) { 163 listener.onFilterUnloading( pid);170 listener.onFilterUnloading(filterId); 164 171 } 165 172 … … 170 177 171 178 @Override 172 public void runFilter(String pid, long period) throws FilterNotFoundException,179 public void runFilter(String filterId, long period) throws FilterNotFoundException, 173 180 IllegalThreadStateException { 174 Filter filter = getFilterInstance( pid);181 Filter filter = getFilterInstance(filterId); 175 182 176 183 if (filter instanceof ActiveFilter) { 177 184 ActiveFilter activeFilter = (ActiveFilter) filter; 178 185 if (activeFilter.isRunning()) 179 throw new IllegalThreadStateException( pid + " thread is already running.");186 throw new IllegalThreadStateException(filterId + " thread is already running."); 180 187 181 188 ActiveFilterRunner runner = new ActiveFilterRunner(activeFilter, period); 182 189 synchronized (this) { 183 runnerMap.put( pid, runner);190 runnerMap.put(filterId, runner); 184 191 } 185 192 runner.start(); 186 193 } else { 187 throw new FilterNotFoundException( pid);188 } 189 } 190 191 @Override 192 public void stopFilter(String pid) throws FilterNotFoundException {193 ActiveFilterRunner runner = runnerMap.get( pid);194 throw new FilterNotFoundException(filterId); 195 } 196 } 197 198 @Override 199 public void stopFilter(String filterId) throws FilterNotFoundException { 200 ActiveFilterRunner runner = runnerMap.get(filterId); 194 201 if (runner == null) 195 throw new FilterNotFoundException( pid);202 throw new FilterNotFoundException(filterId); 196 203 197 204 runner.stop(); … … 199 206 200 207 @Override 201 public void registerFilter(FilterHandler filterHandler, String pid, Filter filter) {202 logger.info("registering filter: " + pid);203 204 synchronized (this) { 205 filterMap.put( pid, filter);206 bindMap.put(pid, new ArrayList<Filter>());207 reverseBindMap.put( pid, new ArrayList<Filter>());208 handlerMap.put( pid, filterHandler);209 } 210 } 211 212 @Override 213 public void unregisterFilter(String pid) {214 logger.info("unregistering filter: " + pid);208 public void registerFilter(FilterHandler filterHandler, String filterId, Filter filter) { 209 logger.info("registering filter: " + filterId); 210 211 synchronized (this) { 212 filterMap.put(filterId, filter); 213 forwardBindMap.put(filterId, new ArrayList<Filter>()); 214 reverseBindMap.put(filterId, new ArrayList<Filter>()); 215 handlerMap.put(filterId, filterHandler); 216 } 217 } 218 219 @Override 220 public void unregisterFilter(String filterId) { 221 logger.info("unregistering filter: " + filterId); 215 222 216 223 /* … … 221 228 synchronized (this) { 222 229 // remove filter 223 Filter filter = filterMap.remove( pid);230 Filter filter = filterMap.remove(filterId); 224 231 225 232 // stop thread … … 227 234 ActiveFilter activeFilter = (ActiveFilter) filter; 228 235 if (activeFilter.isRunning()) { 229 ActiveFilterRunner runner = runnerMap.remove( pid);236 ActiveFilterRunner runner = runnerMap.remove(filterId); 230 237 if (runner != null) { 231 logger.debug("[{}] thread stopping.", pid);238 logger.debug("[{}] thread stopping.", filterId); 232 239 runner.stop(); 233 240 runner.waitToFinish(); 234 logger.debug("[{}] thread stopped.", pid);241 logger.debug("[{}] thread stopped.", filterId); 235 242 } 236 243 } … … 238 245 239 246 // remove 2->3 240 filterMap.remove( pid);241 List<Filter> filters2to3 = bindMap.remove(pid);247 filterMap.remove(filterId); 248 List<Filter> filters2to3 = forwardBindMap.remove(filterId); 242 249 243 250 // remove 3->2 … … 252 259 253 260 // remove 2->1 254 List<Filter> filters2to1 = reverseBindMap.get( pid);261 List<Filter> filters2to1 = reverseBindMap.get(filterId); 255 262 256 263 // remove 1->2 … … 259 266 // forward lookup and remove pid 260 267 String filter1Pid = (String) filter1.getProperty("instance.name"); 261 List<Filter> filters1to2 = bindMap.get(filter1Pid);268 List<Filter> filters1to2 = forwardBindMap.get(filter1Pid); 262 269 filters1to2.remove(filter); 263 270 … … 269 276 270 277 // remove event handler 271 handlerMap.remove( pid);278 handlerMap.remove(filterId); 272 279 273 280 // remove from db 274 filterConfig.removeFilterInstance( pid);275 } 276 } 277 278 private Properties newServiceConfiguration(String pid, ComponentDescription description) {281 filterConfig.removeFilterInstance(filterId); 282 } 283 } 284 285 private Properties newServiceConfiguration(String filterId, ComponentDescription description) { 279 286 Properties configuration = new Properties(); 280 configuration.put("instance.name", pid);287 configuration.put("instance.name", filterId); 281 288 configuration.put("filter.manager", this); 282 289 configuration.put("filter.config", filterConfig); … … 286 293 private ServiceReference findFilterFactoryReference(String factoryName) { 287 294 try { 288 ServiceReference[] refs = context.getServiceReferences( 289 Factory.class.getName(), 295 ServiceReference[] refs = context.getServiceReferences(Factory.class.getName(), 290 296 "(component.class=" + factoryName + ")"); 291 297 … … 323 329 } 324 330 325 @Override326 public ComponentDescription getComponentDescription(String filterFactoryName)327 throws FilterFactoryNotFoundException {328 ServiceReference ref = findFilterFactoryReference(filterFactoryName);329 return getComponentDescription(ref);330 }331 332 331 private ComponentDescription getComponentDescription(ServiceReference ref) { 333 332 if (ref == null) … … 358 357 if (refs != null) { 359 358 for (ServiceReference ref : refs) { 360 359 361 360 ComponentDescription description = new ComponentDescription(); 362 361 description.setInstanceName((String) ref.getProperty("instance.name")); … … 373 372 374 373 @Override 375 public void bindFilter(String fromFilter Pid, String toFilterPid) throws FilterNotFoundException,374 public void bindFilter(String fromFilterId, String toFilterId) throws FilterNotFoundException, 376 375 AlreadyBoundException, MessageSpecMismatchException { 377 376 synchronized (this) { 378 377 // get filter instances. 379 Filter fromFilter = getFilterInstance(fromFilter Pid);380 Filter toFilter = getFilterInstance(toFilter Pid);378 Filter fromFilter = getFilterInstance(fromFilterId); 379 Filter toFilter = getFilterInstance(toFilterId); 381 380 382 381 // match message specification 383 MessageSpec sendMessageSpec = fromFilter.get SendSpec();384 MessageSpec[] receiveMessageSpecs = toFilter.get ReceiveSpecs();382 MessageSpec sendMessageSpec = fromFilter.getOutputMessageSpec(); 383 MessageSpec[] receiveMessageSpecs = toFilter.getInputMessageSpecs(); 385 384 if (matchMessageSpec(sendMessageSpec, receiveMessageSpecs) == false) 386 throw new MessageSpecMismatchException(fromFilter Pid, toFilterPid, sendMessageSpec,385 throw new MessageSpecMismatchException(fromFilterId, toFilterId, sendMessageSpec, 387 386 receiveMessageSpecs); 388 387 389 388 // check if same filter already exists. 390 List<Filter> bindedFilters = bindMap.get(fromFilterPid);389 List<Filter> bindedFilters = forwardBindMap.get(fromFilterId); 391 390 for (Filter bindedFilter : bindedFilters) { 392 391 String instanceName = (String) bindedFilter.getProperty("instance.name"); 393 if (instanceName.equals(toFilter Pid)) {394 throw new AlreadyBoundException(fromFilter Pid, toFilterPid);392 if (instanceName.equals(toFilterId)) { 393 throw new AlreadyBoundException(fromFilterId, toFilterId); 395 394 } 396 395 } 397 396 398 397 // double check if same filter already exists. 399 List<Filter> reverseBindedFilters = reverseBindMap.get(toFilter Pid);398 List<Filter> reverseBindedFilters = reverseBindMap.get(toFilterId); 400 399 for (Filter reverseBindedFilter : reverseBindedFilters) { 401 400 String instanceName = (String) reverseBindedFilter.getProperty("instance.name"); 402 if (instanceName.equals(fromFilter Pid)) {403 throw new AlreadyBoundException(fromFilter Pid, toFilterPid);401 if (instanceName.equals(fromFilterId)) { 402 throw new AlreadyBoundException(fromFilterId, toFilterId); 404 403 } 405 404 } … … 410 409 411 410 // save bind status 412 filterConfig.bindFilter(fromFilter Pid, toFilterPid);411 filterConfig.bindFilter(fromFilterId, toFilterId); 413 412 414 413 // notify changed state. 415 FilterHandler handler = handlerMap.get(fromFilter Pid);414 FilterHandler handler = handlerMap.get(fromFilterId); 416 415 handler.stateChanged(convertToArray(bindedFilters)); 417 416 } … … 434 433 435 434 @Override 436 public void unbindFilter(String fromFilter Pid, String toFilterPid) throws FilterNotFoundException,437 FilterNotBoundException {435 public void unbindFilter(String fromFilterId, String toFilterId) 436 throws FilterNotFoundException, FilterNotBoundException { 438 437 synchronized (this) { 439 438 // get filter instances. 440 Filter fromFilter = getFilterInstance(fromFilter Pid); // for439 Filter fromFilter = getFilterInstance(fromFilterId); // for 441 440 // validation 442 Filter toFilter = getFilterInstance(toFilter Pid);441 Filter toFilter = getFilterInstance(toFilterId); 443 442 444 443 // unbind it. 445 List<Filter> bindedFilters = bindMap.get(fromFilterPid);444 List<Filter> bindedFilters = forwardBindMap.get(fromFilterId); 446 445 boolean isRemoved = bindedFilters.remove(toFilter); 447 446 448 List<Filter> reverseBindedFilters = reverseBindMap.get(toFilter Pid);447 List<Filter> reverseBindedFilters = reverseBindMap.get(toFilterId); 449 448 reverseBindedFilters.remove(fromFilter); 450 449 451 450 // save bind status 452 filterConfig.unbindFilter(fromFilter Pid, toFilterPid);453 451 filterConfig.unbindFilter(fromFilterId, toFilterId); 452 454 453 // notify changed state. 455 FilterHandler handler = handlerMap.get(fromFilter Pid);454 FilterHandler handler = handlerMap.get(fromFilterId); 456 455 handler.stateChanged(convertToArray(bindedFilters)); 457 456 458 457 // raise exception if filter not found. 459 458 if (isRemoved == false) 460 throw new FilterNotBoundException(fromFilter Pid, toFilterPid);461 } 462 } 463 464 @Override 465 public Filter[] getInputFilters(String pid) {466 synchronized (this) { 467 List<Filter> bindedFilters = reverseBindMap.get( pid);459 throw new FilterNotBoundException(fromFilterId, toFilterId); 460 } 461 } 462 463 @Override 464 public Filter[] getInputFilters(String filterId) { 465 synchronized (this) { 466 List<Filter> bindedFilters = reverseBindMap.get(filterId); 468 467 if (bindedFilters == null) 469 468 return new Filter[0]; … … 474 473 475 474 @Override 476 public Filter[] getOutputFilters(String pid) {477 synchronized (this) { 478 List<Filter> bindedFilters = bindMap.get(pid);475 public Filter[] getOutputFilters(String filterId) { 476 synchronized (this) { 477 List<Filter> bindedFilters = forwardBindMap.get(filterId); 479 478 if (bindedFilters == null) 480 479 return new Filter[0]; … … 497 496 */ 498 497 @Override 499 public Object getProperty(String pid, String key) throws FilterNotFoundException {500 synchronized (this) { 501 Filter filter = getFilterInstance( pid);498 public Object getProperty(String filterId, String key) throws FilterNotFoundException { 499 synchronized (this) { 500 Filter filter = getFilterInstance(filterId); 502 501 return filter.getProperty(key); 503 502 } … … 508 507 */ 509 508 @Override 510 public Set<String> getPropertyKeys(String pid) throws FilterNotFoundException {511 Filter filter = getFilterInstance( pid);509 public Set<String> getPropertyKeys(String filterId) throws FilterNotFoundException { 510 Filter filter = getFilterInstance(filterId); 512 511 return filter.getPropertyKeys(); 513 512 } … … 517 516 */ 518 517 @Override 519 public void setProperty(String pid, String key, String value) throws FilterNotFoundException { 520 synchronized (this) { 521 Filter filter = getFilterInstance(pid); 518 public void setProperty(String filterId, String key, String value) 519 throws FilterNotFoundException { 520 synchronized (this) { 521 Filter filter = getFilterInstance(filterId); 522 522 if (filter != null) { 523 filterConfig.setFilterProperty( pid, key, value);523 filterConfig.setFilterProperty(filterId, key, value); 524 524 filter.setProperty(key, value); 525 525 } else { 526 throw new FilterNotFoundException( pid);526 throw new FilterNotFoundException(filterId); 527 527 } 528 528 } … … 533 533 */ 534 534 @Override 535 public void unsetProperty(String pid, String key) throws FilterNotFoundException {536 synchronized (this) { 537 Filter filter = getFilterInstance( pid);535 public void unsetProperty(String filterId, String key) throws FilterNotFoundException { 536 synchronized (this) { 537 Filter filter = getFilterInstance(filterId); 538 538 if (filter != null) { 539 filterConfig.unsetFilterProperty( pid, key);539 filterConfig.unsetFilterProperty(filterId, key); 540 540 filter.unsetProperty(key); 541 541 } else { 542 throw new FilterNotFoundException( pid);542 throw new FilterNotFoundException(filterId); 543 543 } 544 544 } … … 548 548 * Return filter instance or throws exception. 549 549 */ 550 private Filter getFilterInstance(String pid) throws FilterNotFoundException {551 Filter filter = filterMap.get( pid);550 private Filter getFilterInstance(String filterId) throws FilterNotFoundException { 551 Filter filter = filterMap.get(filterId); 552 552 if (filter == null) 553 throw new FilterNotFoundException( pid);553 throw new FilterNotFoundException(filterId); 554 554 return filter; 555 555 } … … 559 559 */ 560 560 @Override 561 public Filter getFilter(String pid) {562 synchronized (this) { 563 return filterMap.get( pid);561 public Filter getFilter(String filterId) { 562 synchronized (this) { 563 return filterMap.get(filterId); 564 564 } 565 565 } -
kraken-filter/src/main/java/org/krakenapps/filter/impl/DefaultMessage.java
r302 r306 23 23 import org.krakenapps.filter.MessageSpec; 24 24 25 26 public class MessageImpl implements Message { 25 /** 26 * This class provides default implementation for the {@link Message} interface. 27 * 28 * @author xeraph 29 * @since 1.0.0 30 */ 31 public class DefaultMessage implements Message { 27 32 private MessageSpec spec; 28 33 private Map<String, Object> fields; 29 34 private Map<String, Object> headers; 30 31 public MessageImpl(MessageSpec spec, Map<String, Object> fields, Map<String, Object> headers) {35 36 public DefaultMessage(MessageSpec spec, Map<String, Object> fields, Map<String, Object> headers) { 32 37 this.spec = spec; 33 38 this.fields = new HashMap<String, Object>(); 34 39 this.headers = new HashMap<String, Object>(); 35 40 36 41 for (String key : fields.keySet()) { 37 42 this.fields.put(key, fields.get(key)); 38 43 } 39 44 40 45 for (String key : headers.keySet()) { 41 46 this.headers.put(key, headers.get(key)); 42 47 } 43 48 } 44 45 public MessageImpl(Message message) {49 50 public DefaultMessage(Message message) { 46 51 this.spec = message.getMessageSpec(); 47 52 this.fields = new HashMap<String, Object>(); 48 53 this.headers = new HashMap<String, Object>(); 49 54 50 55 for (String key : message.keySet()) { 51 56 this.fields.put(key, message.get(key)); 52 57 } 53 58 54 59 for (String key : message.headerKeySet()) { 55 60 this.headers.put(key, message.getHeader(key)); -
kraken-filter/src/main/java/org/krakenapps/filter/impl/FilterConfig.java
r243 r306 29 29 import org.slf4j.LoggerFactory; 30 30 31 /** 32 * Loads and saves filter states. 33 * 34 * @author xeraph 35 * @since 1.0.0 36 */ 31 37 public class FilterConfig { 32 38 final Logger logger = LoggerFactory.getLogger(FilterConfig.class.getName()); 39 40 /** 41 * hsqldb connection 42 */ 33 43 private Connection connection; 44 45 /** 46 * connection string 47 */ 34 48 private String connectionString; 49 50 /** 51 * jdbc user 52 */ 35 53 private String user; 54 55 /** 56 * jdbc password 57 */ 36 58 private String password; 37 59 38 public FilterConfig(String connectionString, String user, String password) { 39 this.connectionString = connectionString; 40 this.user = user; 41 this.password = password; 60 /** 61 * Creates a filter config instance and prepare tables. 62 */ 63 public FilterConfig() { 64 connectionString = "jdbc:hsqldb:file:filter-config;shutdown=true"; 65 user = "sa"; 66 password = ""; 42 67 43 68 initializeSchema(); … … 76 101 } 77 102 103 /** 104 * Creates filter state tables if not exists. 105 */ 78 106 private void initializeSchema() { 79 107 logger.info("initailizing filter table schema."); … … 84 112 } 85 113 114 /** 115 * Returns a List of loaded filter instance informations for specific filter 116 * class name. 117 * 118 * @param className 119 * the filter class name 120 * @return a List of loaded filter instance informations 121 */ 86 122 public List<FilterInstance> loadFilterInstances(String className) { 87 123 PreparedStatement st = null; … … 109 145 } catch (SQLException e) { 110 146 rollback(); 111 e.printStackTrace();147 logger.error("load filter instances error: ", e); 112 148 return null; 113 149 } finally { … … 117 153 } 118 154 155 /** 156 * Returns a List of all loaded filter instance informations. 157 */ 119 158 public List<FilterInstance> loadFilterInstances() { 120 159 Statement st = null; … … 148 187 } 149 188 150 public boolean addFilterInstance(String name, String className, int filterType) { 189 /** 190 * Save filter instance information to data store. 191 * 192 * @param filterId 193 * the filter id 194 * @param filterClassName 195 * the filter class name 196 * @param filterType 197 * active filter or not 198 * @return true if saved successfully 199 */ 200 public boolean addFilterInstance(String filterId, String filterClassName, int filterType) { 151 201 PreparedStatement st1 = null, st2 = null; 152 202 try { … … 155 205 // duplicate check by name 156 206 st1 = connection.prepareStatement("select * from filter_instances where name = ?"); 157 st1.setString(1, name);207 st1.setString(1, filterId); 158 208 if (st1.executeQuery().next()) 159 209 return false; 160 210 161 logger.info("adding filter: " + name + " " + className);211 logger.info("adding filter: " + filterId + " " + filterClassName); 162 212 st2 = connection 163 213 .prepareStatement("insert into filter_instances (name, class_name, filter_type) values (?, ?, ?)"); 164 st2.setString(1, name);165 st2.setString(2, className);214 st2.setString(1, filterId); 215 st2.setString(2, filterClassName); 166 216 st2.setInt(3, filterType); 167 217 int ret = st2.executeUpdate(); … … 173 223 } catch (Exception e) { 174 224 rollback(); 175 e.printStackTrace();225 logger.warn("add filter instance error: ", e); 176 226 return false; 177 227 } finally { … … 182 232 } 183 233 184 public void removeFilterInstance(String name) { 234 /** 235 * Remove the filter instance information from data store. 236 * 237 * @param filterId 238 * the filter id 239 */ 240 public void removeFilterInstance(String filterId) { 185 241 PreparedStatement st = null; 186 242 try { … … 189 245 // delete instance 190 246 st = connection.prepareStatement("delete from filter_instances where name = ?"); 191 st.setString(1, name);247 st.setString(1, filterId); 192 248 st.executeUpdate(); 193 249 … … 195 251 closeStatement(st); 196 252 st = connection.prepareStatement("delete from input_filters where input_filter_id = ?"); 197 st.setString(1, name);253 st.setString(1, filterId); 198 254 st.executeUpdate(); 199 255 200 256 closeStatement(st); 201 257 st = connection.prepareStatement("delete from output_filters where filter_id = ?"); 202 st.setString(1, name);258 st.setString(1, filterId); 203 259 st.executeUpdate(); 204 260 … … 206 262 closeStatement(st); 207 263 st = connection.prepareStatement("delete from filter_properties where filter_id = ?"); 208 st.setString(1, name); 209 st.executeUpdate(); 210 211 connection.commit(); 212 } catch (Exception e) { 213 rollback(); 214 e.printStackTrace(); 215 } finally { 216 closeStatement(st); 217 shutdown(); 218 } 219 } 220 264 st.setString(1, filterId); 265 st.executeUpdate(); 266 267 connection.commit(); 268 } catch (Exception e) { 269 rollback(); 270 logger.error("remove filter instance error: ", e); 271 } finally { 272 closeStatement(st); 273 shutdown(); 274 } 275 } 276 277 /** 278 * Returns the properties of the filter. 279 * 280 * @param filterId 281 * the filter id 282 * @return the properties of the filter. null if error occurred. 283 */ 221 284 public Properties getFilterProperties(String filterId) { 222 285 Properties props = new Properties(); … … 226 289 connect(); 227 290 228 st = connection.prepareStatement("select name, value from filter_properties where filter_id = ?"); 291 st = connection 292 .prepareStatement("select name, value from filter_properties where filter_id = ?"); 229 293 st.setString(1, filterId); 230 294 ResultSet rs = st.executeQuery(); … … 239 303 } catch (Exception e) { 240 304 rollback(); 241 e.printStackTrace();305 logger.error("get filter properties error: ", e); 242 306 return null; 243 307 &nbs
