Ali-Sentinel-集群流控

归档

  • GitHub: Ali-Sentinel-集群流控

测试

  • 参考:热点流控-测试

  • 新建 ClusterDemoApplication2

public class ClusterDemoApplication2 {
    public static void main(String[] args) {
        System.setProperty("csp.sentinel.dashboard.server", "127.0.0.1:8080");
        System.setProperty("project.name", "My-Cluster-8866");
        System.setProperty("server.port", "10012");
        SpringApplication.run(ClusterDemoApplication.class, args);
    }
}
  • 启动 2 应用后

    • 访问:http://localhost:10010/hello/test1
    • 访问:http://localhost:10012/hello/test1
  • 在控制台设置

    • 点击 集群流控:http://127.0.0.1:8080/#/dashboard/cluster/server/My-Cluster-8866
    • 点击 + 新增 Token Server
      • 机器类型 使用默认:应用内机器
      • 选择机器 选第一个即可
      • Server 端口 使用默认:18730
      • 最大允许 QPS 设置为:2
      • 请从中选取 client:
        • 选第一个,再点击
    • 点击 保存
      // POST http://127.0.0.1:8080//cluster/assign/single_server/My-Cluster-8866
      {
          "clusterMap": {
              "machineId": "10.32.51.130@8720",
              "ip": "10.32.51.130",
              "port": 18730,
              "clientSet": [
                  "10.32.51.130@8721"
              ],
              "belongToApp": true,
              "maxAllowedQps": 2
          },
          "remainingList": []
      }
      
    • 点击 簇点链路:http://127.0.0.1:8080/#/dashboard/identity/My-Cluster-8866
      • sayHello 列,点击 + 流控
        • 是否集群 打勾
        • 集群阈值模式总体阈值
        • 集群阈值 设置为:2
        • 其他使用默认值
      • 点击 新增
        // POST http://127.0.0.1:8080//v1/flow/rule
        {
          "enable": false,
          "strategy": 0,
          "grade": 1,
          "controlBehavior": 0,
          "resource": "sayHello",
          "limitApp": "default",
          "clusterMode": true,
          "clusterConfig": {
              "thresholdType": "1"
          },
          "app": "My-Cluster-8866",
          "ip": "10.32.51.130",
          "port": "8720",
          "count": 2
        }
        
  • 再测试

    • 连续刷 3 次:http://localhost:10010/hello/test1
    • 出现:Oops, [test1] blocked by Sentinel

原理

改模式

  • Path: /cluster/assign/single_server/{app}
控制台
  • com.alibaba.csp.sentinel.dashboard.controller.cluster.ClusterAssignController
@RestController
@RequestMapping("/cluster/assign")
public class ClusterAssignController {

    // 全路径为: /cluster/assign/single_server/{app}
    @PostMapping("/single_server/{app}")
    public Result<ClusterAppAssignResultVO> apiAssignSingleClusterServersOfApp(
        @PathVariable String app,
        @RequestBody ClusterAppSingleServerAssignRequest assignRequest
    ) {
        ... // 参数校验
        try {
            return Result.ofSuccess(
                clusterAssignService.applyAssignToApp(  // ref: sign_m_100
                    app, Collections.singletonList(assignRequest.getClusterMap()),
                    assignRequest.getRemainingList()
                )
            );
        } ... // catch
    }
}
  • com.alibaba.csp.sentinel.dashboard.service.ClusterAssignServiceImpl
@Service
public class ClusterAssignServiceImpl implements ClusterAssignService {

