Skip to content

Instantly share code, notes, and snippets.

@derekperkins
Created December 19, 2022 04:59
Show Gist options
  • Save derekperkins/8f5232d897ccf5a20691a1556574cfce to your computer and use it in GitHub Desktop.
Save derekperkins/8f5232d897ccf5a20691a1556574cfce to your computer and use it in GitHub Desktop.
olric kubernetes adapter
package nolric
import (
"context"
"log"
"github.com/buraksezer/olric/pkg/service_discovery"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
var _ service_discovery.ServiceDiscovery = &k8sDiscovery{}
type k8sDiscovery struct {
c context.Context
namespace string
clientset *kubernetes.Clientset
}
func newK8sDiscovery(c context.Context, namespace string) (*k8sDiscovery, error) {
config, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
// creates the clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
return &k8sDiscovery{
c: c,
namespace: namespace,
clientset: clientset,
}, nil
}
func (d *k8sDiscovery) DiscoverPeers() ([]string, error) {
pods, err := d.clientset.CoreV1().Pods(d.namespace).List(d.c, metav1.ListOptions{
// LabelSelector: "app=olric",
})
if err != nil {
return nil, err
}
addrs, err := podAddrs(pods)
if err != nil {
return nil, err
}
if len(addrs) == 0 {
return nil, errors.New("no peers found")
}
return addrs, nil
}
// podAddrs extracts the addresses from a list of pods.
// adapted from https://github.com/hashicorp/go-discover/blob/49f60c093101c9c5f6b04d5b1c80164251a761a6/provider/k8s/k8s_discover.go#L122-L183
func podAddrs(pods *corev1.PodList) ([]string, error) {
var addrs []string
PodLoop:
for _, pod := range pods.Items {
if pod.Status.Phase != corev1.PodRunning {
continue
}
// If there is a Ready condition available, we need that to be true.
// If no ready condition is set, then we accept this pod regardless.
for _, condition := range pod.Status.Conditions {
if condition.Type == corev1.PodReady && condition.Status != corev1.ConditionTrue {
continue PodLoop
}
}
// Get the IP address that we will join.
addr := pod.Status.PodIP
if addr == "" {
// This can be empty according to the API docs, so we protect that.
continue
}
addrs = append(addrs, addr)
}
return addrs, nil
}
func (d *k8sDiscovery) Initialize() error { return nil }
func (d *k8sDiscovery) SetLogger(l *log.Logger) {}
func (d *k8sDiscovery) SetConfig(cfg map[string]interface{}) error { return nil }
func (d *k8sDiscovery) Register() error { return nil }
func (d *k8sDiscovery) Deregister() error { return nil }
func (d *k8sDiscovery) Close() error { return nil }
package nolric
import (
"context"
"sync"
"github.com/buraksezer/olric"
"github.com/buraksezer/olric/config"
)
func New(c context.Context, namespace string, gracefulShutdownChan <-chan struct{}) (*olric.EmbeddedClient, error) {
// configure our olric service discovery k8s adapter
k8sDisc, err := newK8sDiscovery(c, namespace)
if err != nil {
return nil, err
}
// create a new Olric configuration
cfg := config.New("lan") // default configuration
cfg.ReplicationMode = config.AsyncReplicationMode
cfg.ServiceDiscovery = map[string]interface{}{
"plugin": k8sDisc,
}
cfg.DMaps.EvictionPolicy = config.LRUEviction
cfg.DMaps.MaxInuse = 100_000_000 // 100 MB
// cfg.LogLevel = "WARN"
// cfg.LogVerbosity = 1
// this wait group is used to block the main goroutine until the embedded client is ready
wg := sync.WaitGroup{}
wg.Add(1)
cfg.Started = func() {
wg.Done()
}
// create the actual Olric instance
cache, err := olric.New(cfg)
if err != nil {
return nil, err
}
// start the instance, which triggers the k8s service discovery and forms the cluster
go func() {
err = cache.Start()
if err != nil {
panic(err)
}
}()
// wait for the cluster to be ready before continuing
wg.Wait()
// we'll be returning this client for our users, put here so we can close it on shutdown
embClient := cache.NewEmbeddedClient()
// listen for graceful shutdown signal to leave the cache cluster
go func() {
select {
case <-gracefulShutdownChan:
case <-c.Done():
}
if err = embClient.Close(c); err != nil {
fmt.Println(err)
}
if err = cache.Shutdown(c); err != nil {
fmt.Println(err)
}
}()
return embClient, nil
}
@Tochemey
Copy link

@zhp007 I had that issue and I changed my MemberCountQuorum to 1 as well as the ReplicaCount to 1 and the k8 pods were able to form the cluster. I hope this help.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment