Leader Election is an important part of building distributed and scalable web applications. Often applications may have multiple replicas and could be deployed in a number of ways. Using the principal of read-many-write-one and a consensus algorithm, application replicas are all able to coordinate for a lease to be able to perform write-sensitive tasks to avoid data duplication or conflicts. Whether writing an application as a monolith or solely a scheduler, ensuring one replica writes greatly increases reliability.

I've written many applications with Go that use Postgres as a datastore, one of them has a helpful use case for leader election. Let's take a look!

❓ What is leader election?

A way for distributed programs to decide which of them it's turn is to coordinate or lead a task.

or from Wikipedia

"leader election is the process of designating a single process as the organizer of some task distributed among several computers (nodes)"

🌏 Where would I find leader election?

It's found in a number of implementations, such as but not limited to

and a number of algorithms, such as but not limited to

Raft, being the most common in cloud software.

πŸ’­ Could my application do with leader election?

Maybe!

pros

  • ability to cleanly build in scheduling with a leader
  • cleaner potential for state management

cons

  • extra code and potential size increase
  • not suitable for applications which scale-to-zero, given the process not running all the time (use a cronjob to hit an endpoint instead)

☸️ Kubernetes leader election

Found at k8s.io/client-go/tools/leaderelection, the package contains all that's needed to configure and run leader election.

Here's a basic example with the default Kubernetes integration.

First, the program imports the needed packages, such as leaderelection and resourcelock.

package main

import (
	"context"
	"fmt"
	"log"
	"path/filepath"
	"time"

	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/client-go/tools/leaderelection"
	"k8s.io/client-go/tools/leaderelection/resourcelock"
	"k8s.io/client-go/util/homedir"
)

this is the part where the business logic goes, so fill in the blanks.

func work(ctx context.Context, cancel context.CancelFunc) {
	log.Println("WORK: performing work")
}

set up a client to talk to the Kubernetes API. Here, we're just using the Kubeconfig in the home directory. Of course, since you won't be shipping you workstation into production, you can authenticate using an in-cluster clientset: see here or a full out-cluster example see here.

func main() {
	config, err := clientcmd.BuildConfigFromFlags("", filepath.Join(homedir.HomeDir(), ".kube", "config"))
	if err != nil {
		panic(err.Error())
	}
	clientset, err := kubernetes.NewForConfig(config)
	if err != nil {
		panic(err.Error())
	}

bring your own ID format, such as: timestamp, random string or hostname. I'd recommend a random string or timestamp but a timestamp is shortest for this example to demonstrate it.

	id := fmt.Sprintf("%v", time.Now().Unix())
	log.Println("id:", id)

create a new resourcelock. This is the default logic and configuration. This is responsible for making the calls to get lock, create lock, update lock, record event, describe (lock) and (lock) identity.

Here, the type is set to use the built-in resource of Leases.coordination.k8s.io with one named leader-election-example in the namespace of default.

	l, err := resourcelock.New(
		resourcelock.LeasesResourceLock,
		"default", "leader-election-example",
		clientset.CoreV1(),
		clientset.CoordinationV1(),
		resourcelock.ResourceLockConfig{
			Identity: id,
		},
      )
	if err != nil {
		log.Println("error: %w", err)
		return
	}

here is how a leader elector may be configured, using the lock and some actions regarding printing the state of the leader election.

	leaderelector, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
		Lock:            l,
		ReleaseOnCancel: true,
		LeaseDuration:   15 * time.Second,
		RenewDeadline:   10 * time.Second,
		RetryPeriod:     2 * time.Second,
		Callbacks: leaderelection.LeaderCallbacks{
			OnStartedLeading: func(ctx context.Context) {
				log.Printf("i (%v) am now the leader!\n", id)
			},
			OnStoppedLeading: func() {
				log.Println("no longer the leader, staying inactive.")
			},
			OnNewLeader: func(currentID string) {
				if currentID == id {
					log.Printf("i (%v) am still the leader!\n", id)
					return
				}
				log.Printf("new/current leader is %s\n", currentID)
			},
		},
	})
	if err != nil {
		log.Fatalf("leader election error: %w\n", err)
		return
	}

perform work in the background, when elected as the leader. Also, run the leader election task.

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	go func() {
		for {
			if leaderelector.IsLeader() {
				work(ctx, cancel)
			}
			time.Sleep(10 * time.Second)
		}
	}()
	leaderelector.Run(ctx)
}

Alternatively, the handler for or value of IsLeader() can be exported to use else where.