    // sign_m_100
    @Override
    public ClusterAppAssignResultVO applyAssignToApp(
        String app, List<ClusterAppAssignMap> clusterMap,
        Set<String> remainingSet
    ) {
        ... // 参数校验

        // 更改集群模式--为服务端 & 推送配置
        clusterMap.stream()
            .filter(Objects::nonNull)
            .filter(ClusterAppAssignMap::getBelongToApp)
            .map(e -> {
                String ip = e.getIp();          // 传过来的值为: 10.32.51.130 (相当于连接到控制台的 Sentinel 使用者的机器 IP)
                int commandPort = parsePort(e); // 从 "machineId": "10.32.51.130@8720" 里取,取的值为: 8720
                CompletableFuture<Void> f = modifyMode(ip, commandPort, ClusterStateManager.CLUSTER_SERVER) // 改集群模式, ref: sign_m_101
                    .thenCompose(v -> applyServerConfigChange(app, ip, commandPort, e));                    // 推送配置,   ref: sign_m_102 
                return Tuple2.of(e.getMachineId(), f);
            })
            .forEach(t -> handleFutureSync(t, failedServerSet));    // 失败或超时 (10s) 则记录到集合里

        // 更改集群模式--为客户端 & 推送配置
        clusterMap.parallelStream()
            .filter(Objects::nonNull)
            .forEach(e -> applyAllClientConfigChange(app, e, failedClientSet)); // 改集群模式为客户端并推送配置, ref: sign_m_110

        // 解绑剩余 (未分配的) 机器
        applyAllRemainingMachineSet(app, remainingSet, failedClientSet);

        return new ClusterAppAssignResultVO() ... // 组装失败的 Client/Server 集合并返回
    }

    // sign_m_101 改集群模式
    private CompletableFuture<Void> modifyMode(String ip, int port, int mode) {
        return sentinelApiClient.modifyClusterMode(ip, port, mode); // 命令为: setClusterMode, 处理者 ref: sign_c_110
    }
    
    // sign_m_102 推送配置给服务端
    private CompletableFuture<Void> applyServerConfigChange(
        String app, String ip, int commandPort,
        ClusterAppAssignMap assignMap
    ) {
        ServerTransportConfig transportConfig = new ServerTransportConfig()
            .setPort(assignMap.getPort())   // 传过来的值为: 18730
            .setIdleSeconds(600);
        /**
         * 推送服务端配置 (服务端口) 
         * 命令为: cluster/server/modifyTransportConfig
         */
        return sentinelApiClient.modifyClusterServerTransportConfig(app, ip, commandPort, transportConfig)
            .thenCompose(v -> applyServerFlowConfigChange(app, ip, commandPort, assignMap))     // 推送集群整体流控 QPS, ref: sign_m_103
            .thenCompose(v -> applyServerNamespaceSetConfig(app, ip, commandPort, assignMap));  // 传过来的 namespaceSet 为空,不会处理,略...
    }

    // sign_m_103 推送集群整体流控 QPS
    private CompletableFuture<Void> applyServerFlowConfigChange(
        String app, String ip, int commandPort,
        ClusterAppAssignMap assignMap
    ) {
        Double maxAllowedQps = assignMap.getMaxAllowedQps();
        ... // maxAllowedQps 校验。注: QPS 大于 20w 则不推送
        /**
         * 推送集群流控配置
         * 命令为: cluster/server/modifyFlowConfig
         */
        return sentinelApiClient.modifyClusterServerFlowConfig(app, ip, commandPort,
            new ServerFlowConfig().setMaxAllowedQps(maxAllowedQps));
    }

    // sign_m_110 改集群模式为客户端并推送配置
    private void applyAllClientConfigChange(
        String app, ClusterAppAssignMap assignMap,
        Set<String> failedSet
    ) {
        Set<String> clientSet = assignMap.getClientSet();
        ... // 空校验
        final String serverIp = assignMap.getIp();  // 集群服务端 IP:  10.32.51.130
        final int serverPort = assignMap.getPort(); // 集群服务端端口: 18730
        clientSet.stream()
            .map(MachineUtils::parseCommandIpAndPort)
            .filter(Optional::isPresent)
            .map(Optional::get)
            .map(ipPort -> {    // 将客户端机器 ID 用 @ 符截取成 [ip, port] 元组
                CompletableFuture<Void> f = sentinelApiClient
                    /**
                     * 改集群模式为客户端,命令为: setClusterMode
                     * 处理者 ref: sign_c_110
                     */
                    .modifyClusterMode(ipPort.r1, ipPort.r2, ClusterStateManager.CLUSTER_CLIENT)
                    /**
                     * 推送客户端配置 (服务端的 IP 和端口) 
                     * 命令为: cluster/client/modifyConfig
                     */
                    .thenCompose(v -> sentinelApiClient.modifyClusterClientConfig(app, ipPort.r1, ipPort.r2,
                        new ClusterClientConfig().setRequestTimeout(20)
                            .setServerHost(serverIp)
                            .setServerPort(serverPort)
                    ));
                return Tuple2.of(ipPort.r1 + '@' + ipPort.r2, f);
            })
            .forEach(t -> handleFutureSync(t, failedSet));  // 失败或超时 (10s) 则记录到集合里
    }

}
应用机器
  • com.alibaba.csp.sentinel.command.handler.cluster.ModifyClusterModeCommandHandler
