Created
August 3, 2016 07:25
-
-
Save jackeylu/edf600b66273d17db0438065ae362934 to your computer and use it in GitHub Desktop.
If p2p is enabled, CQ failed when 3rd node join the cluster.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package example.ignite; | |
import javax.cache.event.CacheEntryEvent; | |
import java.util.EventListener; | |
/** | |
* Created by jackeylv on 2016/8/3. | |
*/ | |
public interface CacheEventProcessor extends EventListener { | |
void entryInserted(CacheEntryEvent event); | |
void entryUpdated(CacheEntryEvent event); | |
void entryDeleted(CacheEntryEvent event); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package example.ignite; | |
import com.tencent.perf.ignite.AlwaysTrueRemoteFilter; | |
import org.apache.ignite.Ignite; | |
import org.apache.ignite.IgniteCache; | |
import org.apache.ignite.IgniteLogger; | |
import org.apache.ignite.Ignition; | |
import org.apache.ignite.binary.BinaryBasicIdMapper; | |
import org.apache.ignite.binary.BinaryBasicNameMapper; | |
import org.apache.ignite.cache.query.ContinuousQuery; | |
import org.apache.ignite.cache.query.QueryCursor; | |
import org.apache.ignite.configuration.BinaryConfiguration; | |
import org.apache.ignite.configuration.IgniteConfiguration; | |
import org.apache.ignite.internal.binary.BinaryMarshaller; | |
import org.apache.ignite.internal.util.typedef.G; | |
import javax.cache.configuration.FactoryBuilder; | |
import javax.cache.event.CacheEntryEvent; | |
import javax.cache.event.CacheEntryListenerException; | |
import javax.cache.event.CacheEntryUpdatedListener; | |
import java.util.concurrent.TimeUnit; | |
/** | |
* The second node, as a client with a CQ listener. | |
* Created by jackeylv on 2016/8/3. | |
*/ | |
public class CQListenerClient { | |
public static void main(String[] args) throws InterruptedException { | |
IgniteConfiguration icfg = new IgniteConfiguration(); | |
icfg.setIgniteHome(System.getProperty("user.dir")); | |
icfg.setWorkDirectory(System.getProperty("user.dir")); | |
//FIXME, if p2p is enabled, the 3rd joined node will be failed when getting the remote filter. | |
icfg.setPeerClassLoadingEnabled(true); | |
BinaryConfiguration bCfg = new BinaryConfiguration(); | |
bCfg.setCompactFooter(true); | |
bCfg.setNameMapper(new BinaryBasicNameMapper(false)); | |
bCfg.setIdMapper(new BinaryBasicIdMapper(false)); | |
icfg.setMarshaller(new BinaryMarshaller()); | |
icfg.setBinaryConfiguration(bCfg); | |
Ignition.setClientMode(true); | |
QueryCursor cursor = null; | |
try { | |
Ignite ignite = Ignition.start(icfg); | |
IgniteCache cache = ignite.getOrCreateCache("Test"); | |
CacheEventProcessor listener = new EventPrinter(); | |
cursor = registerEventListener(cache, listener); | |
cache.put("Hello", "world"); | |
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE); | |
}finally { | |
if (cursor!=null) | |
cursor.close(); | |
} | |
} | |
public static QueryCursor registerEventListener(final IgniteCache cache, | |
final CacheEventProcessor listener){ | |
final IgniteLogger logger = G.ignite().log(); | |
ContinuousQuery qry = new ContinuousQuery(); | |
qry.setAutoUnsubscribe(true); | |
qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() { | |
@Override | |
public void onUpdated(Iterable<CacheEntryEvent<? extends Object, ? extends Object>> cacheEntryEvents) | |
throws CacheEntryListenerException { | |
for (CacheEntryEvent event : | |
cacheEntryEvents) { | |
logger.info(Thread.currentThread().getName() + " receive CacheEntryEvent " + event ); | |
localEventProcess(listener, event, cache.getName() ,logger); | |
} | |
} | |
}); | |
AlwaysTrueRemoteFilter rmtFilter = new AlwaysTrueRemoteFilter(listener); | |
qry.setRemoteFilterFactory(FactoryBuilder.factoryOf(rmtFilter)); | |
QueryCursor cursor = cache.query(qry); | |
return cursor; | |
} | |
private static void localEventProcess(CacheEventProcessor listener, CacheEntryEvent event, String name, IgniteLogger logger) { | |
logger.info("listener = [" + listener + "], event = [" + event + "], name = [" + name + "], logger = [" + logger + "]"); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package example.ignite; | |
import javax.cache.event.CacheEntryEvent; | |
import java.io.Externalizable; | |
import java.io.IOException; | |
import java.io.ObjectInput; | |
import java.io.ObjectOutput; | |
/** | |
* Created by jackeylv on 2016/8/3. | |
*/ | |
public class EventPrinter implements CacheEventProcessor, Externalizable { | |
@Override | |
public void entryInserted(CacheEntryEvent event) { | |
System.out.println("EventPrinter.entryInserted " + event); | |
} | |
@Override | |
public void entryUpdated(CacheEntryEvent event) { | |
System.out.println("EventPrinter.entryUpdated " + event); | |
} | |
@Override | |
public void entryDeleted(CacheEntryEvent event) { | |
System.out.println("EventPrinter.entryDeleted " + event); | |
} | |
// dummy code, for Externalization | |
public EventPrinter(){ | |
} | |
@Override | |
public void writeExternal(ObjectOutput out) throws IOException { | |
} | |
@Override | |
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package example.ignite; | |
import org.apache.ignite.Ignition; | |
import org.apache.ignite.binary.BinaryBasicIdMapper; | |
import org.apache.ignite.binary.BinaryBasicNameMapper; | |
import org.apache.ignite.configuration.BinaryConfiguration; | |
import org.apache.ignite.configuration.IgniteConfiguration; | |
import org.apache.ignite.internal.binary.BinaryMarshaller; | |
/** | |
* The first node, as a server. | |
* Created by jackeylv on 2016/8/3. | |
*/ | |
public class IgniteServer { | |
public static void main(String[] args) { | |
IgniteConfiguration icfg = new IgniteConfiguration(); | |
icfg.setIgniteHome(System.getProperty("user.dir")); | |
icfg.setWorkDirectory(System.getProperty("user.dir")); | |
//FIXME, if p2p is enabled, the 3rd joined node will be failed when getting the remote filter. | |
icfg.setPeerClassLoadingEnabled(true); | |
BinaryConfiguration bCfg = new BinaryConfiguration(); | |
bCfg.setCompactFooter(true); | |
bCfg.setNameMapper(new BinaryBasicNameMapper(false)); | |
bCfg.setIdMapper(new BinaryBasicIdMapper(false)); | |
icfg.setMarshaller(new BinaryMarshaller()); | |
icfg.setBinaryConfiguration(bCfg); | |
Ignition.start(icfg); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package example.ignite; | |
import org.apache.ignite.Ignite; | |
import org.apache.ignite.IgniteCache; | |
import org.apache.ignite.Ignition; | |
import org.apache.ignite.binary.BinaryBasicIdMapper; | |
import org.apache.ignite.binary.BinaryBasicNameMapper; | |
import org.apache.ignite.configuration.BinaryConfiguration; | |
import org.apache.ignite.configuration.IgniteConfiguration; | |
import org.apache.ignite.internal.binary.BinaryMarshaller; | |
import java.util.concurrent.TimeUnit; | |
/** | |
* The 3rd node, try to put some key-value paris. | |
* Created by jackeylv on 2016/8/3. | |
*/ | |
public class IgniteSimpleClient { | |
public static void main(String[] args) throws InterruptedException { | |
IgniteConfiguration icfg = new IgniteConfiguration(); | |
icfg.setIgniteHome(System.getProperty("user.dir")); | |
icfg.setWorkDirectory(System.getProperty("user.dir")); | |
//FIXME, if p2p is enabled, the 3rd joined node will be failed when getting the remote filter. | |
icfg.setPeerClassLoadingEnabled(true); | |
BinaryConfiguration bCfg = new BinaryConfiguration(); | |
bCfg.setCompactFooter(true); | |
bCfg.setNameMapper(new BinaryBasicNameMapper(false)); | |
bCfg.setIdMapper(new BinaryBasicIdMapper(false)); | |
icfg.setMarshaller(new BinaryMarshaller()); | |
icfg.setBinaryConfiguration(bCfg); | |
Ignition.setClientMode(true); | |
Ignite ignite = Ignition.start(icfg); | |
IgniteCache cache = ignite.getOrCreateCache("Test"); | |
for (int i = 0; i < 100; i++) { | |
cache.put(i, i); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment