springboot + aop + Lua distributed current limiting best practices

Posted May 27, 202010 min read

This article is included in personal blog: http://www.chengxy-nds.top , share resources and exchange technology

1. What is current limiting? Why limit current?

I do n t know if you have ever done the metro of the Imperial City, that is, the kind that you have to queue up when you enter the subway station. The answer is to limit current! Because the capacity of a subway is limited, squeezing too many people at a time will cause crowding of the platform and overload of the train, and there will be certain safety risks. In the same way, our program is the same, and its ability to process requests is also limited. Once the number of requests exceeds its processing limit, it will crash. In order not to have the worst crash situation, we can only delay the time for everyone to enter the station.
Insert picture description here
Current limiting is an important means to ensure high availability of the system! ! !

Due to the huge traffic of Internet companies, the system will go online to do a peak traffic evaluation, especially like various spike promotion activities. In order to ensure that the system is not overwhelmed by huge traffic, when the system traffic reaches a certain threshold, some of the traffic will be rejected .

The current limit will cause the user to be unavailable for a short period of time(this time period is in the millisecond range). Generally, the indicator for measuring the processing capacity of the system is QPS or TPS per second, assuming that the system s traffic threshold per second is 1000, in theory, when a 1001 request comes in within a second, then this request will be limited.

Second, the current limit program

1. Counter

Java can also use the atomic counters AtomicInteger and Semaphore semaphores to do simple current limiting.

//The number of current limit
    private int maxCount = 10;
    //within the specified time
    private long interval = 60;
    //Atomic counter
    private AtomicInteger atomicInteger = new AtomicInteger(0);
    //start time
    private long startTime = System.currentTimeMillis();

    public boolean limit(int maxCount, int interval) {
        atomicInteger.addAndGet(1);
        if(atomicInteger.get() == 1) {
            startTime = System.currentTimeMillis();
            atomicInteger.addAndGet(1);
            return true;
        }
        //When the interval time is exceeded, restart counting directly
        if(System.currentTimeMillis()-startTime> interval * 1000) {
            startTime = System.currentTimeMillis();
            atomicInteger.set(1);
            return true;
        }
        //In the interval, check whether the number of current limit is exceeded
        if(atomicInteger.get()> maxCount) {
            return false;
        }
        return true;
    }
2. Leaky bucket algorithm

The leaky bucket algorithm is very simple. We compare water to request and leaky bucket to system processing capacity limit. Water first enters the leaky bucket, and the water in the leaky bucket flows out at a certain rate. When the flow rate is less than the inflow rate, due to the limited capacity of the leaking bucket, subsequent incoming water directly overflows(rejects the request), thereby achieving flow limitation.
Insert picture description here

3.Token bucket algorithm

The principle of the token bucket algorithm is also relatively simple. We can understand that it is a registered hospital to see a doctor. Only after receiving the number can the doctor be diagnosed.

The system maintains a token(token) bucket and puts a token( token) into the bucket at a constant speed. If a request comes in and wants to be processed, you need to get one from the bucket first. Token(token), when there is no token( token) in the bucket, the request will be denied service. The token bucket algorithm achieves the limit on requests by controlling the capacity of the bucket and the rate of issuing tokens.
Insert picture description here

4, Redis + Lua

Many students do not know what Lua is? Personally, the stored procedures of the Lua script and the MySQL database are relatively similar. They execute a set of commands, and the execution of all commands either succeeds or fails in order to achieve atomicity. You can also understand the Lua script as a piece of code with business logic.

And Lua itself is a programming language. Although redis does not directly provide the corresponding API for current limiting, it supports the function of Lua script, which can be used to implement complex token buckets or leaky bucket Algorithms are also one of the main ways to achieve current limiting in distributed systems.

Compared with Redis transactions, the advantages of Lua script:

  • Reduce network overhead:Use Lua script, no need to send multiple requests to Redis, just execute it once, reduce network transmission
  • Atomic operation:Redis executes the entire Lua script as a command, atomic, no need to worry about concurrency
  • Reuse:Once the Lua script is executed, it will be permanently saved in Redis, other clients can reuse

The logic of the Lua script is roughly as follows:

-Get the first key value passed in when the script is called
local key = KEYS [1]
-Get the first parameter value(current limit size) passed when calling the script
local limit = tonumber(ARGV [1])

-Get the current traffic size
local curentLimit = tonumber(redis.call('get', key) or "0")

-Whether the current limit is exceeded
if curentLimit + 1> limit then
    -Return(reject)
    return 0
else
    -Did not exceed value + 1
    redis.call("INCRBY", key, 1)
    -Set expiration time
    redis.call("EXPIRE", key, 2)
    -Return(release)
    return 1
end
  • Obtain incoming key parameters through KEYS [1]
  • Get the incoming limit parameter through ARGV [1]
  • The redis.call method returns the values related to get and key from the cache, if it is null then it returns 0
  • Next, determine whether the value recorded in the cache will be greater than the limit size. If it exceeds the limit, it means that the current is limited and returns 0.
  • If not exceeded, then the cache value of the key is +1, and set the expiration time to 1 second later, and return the cache value +1