// sign_c_110 改集群模式命令处理者
@CommandMapping(name = "setClusterMode", desc = "set cluster mode...")
public class ModifyClusterModeCommandHandler implements CommandHandler<String> {

    @Override
    public CommandResponse<String> handle(CommandRequest request) {
        try {
            int mode = Integer.valueOf(request.getParam("mode"));
            ... // 对模式对应的 SPI 的校验

            ClusterStateManager.applyState(mode);   // 改模式, ref: sign_m_200
            return CommandResponse.ofSuccess("success");
        } ... // catch
    }
}
  • com.alibaba.csp.sentinel.cluster.ClusterStateManager
    • DynamicSentinelProperty 参考:入口控制-设置规则 sign_c_100
public final class ClusterStateManager {

    private static volatile SentinelProperty<Integer> stateProperty = new DynamicSentinelProperty<Integer>();
    private static final PropertyListener<Integer> PROPERTY_LISTENER = new ClusterStatePropertyListener();

    static {
        InitExecutor.doInit();
        stateProperty.addListener(PROPERTY_LISTENER);
    }
    
    // sign_m_200 改模式
    public static void applyState(Integer state) {
        stateProperty.updateValue(state);   // 最终触发内部处理, ref: sign_m_210
    }
    
    private static class ClusterStatePropertyListener implements PropertyListener<Integer> {
        ... // configLoad 实现

        @Override
        public synchronized void configUpdate(Integer value) {
            applyStateInternal(value);
        }
    }
    
    // sign_m_210 状态更改处理
    private static boolean applyStateInternal(Integer state) {
        ... // state 校验
        try {
            switch (state) {
                case CLUSTER_CLIENT:
                    return setToClient();   // 设置为客户端模式, ref: sign_m_211
                case CLUSTER_SERVER:
                    return setToServer();   // 设置为服务端模式, ref: sign_m_215
                case CLUSTER_NOT_STARTED:
                    setStop();
                    return true;
                ... // default 处理
            }
        } ... // catch
    }

    // sign_m_211 设置为客户端模式
    public static boolean setToClient() {
        if (mode == CLUSTER_CLIENT) {
            return true;
        }
        mode = CLUSTER_CLIENT;
        sleepIfNeeded();        // 转换完之后,至少要等 5s 才能换模式
        lastModified = TimeUtil.currentTimeMillis();
        return startClient();   // ref: sign_m_212
    }

    // sign_m_212 启动客户端并关闭服务端
    private static boolean startClient() {
        try {
            EmbeddedClusterTokenServer server = EmbeddedClusterTokenServerProvider.getServer();
            if (server != null) {
                server.stop();
            }
            ClusterTokenClient tokenClient = TokenClientProvider.getClient();
            if (tokenClient != null) {  // 默认实现为 DefaultClusterTokenClient, ref: sign_c_230
                tokenClient.start();    // ref: sign_m_230
                RecordLog.info("[ClusterStateManager] Changing cluster mode to client");
                return true;
            } ... // else
        } ... // catch
    }

    // sign_m_215 设置为服务端模式
    public static boolean setToServer() {
        ... // 同样要等 5s 才能换模式
        return startServer();   // ref: sign_m_216
    }

    // sign_m_216 启动服务端并关闭客户端
    private static boolean startServer() {
        try {
            ClusterTokenClient tokenClient = TokenClientProvider.getClient();
            if (tokenClient != null) {
                tokenClient.stop();
            }
            EmbeddedClusterTokenServer server = EmbeddedClusterTokenServerProvider.getServer();
            if (server != null) {       // 默认实现为 DefaultEmbeddedTokenServer, ref: sign_c_220
                server.start();         // ref: sign_m_218
                RecordLog.info("[ClusterStateManager] Changing cluster mode to server");
                return true;
            } ... // else
        } ... // catch
    }
}
启用服务端
  • com.alibaba.csp.sentinel.cluster.server.DefaultEmbeddedTokenServer
/** sign_c_220 默认内嵌 Token 服务器 */
public class DefaultEmbeddedTokenServer implements EmbeddedClusterTokenServer {