Supporting the functionality, an RBAC role is needed later in when deploying into a cluster. The role requires having a RoleBinding in the namespace and ServiceAccount set for the application.

---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: leader-election-role
rules:
- apiGroups:
  - ""
  resources:
  - configmaps
  verbs:
  - get
  - list
  - watch
  - create
  - update
  - patch
  - delete
- apiGroups:
  - coordination.k8s.io
  resources:
  - leases
  verbs:
  - get
  - list
  - watch
  - create
  - update
  - patch
  - delete
- apiGroups:
  - ""
  resources:
  - events
  verbs:
  - create
  - patch

However, the RBAC role is not needed when running locally as a cluster admin.

πŸƒ In action

To begin, no lease lock exists.

On the left, a replica comes up and immediately becomes the leader. Shortly after it begins performing work.

At almost the same time another new replica comes up which does not become the leader because there is already one. After two cycles of performing work, the current leader replica is shutdown.

With only one replica running, it is easy to see that it will become the leader because it's not competing. It becomes the leader and begins performing work.

example of a Lease:

apiVersion: coordination.k8s.io/v1
kind: Lease
metadata:
  creationTimestamp: "2024-06-28T22:06:43Z"
  name: leader-election-example
  namespace: default
  resourceVersion: "65177"
  uid: 578a5a6d-bad9-430f-9985-34dc5d4bffad
spec:
  acquireTime: "2024-06-28T22:41:47.805614Z"
  holderIdentity: "1719614490"
  leaseDurationSeconds: 15
  leaseTransitions: 2
  renewTime: "2024-06-28T22:41:49.822416Z"

☁️ Aside

Some maybe-obvious presumptions are required for functionality

  • replicas run the same leader election code
  • there is no interference in the lock record value
  • replicas has synced machine clocks as it is not resistant against time skew
  • leases are constantly being renewed by the leader until it goes away
  • no network is needed for coordinations since it's completely time-based, unlike RAFT

Read the implementation notes here.

🐘 Adapting resource locks for Postgres

a table just like this is what will store the state.

create table if not exists leader_election (
    name text not null,
    holderIdentity text not null,
    leaseDurationSeconds int not null,
    acquireTime bigint not null,
    renewTime bigint not null,
    leaderTransitions int not null,
    primary key(holderIdentity)
);

an object for lock state to handle the read and write operations for the lock.

type lock struct {
	id   string
	name string
	db   *sql.DB

	lease *resourcelock.LeaderElectionRecord
}

Lock will implement the calls which the resourcelock Interface needs.

query the database for the record, returning the typed value, the same value but as raw JSON bytes and also an error.

func (l *lock) Get(ctx context.Context) (ler *resourcelock.LeaderElectionRecord, rb []byte, err error) {
	log.Printf("getting: %v\n", l.name)
	sqlStatement := `
	select
	    holderIdentity, leaseDurationSeconds, acquireTime, renewTime, leaderTransitions
	from leader_election where name = $1 limit 1`
	rows, err := l.db.Query(sqlStatement, l.name)
	if err != nil {
		return nil, nil, err
	}

always close your rows!

	defer func() {
		if err := rows.Close(); err != nil {
			log.Printf("error: failed to close rows: %v\n", err)
		}
	}()

although only one row is ever returned, this will use the last value returned. TLDR; read the row back and format time integers as parsable time.

	for rows.Next() {
		ler = &resourcelock.LeaderElectionRecord{}
		var acquireTime, renewTime int
		if err = rows.Scan(&ler.HolderIdentity, &ler.LeaseDurationSeconds, &acquireTime, &renewTime, &ler.LeaderTransitions); err != nil {
			return nil, nil, err
		}
		ler.AcquireTime = metav1.NewTime(time.Unix(int64(acquireTime), 0))
		ler.RenewTime = metav1.NewTime(time.Unix(int64(renewTime), 0))
	}

if no record is found, return it as empty.

	if ler == nil {
		l.lease = nil
		return &resourcelock.LeaderElectionRecord{}, nil, nil
	}

if a record is found, store it and return it along with JSON bytes.

	l.lease = ler
	rb, err = json.Marshal(ler)
	if err != nil {
		return nil, nil, err
	}
	return ler, rb, nil
}

insert into the table with translated time values.