This method is the solution recommended in this article, and the specific implementation will be elaborated later.

5. Gateway layer current limiting

Current limiting is often done at the gateway layer, such as Nginx, Openresty, kong, zuul, Spring Cloud Gateway, etc., and the underlying implementation principle of spring cloud-gateway gateway current limiting Based on Redis + Lua, through the built-in Lua current limiting script.
Insert picture description here

Three, Redis + Lua current limit implementation

Below we use "custom annotations", aop, Redis + Lua to achieve current limiting, the steps will be more detailed, in order to let Bai Bai get started quickly, and experienced veterans will take more care.

1. Environment preparation

springboot project creation address: https://start.spring.io , very convenient and practical tool.
Insert picture description here

2. Introduce dependency packages

The following dependency packages are added to the pom file. The key ones are spring-boot-starter-data-redis and spring-boot-starter-aop.

    <dependencies>
        <dependency>
            <groupId> org.springframework.boot </groupId>
            <artifactId> spring-boot-starter-web </artifactId>
        </dependency>
        <dependency>
            <groupId> org.springframework.boot </groupId>
            <artifactId> spring-boot-starter-data-redis </artifactId>
        </dependency>
        <dependency>
            <groupId> org.springframework.boot </groupId>
            <artifactId> spring-boot-starter-aop </artifactId>
        </dependency>
        <dependency>
            <groupId> com.google.guava </groupId>
            <artifactId> guava </artifactId>
            <version> 21.0 </version>
        </dependency>
        <dependency>
            <groupId> org.springframework.boot </groupId>
            <artifactId> spring-boot-starter-test </artifactId>
        </dependency>
        <dependency>
            <groupId> org.apache.commons </groupId>
            <artifactId> commons-lang3 </artifactId>
        </dependency>

        <dependency>
            <groupId> org.springframework.boot </groupId>
            <artifactId> spring-boot-starter-test </artifactId>
            <scope> test </scope>
            <exclusions>
                <exclusion>
                    <groupId> org.junit.vintage </groupId>
                    <artifactId> junit-vintage-engine </artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>
3. Configure application.properties

Configure the pre-built redis service address and port in the application.properties file.

spring.redis.host = 127.0.0.1

spring.redis.port = 6379
4. Configure RedisTemplate instance
@Configuration
public class RedisLimiterHelper {

    @Bean
    public RedisTemplate <String, Serializable> limitRedisTemplate(LettuceConnectionFactory redisConnectionFactory) {
        RedisTemplate <String, Serializable> template = new RedisTemplate <>();
        template.setKeySerializer(new StringRedisSerializer());
        template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
        template.setConnectionFactory(redisConnectionFactory);
        return template;
    }
}

Current limit type enumeration class