    // 相当于对此进行一层封装
    private final ClusterTokenServer server = new SentinelDefaultTokenServer(true); // ref: sign_cm_221

    // sign_m_218
    @Override
    public void start() throws Exception {
        server.start(); // ref: sign_m_220
    }
}
  • com.alibaba.csp.sentinel.cluster.server.SentinelDefaultTokenServer
public class SentinelDefaultTokenServer implements ClusterTokenServer {

    // sign_cm_221
    public SentinelDefaultTokenServer(boolean embedded) {
        this.embedded = embedded;
        ClusterServerConfigManager.addTransportConfigChangeObserver(new ServerTransportConfigObserver() {
            @Override
            public void onTransportConfigChange(ServerTransportConfig config) {
                changeServerConfig(config); // ref: sign_m_221
            }
        });
        initNewServer();    // 只是创建,并未启动...
    }

    // sign_m_220
    @Override
    public void start() throws Exception {
        if (shouldStart.compareAndSet(false, true)) {
            startServerIfScheduled();   // ref: sign_m_222
        }
    }

    // sign_m_221
    private synchronized void changeServerConfig(ServerTransportConfig config) {
        ...
        int newPort = config.getPort();
        ... // 新端口与旧端口相同,则返回。(相当于界面用非默认端口 `18730`,则会启动 2 次 Netty 服务)
        try {
            if (server != null) {
                stopServer();
            }
            this.server = new NettyTransportServer(newPort);
            this.port = newPort;
            startServerIfScheduled();   
        } ... // catch
    }

    // sign_m_222
    private void startServerIfScheduled() throws Exception {
        if (shouldStart.get()) {
            if (server != null) {
                server.start();         // 启动 Netty 服务, ref: sign_m_223
                ClusterStateManager.markToServer();
                if (embedded) {
                    handleEmbeddedStart();
                }
            }
        }
    }
}
Netty-服务
  • com.alibaba.csp.sentinel.cluster.server.NettyTransportServer
public class NettyTransportServer implements ClusterTokenServer {
    
    private final int port;

    public NettyTransportServer(int port) {
        this.port = port;
    }

    // sign_m_223 启动 Netty 服务
    @Override
    public void start() {
        if (!currentState.compareAndSet(SERVER_STATUS_OFF, SERVER_STATUS_STARTING)) {
            return;
        }

        ServerBootstrap b = new ServerBootstrap();
        this.bossGroup = new NioEventLoopGroup(1);
        this.workerGroup = new NioEventLoopGroup(DEFAULT_EVENT_LOOP_THREADS);
        b.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG, 128)
            .handler(new LoggingHandler(LogLevel.INFO))
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline p = ch.pipeline();
                    p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
                    p.addLast(new NettyRequestDecoder());
                    p.addLast(new LengthFieldPrepender(2));
                    p.addLast(new NettyResponseEncoder());
                    p.addLast(new TokenServerHandler(connectionPool));
                }
            })
            ... // childOption
        b.bind(port).addListener(new GenericFutureListener<ChannelFuture>() {
            @Override
            public void operationComplete(ChannelFuture future) {
                if (future.cause() != null) {
                    ...
                    try {
                        Thread.sleep(failCount * RETRY_SLEEP_MS);   // 等待 n * 2s
                        start();    // 有异常,则尝试 3 次启动, ref: sign_m_223
                    } ... // catch
                } else {
                    // 没有异常,则改状态为启动完成
                    currentState.compareAndSet(SERVER_STATUS_STARTING, SERVER_STATUS_STARTED);
                }
            }
        });
    }
}
启用客户端
  • com.alibaba.csp.sentinel.cluster.client.DefaultClusterTokenClient
/** sign_c_230 默认客户端 */
public class DefaultClusterTokenClient implements ClusterTokenClient {

    // SPI 加载时会调用
    public DefaultClusterTokenClient() {
        ClusterClientConfigManager.addServerChangeObserver(new ServerChangeObserver() {
            @Override
            public void onRemoteServerChange(ClusterClientAssignConfig assignConfig) {
                changeServer(assignConfig); // ref: sign_m_231
            }
        });
        initNewConnection();    // host 为空,不会创建连接...
    }
    
    // sign_m_230
    @Override
    public void start() throws Exception {
        if (shouldStart.compareAndSet(false, true)) {
            startClientIfScheduled();   // ref: sign_m_232
        }
    }
    