func (l *lock) Create(ctx context.Context, ler resourcelock.LeaderElectionRecord) error {
	log.Printf("creating: %+v\n", ler)
	sqlStatement := `
          insert into leader_election
              (name, holderIdentity, leaseDurationSeconds, acquireTime, renewTime, leaderTransitions)
          values ($1, $2, $3, $4, $5, $6)
              returning name, holderIdentity, leaseDurationSeconds, acquireTime, renewTime, leaderTransitions`
	rows, err := l.db.Query(
		sqlStatement,
		l.name,
		ler.HolderIdentity,
		ler.LeaseDurationSeconds,
		ler.AcquireTime.Unix(),
		ler.RenewTime.Unix(),
		ler.LeaderTransitions,
	)
	if err != nil {
		return err
	}

don't leak any connections or memory.

	defer func() {
		if err := rows.Close(); err != nil {
			log.Printf("error: failed to close rows: %v\n", err)
		}
	}()

ensure that the lease object is not nil and read the database rows to it.

	for rows.Next() {
		if l.lease == nil {
			l.lease = &resourcelock.LeaderElectionRecord{}
		}
		if err = rows.Scan(
			&l.lease.HolderIdentity,
			&l.lease.LeaseDurationSeconds,
			&l.lease.AcquireTime,
			&l.lease.RenewTime,
			&l.lease.LeaderTransitions,
		); err != nil {
			return err
		}
	}
	return nil
}

perform an update or create if not found.

func (l *lock) Update(ctx context.Context, ler resourcelock.LeaderElectionRecord) error {
	if l.lease == nil {
		return l.Create(ctx, resourcelock.LeaderElectionRecord{})
	}
	log.Printf("updating: %+v\n", ler)
	sqlStatement := `update leader_election
                     set holderIdentity = $2, leaseDurationSeconds = $3, acquireTime = $4, renewTime = $5, leaderTransitions = $6
                     where name = $1`
	rows, err := l.db.Query(
		sqlStatement,
		l.name,
		ler.HolderIdentity,
		ler.LeaseDurationSeconds,
		ler.AcquireTime.Unix(),
		ler.RenewTime.Unix(),
		ler.LeaderTransitions,
	)
	if err != nil {
		return err
	}

close the row since we're done with the transaction.

	defer func() {
		if err := rows.Close(); err != nil {
			log.Printf("error: failed to close rows: %v\n", err)
		}
	}()
	return nil
}

this function could handle things like writing to a logs/events table, posting to a channel somewhere or even emitting Prometheus compatible data. With the standard resourcelock, it writes to Kubernetes Events.v1.

func (l *lock) RecordEvent(s string) {
	log.Println("leader election event:", s)
}

get the record name and the lock instance identity.

func (l *lock) Describe() string {
	return l.name
}

func (l *lock) Identity() string {
	return l.id
}

create a new usable lock.

func newLock() *lock {
	db, err := sql.Open("postgres",
		"postgres://postgres:postgres@localhost:5432/postgres?sslmode=disable")
	if err != nil {
		return nil, err
	}
	id := fmt.Sprintf("%v", time.Now().Unix())
	return &lock{
		id:    id,
		name:  "default",
		db:    db,
		lease: nil,
	}
}

now, the lock can be used like so, with the LeaderElectionConfig.Lock property set to the lock created earlier.

...
	lock := newLock()
	log.Println("id:", lock.Identity())
	leaderelector, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
		Lock:            lock,
...

βš™οΈ Context: my use case

FlatTrack is an application I've been working on for about five years to date; It is a collaboration software for those living together. It currently handles shopping lists and flatmates. Thinking to the next features like Tasks, which will help people keep track of what their household obligations are, it is important to have a way to schedule the next task without multiple writers.

Leader election answered the problem for task scheduling, which will be a future feature of the software.

One other thing I did to reduce dependency size was forking the leaderelection and resourcelock packages into the repo to removing unused or not required code. This dramatically reduced the binary size.

repo link: gitlab.com/flattrack/flattrack

πŸ₯½ Clarifications

  • Kubernetes is not required and does not become a dependency for application runtime when using a different backend like Postgres
  • you can use which ever backend is closest to you, such as Postgres, MariaDB, Redis or even DNS if you really wanted to

πŸ”š Conclusion

Leader election is a piece of the puzzle for building scalable and reliable cloud software. Although there are many implementations and algorithms to choose, it is helpful to use an existing implementation in a language I'm already using which is industry-proven.

Kubernetes has lots of powerful tooling, which can be used outside of it's ecosystem given the power of interfaces.

Hope you have found this helpful. Thank you for reading and stopping by!

Check out my toot from earlier in the year: https://mastodon.nz/@calebwoodbine/111866177159653535