/**
* @author fu
* @description current limit type
* @date 2020/4/8 13:47
* /
public enum LimitType {

   /**
     * Custom key
     * /
    CUSTOMER,

   /**
     * Requester IP
     * /
    IP;
}
5. Custom annotations

We define a @ Limit annotation, and the annotation type is ElementType.METHOD, which acts on the method.

period represents the time limit for request, and count represents the number of times the request is allowed to be released during the period of period. limitType represents the type of current limit, according to the requested IP, custom key, if you do not pass the limitType attribute, the default method name is used as the default key.

/**
* @author fu
* @description custom current limit annotation
* @date 2020/4/8 13:15
* /
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface Limit {

   /**
     * first name
     * /
    String name() default "";

   /**
     * key
     * /
    String key() default "";

   /**
     * Key prefix
     * /
    String prefix() default "";

   /**
     * The given time range unit(seconds)
     * /
    int period();

   /**
     * Maximum number of visits in a certain time
     * /
    int count();

   /**
     * Type of current limit(user-defined key or request ip)
     * /
    LimitType limitType() default LimitType.CUSTOMER;
}
6. Aspect code implementation

/**
* @author fu
* @description current limiting aspect
* @date 2020/4/8 13:04
* /
@Aspect
@Configuration
public class LimitInterceptor {

    private static final Logger logger = LoggerFactory.getLogger(LimitInterceptor.class);

    private static final String UNKNOWN = "unknown";

    private final RedisTemplate <String, Serializable> limitRedisTemplate;

    @Autowired
    public LimitInterceptor(RedisTemplate <String, Serializable> limitRedisTemplate) {
        this.limitRedisTemplate = limitRedisTemplate;
    }

   /**
     * @param pjp
     * @author fu
     * @description cut
     * @date 2020/4/8 13:04
     * /
    @Around("execution(public * *(..)) && @annotation(com.xiaofu.limit.api.Limit)")
    public Object interceptor(ProceedingJoinPoint pjp) {
        MethodSignature signature =(MethodSignature) pjp.getSignature();
        Method method = signature.getMethod();
        Limit limitAnnotation = method.getAnnotation(Limit.class);
        LimitType limitType = limitAnnotation.limitType();
        String name = limitAnnotation.name();
        String key;
        int limitPeriod = limitAnnotation.period();
        int limitCount = limitAnnotation.count();

       /**
         * Obtain different keys according to the type of current limit
         * /
        switch(limitType) {
            case IP:
                key = getIpAddress();
                break;
            case CUSTOMER:
                key = limitAnnotation.key();
                break;
            default:
                key = StringUtils.upperCase(method.getName());
        }

        ImmutableList <String> keys = ImmutableList.of(StringUtils.join(limitAnnotation.prefix(), key));
        try {
            String luaScript = buildLuaScript();
            RedisScript <Number> redisScript = new DefaultRedisScript <>(luaScript, Number.class);
            Number count = limitRedisTemplate.execute(redisScript, keys, limitCount, limitPeriod);
            logger.info("Access try count is {} for name = {} and key = {}", count, name, key);
            if(count! = null && count.intValue() <= limitCount) {
                return pjp.proceed();
            } else {
                throw new RuntimeException("You have been dragged into the blacklist");
            }
        } catch(Throwable e) {
            if(e instanceof RuntimeException) {
                throw new RuntimeException(e.getLocalizedMessage());
            }
            throw new RuntimeException("server exception");
        }
    }

   /**
     * @author fu
     * @description write redis Lua current limit script
     * @date 2020/4/8 13:24
     * /
    public String buildLuaScript() {
        StringBuilder lua = new StringBuilder();
        lua.append("local c");
        lua.append("\ nc = redis.call('get', KEYS [1])");
        //The call does not exceed the maximum value, then return directly
        lua.append("\ nif c and tonumber(c)> tonumber(ARGV [1]) then");
        lua.append("\ nreturn c;");
        lua.append("\ nend");
        //Execute the calculator to add
        lua.append("\ nc = redis.call('incr', KEYS [1])");
        lua.append("\ nif tonumber(c) == 1 then");
        //The current limit starts from the first call, and the corresponding key value is set to expire
        lua.append("\ nredis.call('expire', KEYS [1], ARGV [2])");
        lua.append("\ nend");
        lua.append("\ nreturn c;");
        return lua.toString();
    }


   /**
     * @author fu
     * @description get id address
     * @date 2020/4/8 13:24
     * /
    public String getIpAddress() {
        HttpServletRequest request =((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()). GetRequest();
        String ip = request.getHeader("x-forwarded-for");
        if(ip == null || ip.length() == 0 || UNKNOWN.equalsIgnoreCase(ip)) {
            ip = request.getHeader("Proxy-Client-IP");
        }
        if(ip == null || ip.length() == 0 || UNKNOWN.equalsIgnoreCase(ip)) {
            ip = request.getHeader("WL-Proxy-Client-IP");
        }
        if(ip == null || ip.length() == 0 || UNKNOWN.equalsIgnoreCase(ip)) {
            ip = request.getRemoteAddr();
        }
        return ip;
    }
}
7. Control layer implementation

We apply the @ Limit annotation to the interface method that needs to limit the current. Below we set the @ Limit annotation to the method. Only 3requests are allowed to be released in10 seconds. This is for intuitive use. AtomicInteger` count.

/**
* @Author:fu
* @Description:
* /
@RestController
public class LimiterController {

    private static final AtomicInteger ATOMIC_INTEGER_1 = new AtomicInteger();
    private static final AtomicInteger ATOMIC_INTEGER_2 = new AtomicInteger();
    private static final AtomicInteger ATOMIC_INTEGER_3 = new AtomicInteger();

   /**
     * @author fu
     * @description
     * @date 2020/4/8 13:42
     * /
    @Limit(key = "limitTest", period = 10, count = 3)
    @GetMapping("/limitTest1")
    public int testLimiter1() {

        return ATOMIC_INTEGER_1.incrementAndGet();
    }

   /**
     * @author fu
     * @description
     * @date 2020/4/8 13:42
     * /
    @Limit(key = "customer_limit_test", period = 10, count = 3, limitType = LimitType.CUSTOMER)
    @GetMapping("/limitTest2")
    public int testLimiter2() {

        return ATOMIC_INTEGER_2.incrementAndGet();
    }

   /**
     * @author fu
     * @description
     * @date 2020/4/8 13:42
     * /
    @Limit(key = "ip_limit_test", period = 10, count = 3, limitType = LimitType.IP)
    @GetMapping("/limitTest3")
    public int testLimiter3() {

        return ATOMIC_INTEGER_3.incrementAndGet();
    }

}
8, test

Test Expected:3 consecutive requests can be successful, the fourth request is rejected. Next, let's see if it is the expected effect. The request address:http://127.0.0.1:8080/limitTest1, use postman to test, and whether the postman` URL is directly posted to the browser is the same.

Insert picture description here
It can be seen that when the fourth request was made, the application directly rejected the request, indicating that our Springboot + aop + lua current limiting solution was successfully built.
Insert picture description here

to sum up

The above springboot + aop + Lua current limit implementation is relatively simple, designed to let everyone know what is current limit? How to do a simple current limit function, the interview must know what this is. Although there are several schemes to achieve current limiting, but which one to choose depends on the specific business scenario and cannot be used for the purpose.