    // sign_m_231
    private void changeServer(ClusterClientAssignConfig config) {
        ... // 服务端的 IP 和端口没改,则返回不处理

        try {
            if (transportClient != null) {
                transportClient.stop();
            }
            this.transportClient = new NettyTransportClient(config.getServerHost(), config.getServerPort());
            ... // 记录配置
            startClientIfScheduled();   // ref: sign_m_232
        } ... // catch
    }

    // sign_m_232
    private void startClientIfScheduled() throws Exception {
        if (shouldStart.get()) {
            if (transportClient != null) {
                transportClient.start();    // 启动 Netty 客户端连接, ref: sign_m_233
            } ... // else
        }
    }
}
Netty-客户端连接
  • com.alibaba.csp.sentinel.cluster.client.NettyTransportClient
public class NettyTransportClient implements ClusterTransportClient {

    private final String host;
    private final int port;

    public NettyTransportClient(String host, int port) {
        ... // 校验参数
        this.host = host;
        this.port = port;
    }

    // sign_m_233 启动 Netty 客户端连接
    @Override
    public void start() throws Exception {
        shouldRetry.set(true);
        startInternal();    // ref: sign_m_234
    }

    // sign_m_234
    private void startInternal() {
        connect(initClientBootstrap()); // sign_m_235 || sign_m_236
    }

    // sign_m_235
    private Bootstrap initClientBootstrap() {
        Bootstrap b = new Bootstrap();
        eventLoopGroup = new NioEventLoopGroup();
        b.group(eventLoopGroup)
            .channel(NioSocketChannel.class)
            ... // option
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    clientHandler = new TokenClientHandler(currentState, disconnectCallback);

                    ChannelPipeline pipeline = ch.pipeline();
                    pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
                    pipeline.addLast(new NettyResponseDecoder());
                    pipeline.addLast(new LengthFieldPrepender(2));
                    pipeline.addLast(new NettyRequestEncoder());
                    pipeline.addLast(clientHandler);
                }
            });

        return b;
    }

    // sign_m_236
    private void connect(Bootstrap b) {
        if (currentState.compareAndSet(CLIENT_STATUS_OFF, CLIENT_STATUS_PENDING)) {
            b.connect(host, port)
                    .addListener(new GenericFutureListener<ChannelFuture>() {
                        @Override
                        public void operationComplete(ChannelFuture future) {
                            if (future.cause() != null) {   // 有异常,连接失败
                                ... // log
                                failConnectedTime.incrementAndGet();
                                channel = null;
                            } else {                        // 无异常,连接成功
                                failConnectedTime.set(0);
                                channel = future.channel();
                                ... // log
                            }
                        }
                    });
        }
    }
}

更新规则

  • 参考:WebUI-更新规则-应用端 sign_c_200
    • 服务端并没有将规则同步到客户端
  • 更新规则的入口
    • ModifyClusterFlowRulesCommandHandler ( cluster/server/modifyFlowRules )
      • ClusterFlowRuleManager #loadRules
      • ClusterFlowRuleManager #FLOW_RULES
  • 集群规则现在还没有同步,需要自己改造控制台
    • ref: https://github.com/alibaba/Sentinel/issues/1670
    • 可用 V2,也可用配置源
      • 然后在 ModifyRulesCommandHandler 用 SPI 做下扩展
    • 最主要的是要保持 ruleId 一致

流控原理

  • 参考:

    • 链路控制-ClusterBuilderSlot
    • 链路控制-FlowSlot sign_m_721
  • com.alibaba.csp.sentinel.slots.block.flow.FlowRuleChecker

public class FlowRuleChecker {

    // sign_m_411 集群流控 (调用者, ref: sign_m_721)
    private static boolean passClusterCheck(
        FlowRule rule, Context context, DefaultNode node, int acquireCount,
        boolean prioritized
    ) {
        try {
            // 客户端模式下,SPI 提供的实现者为: DefaultClusterTokenClient, ref: sign_c_421
            TokenService clusterService = pickClusterService();
            ... // clusterService 为空则回退到本地单机校验
            long flowId = rule.getClusterConfig().getFlowId();
            // 根据规则 id 请求 token 服务器,以获取流控结果. ref: sign_m_421 
            TokenResult result = clusterService.requestToken(flowId, acquireCount, prioritized);
            return applyTokenResult(result, rule, context, node, acquireCount, prioritized); // ref: sign_m_412
        } ... // catch
        // 回退到本地单机校验
        return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
    }

