Changeset 306 for kraken-filter

Show
Ignore:
Timestamp:
06/14/09 23:45:32 (14 months ago)
Author:
xeraph
Message:

kraken-filter 1.0.0 release.
- refactored a lot of codes in consistent way.
- javadoc documentation added.

Location:
kraken-filter
Files:
4 added
28 modified
7 moved

Legend:

Unmodified
Added
Removed
  • kraken-filter/pom.xml

    r301 r306  
    2323                                                <Bundle-SymbolicName>org.krakenapps.filter 
    2424                                                </Bundle-SymbolicName> 
    25                                                 <Export-Package>org.krakenapps.filter, 
    26                                                         org.krakenapps.filter.exception</Export-Package> 
    27                                                 <Private-Package>org.krakenapps.filter.impl 
     25                                                <Export-Package>org.krakenapps.filter;version=1.0.0, 
     26                                                        org.krakenapps.filter.exception;version=1.0.0</Export-Package> 
     27                                                <Private-Package>org.krakenapps.filter.impl;version=1.0.0 
    2828                                                </Private-Package> 
    2929                                                <Import-Package>!org.junit,org.hsqldb,* 
     
    6363                </dependency> 
    6464                <dependency> 
    65                         <groupId>org.apache.felix</groupId> 
    66                         <artifactId>org.apache.felix.configadmin 
    67                         </artifactId> 
    68                         <version>1.0.10</version> 
    69                 </dependency> 
    70                 <dependency> 
    7165                        <groupId>org.krakenapps</groupId> 
    7266                        <artifactId>kraken-api</artifactId> 
  • kraken-filter/src/main/java/org/krakenapps/filter/ActiveFilter.java

    r302 r306  
    1818import org.krakenapps.filter.exception.ConfigurationException; 
    1919 
    20 public abstract class ActiveFilter extends AbstractFilter { 
     20/** 
     21 * ActiveFilter class should be implemented by any filter class which filters 
     22 * are intended to be excuted by a thread. 
     23 *  
     24 * @author xeraph 
     25 * @since 1.0.0 
     26 * @see Filter 
     27 */ 
     28public abstract class ActiveFilter extends DefaultFilter { 
    2129        private boolean isRunning; 
    2230 
     31        /** 
     32         * Returns true if thread is running. 
     33         *  
     34         * @return true if thread is running 
     35         */ 
    2336        public boolean isRunning() { 
    2437                return isRunning; 
     
    2942        } 
    3043 
     44        /** 
     45         * Initialize an active filter instance. ActiveFilterRunner calls this 
     46         * method before the run loop. 
     47         *  
     48         * @throws ConfigurationException 
     49         *             if failed to configure 
     50         */ 
    3151        public void open() throws ConfigurationException { 
    3252        } 
    3353 
     54        /** 
     55         * Finalize an active filter instance. ActiveFilterRunner calls this method 
     56         * after the run loop. 
     57         */ 
    3458        public void close() { 
    3559        } 
    3660 
     61        /** 
     62         * ActiveFilterRunner calls this method in each loop. Thread will sleep some 
     63         * milliseconds after run. 
     64         *  
     65         * @throws InterruptedException 
     66         *             if thread is interrupted 
     67         */ 
    3768        abstract public void run() throws InterruptedException; 
    3869} 
  • kraken-filter/src/main/java/org/krakenapps/filter/ComponentDescription.java

    r302 r306  
    1919import java.util.List; 
    2020 
     21/** 
     22 * Represents iPOJO component description. 
     23 *  
     24 * @author xeraph 
     25 * @since 1.0.0 
     26 */ 
    2127public class ComponentDescription { 
    2228        private String instanceName; 
  • kraken-filter/src/main/java/org/krakenapps/filter/ComponentDescriptionParser.java

    r302 r306  
    1919import java.util.List; 
    2020 
    21  
     21/** 
     22 * Parses iPOJO component description string. 
     23 *  
     24 * @author xeraph 
     25 * @since 1.0.0 
     26 */ 
    2227public class ComponentDescriptionParser { 
    2328        public static ComponentDescription parse(String instanceName, String description) { 
  • kraken-filter/src/main/java/org/krakenapps/filter/DefaultFilter.java

    r302 r306  
    2323import org.krakenapps.filter.exception.ConfigurationException; 
    2424 
    25 public abstract class AbstractFilter implements Filter { 
     25/** 
     26 * This class provides default implementations for the {@link Filter} interface. 
     27 *  
     28 * @author xeraph 
     29 * @since 1.0.0 
     30 */ 
     31public class DefaultFilter implements Filter { 
    2632        private Map<String, Object> properties; 
    2733 
    28         public AbstractFilter() { 
     34        /** 
     35         * Creates a new default filter instance. 
     36         */ 
     37        public DefaultFilter() { 
    2938                properties = new ConcurrentHashMap<String, Object>(); 
    3039        } 
    3140 
     41        /** 
     42         * No input message specifications are supported. 
     43         */ 
    3244        @Override 
    33         public MessageSpec[] getReceiveSpecs() { 
     45        public MessageSpec[] getInputMessageSpecs() { 
    3446                return null; 
    3547        } 
    3648 
     49        /** 
     50         * No output message specification is supported. 
     51         */ 
    3752        @Override 
    38         public MessageSpec getSendSpec() { 
     53        public MessageSpec getOutputMessageSpec() { 
    3954                return null; 
    4055        } 
    4156 
     57        /** 
     58         * Empty message processing. 
     59         */ 
    4260        @Override 
    4361        public void process(Message message) { 
    4462        } 
    4563 
     64        /** 
     65         * Returns the value of the specified property. 
     66         */ 
    4667        @Override 
    4768        public Object getProperty(String key) { 
     
    4970        } 
    5071 
     72        /** 
     73         * Returns a key set of properties. 
     74         */ 
    5175        @Override 
    5276        public Set<String> getPropertyKeys() { 
     
    5478        } 
    5579 
     80        /** 
     81         *  
     82         */ 
    5683        @Override 
    5784        public void setProperty(String key, Object value) { 
  • kraken-filter/src/main/java/org/krakenapps/filter/DefaultMessageSpec.java

    r277 r306  
    1616package org.krakenapps.filter; 
    1717 
    18 public class MessageSpecImpl implements MessageSpec { 
     18/** 
     19 *  
     20 * @author xeraph 
     21 * @since 1.0.0 
     22 */ 
     23public class DefaultMessageSpec implements MessageSpec { 
    1924        private String name; 
    2025        private String description; 
    2126        private MessageSpecVersionRange range; 
    2227 
    23         public MessageSpecImpl(String name, MessageSpecVersion version) { 
     28        public DefaultMessageSpec(String name, MessageSpecVersion version) { 
    2429                this(name, null, version); 
    2530        } 
    2631 
    27         public MessageSpecImpl(String name, int majorVersion, int minorVersion) { 
    28                 this(name, new MessageSpecVersionImpl(majorVersion, minorVersion)); 
     32        public DefaultMessageSpec(String name, int majorVersion, int minorVersion) { 
     33                this(name, new DefaultMessageSpecVersion(majorVersion, minorVersion)); 
    2934        } 
    3035 
    31         public MessageSpecImpl(String name, String description, MessageSpecVersion version) { 
    32                 this(name, description, new MessageSpecVersionRangeImpl(version, version)); 
     36        public DefaultMessageSpec(String name, String description, MessageSpecVersion version) { 
     37                this(name, description, new DefaultMessageSpecVersionRange(version, version)); 
    3338        } 
    3439 
    35         public MessageSpecImpl(String name, String description, MessageSpecVersionRange range) { 
     40        public DefaultMessageSpec(String name, String description, MessageSpecVersionRange range) { 
    3641                this.name = name; 
    3742                this.description = description; 
  • kraken-filter/src/main/java/org/krakenapps/filter/DefaultMessageSpecVersion.java

    r215 r306  
    1616package org.krakenapps.filter; 
    1717 
    18 public class MessageSpecVersionImpl implements MessageSpecVersion { 
     18/** 
     19 *  
     20 * @author xeraph 
     21 * @since 1.0.0 
     22 */ 
     23public class DefaultMessageSpecVersion implements MessageSpecVersion { 
    1924        private int majorVersion; 
    2025        private int minorVersion; 
    2126         
    22         public MessageSpecVersionImpl(int majorVersion, int minorVersion) { 
     27        public DefaultMessageSpecVersion(int majorVersion, int minorVersion) { 
    2328                this.majorVersion = majorVersion; 
    2429                this.minorVersion = minorVersion; 
  • kraken-filter/src/main/java/org/krakenapps/filter/DefaultMessageSpecVersionRange.java

    r215 r306  
    1616package org.krakenapps.filter; 
    1717 
    18 public class MessageSpecVersionRangeImpl implements MessageSpecVersionRange { 
     18/** 
     19 *  
     20 * @author xeraph 
     21 * @since 1.0.0 
     22 */ 
     23public class DefaultMessageSpecVersionRange implements MessageSpecVersionRange { 
    1924        private MessageSpecVersion lowerBound; 
    2025        private MessageSpecVersion upperBound; 
    2126 
    22         public MessageSpecVersionRangeImpl(MessageSpecVersion v1, MessageSpecVersion v2) { 
     27        public DefaultMessageSpecVersionRange(MessageSpecVersion v1, MessageSpecVersion v2) { 
    2328                if (isLowerThan(v1, v2)) { 
    2429                        this.lowerBound = v1; 
     
    4449                        return true; 
    4550 
    46                 if ((lhs.getMajorVersion() == rhs.getMajorVersion()) && lhs.getMinorVersion() < rhs.getMinorVersion()) { 
     51                if ((lhs.getMajorVersion() == rhs.getMajorVersion()) 
     52                                && lhs.getMinorVersion() < rhs.getMinorVersion()) { 
    4753                        return true; 
    4854                } 
  • kraken-filter/src/main/java/org/krakenapps/filter/Filter.java

    r302 r306  
    2020import org.krakenapps.filter.exception.ConfigurationException; 
    2121 
     22/** 
     23 * Filter is a message processing unit. Filter can receive various message types 
     24 * from binded filters and send messages to binded filters. 
     25 *  
     26 * @author xeraph 
     27 * @since 1.0.0 
     28 */ 
    2229public interface Filter { 
    23         MessageSpec[] getReceiveSpecs(); 
     30        /** 
     31         * Returns the input message specifications that can be bound to this 
     32         * filter. 
     33         *  
     34         * @return the input message specifications array. null or empty array if no 
     35         *         types are supported 
     36         */ 
     37        MessageSpec[] getInputMessageSpecs(); 
    2438 
    25         MessageSpec getSendSpec(); 
     39        /** 
     40         * Returns the output message specification that can be bound to this 
     41         * filter. 
     42         *  
     43         * @return the output message specification. null if no type is supported 
     44         */ 
     45        MessageSpec getOutputMessageSpec(); 
    2646 
    27         /* 
     47        /** 
    2848         * Process a message object pushed from other filters. ActiveFilter usually 
    2949         * queues incoming message in this method, rather than processes it 
     
    3252        void process(Message message); 
    3353 
     54        /** 
     55         * Returns a Set of the keys of properties. 
     56         */ 
    3457        Set<String> getPropertyKeys(); 
    3558 
     59        /** 
     60         * Returns the value of specified property. 
     61         *  
     62         * @param key 
     63         *            the name of the property 
     64         * @return the value of specified property 
     65         */ 
    3666        Object getProperty(String key); 
    3767 
     68        /** 
     69         * Sets the property. 
     70         *  
     71         * @param key 
     72         *            the name of the property 
     73         * @param value 
     74         *            the value of the property 
     75         */ 
    3876        void setProperty(String key, Object value); 
    3977 
     78        /** 
     79         * Removes the specified property. 
     80         *  
     81         * @param key 
     82         *            the name of the property 
     83         */ 
    4084        void unsetProperty(String key); 
    4185 
     86        /** 
     87         * Validates current configuration of the filter. 
     88         *  
     89         * @throws ConfigurationException 
     90         *             if failed to validate 
     91         */ 
    4292        void validateConfiguration() throws ConfigurationException; 
    4393} 
  • kraken-filter/src/main/java/org/krakenapps/filter/FilterChain.java

    r282 r306  
    1616package org.krakenapps.filter; 
    1717 
     18/** 
     19 * A container of {@link Filter}s that forwards {@link Message}s to the 
     20 * consisting filters sequentially. 
     21 *  
     22 * @author xeraph 
     23 * @since 1.0.0 
     24 */ 
    1825public interface FilterChain { 
     26        /** 
     27         * Processes a {@link Message}. 
     28         *  
     29         * @param message 
     30         *            the forwarding message 
     31         */ 
    1932        void process(Message message); 
    2033} 
  • kraken-filter/src/main/java/org/krakenapps/filter/FilterEventListener.java

    r154 r306  
    1616package org.krakenapps.filter; 
    1717 
     18/** 
     19 * The listener interface for receiving filter events. 
     20 *  
     21 * @author xeraph 
     22 * @since 1.0.0 
     23 */ 
    1824public interface FilterEventListener { 
     25        /** 
     26         * Invoked when a filter is loaded. 
     27         */ 
    1928        void onFilterLoaded(String filterId); 
    2029 
     30        /** 
     31         * Invoked when a filter is unloading. 
     32         */ 
    2133        void onFilterUnloading(String filterId); 
    2234 
     35        /** 
     36         * Invoked when filters are bound. 
     37         *  
     38         * @param fromFilterId 
     39         *            the source filter id 
     40         * @param toFilterId 
     41         *            the destination filter id 
     42         */ 
     43        void onFilterBound(String fromFilterId, String toFilterId); 
     44 
     45        /** 
     46         * Invoked when filters are unbounding. 
     47         *  
     48         * @param fromFilterId 
     49         *            the source filter id 
     50         * @param toFilterId 
     51         *            the destination filter id 
     52         */ 
     53        void onFilterUnbinding(String fromFilterId, String toFilterId); 
     54 
     55        /** 
     56         * Invoked when a property is set. 
     57         *  
     58         * @param filterId 
     59         *            the target filter id 
     60         * @param name 
     61         *            the name of the property 
     62         * @param value 
     63         *            the value of the property 
     64         */ 
     65        void onFilterSet(String filterId, String name, Object value); 
     66 
     67        /** 
     68         * Invoked when a property is removed. 
     69         *  
     70         * @param filterId 
     71         *            the target filter id 
     72         * @param name 
     73         *            the name of the property 
     74         */ 
     75        void onFilterUnset(String filterId, String name); 
    2376} 
  • kraken-filter/src/main/java/org/krakenapps/filter/FilterHandler.java

    r302 r306  
    1717 
    1818import java.util.Dictionary; 
    19 import java.util.Enumeration; 
    2019import java.util.Properties; 
    2120 
     
    2827import org.krakenapps.filter.exception.FilterNotFoundException; 
    2928import org.krakenapps.filter.exception.MessageSpecMismatchException; 
    30 import org.krakenapps.filter.impl.FilterChainImpl; 
     29import org.krakenapps.filter.impl.DefaultFilterChain; 
    3130import org.krakenapps.filter.impl.FilterConfig; 
    3231import org.slf4j.Logger; 
    3332import org.slf4j.LoggerFactory; 
    3433 
     34/** 
     35 * This handler class provides filter extension of iPOJO component instance. 
     36 * FilterHanler injects a {@link FilterChain} instance and loads all properties. 
     37 * Dynamic filter binding feature is implemented based on iPOJO code injection. 
     38 *  
     39 * @author xeraph 
     40 * @since 1.0.0 
     41 */ 
    3542public class FilterHandler extends PrimitiveHandler { 
     43        /** 
     44         * the slf4j logger 
     45         */ 
    3646        final Logger logger = LoggerFactory.getLogger(FilterHandler.class.getName()); 
    3747 
     48        /** 
     49         * the filter manager instance 
     50         */ 
    3851        private FilterManager filterManager; 
     52 
     53        /** 
     54         * the filter config instance 
     55         */ 
    3956        private FilterConfig filterConfig; 
     57 
     58        /** 
     59         * the injected filter chain instance 
     60         */ 
    4061        private volatile FilterChain filterChain; 
    41         private String pid; 
    4262 
     63        /** 
     64         * the filter id 
     65         */ 
     66        private String filterId; 
     67 
     68        /** 
     69         * Invoked when a filter component instance is created. Inject filter chain 
     70         * field here. 
     71         */ 
    4372        @SuppressWarnings("unchecked") 
    4473        @Override 
    4574        public void configure(Element metadata, Dictionary config) throws ConfigurationException { 
    46                 Enumeration iterator = config.keys(); 
    47                 while (iterator.hasMoreElements()) { 
    48                         String key = (String) iterator.nextElement(); 
    49                         Object value = config.get(key); 
    50  
    51                         logger.info("configuring " + key + " " + value.toString()); 
    52                 } 
    5375 
    5476                filterManager = (FilterManager) config.get("filter.manager"); 
    5577                filterConfig = (FilterConfig) config.get("filter.config"); 
    56                 pid = (String) config.get("instance.name"); 
     78                filterId = (String) config.get("instance.name"); 
    5779 
     80                // inspects all fields and inject filter chain field. 
    5881                PojoMetadata pojoMetadata = getPojoMetadata(); 
    5982                for (FieldMetadata fieldMetadata : pojoMetadata.getFields()) { 
     
    6386                } 
    6487 
    65                 filterChain = new FilterChainImpl(new Filter[0]); 
     88                // set instance.name property. 
     89                Filter filter = getFilter(); 
     90                filter.setProperty("instance.name", filterId); 
    6691 
    67                 Filter filter = (Filter) getInstanceManager().getPojoObject(); 
    68                 filter.setProperty("instance.name", pid); 
     92                // creates a filter chain instance with empty filter bindings. 
     93                filterChain = new DefaultFilterChain(filter, new Filter[0]); 
    6994 
    70                 logger.info("filter [{}] configuration succeeded.", pid); 
     95                logger.trace("filter [{}] configuration succeeded.", filterId); 
    7196        } 
    7297 
     98        /** 
     99         * Invoked when a filter is validated. Register the filter instance and 
     100         * restore previous states. (properties and bind states) 
     101         */ 
    73102        @Override 
    74103        public void start() { 
    75                 logger.info("Kraken Filter Handler started."); 
     104                logger.debug("Kraken filter handler is started."); 
    76105 
    77106                Filter filter = getFilter(); 
    78                 filterManager.registerFilter(this, pid, filter); 
     107                filterManager.registerFilter(this, filterId, filter); 
    79108 
    80109                loadProperties(filter); 
    81                 bindAutomatically(pid); 
     110                bindAutomatically(filterId); 
    82111        } 
    83112 
     
    88117                } 
    89118 
    90                 // write filter instance information to db. 
    91                 filterConfig.addFilterInstance(pid, filter.getClass().getName(), filterType); 
     119                // write filter instance information to data store. 
     120                filterConfig.addFilterInstance(filterId, filter.getClass().getName(), filterType); 
    92121 
    93                 // load properties from db if exists. 
    94                 Properties props = filterConfig.getFilterProperties(pid); 
     122                // load properties from data store if exists. 
     123                Properties props = filterConfig.getFilterProperties(filterId); 
    95124                for (Object key : props.keySet()) { 
    96125                        filter.setProperty((String) key, (String) props.get(key)); 
     
    98127        } 
    99128 
     129        /** 
     130         * Invoked when a filter is invalidated. 
     131         */ 
    100132        @Override 
    101133        public void stop() { 
    102                 logger.info("Kraken Filter Handler stopped."); 
     134                logger.debug("Kraken filter handler is stopped."); 
    103135 
    104                 filterManager.unregisterFilter(pid); 
     136                filterManager.unregisterFilter(filterId); 
    105137        } 
    106138 
     
    110142                                try { 
    111143                                        filterManager.bindFilter(pid, target); 
    112                                         logger.info(pid + " -> " + target + " binded."); 
     144                                        logger.trace(pid + " -> " + target + " binded."); 
    113145                                } catch (FilterNotFoundException e) { 
    114146                                        logger.warn("filter not found:", e); 
     
    136168        } 
    137169 
     170        /** 
     171         * Invoked when the registered filter chain field is accessed. 
     172         */ 
    138173        @Override 
    139174        public Object onGet(Object pojo, String fieldName, Object value) { 
    140                 if (filterChain == null) { 
    141                         logger.warn("filterChain is null!"); 
    142                 } 
    143175                return filterChain; 
    144176        } 
     
    148180        } 
    149181 
    150         public void stateChanged(Filter[] bindedFilters) { 
    151                 filterChain = new FilterChainImpl(bindedFilters); 
     182        /** 
     183         * Invoked when a filter's binding states are changed cause of bind command 
     184         * or unbind command. 
     185         *  
     186         * @param boundOutputFilters 
     187         *            the bound output filters 
     188         */ 
     189        public void stateChanged(Filter[] boundOutputFilters) { 
     190                filterChain = new DefaultFilterChain(getFilter(), boundOutputFilters); 
    152191        } 
    153192} 
  • kraken-filter/src/main/java/org/krakenapps/filter/FilterManager.java

    r302 r306  
    2626import org.krakenapps.filter.exception.MessageSpecMismatchException; 
    2727 
     28/** 
     29 * The interface for filter management. 
     30 *  
     31 * @author xeraph 
     32 * @since 1.0.0 
     33 */ 
    2834public interface FilterManager { 
     35        /** 
     36         * Returns a list of filter factory type name. 
     37         */ 
    2938        List<String> getFilterFactoryNames(); 
    3039 
     40        /** 
     41         * Returns a list of all filter instance descriptions. 
     42         */ 
    3143        List<ComponentDescription> getFilterInstanceDescriptions(); 
    3244 
    33         ComponentDescription getComponentDescription(String filterFactoryName) throws FilterFactoryNotFoundException; 
    34  
    35         void loadFilter(int filterFactoryId, String pid) throws FilterFactoryNotFoundException, 
     45        /** 
     46         * Loads a filter using filter factory index. 
     47         *  
     48         * @param filterFactoryIndex 
     49         *            the filter factory index which is listed before. 
     50         * @param filterId 
     51         *            the filter id 
     52         * @throws FilterFactoryNotFoundException 
     53         *             if filter factory is not found 
     54         * @throws DuplicatedFilterNameException 
     55         *             if filter id is duplicated 
     56         */ 
     57        void loadFilter(int filterFactoryIndex, String filterId) throws FilterFactoryNotFoundException, 
    3658                        DuplicatedFilterNameException; 
    3759 
    38         void loadFilter(String filterFactoryName, String pid) throws FilterFactoryNotFoundException, 
    39                         DuplicatedFilterNameException; 
    40  
    41         void unloadFilter(String pid) throws FilterNotFoundException; 
    42  
    43         void runFilter(String pid, long period) throws FilterNotFoundException, IllegalThreadStateException; 
    44  
    45         void stopFilter(String pid) throws FilterNotFoundException; 
    46  
    47         void bindFilter(String fromFilterPid, String toFilterPid) throws FilterNotFoundException, 
     60        /** 
     61         * Loads a filter using the filter factory name. 
     62         *  
     63         * @param filterFactoryName 
     64         *            the filter factory name 
     65         * @param filterId 
     66         *            the filter id 
     67         * @throws FilterFactoryNotFoundException 
     68         *             if filter factory is not found 
     69         * @throws DuplicatedFilterNameException 
     70         *             if filter id is duplicated 
     71         */ 
     72        void loadFilter(String filterFactoryName, String filterId) 
     73                        throws FilterFactoryNotFoundException, DuplicatedFilterNameException; 
     74 
     75        /** 
     76         * Unloads the filter. 
     77         *  
     78         * @param filterId 
     79         *            the filter id 
     80         * @throws FilterNotFoundException 
     81         *             if filter is not found 
     82         */ 
     83        void unloadFilter(String filterId) throws FilterNotFoundException; 
     84 
     85        /** 
     86         * Starts an {@link ActiveFilter} thread with specified milliseconds 
     87         * interval. 
     88         *  
     89         * @param filterId 
     90         *            the {@link ActiveFilter} id 
     91         * @param period 
     92         *            the sleep interval in milliseconds 
     93         * @throws FilterNotFoundException 
     94         *             if active filter is not found 
     95         * @throws IllegalThreadStateException 
     96         *             if active filter is already running 
     97         */ 
     98        void runFilter(String filterId, long period) throws FilterNotFoundException, 
     99                        IllegalThreadStateException; 
     100 
     101        /** 
     102         * Stops the active filter. 
     103         *  
     104         * @param filterId 
     105         *            the {@link ActiveFilter} id 
     106         * @throws FilterNotFoundException 
     107         *             if active filter is not found 
     108         */ 
     109        void stopFilter(String filterId) throws FilterNotFoundException; 
     110 
     111        /** 
     112         * Binds two filters. 
     113         *  
     114         * @param fromFilterId 
     115         *            the source filter id 
     116         * @param toFilterId 
     117         *            the destination filter id 
     118         * @throws FilterNotFoundException 
     119         *             if source or destination filter is not found 
     120         * @throws AlreadyBoundException 
     121         *             if filters are already bound 
     122         * @throws MessageSpecMismatchException 
     123         *             if source filter's message specification and destinatino 
     124         *             filter's message specification is not matched. 
     125         */ 
     126        void bindFilter(String fromFilterId, String toFilterId) throws FilterNotFoundException, 
    48127                        AlreadyBoundException, MessageSpecMismatchException; 
    49128 
    50         void unbindFilter(String fromFilterPid, String toFilterPid) throws FilterNotFoundException, 
     129        /** 
     130         * Unbind filters. 
     131         *  
     132         * @param fromFilterId 
     133         *            the source filter id 
     134         * @param toFilterId 
     135         *            the destination filter id 
     136         * @throws FilterNotFoundException 
     137         *             if source or destination filter is not found 
     138         * @throws FilterNotBoundException 
     139         *             if filters are not bound 
     140         */ 
     141        void unbindFilter(String fromFilterId, String toFilterId) throws FilterNotFoundException, 
    51142                        FilterNotBoundException; 
    52143 
    53         Filter getFilter(String pid); 
    54  
    55         Filter[] getInputFilters(String pid); 
    56  
    57         Filter[] getOutputFilters(String pid); 
    58  
    59         void registerFilter(FilterHandler filterHandler, String pid, Filter filter); 
    60  
    61         void unregisterFilter(String pid); 
    62  
    63         Set<String> getPropertyKeys(String pid) throws FilterNotFoundException; 
    64  
    65         Object getProperty(String pid, String key) throws FilterNotFoundException; 
    66  
    67         void setProperty(String pid, String key, String value) throws FilterNotFoundException; 
    68  
    69         void unsetProperty(String pid, String key) throws FilterNotFoundException; 
    70  
     144        /** 
     145         * Returns the filter 
     146         *  
     147         * @param filterId 
     148         *            the filter id 
     149         * @return the filter. returns null if not exists. 
     150         */ 
     151        Filter getFilter(String filterId); 
     152 
     153        /** 
     154         * Returns all bound input filters. 
     155         *  
     156         * @param filterId 
     157         *            the filter id 
     158         * @return all bound input filters. empty array if input filter does not 
     159         *         exists 
     160         */ 
     161        Filter[] getInputFilters(String filterId); 
     162 
     163        /** 
     164         * Returns all bound output filters. 
     165         *  
     166         * @param filterId 
     167         *            the filter id 
     168         * @return all bound output filters. empty array if output filter does not 
     169         *         exists 
     170         */ 
     171        Filter[] getOutputFilters(String filterId); 
     172 
     173        /** 
     174         * Register the filter with filter id and handler. Can be used as 
     175         * sub-routine of loadFilter. Invoked from {@link FilterHandler}. 
     176         *  
     177         * @param filterHandler 
     178         *            the filter handler 
     179         * @param filterId 
     180         *            the filter id 
     181         * @param filter 
     182         *            the filter 
     183         */ 
     184        void registerFilter(FilterHandler filterHandler, String filterId, Filter filter); 
     185 
     186        /** 
     187         * Unregister the filter. Can be used as sub-routine of unloadFilter. 
     188         * Invoked from {@link FilterHandler}. 
     189         *  
     190         * @param filterId 
     191         *            the filter id 
     192         */ 
     193        void unregisterFilter(String filterId); 
     194 
     195        /** 
     196         * Returns a Set of property keys 
     197         *  
     198         * @param filterId 
     199         *            the filter id 
     200         * @return a Set of property keys 
     201         * @throws FilterNotFoundException 
     202         *             if filter is not found 
     203         */ 
     204        Set<String> getPropertyKeys(String filterId) throws FilterNotFoundException; 
     205 
     206        /** 
     207         * Returns the value of the property. 
     208         *  
     209         * @param filterId 
     210         *            the filter id 
     211         * @param key 
     212         *            the key of property 
     213         * @return the property value 
     214         * @throws FilterNotFoundException 
     215         *             if filter is not found 
     216         */ 
     217        Object getProperty(String filterId, String key) throws FilterNotFoundException; 
     218 
     219        /** 
     220         * Sets the property. 
     221         *  
     222         * @param filterId 
     223         *            the filter id 
     224         * @param key 
     225         *            the key of property 
     226         * @param value 
     227         *            the value of property 
     228         * @throws FilterNotFoundException 
     229         *             if filter is not found 
     230         */ 
     231        void setProperty(String filterId, String key, String value) throws FilterNotFoundException; 
     232 
     233        /** 
     234         * Removes the property. 
     235         *  
     236         * @param filterId 
     237         *            the filter id 
     238         * @param key 
     239         *            the key of property 
     240         * @throws FilterNotFoundException 
     241         *             if filter is not found 
     242         */ 
     243        void unsetProperty(String filterId, String key) throws FilterNotFoundException; 
     244 
     245        /** 
     246         * Subscribes filter events. 
     247         *  
     248         * @param listener 
     249         *            the filter event listener 
     250         */ 
    71251        void subscribeFilterEvent(FilterEventListener listener); 
    72252 
     253        /** 
     254         * Unsubscribes filter events. 
     255         *  
     256         * @param listener 
     257         *            the filter event listener 
     258         */ 
    73259        void unsubscribeFilterEvent(FilterEventListener listener); 
    74260} 
  • kraken-filter/src/main/java/org/krakenapps/filter/FilterScript.java

    r302 r306  
    2929import org.slf4j.LoggerFactory; 
    3030 
     31/** 
     32 * Provides filter management command scripts. 
     33 *  
     34 * @author xeraph 
     35 * @since 1.0.0 
     36 */ 
    3137public class FilterScript implements Script { 
    3238        final Logger logger = LoggerFactory.getLogger(FilterScript.class.getName()); 
     
    137143                                long period = Long.parseLong(args[1]); 
    138144                                manager.runFilter(pid, period); 
    139                                 context.println("[%s] filter's thread started. run every %d millisecond.", pid, period); 
     145                                context.println("[%s] filter's thread started. run every %d millisecond.", pid, 
     146                                                period); 
    140147                        } 
    141148                } catch (Exception e) { 
     
    161168                        context.println("========================"); 
    162169                        for (ComponentDescription description : descriptions) { 
    163                                 context.println(description.getInstanceName() + " -> " + description.getFactoryName()); 
     170                                context.println(description.getInstanceName() + " -> " 
     171                                                + description.getFactoryName()); 
    164172                        } 
    165173                } else if (args.length == 1) { 
     
    172180 
    173181                                for (String key : keys) { 
    174                                         context.println("Key: [%s], Value: [%s]", key, manager.getProperty(pid, key).toString()); 
     182                                        context.println("Key: [%s], Value: [%s]", key, manager.getProperty(pid, key) 
     183                                                        .toString()); 
    175184                                } 
    176185                        } catch (FilterNotFoundException e) { 
  • kraken-filter/src/main/java/org/krakenapps/filter/Message.java

    r146 r306  
    1818import java.util.Set; 
    1919 
     20/** 
     21 * Represents the communication unit of filter. Composed of header and 
     22 * properties. Message is immutable. 
     23 *  
     24 * @author xeraph 
     25 * @since 1.0.0 
     26 */ 
    2027public interface Message { 
     28        /** 
     29         * Returns the message specification. 
     30         */ 
    2131        public MessageSpec getMessageSpec(); 
    22          
     32 
     33        /** 
     34         * Returns a Set of property keys. 
     35         */ 
    2336        public Set<String> keySet(); 
    2437 
     38        /** 
     39         * Checks if specific key is contained. 
     40         *  
     41         * @param key 
     42         *            the name of property 
     43         * @return true if key exists. 
     44         */ 
    2545        public boolean containsKey(String key); 
    2646 
     47        /** 
     48         * Returns the value of the property 
     49         *  
     50         * @param key 
     51         *            the name of property 
     52         * @returns the value of the property. null if not exists. 
     53         */ 
    2754        public Object get(String key); 
    28          
     55 
     56        /** 
     57         * Returns a Set of header keys. 
     58         */ 
    2959        public Set<String> headerKeySet(); 
    3060 
     61        /** 
     62         * Checks if specific header is contained. 
     63         *  
     64         * @param key 
     65         *            the name of header 
     66         * @return true if header key exists 
     67         */ 
    3168        public boolean containsHeader(String key); 
    3269 
     70        /** 
     71         * Returns the value of the header. 
     72         *  
     73         * @param key 
     74         *            the name of header 
     75         * @return the value of the header. null if not exists. 
     76         */ 
    3377        public Object getHeader(String key); 
    3478} 
  • kraken-filter/src/main/java/org/krakenapps/filter/MessageBuilder.java

    r302 r306  
    1919import java.util.Map; 
    2020 
    21 import org.krakenapps.filter.impl.MessageImpl; 
     21import org.krakenapps.filter.impl.DefaultMessage; 
    2222 
    23  
     23/** 
     24 * Builds a message of specific message specification. This class is for 
     25 * convenience. 
     26 *  
     27 * @author xeraph 
     28 * @since 1.0.0 
     29 */ 
    2430public class MessageBuilder { 
    2531        private MessageSpec spec; 
     
    2733        private Map<String, Object> headers; 
    2834 
     35        /** 
     36         * Prepare a builder of specific message specification. 
     37         *  
     38         * @param spec 
     39         *            the message specification 
     40         */ 
    2941        public MessageBuilder(MessageSpec spec) { 
    3042                this.spec = spec; 
     
    3345        } 
    3446 
     47        /** 
     48         * Copy from the other message 
     49         *  
     50         * @param message 
     51         *            the source message 
     52         * @return builder for method chaining 
     53         */ 
    3554        public MessageBuilder setBase(Message message) { 
    3655                this.spec = message.getMessageSpec(); 
    37                  
     56 
    3857                for (String key : message.keySet()) { 
    3958                        fields.put(key, message.get(key)); 
     
    4766        } 
    4867 
     68        /** 
     69         * Sets a property 
     70         *  
     71         * @param key 
     72         *            the name of the property 
     73         * @param value 
     74         *            the value of the property 
     75         * @return builder for method chaining 
     76         */ 
    4977        public MessageBuilder set(String key, Object value) { 
    5078                fields.put(key, value); 
     
    5280        } 
    5381 
     82        /** 
     83         * Sets a header 
     84         *  
     85         * @param key 
     86         *            the name of the header 
     87         * @param value 
     88         *            the value of the header 
     89         * @return builder for method chaining 
     90         */ 
    5491        public MessageBuilder setHeader(String key, Object value) { 
    5592                headers.put(key, value); 
     
    5794        } 
    5895 
     96        /** 
     97         * Creates a message instance. 
     98         *  
     99         * @return the immutable message instance. 
     100         */ 
    59101        public Message build() { 
    60                 Message message = new MessageImpl(spec, fields, headers); 
     102                Message message = new DefaultMessage(spec, fields, headers); 
    61103                fields = new HashMap<String, Object>(); 
    62104                headers = new HashMap<String, Object>(); 
  • kraken-filter/src/main/java/org/krakenapps/filter/MessageSpec.java

    r147 r306  
    1616package org.krakenapps.filter; 
    1717 
     18/** 
     19 *  
     20 * @author xeraph 
     21 * @since 1.0.0 
     22 */ 
    1823public interface MessageSpec { 
    1924        public String getName(); 
     25 
    2026        public String getDescription(); 
     27 
    2128        public MessageSpecVersion getLatestVersion(); 
     29 
    2230        public MessageSpecVersionRange getVersionRange(); 
    2331} 
  • kraken-filter/src/main/java/org/krakenapps/filter/MessageSpecVersion.java

    r215 r306  
    1616package org.krakenapps.filter; 
    1717 
     18/** 
     19 *  
     20 * @author xeraph 
     21 * @since 1.0.0 
     22 */ 
    1823public interface MessageSpecVersion { 
    1924        int getMajorVersion(); 
     25 
    2026        int getMinorVersion(); 
     27 
    2128        boolean isInRange(MessageSpecVersionRange range); 
    2229} 
  • kraken-filter/src/main/java/org/krakenapps/filter/MessageSpecVersionRange.java

    r215 r306  
    1616package org.krakenapps.filter; 
    1717 
     18/** 
     19 *  
     20 * @author xeraph 
     21 * @since 1.0.0 
     22 */ 
    1823public interface MessageSpecVersionRange { 
    1924        MessageSpecVersion getLowerBound(); 
     25 
    2026        MessageSpecVersion getUpperBound(); 
    2127} 
  • kraken-filter/src/main/java/org/krakenapps/filter/exception/AlreadyBoundException.java

    r302 r306  
    1616package org.krakenapps.filter.exception; 
    1717 
     18/** 
     19 * Unchecked exception thrown when the filter is already bound to the specified 
     20 * filter. 
     21 *  
     22 * @author xeraph 
     23 * @since 1.0.0 
     24 */ 
    1825public class AlreadyBoundException extends RuntimeException { 
    1926        private static final long serialVersionUID = 1L; 
    20         private String fromPid; 
    21         private String toPid; 
    22          
    23         public AlreadyBoundException(String fromPid, String toPid) { 
    24                 this.fromPid = fromPid; 
    25                 this.toPid = toPid; 
     27        private String fromFilterId; 
     28        private String toFilterId; 
     29 
     30        /** 
     31         * Creates an exception with source and destination filter informations. 
     32         *  
     33         * @param fromFilterId 
     34         *            the source filter id 
     35         * @param toFilterId 
     36         *            the destination filter id 
     37         */ 
     38        public AlreadyBoundException(String fromFilterId, String toFilterId) { 
     39                this.fromFilterId = fromFilterId; 
     40                this.toFilterId = toFilterId; 
    2641        } 
    2742 
    2843        @Override 
    2944        public String getMessage() { 
    30                 return toPid + " is already bound to " + fromPid; 
     45                return toFilterId + " is already bound to " + fromFilterId; 
    3146        } 
    3247} 
  • kraken-filter/src/main/java/org/krakenapps/filter/exception/ConfigurationException.java

    r302 r306  
    11package org.krakenapps.filter.exception; 
    22 
     3/** 
     4 * Unchecked exception thrown when the filter can not accept current 
     5 * configurations. 
     6 *  
     7 * @author xeraph 
     8 * @since 1.0.0 
     9 */ 
    310public class ConfigurationException extends RuntimeException { 
    411        private static final long serialVersionUID = 1L; 
    5         private String name; 
     12 
     13        /** 
     14         * property name or internal configuration name 
     15         */ 
     16        private String configName; 
     17 
     18        /** 
     19         * cause of the error 
     20         */ 
    621        private String errorMessage; 
    7          
    8         public ConfigurationException(String name) { 
    9                 this.name = name; 
     22 
     23        public ConfigurationException(String configName) { 
     24                this.configName = configName; 
    1025        } 
    11          
     26 
    1227        public ConfigurationException(String name, String errorMessage) { 
    13                 this.name = name; 
     28                this.configName = name; 
    1429                this.errorMessage = errorMessage; 
    1530        } 
    16          
     31 
    1732        public String getConfigurationName() { 
    18                 return name; 
     33                return configName; 
    1934        } 
    20          
     35 
    2136        public String getErrorMessage() { 
    2237                return errorMessage; 
  • kraken-filter/src/main/java/org/krakenapps/filter/exception/DuplicatedFilterNameException.java

    r302 r306  
    11package org.krakenapps.filter.exception; 
    22 
     3/** 
     4 * Unchecked exception thrown when the requested filter id already exists at 
     5 * filter loading. 
     6 *  
     7 * @author xeraph 
     8 * @since 1.0.0 
     9 */ 
    310public class DuplicatedFilterNameException extends RuntimeException { 
    411        private static final long serialVersionUID = 1L; 
    5         private String name; 
     12        private String filterId; 
    613 
    7         public DuplicatedFilterNameException(String name) { 
    8                 this.name = name; 
     14        public DuplicatedFilterNameException(String filterId) { 
     15                this.filterId = filterId; 
    916        } 
    1017 
    11         public String getFilterName() { 
    12                 return name; 
     18        public String getFilterId() { 
     19                return filterId; 
    1320        } 
    1421} 
  • kraken-filter/src/main/java/org/krakenapps/filter/exception/FilterFactoryNotFoundException.java

    r302 r306  
    11package org.krakenapps.filter.exception; 
    22 
     3/** 
     4 * Unchecked exception thrown when the filter factory is not found. 
     5 *  
     6 * @author xeraph 
     7 * @since 1.0.0 
     8 */ 
    39public class FilterFactoryNotFoundException extends RuntimeException { 
    410        private static final long serialVersionUID = 1L; 
    5         private String filterTypeName; 
    611 
    7         public FilterFactoryNotFoundException(String filterTypeName) { 
    8                 this.filterTypeName = filterTypeName; 
     12        private String filterFactoryName; 
     13 
     14        /** 
     15         * Creates an exception with filter factory name 
     16         *  
     17         * @param filterFactoryName 
     18         *            filter class name in general, but can have alias name. 
     19         */ 
     20        public FilterFactoryNotFoundException(String filterFactoryName) { 
     21                this.filterFactoryName = filterFactoryName; 
    922        } 
    1023 
    11         public String getFilterTypeName() { 
    12                 return filterTypeName; 
     24        /** 
     25         * Returns the filter factory name. 
     26         */ 
     27        public String getFilterFactoryName() { 
     28                return filterFactoryName; 
    1329        } 
    1430 
  • kraken-filter/src/main/java/org/krakenapps/filter/exception/FilterNotBoundException.java

    r302 r306  
    1616package org.krakenapps.filter.exception; 
    1717 
     18/** 
     19 * Unchecked exception thrown when filters are not bound at unbind operation. 
     20 *  
     21 * @author xeraph 
     22 * @since 1.0.0 
     23 */ 
    1824public class FilterNotBoundException extends RuntimeException { 
    1925        private static final long serialVersionUID = 1L; 
    20         private String fromPid; 
    21         private String toPid; 
    22          
    23         public FilterNotBoundException(String fromPid, String toPid) { 
    24                 this.fromPid = fromPid; 
    25                 this.toPid = toPid; 
     26        private String fromFilterId; 
     27        private String toFilterId; 
     28 
     29        public FilterNotBoundException(String fromFilterId, String toFilterId) { 
     30                this.fromFilterId = fromFilterId; 
     31                this.toFilterId = toFilterId; 
    2632        } 
    2733 
    2834        @Override 
    2935        public String getMessage() { 
    30                 return toPid + " is not bound to " + fromPid; 
     36                return toFilterId + " is not bound to " + fromFilterId; 
    3137        } 
    3238} 
  • kraken-filter/src/main/java/org/krakenapps/filter/exception/FilterNotFoundException.java

    r302 r306  
    1616package org.krakenapps.filter.exception; 
    1717 
     18/** 
     19 * Unchecked exception thrown when the specified filter is not found. 
     20 *  
     21 * @author xeraph 
     22 * @since 1.0.0 
     23 */ 
    1824public class FilterNotFoundException extends RuntimeException { 
    1925        private static final long serialVersionUID = 1L; 
    20         private String pid; 
    21          
    22         public FilterNotFoundException(String pid) { 
    23                 this.pid = pid; 
     26        private String filterId; 
     27 
     28        public FilterNotFoundException(String filterId) { 
     29                this.filterId = filterId; 
    2430        } 
    2531 
    2632        @Override 
    2733        public String getMessage() { 
    28                 return pid + " filter not found."; 
     34                return filterId + " filter not found."; 
    2935        } 
    3036} 
  • kraken-filter/src/main/java/org/krakenapps/filter/exception/MessageSpecMismatchException.java

    r302 r306  
    1818import org.krakenapps.filter.MessageSpec; 
    1919 
     20/** 
     21 * Unchecked exception thrown when filters can not be bound cause of message 
     22 * specification mismatch. 
     23 *  
     24 * @author xeraph 
     25 * @since 1.0.0 
     26 */ 
    2027public class MessageSpecMismatchException extends RuntimeException { 
    2128        private static final long serialVersionUID = 1L; 
    22         private String fromPid; 
    23         private String toPid; 
    24         private MessageSpec sendMessageSpec; 
    25         private MessageSpec[] receiveMessageSpecs; 
     29        private String fromFilterId; 
     30        private String toFilterId; 
     31        private MessageSpec outputMessageSpec; 
     32        private MessageSpec[] inputMessageSpecs; 
    2633 
    27         public MessageSpecMismatchException(String fromPid, String toPid, MessageSpec sendMessageSpec, 
    28                         MessageSpec[] receiveMessageSpec) { 
    29                 this.fromPid = fromPid; 
    30                 this.toPid = toPid; 
    31                 this.sendMessageSpec = sendMessageSpec; 
    32                 this.receiveMessageSpecs = receiveMessageSpec; 
     34        public MessageSpecMismatchException(String fromFilterId, String toFilterId, 
     35                        MessageSpec outputMessageSpec, MessageSpec[] inputMessageSpec) { 
     36                this.fromFilterId = fromFilterId; 
     37                this.toFilterId = toFilterId; 
     38                this.outputMessageSpec = outputMessageSpec; 
     39                this.inputMessageSpecs = inputMessageSpec; 
    3340        } 
    3441 
    3542        @Override 
    3643        public String getMessage() { 
    37                 return String.format( 
    38                                 "message spec does not match: %s -> %s\n" + "source filter message spec: %s\n" 
    39                                                 + "destination filter message specs: %s\n", 
    40                                 fromPid, 
    41                                 toPid, 
    42                                 sendMessageSpec.getName(), 
    43                                 getReceiveMessageSpecs()); 
     44                return String.format("message spec does not match: %s -> %s\n" 
     45                                + "source filter message spec: %s\n" + "destination filter message specs: %s\n", 
     46                                fromFilterId, toFilterId, outputMessageSpec.getName(), 
     47                                getCommaSeparatedInputMessageSpecs()); 
    4448        } 
    4549 
    46         private String getReceiveMessageSpecs() { 
     50        private String getCommaSeparatedInputMessageSpecs() { 
    4751                String buffer = ""; 
    4852 
    49                 int length = receiveMessageSpecs.length; 
     53                int length = inputMessageSpecs.length; 
    5054                for (int i = 0; i < length; i++) { 
    51                         MessageSpec spec = receiveMessageSpecs[i]; 
     55                        MessageSpec spec = inputMessageSpecs[i]; 
    5256                        buffer += spec.getName(); 
    5357                        if (i < length - 1) { 
  • kraken-filter/src/main/java/org/krakenapps/filter/impl/ActiveFilterRunner.java

    r302 r306  
    2121import org.slf4j.LoggerFactory; 
    2222 
     23/** 
     24 * This class provides thread management for the active filter. 
     25 *  
     26 * @author xeraph 
     27 * @since 1.0.0 
     28 */ 
    2329public class ActiveFilterRunner implements Runnable { 
    2430        final Logger logger = LoggerFactory.getLogger(ActiveFilterRunner.class); 
     
    2834        private Thread thread; 
    2935 
     36        /** 
     37         * Creates an active filter with 1 second of sleep interval. 
     38         *  
     39         * @param activeFilter 
     40         */ 
    3041        public ActiveFilterRunner(ActiveFilter activeFilter) { 
    3142                this(activeFilter, 1000); 
    3243        } 
    3344 
    34         /* 
    35          * Initializes concurrent filter that operates periodically Period is 
    36          * expressed in millisecond. TODO : make period configurable 
     45        /** 
     46         * Creates an active filter that operates periodically Period is expressed 
     47         * in millisecond. 
    3748         */ 
    3849        public ActiveFilterRunner(ActiveFilter activeFilter, long period) { 
     
    4354        } 
    4455 
     56        /** 
     57         * Runs the infinite loop of the active filter. 
     58         */ 
    4559        @Override 
    4660        public void run() { 
    47                 logger.info("starting active filter: " + activeFilter.getClass().getName()); 
     61                logger.info("starting active filter: class [{}], filter id [{}]", activeFilter.getClass() 
     62                                .getName(), activeFilter.getProperty("instance.name")); 
    4863                try { 
    4964                        activeFilter.validateConfiguration(); 
     
    5166                        activeFilter.setRunning(true); 
    5267                } catch (ConfigurationException e) { 
    53                         logger.error("configuration error: {} - {}", e.getConfigurationName(), e.getErrorMessage()); 
     68                        logger.error("configuration error: {} - {}", e.getConfigurationName(), e 
     69                                        .getErrorMessage()); 
    5470                        return; 
    5571                } 
     
    7187                                } 
    7288                        } catch (InterruptedException e) { 
    73                                 logger.info("active filter interrupted."); 
     89                                logger.info("active filter interrupted: " 
     90                                                + activeFilter.getProperty("instance.name")); 
    7491                                break; 
    7592                        } catch (Exception e) { 
     
    83100 
    84101                logger.info("active filter thread stopped."); 
    85                  
     102 
    86103                // create new thread instance for next run 
    87104                thread = new Thread(this); 
    88105        } 
    89106 
    90         public long getPeriodInMilli() { 
     107        /** 
     108         * Returns the sleep interval of the active filter in milliseconds unit. 
     109         */ 
     110        public long getSleepInterval() { 
    91111                return sleepMilliseconds; 
    92112        } 
    93113 
     114        /** 
     115         * Starts the thread for the active filter. 
     116         */ 
    94117        public void start() { 
    95118                logger.info("starting active filter runner thread."); 
     
    97120        } 
    98121 
     122        /** 
     123         * Requests to stop thread of the active filter. 
     124         */ 
    99125        public void stop() { 
    100126                doStop = true; 
     
    102128        } 
    103129 
     130        /** 
     131         * Waits termination of the active filter. 
     132         */ 
    104133        @SuppressWarnings("deprecation") 
    105134        public void waitToFinish() { 
  • kraken-filter/src/main/java/org/krakenapps/filter/impl/DefaultFilterChain.java

    r302 r306  
    2222import org.slf4j.LoggerFactory; 
    2323 
    24 public class FilterChainImpl implements FilterChain { 
    25         final Logger logger = LoggerFactory.getLogger(FilterChainImpl.class.getName()); 
    26         private Filter[] filters; 
     24/** 
     25 * This class provides default implementation for the {@link FilterChain} 
     26 * interface. 
     27 *  
     28 * @author xeraph 
     29 * @since 1.0.0 
     30 */ 
     31public class DefaultFilterChain implements FilterChain { 
     32        final Logger logger = LoggerFactory.getLogger(DefaultFilterChain.class.getName()); 
    2733 
    28         public FilterChainImpl(Filter[] filters) { 
    29                 this.filters = filters; 
     34        private Filter hostFilter; 
     35        private Filter[] outputFilters; 
     36 
     37        public DefaultFilterChain(Filter hostFilter, Filter[] outputFilters) { 
     38                this.hostFilter = hostFilter; 
     39                this.outputFilters = outputFilters; 
    3040        } 
    3141 
    3242        @Override 
    3343        public void process(Message message) { 
    34                 for (Filter filter : filters) { 
     44                for (Filter filter : outputFilters) { 
    3545                        try { 
    3646                                filter.process(message); 
    3747                        } catch (Exception e) { 
    38                                 logger.warn("filter chain error: ", e); 
     48                                logger.warn(getHostFilterId() + " filter chain error: ", e); 
    3949                        } 
    4050                } 
    4151        } 
     52 
     53        private String getHostFilterId() { 
     54                return (String) hostFilter.getProperty("instance.name"); 
     55        } 
    4256} 
  • kraken-filter/src/main/java/org/krakenapps/filter/impl/DefaultFilterManager.java

    r302 r306  
    5050import org.slf4j.LoggerFactory; 
    5151 
    52 public class FilterManagerImpl implements FilterManager { 
    53         final Logger logger = LoggerFactory.getLogger(FilterManagerImpl.class.getName()); 
     52/** 
     53 * This class provides default implementation for the {@link FilterManager} 
     54 * interface. 
     55 *  
     56 * @author xeraph 
     57 * @since 1.0.0 
     58 */ 
     59public class DefaultFilterManager implements FilterManager { 
     60        final Logger logger = LoggerFactory.getLogger(DefaultFilterManager.class.getName()); 
    5461 
    5562        private static final String KRAKEN_FILTER_INTERFACE = "org.krakenapps.filter.Filter"; 
     
    5865        private Map<String, Filter> filterMap; 
    5966        private Map<String, ActiveFilterRunner> runnerMap; 
    60         private Map<String, List<Filter>> bindMap; 
     67        private Map<String, List<Filter>> forwardBindMap; 
    6168        private Map<String, List<Filter>> reverseBindMap; 
    6269        private Map<String, FilterHandler> handlerMap; 
     
    6774        private FilterFactoryTracker factoryTracker; 
    6875 
    69         public FilterManagerImpl(BundleContext context) { 
     76        public DefaultFilterManager(BundleContext context) { 
    7077                filterMap = new HashMap<String, Filter>(); 
    7178                runnerMap = new HashMap<String, ActiveFilterRunner>(); 
    72                 bindMap = new HashMap<String, List<Filter>>(); 
     79                forwardBindMap = new HashMap<String, List<Filter>>(); 
    7380                reverseBindMap = new HashMap<String, List<Filter>>(); 
    7481                handlerMap = new HashMap<String, FilterHandler>(); 
     
    8390                try { 
    8491                        Class.forName("org.hsqldb.jdbcDriver"); 
    85                         filterConfig = new FilterConfig("jdbc:hsqldb:file:filter-config;shutdown=true", "sa", ""); 
     92                        filterConfig = new FilterConfig(); 
    8693 
    8794                        factoryTracker = new FilterFactoryTracker(context, this, filterConfig); 
     
    93100 
    94101        @Override 
    95         public void loadFilter(int filterTypeIndex, String pid) throws FilterFactoryNotFoundException, 
    96                         DuplicatedFilterNameException { 
     102        public void loadFilter(int filterTypeIndex, String filterId) 
     103                        throws FilterFactoryNotFoundException, DuplicatedFilterNameException { 
    97104                if (latestFilterFactories == null) { 
    98105                        throw new FilterFactoryNotFoundException(null); 
     
    104111                } 
    105112 
    106                 loadFilter(filterFactoryName, pid); 
    107         } 
    108  
    109         @Override 
    110         public void loadFilter(String factoryName, String pid) throws FilterFactoryNotFoundException, 
    111                         DuplicatedFilterNameException { 
     113                loadFilter(filterFactoryName, filterId); 
     114        } 
     115 
     116        @Override 
     117        public void loadFilter(String factoryName, String filterId) 
     118                        throws FilterFactoryNotFoundException, DuplicatedFilterNameException { 
    112119                ServiceReference factoryReference = findFilterFactoryReference(factoryName); 
    113120                if (factoryReference == null) 
     
    117124 
    118125                Factory factory = (Factory) context.getService(factoryReference); 
    119                 Properties configuration = newServiceConfiguration(pid, description); 
     126                Properties configuration = newServiceConfiguration(filterId, description); 
    120127                ComponentInstance instance = null; 
    121128                try { 
     
    123130                } catch (UnacceptableConfiguration e) { 
    124131                        logger.warn("load filter error: ", e); 
    125                         throw new DuplicatedFilterNameException(pid); 
     132                        throw new DuplicatedFilterNameException(filterId); 
    126133                } catch (MissingHandlerException e) { 
    127134                        logger.warn("load filter error: ", e); 
     
    136143 
    137144                if (instance == null) { 
    138                         logger.warn("[{} -> {}] component initialization failed.", factoryName, pid); 
     145                        logger.warn("[{} -> {}] component initialization failed.", factoryName, filterId); 
    139146                        return; 
    140147                } 
    141148 
    142149                synchronized (this) { 
    143                         componentMap.put(pid, instance); 
     150                        componentMap.put(filterId, instance); 
    144151 
    145152                        for (FilterEventListener listener : listeners) { 
    146                                 listener.onFilterLoaded(pid); 
     153                                listener.onFilterLoaded(filterId); 
    147154                        } 
    148155                } 
     
    152159 
    153160        @Override 
    154         public void unloadFilter(String pid) throws FilterNotFoundException { 
    155                 synchronized (this) { 
    156                         ComponentInstance instance = componentMap.remove(pid); 
     161        public void unloadFilter(String filterId) throws FilterNotFoundException { 
     162                synchronized (this) { 
     163                        ComponentInstance instance = componentMap.remove(filterId); 
    157164                        if (instance == null) { 
    158                                 logger.warn("filter not found: " + pid); 
    159                                 throw new FilterNotFoundException(pid); 
     165                                logger.warn("filter not found: " + filterId); 
     166                                throw new FilterNotFoundException(filterId); 
    160167                        } 
    161168 
    162169                        for (FilterEventListener listener : listeners) { 
    163                                 listener.onFilterUnloading(pid); 
     170                                listener.onFilterUnloading(filterId); 
    164171                        } 
    165172 
     
    170177 
    171178        @Override 
    172         public void runFilter(String pid, long period) throws FilterNotFoundException, 
     179        public void runFilter(String filterId, long period) throws FilterNotFoundException, 
    173180                        IllegalThreadStateException { 
    174                 Filter filter = getFilterInstance(pid); 
     181                Filter filter = getFilterInstance(filterId); 
    175182 
    176183                if (filter instanceof ActiveFilter) { 
    177184                        ActiveFilter activeFilter = (ActiveFilter) filter; 
    178185                        if (activeFilter.isRunning()) 
    179                                 throw new IllegalThreadStateException(pid + " thread is already running."); 
     186                                throw new IllegalThreadStateException(filterId + " thread is already running."); 
    180187 
    181188                        ActiveFilterRunner runner = new ActiveFilterRunner(activeFilter, period); 
    182189                        synchronized (this) { 
    183                                 runnerMap.put(pid, runner); 
     190                                runnerMap.put(filterId, runner); 
    184191                        } 
    185192                        runner.start(); 
    186193                } else { 
    187                         throw new FilterNotFoundException(pid); 
    188                 } 
    189         } 
    190  
    191         @Override 
    192         public void stopFilter(String pid) throws FilterNotFoundException { 
    193                 ActiveFilterRunner runner = runnerMap.get(pid); 
     194                        throw new FilterNotFoundException(filterId); 
     195                } 
     196        } 
     197 
     198        @Override 
     199        public void stopFilter(String filterId) throws FilterNotFoundException { 
     200                ActiveFilterRunner runner = runnerMap.get(filterId); 
    194201                if (runner == null) 
    195                         throw new FilterNotFoundException(pid); 
     202                        throw new FilterNotFoundException(filterId); 
    196203 
    197204                runner.stop(); 
     
    199206 
    200207        @Override 
    201         public void registerFilter(FilterHandler filterHandler, String pid, Filter filter) { 
    202                 logger.info("registering filter: " + pid); 
    203  
    204                 synchronized (this) { 
    205                         filterMap.put(pid, filter); 
    206                         bindMap.put(pid, new ArrayList<Filter>()); 
    207                         reverseBindMap.put(pid, new ArrayList<Filter>()); 
    208                         handlerMap.put(pid, filterHandler); 
    209                 } 
    210         } 
    211  
    212         @Override 
    213         public void unregisterFilter(String pid) { 
    214                 logger.info("unregistering filter: " + pid); 
     208        public void registerFilter(FilterHandler filterHandler, String filterId, Filter filter) { 
     209                logger.info("registering filter: " + filterId); 
     210 
     211                synchronized (this) { 
     212                        filterMap.put(filterId, filter); 
     213                        forwardBindMap.put(filterId, new ArrayList<Filter>()); 
     214                        reverseBindMap.put(filterId, new ArrayList<Filter>()); 
     215                        handlerMap.put(filterId, filterHandler); 
     216                } 
     217        } 
     218 
     219        @Override 
     220        public void unregisterFilter(String filterId) { 
     221                logger.info("unregistering filter: " + filterId); 
    215222 
    216223                /* 
     
    221228                synchronized (this) { 
    222229                        // remove filter 
    223                         Filter filter = filterMap.remove(pid); 
     230                        Filter filter = filterMap.remove(filterId); 
    224231 
    225232                        // stop thread 
     
    227234                                ActiveFilter activeFilter = (ActiveFilter) filter; 
    228235                                if (activeFilter.isRunning()) { 
    229                                         ActiveFilterRunner runner = runnerMap.remove(pid); 
     236                                        ActiveFilterRunner runner = runnerMap.remove(filterId); 
    230237                                        if (runner != null) { 
    231                                                 logger.debug("[{}] thread stopping.", pid); 
     238                                                logger.debug("[{}] thread stopping.", filterId); 
    232239                                                runner.stop(); 
    233240                                                runner.waitToFinish(); 
    234                                                 logger.debug("[{}] thread stopped.", pid); 
     241                                                logger.debug("[{}] thread stopped.", filterId); 
    235242                                        } 
    236243                                } 
     
    238245 
    239246                        // remove 2->3 
    240                         filterMap.remove(pid); 
    241                         List<Filter> filters2to3 = bindMap.remove(pid); 
     247                        filterMap.remove(filterId); 
     248                        List<Filter> filters2to3 = forwardBindMap.remove(filterId); 
    242249 
    243250                        // remove 3->2 
     
    252259 
    253260                        // remove 2->1 
    254                         List<Filter> filters2to1 = reverseBindMap.get(pid); 
     261                        List<Filter> filters2to1 = reverseBindMap.get(filterId); 
    255262 
    256263                        // remove 1->2 
     
    259266                                        // forward lookup and remove pid 
    260267                                        String filter1Pid = (String) filter1.getProperty("instance.name"); 
    261                                         List<Filter> filters1to2 = bindMap.get(filter1Pid); 
     268                                        List<Filter> filters1to2 = forwardBindMap.get(filter1Pid); 
    262269                                        filters1to2.remove(filter); 
    263270 
     
    269276 
    270277                        // remove event handler 
    271                         handlerMap.remove(pid); 
     278                        handlerMap.remove(filterId); 
    272279 
    273280                        // remove from db 
    274                         filterConfig.removeFilterInstance(pid); 
    275                 } 
    276         } 
    277  
    278         private Properties newServiceConfiguration(String pid, ComponentDescription description) { 
     281                        filterConfig.removeFilterInstance(filterId); 
     282                } 
     283        } 
     284 
     285        private Properties newServiceConfiguration(String filterId, ComponentDescription description) { 
    279286                Properties configuration = new Properties(); 
    280                 configuration.put("instance.name", pid); 
     287                configuration.put("instance.name", filterId); 
    281288                configuration.put("filter.manager", this); 
    282289                configuration.put("filter.config", filterConfig); 
     
    286293        private ServiceReference findFilterFactoryReference(String factoryName) { 
    287294                try { 
    288                         ServiceReference[] refs = context.getServiceReferences( 
    289                                         Factory.class.getName(), 
     295                        ServiceReference[] refs = context.getServiceReferences(Factory.class.getName(), 
    290296                                        "(component.class=" + factoryName + ")"); 
    291297 
     
    323329        } 
    324330 
    325         @Override 
    326         public ComponentDescription getComponentDescription(String filterFactoryName) 
    327                         throws FilterFactoryNotFoundException { 
    328                 ServiceReference ref = findFilterFactoryReference(filterFactoryName); 
    329                 return getComponentDescription(ref); 
    330         } 
    331  
    332331        private ComponentDescription getComponentDescription(ServiceReference ref) { 
    333332                if (ref == null) 
     
    358357                        if (refs != null) { 
    359358                                for (ServiceReference ref : refs) { 
    360                                          
     359 
    361360                                        ComponentDescription description = new ComponentDescription(); 
    362361                                        description.setInstanceName((String) ref.getProperty("instance.name")); 
     
    373372 
    374373        @Override 
    375         public void bindFilter(String fromFilterPid, String toFilterPid) throws FilterNotFoundException, 
     374        public void bindFilter(String fromFilterId, String toFilterId) throws FilterNotFoundException, 
    376375                        AlreadyBoundException, MessageSpecMismatchException { 
    377376                synchronized (this) { 
    378377                        // get filter instances. 
    379                         Filter fromFilter = getFilterInstance(fromFilterPid); 
    380                         Filter toFilter = getFilterInstance(toFilterPid); 
     378                        Filter fromFilter = getFilterInstance(fromFilterId); 
     379                        Filter toFilter = getFilterInstance(toFilterId); 
    381380 
    382381                        // match message specification 
    383                         MessageSpec sendMessageSpec = fromFilter.getSendSpec(); 
    384                         MessageSpec[] receiveMessageSpecs = toFilter.getReceiveSpecs(); 
     382                        MessageSpec sendMessageSpec = fromFilter.getOutputMessageSpec(); 
     383                        MessageSpec[] receiveMessageSpecs = toFilter.getInputMessageSpecs(); 
    385384                        if (matchMessageSpec(sendMessageSpec, receiveMessageSpecs) == false) 
    386                                 throw new MessageSpecMismatchException(fromFilterPid, toFilterPid, sendMessageSpec, 
     385                                throw new MessageSpecMismatchException(fromFilterId, toFilterId, sendMessageSpec, 
    387386                                                receiveMessageSpecs); 
    388387 
    389388                        // check if same filter already exists. 
    390                         List<Filter> bindedFilters = bindMap.get(fromFilterPid); 
     389                        List<Filter> bindedFilters = forwardBindMap.get(fromFilterId); 
    391390                        for (Filter bindedFilter : bindedFilters) { 
    392391                                String instanceName = (String) bindedFilter.getProperty("instance.name"); 
    393                                 if (instanceName.equals(toFilterPid)) { 
    394                                         throw new AlreadyBoundException(fromFilterPid, toFilterPid); 
     392                                if (instanceName.equals(toFilterId)) { 
     393                                        throw new AlreadyBoundException(fromFilterId, toFilterId); 
    395394                                } 
    396395                        } 
    397396 
    398397                        // double check if same filter already exists. 
    399                         List<Filter> reverseBindedFilters = reverseBindMap.get(toFilterPid); 
     398                        List<Filter> reverseBindedFilters = reverseBindMap.get(toFilterId); 
    400399                        for (Filter reverseBindedFilter : reverseBindedFilters) { 
    401400                                String instanceName = (String) reverseBindedFilter.getProperty("instance.name"); 
    402                                 if (instanceName.equals(fromFilterPid)) { 
    403                                         throw new AlreadyBoundException(fromFilterPid, toFilterPid); 
     401                                if (instanceName.equals(fromFilterId)) { 
     402                                        throw new AlreadyBoundException(fromFilterId, toFilterId); 
    404403                                } 
    405404                        } 
     
    410409 
    411410                        // save bind status 
    412                         filterConfig.bindFilter(fromFilterPid, toFilterPid); 
     411                        filterConfig.bindFilter(fromFilterId, toFilterId); 
    413412 
    414413                        // notify changed state. 
    415                         FilterHandler handler = handlerMap.get(fromFilterPid); 
     414                        FilterHandler handler = handlerMap.get(fromFilterId); 
    416415                        handler.stateChanged(convertToArray(bindedFilters)); 
    417416                } 
     
    434433 
    435434        @Override 
    436         public void unbindFilter(String fromFilterPid, String toFilterPid) throws FilterNotFoundException, 
    437                         FilterNotBoundException { 
     435        public void unbindFilter(String fromFilterId, String toFilterId) 
     436                        throws FilterNotFoundException, FilterNotBoundException { 
    438437                synchronized (this) { 
    439438                        // get filter instances. 
    440                         Filter fromFilter = getFilterInstance(fromFilterPid); // for 
     439                        Filter fromFilter = getFilterInstance(fromFilterId); // for 
    441440                        // validation 
    442                         Filter toFilter = getFilterInstance(toFilterPid); 
     441                        Filter toFilter = getFilterInstance(toFilterId); 
    443442 
    444443                        // unbind it. 
    445                         List<Filter> bindedFilters = bindMap.get(fromFilterPid); 
     444                        List<Filter> bindedFilters = forwardBindMap.get(fromFilterId); 
    446445                        boolean isRemoved = bindedFilters.remove(toFilter); 
    447446 
    448                         List<Filter> reverseBindedFilters = reverseBindMap.get(toFilterPid); 
     447                        List<Filter> reverseBindedFilters = reverseBindMap.get(toFilterId); 
    449448                        reverseBindedFilters.remove(fromFilter); 
    450449 
    451450                        // save bind status 
    452                         filterConfig.unbindFilter(fromFilterPid, toFilterPid); 
    453                          
     451                        filterConfig.unbindFilter(fromFilterId, toFilterId); 
     452 
    454453                        // notify changed state. 
    455                         FilterHandler handler = handlerMap.get(fromFilterPid); 
     454                        FilterHandler handler = handlerMap.get(fromFilterId); 
    456455                        handler.stateChanged(convertToArray(bindedFilters)); 
    457456 
    458457                        // raise exception if filter not found. 
    459458                        if (isRemoved == false) 
    460                                 throw new FilterNotBoundException(fromFilterPid, toFilterPid); 
    461                 } 
    462         } 
    463  
    464         @Override 
    465         public Filter[] getInputFilters(String pid) { 
    466                 synchronized (this) { 
    467                         List<Filter> bindedFilters = reverseBindMap.get(pid); 
     459                                throw new FilterNotBoundException(fromFilterId, toFilterId); 
     460                } 
     461        } 
     462 
     463        @Override 
     464        public Filter[] getInputFilters(String filterId) { 
     465                synchronized (this) { 
     466                        List<Filter> bindedFilters = reverseBindMap.get(filterId); 
    468467                        if (bindedFilters == null) 
    469468                                return new Filter[0]; 
     
    474473 
    475474        @Override 
    476         public Filter[] getOutputFilters(String pid) { 
    477                 synchronized (this) { 
    478                         List<Filter> bindedFilters = bindMap.get(pid); 
     475        public Filter[] getOutputFilters(String filterId) { 
     476                synchronized (this) { 
     477                        List<Filter> bindedFilters = forwardBindMap.get(filterId); 
    479478                        if (bindedFilters == null) 
    480479                                return new Filter[0]; 
     
    497496         */ 
    498497        @Override 
    499         public Object getProperty(String pid, String key) throws FilterNotFoundException { 
    500                 synchronized (this) { 
    501                         Filter filter = getFilterInstance(pid); 
     498        public Object getProperty(String filterId, String key) throws FilterNotFoundException { 
     499                synchronized (this) { 
     500                        Filter filter = getFilterInstance(filterId); 
    502501                        return filter.getProperty(key); 
    503502                } 
     
    508507         */ 
    509508        @Override 
    510         public Set<String> getPropertyKeys(String pid) throws FilterNotFoundException { 
    511                 Filter filter = getFilterInstance(pid); 
     509        public Set<String> getPropertyKeys(String filterId) throws FilterNotFoundException { 
     510                Filter filter = getFilterInstance(filterId); 
    512511                return filter.getPropertyKeys(); 
    513512        } 
     
    517516         */ 
    518517        @Override 
    519         public void setProperty(String pid, String key, String value) throws FilterNotFoundException { 
    520                 synchronized (this) { 
    521                         Filter filter = getFilterInstance(pid); 
     518        public void setProperty(String filterId, String key, String value) 
     519                        throws FilterNotFoundException { 
     520                synchronized (this) { 
     521                        Filter filter = getFilterInstance(filterId); 
    522522                        if (filter != null) { 
    523                                 filterConfig.setFilterProperty(pid, key, value); 
     523                                filterConfig.setFilterProperty(filterId, key, value); 
    524524                                filter.setProperty(key, value); 
    525525                        } else { 
    526                                 throw new FilterNotFoundException(pid); 
     526                                throw new FilterNotFoundException(filterId); 
    527527                        } 
    528528                } 
     
    533533         */ 
    534534        @Override 
    535         public void unsetProperty(String pid, String key) throws FilterNotFoundException { 
    536                 synchronized (this) { 
    537                         Filter filter = getFilterInstance(pid); 
     535        public void unsetProperty(String filterId, String key) throws FilterNotFoundException { 
     536                synchronized (this) { 
     537                        Filter filter = getFilterInstance(filterId); 
    538538                        if (filter != null) { 
    539                                 filterConfig.unsetFilterProperty(pid, key); 
     539                                filterConfig.unsetFilterProperty(filterId, key); 
    540540                                filter.unsetProperty(key); 
    541541                        } else { 
    542                                 throw new FilterNotFoundException(pid); 
     542                                throw new FilterNotFoundException(filterId); 
    543543                        } 
    544544                } 
     
    548548         * Return filter instance or throws exception. 
    549549         */ 
    550         private Filter getFilterInstance(String pid) throws FilterNotFoundException { 
    551                 Filter filter = filterMap.get(pid); 
     550        private Filter getFilterInstance(String filterId) throws FilterNotFoundException { 
     551                Filter filter = filterMap.get(filterId); 
    552552                if (filter == null) 
    553                         throw new FilterNotFoundException(pid); 
     553                        throw new FilterNotFoundException(filterId); 
    554554                return filter; 
    555555        } 
     
    559559         */ 
    560560        @Override 
    561         public Filter getFilter(String pid) { 
    562                 synchronized (this) { 
    563                         return filterMap.get(pid); 
     561        public Filter getFilter(String filterId) { 
     562                synchronized (this) { 
     563                        return filterMap.get(filterId); 
    564564                } 
    565565        } 
  • kraken-filter/src/main/java/org/krakenapps/filter/impl/DefaultMessage.java

    r302 r306  
    2323import org.krakenapps.filter.MessageSpec; 
    2424 
    25  
    26 public class MessageImpl implements Message { 
     25/** 
     26 * This class provides default implementation for the {@link Message} interface. 
     27 *  
     28 * @author xeraph 
     29 * @since 1.0.0 
     30 */ 
     31public class DefaultMessage implements Message { 
    2732        private MessageSpec spec; 
    2833        private Map<String, Object> fields; 
    2934        private Map<String, Object> headers; 
    30          
    31         public MessageImpl(MessageSpec spec, Map<String, Object> fields, Map<String, Object> headers) { 
     35 
     36        public DefaultMessage(MessageSpec spec, Map<String, Object> fields, Map<String, Object> headers) { 
    3237                this.spec = spec; 
    3338                this.fields = new HashMap<String, Object>(); 
    3439                this.headers = new HashMap<String, Object>(); 
    35                  
     40 
    3641                for (String key : fields.keySet()) { 
    3742                        this.fields.put(key, fields.get(key)); 
    3843                } 
    39                  
     44 
    4045                for (String key : headers.keySet()) { 
    4146                        this.headers.put(key, headers.get(key)); 
    4247                } 
    4348        } 
    44          
    45         public MessageImpl(Message message) { 
     49 
     50        public DefaultMessage(Message message) { 
    4651                this.spec = message.getMessageSpec(); 
    4752                this.fields = new HashMap<String, Object>(); 
    4853                this.headers = new HashMap<String, Object>(); 
    49                  
     54 
    5055                for (String key : message.keySet()) { 
    5156                        this.fields.put(key, message.get(key)); 
    5257                } 
    53                  
     58 
    5459                for (String key : message.headerKeySet()) { 
    5560                        this.headers.put(key, message.getHeader(key)); 
  • kraken-filter/src/main/java/org/krakenapps/filter/impl/FilterConfig.java

    r243 r306  
    2929import org.slf4j.LoggerFactory; 
    3030 
     31/** 
     32 * Loads and saves filter states. 
     33 *  
     34 * @author xeraph 
     35 * @since 1.0.0 
     36 */ 
    3137public class FilterConfig { 
    3238        final Logger logger = LoggerFactory.getLogger(FilterConfig.class.getName()); 
     39 
     40        /** 
     41         * hsqldb connection 
     42         */ 
    3343        private Connection connection; 
     44 
     45        /** 
     46         * connection string 
     47         */ 
    3448        private String connectionString; 
     49 
     50        /** 
     51         * jdbc user 
     52         */ 
    3553        private String user; 
     54 
     55        /** 
     56         * jdbc password 
     57         */ 
    3658        private String password; 
    3759 
    38         public FilterConfig(String connectionString, String user, String password) { 
    39                 this.connectionString = connectionString; 
    40                 this.user = user; 
    41                 this.password = password; 
     60        /** 
     61         * Creates a filter config instance and prepare tables. 
     62         */ 
     63        public FilterConfig() { 
     64                connectionString = "jdbc:hsqldb:file:filter-config;shutdown=true"; 
     65                user = "sa"; 
     66                password = ""; 
    4267 
    4368                initializeSchema(); 
     
    76101        } 
    77102 
     103        /** 
     104         * Creates filter state tables if not exists. 
     105         */ 
    78106        private void initializeSchema() { 
    79107                logger.info("initailizing filter table schema."); 
     
    84112        } 
    85113 
     114        /** 
     115         * Returns a List of loaded filter instance informations for specific filter 
     116         * class name. 
     117         *  
     118         * @param className 
     119         *            the filter class name 
     120         * @return a List of loaded filter instance informations 
     121         */ 
    86122        public List<FilterInstance> loadFilterInstances(String className) { 
    87123                PreparedStatement st = null; 
     
    109145                } catch (SQLException e) { 
    110146                        rollback(); 
    111                         e.printStackTrace(); 
     147                        logger.error("load filter instances error: ", e); 
    112148                        return null; 
    113149                } finally { 
     
    117153        } 
    118154 
     155        /** 
     156         * Returns a List of all loaded filter instance informations. 
     157         */ 
    119158        public List<FilterInstance> loadFilterInstances() { 
    120159                Statement st = null; 
     
    148187        } 
    149188 
    150         public boolean addFilterInstance(String name, String className, int filterType) { 
     189        /** 
     190         * Save filter instance information to data store. 
     191         *  
     192         * @param filterId 
     193         *            the filter id 
     194         * @param filterClassName 
     195         *            the filter class name 
     196         * @param filterType 
     197         *            active filter or not 
     198         * @return true if saved successfully 
     199         */ 
     200        public boolean addFilterInstance(String filterId, String filterClassName, int filterType) { 
    151201                PreparedStatement st1 = null, st2 = null; 
    152202                try { 
     
    155205                        // duplicate check by name 
    156206                        st1 = connection.prepareStatement("select * from filter_instances where name = ?"); 
    157                         st1.setString(1, name); 
     207                        st1.setString(1, filterId); 
    158208                        if (st1.executeQuery().next()) 
    159209                                return false; 
    160210 
    161                         logger.info("adding filter: " + name + " " + className); 
     211                        logger.info("adding filter: " + filterId + " " + filterClassName); 
    162212                        st2 = connection 
    163213                                        .prepareStatement("insert into filter_instances (name, class_name, filter_type) values (?, ?, ?)"); 
    164                         st2.setString(1, name); 
    165                         st2.setString(2, className); 
     214                        st2.setString(1, filterId); 
     215                        st2.setString(2, filterClassName); 
    166216                        st2.setInt(3, filterType); 
    167217                        int ret = st2.executeUpdate(); 
     
    173223                } catch (Exception e) { 
    174224                        rollback(); 
    175                         e.printStackTrace(); 
     225                        logger.warn("add filter instance error: ", e); 
    176226                        return false; 
    177227                } finally { 
     
    182232        } 
    183233 
    184         public void removeFilterInstance(String name) { 
     234        /** 
     235         * Remove the filter instance information from data store. 
     236         *  
     237         * @param filterId 
     238         *            the filter id 
     239         */ 
     240        public void removeFilterInstance(String filterId) { 
    185241                PreparedStatement st = null; 
    186242                try { 
     
    189245                        // delete instance 
    190246                        st = connection.prepareStatement("delete from filter_instances where name = ?"); 
    191                         st.setString(1, name); 
     247                        st.setString(1, filterId); 
    192248                        st.executeUpdate(); 
    193249 
     
    195251                        closeStatement(st); 
    196252                        st = connection.prepareStatement("delete from input_filters where input_filter_id = ?"); 
    197                         st.setString(1, name); 
     253                        st.setString(1, filterId); 
    198254                        st.executeUpdate(); 
    199255 
    200256                        closeStatement(st); 
    201257                        st = connection.prepareStatement("delete from output_filters where filter_id = ?"); 
    202                         st.setString(1, name); 
     258                        st.setString(1, filterId); 
    203259                        st.executeUpdate(); 
    204260 
     
    206262                        closeStatement(st); 
    207263                        st = connection.prepareStatement("delete from filter_properties where filter_id = ?"); 
    208                         st.setString(1, name); 
    209                         st.executeUpdate(); 
    210  
    211                         connection.commit(); 
    212                 } catch (Exception e) { 
    213                         rollback(); 
    214                         e.printStackTrace(); 
    215                 } finally { 
    216                         closeStatement(st); 
    217                         shutdown(); 
    218                 } 
    219         } 
    220  
     264                        st.setString(1, filterId); 
     265                        st.executeUpdate(); 
     266 
     267                        connection.commit(); 
     268                } catch (Exception e) { 
     269                        rollback(); 
     270                        logger.error("remove filter instance error: ", e); 
     271                } finally { 
     272                        closeStatement(st); 
     273                        shutdown(); 
     274                } 
     275        } 
     276 
     277        /** 
     278         * Returns the properties of the filter. 
     279         *  
     280         * @param filterId 
     281         *            the filter id 
     282         * @return the properties of the filter. null if error occurred. 
     283         */ 
    221284        public Properties getFilterProperties(String filterId) { 
    222285                Properties props = new Properties(); 
     
    226289                        connect(); 
    227290 
    228                         st = connection.prepareStatement("select name, value from filter_properties where filter_id = ?"); 
     291                        st = connection 
     292                                        .prepareStatement("select name, value from filter_properties where filter_id = ?"); 
    229293                        st.setString(1, filterId); 
    230294                        ResultSet rs = st.executeQuery(); 
     
    239303                } catch (Exception e) { 
    240304                        rollback(); 
    241                         e.printStackTrace(); 
     305                        logger.error("get filter properties error: ", e); 
    242306                        return null; 
    243307        &nbs