    // sign_m_412 根据流控结果进行处理
    private static boolean applyTokenResult(
        TokenResult result, FlowRule rule, Context context,
        DefaultNode node,
        int acquireCount, boolean prioritized
    ) {
        switch (result.getStatus()) {
            case TokenResultStatus.OK:
                return true;    // 放行
            case TokenResultStatus.SHOULD_WAIT:
                try {
                    Thread.sleep(result.getWaitInMs());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return true;    // 等待指定时间后放行
            case TokenResultStatus.NO_RULE_EXISTS:
            case TokenResultStatus.BAD_REQUEST:
            case TokenResultStatus.FAIL:
            case TokenResultStatus.TOO_MANY_REQUEST:
                // 回退到本地单机校验
                return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
            case TokenResultStatus.BLOCKED:
            default:
                return false;
        }
    }

}
  • com.alibaba.csp.sentinel.cluster.client.DefaultClusterTokenClient
// sign_c_421
public class DefaultClusterTokenClient implements ClusterTokenClient {

    // sign_m_421 请求 token 服务器,获取流控结果
    @Override
    public TokenResult requestToken(Long flowId, int acquireCount, boolean prioritized) {
        ...
        FlowRequestData data = new FlowRequestData().setCount(acquireCount)
            .setFlowId(flowId).setPriority(prioritized);
        // 此请求的处理者为 FlowRequestProcessor, ref: sign_c_510 | sign_m_510
        ClusterRequest<FlowRequestData> request = new ClusterRequest<>(ClusterConstants.MSG_TYPE_FLOW, data);
        try {
            TokenResult result = sendTokenRequest(request); // ref: sign_m_422
            ...
            return result;
        } ... // catch
    }

    // sign_m_422
    private TokenResult sendTokenRequest(ClusterRequest request) throws Exception {
        ...
        // 通过 Netty 客户端 (NettyTransportClient) 发送
        ClusterResponse response = transportClient.sendRequest(request);
        TokenResult result = new TokenResult(response.getStatus());
        if (response.getData() != null) {
            FlowTokenResponseData responseData = (FlowTokenResponseData) response.getData();
            result.setRemaining(responseData.getRemainingCount())
                .setWaitInMs(responseData.getWaitInMs());
        }
        return result;
    }
}
服务端流控
  • com.alibaba.csp.sentinel.cluster.server.processor.FlowRequestProcessor
// sign_c_510 流控请求处理器
@RequestType(ClusterConstants.MSG_TYPE_FLOW)
public class FlowRequestProcessor implements RequestProcessor<FlowRequestData, FlowTokenResponseData> {

    // sign_m_510
    @Override
    public ClusterResponse<FlowTokenResponseData> processRequest(ClusterRequest<FlowRequestData> request) {
        // 服务端模式下,SPI 提供的实现者为 DefaultTokenService, ref: sign_c_521
        TokenService tokenService = TokenServiceProvider.getService();

        long flowId = request.getData().getFlowId();
        int count = request.getData().getCount();
        boolean prioritized = request.getData().isPriority();

        TokenResult result = tokenService.requestToken(flowId, count, prioritized); // ref: sign_m_521
        return toResponse(result, request); // ref: sign_m_511
    }

    // sign_m_511
    private ClusterResponse<FlowTokenResponseData> toResponse(TokenResult result, ClusterRequest request) {
        return new ClusterResponse<>(request.getId(), request.getType(), result.getStatus(),
            new FlowTokenResponseData()
                .setRemainingCount(result.getRemaining())
                .setWaitInMs(result.getWaitInMs())
        );
    }

}
  • com.alibaba.csp.sentinel.cluster.flow.DefaultTokenService
// sign_c_521
@Spi(isDefault = true)
public class DefaultTokenService implements TokenService {

    // sign_m_521
    @Override
    public TokenResult requestToken(Long ruleId, int acquireCount, boolean prioritized) {
        ...
        FlowRule rule = ClusterFlowRuleManager.getFlowRuleById(ruleId);
        if (rule == null) { // 测试的时候,为 null (因为没同步)
            return new TokenResult(TokenResultStatus.NO_RULE_EXISTS);
        }
        // 集群流控校验 (逻辑较简单。集群指标在 ClusterFlowRuleManager #applyClusterFlowRule 里初始化)
        return ClusterFlowChecker.acquireClusterToken(rule, acquireCount, prioritized);
    }

}
总结
  • 集群流控时,每次请求都会转发到服务端(由服务端去控制)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/634544.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Polar 上传

Polar 上传 开题&#xff0c;是一个文件上传界面 对文件后缀有过滤 测试了一下是黑名单&#xff0c;过滤了php相关的文件&#xff0c;但是没过滤.ini、.htaccess后缀的文件 对内容的过滤是<?、file&#xff0c;所以不能用.user.ini配置文件绕过 我们选择使用.htaccess配置…

晶体振荡器

一、晶振与晶体区别 晶振是有源晶振的简称&#xff0c;又叫振荡器&#xff0c;英文名称是oscillator&#xff0c;内部有时钟电路&#xff0c;只需供电便可产生振荡信号&#xff1b;晶体是无源晶振的简称&#xff0c;也叫谐振器&#xff0c;英文名称是crystal&#xff0c;是无极…

第197题|奇偶性的四则运算,你掌握了吗?|函数强化训练(四)|武忠祥老师每日一题 5月22日

解题思路&#xff1a;这道题如果我们会21号的题的话&#xff0c;简直是小菜一碟&#xff01;主要就是要用到下面这个结论&#xff1a; &#xff08;A&#xff09; 直接看奇偶性我们不好看&#xff0c;我们需要拆项&#xff1a; 我们先看前一项的奇偶性&#xff0c;x是奇函数&a…

民国漫画杂志《时代漫画》第13期.PDF

时代漫画13.PDF: https://url03.ctfile.com/f/1779803-1247458360-14efab?p9586 (访问密码: 9586) 《时代漫画》的杂志在1934年诞生了&#xff0c;截止1937年6月战争来临被迫停刊共发行了39期。 ps:资源来源网络&#xff01;

P1【知识点】【数据结构】【链表LinkedList】C++版

链表是一种逻辑上连续&#xff0c;内存上分散的线性表数据结构&#xff0c;是用一组任意的空间&#xff08;可以连续&#xff0c;也可以不连续&#xff09;来存放数据元素。每个数据元素成为一个”结点“&#xff0c;每个结点由数据域和指针域组成。 访问元素&#xff08;Acce…

特征融合篇 | YOLOv8改进之引入轻量级跨尺度特征融合模块CCFM | 源自RT-DETR

前言:Hello大家好,我是小哥谈。CCFM(Cross-Scale Feature Fusion Module)即为跨尺度特征融合模块。这个模块的作用是将不同尺度的特征通过融合操作整合起来,以增强模型对于尺度变化的适应性和对小尺度对象的检测能力。CCFM可以有效地整合细节特征和上下文信息,从而提高模…

【全开源】简单商城系统(PC/UniAPP)

轻松构建您的在线商店 在当今数字化时代&#xff0c;拥有一个在线商店对于许多商家来说已成为必不可少的营销手段。为了满足这一需求&#xff0c;我们推出了“简单商城系统源码”&#xff0c;让您轻松构建并管理您的在线商店。 一、简单易用&#xff0c;快速上手 “简单商城…

python demo

文章背景&#xff0c;记录python 小demo 集合 1、使用python matplotlib库描绘曲线 import matplotlib.pyplot as NLAx_index [0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 65, 70, 75, 80, 85, 90, 95, 100] y_index [6.1, 7.4, 9.1, 11.1, 12.69, 14.35, 16.1, 1…

成都爱尔周进院长提醒当双眼度数差距过大,我们该做些什么

每个人的用眼方式、用眼习惯且两只眼睛“天生条件”不一定相同&#xff0c;当发生近视&#xff0c;双眼近视程度也就可能不同&#xff0c;双眼度数必然会变得不一样。当双眼度数产生差异&#xff0c;尤其是当双眼度数差别过大时会引发哪些问题&#xff1f; 双眼度数不一致&…

面试八股之MySQL篇4——事务篇

&#x1f308;hello&#xff0c;你好鸭&#xff0c;我是Ethan&#xff0c;一名不断学习的码农&#xff0c;很高兴你能来阅读。 ✔️目前博客主要更新Java系列、项目案例、计算机必学四件套等。 &#x1f3c3;人生之义&#xff0c;在于追求&#xff0c;不在成败&#xff0c;勤通…

【ARFoundation自学03】AR Point Cloud 点云(参考点标记)功能详解

和平面识别框架一样 1为XR Origin添加AR Point Cloud Manager组件 然后你的ar应用就具备了点云识别功能&#xff0c;就这么简单 2.可视化这些云点 创建一个美术效果的预制体&#xff0c;人家提供了预设模板 然后拖到仓库&#xff08;ASSETS&#xff09;创建预制体&#xff…

Redis持久化之☞AOF、AOF是怎样执行持久化的?

AOF持久化机制&#xff1a; AOF&#xff08;Append Of File&#xff09;&#xff1a;将redis执行过的所有写指令记录下来&#xff0c;在下次redis重新启动时&#xff0c;只要把这些指令从前到后重复执行一遍&#xff0c;就可以实现数据恢复了。 以独立日志的方式记录每次写命…

本特利330878-90-00前置传感器在PLC系统中的应用与优势

本特利330878-90-00前置传感器在PLC系统中的应用与优势 一、引言 在现代工业自动化领域中&#xff0c;传感器作为信息获取的重要工具&#xff0c;其性能的稳定性和准确性直接影响到整个系统的运行效率。其中&#xff0c;本特利330878-90-00前置传感器以其卓越的性能和广泛的应…

查看主机的php参数short_open_tag 是否为 on

我想要查看主机的php参数short_open_tag 是否为 on&#xff0c;由于我使用的是Hostease的Linux虚拟主机产品&#xff0c;在cPanel面板中并没有找到这个参数选项&#xff0c;因此无法查看。这边联系了Hostease技术支持了解&#xff0c;可以通过以下方式进行查看。 1.先登陆cPane…

自定义横向思维导图,横向组织架构图,横向树图。可以自定义节点颜色,样式,还可以导出为图片

最近公司设计要求根据目录结构&#xff0c;横向展示。所以做了一个横向的思维导图&#xff0c;横向的树结构&#xff0c;横向的组织架构图&#xff0c;可以自定义节点颜色&#xff0c;样式&#xff0c;还可以导出为图片 话不多说&#xff0c;直接上图片&#xff0c;这个就是一…

nssctf(Web刷题)

[SWPUCTF 2021 新生赛]gift_F12 打开题目是一个时间页面&#xff0c;不过看了一会儿发现没有什么用 直接F12打开网页源代码 CtrlF搜索flag 找到了flag NSSCTF{We1c0me_t0_WLLMCTF_Th1s_1s_th3_G1ft} [第五空间 2021]签到题 NSSCTF{welcometo5space} [SWPUCTF 2021 新生赛…

cPanel中如何为数据库添加用户权限

本周有一个客户&#xff0c;购买Hostease的主机&#xff0c;询问我们的在线客服&#xff0c;他的网站安装后再还是无法访问。 客户购买的是Linux虚拟主机&#xff0c;带cPanel面板的。网站访问有如下数据库连接错误: 随后检查发现客户创建的数据库没有添加数据库用户权限。 下面…

期权策略交易怎么做?怎么选择期权策略?

今天期权懂带你了解期权策略交易怎么做&#xff1f;怎么选择期权策略&#xff1f;期权交易是一种金融衍生品交易方式&#xff0c;它给予购买者在未来特定时间内以特定价格购买&#xff08;或出售&#xff09;标的资产的权利。 期权策略交易怎么做&#xff1f; 配对看跌期权&am…

基于地理坐标的高阶几何编辑工具算法(3)——相离面吸附

文章目录 工具步骤应用场景算法输入算法输出算法示意图算法原理 工具步骤 点击面&#xff0c;点击“相离面吸附”工具&#xff0c;绘制一个面&#xff0c;双击结束后&#xff0c;与所有相交的面进行吸附 应用场景 为了让相离的两个几何面在空间上相邻&#xff0c;使用该工具…

llama_factory的使用

1.git clone llama_factory到本地 2.记得安环境&#xff0c;在clone后 3.多显卡要设置一下 4.数据文件放在data里面&#xff0c;仿照模板里的格式 5.进入llama_factory微调页面 python src/webui.py 6.llama_factory介绍&#xff1a;10分钟打造你个人专属的语言大模型&am…