Apache Kafka Connect JNDI 注入漏洞(CVE-2023-25194)分析

前言

Apache Kafka 是一种分布式流处理平台。它可以在分布式系统中可靠地发布和订阅流数据。 Kafka 用作实时数据流的中央数据管道,能有效地连接各种系统和应用。在Apache Kafka Connect中存在JNDI注入漏洞,当攻击者可访问Kafka Connect Worker,且可以创建或修改连接器时,通过设置sasl.jaas.config属性为com.sun.security.auth.module.JndiLoginModule,进而可导致JNDI注入

漏洞分析

1686127828050.png

调用KafkaProducer#KafkaProducer()构造函数

1686127929595.png

然后调用他的重载方法

1686128264173.png

再调用重载方法

1686128482304.png

一共经历四次调用重载方法后,进入newSender()

1686137249398.png
1686130699510.png
1686130891741.png

跟进到ClientUtils#createChannelBuilder(),这里的security.protocolsasl.mechanism是我们传进去的,如果不手动指定,则会抛出null异常。

1686132081641.png

跟进到ChannelBuiders#cilentChannelBuilder(),这里需要contextTypeclientSaslMechanism不为空

1686132351462.png

调用ChannelBuilders#create()

private static ChannelBuilder create(SecurityProtocol securityProtocol, Mode mode, JaasContext.Type contextType, AbstractConfig config, ListenerName listenerName, boolean isInterBrokerListener, String clientSaslMechanism, boolean saslHandshakeRequestEnable, CredentialCache credentialCache, DelegationTokenCache tokenCache, Time time, LogContext logContext, Supplier<ApiVersionsResponse> apiVersionSupplier) {
        Map<String, Object> configs = channelBuilderConfigs(config, listenerName);
        Object channelBuilder;
        switch (securityProtocol) {
            case SSL:
                requireNonNullMode(mode, securityProtocol);
                channelBuilder = new SslChannelBuilder(mode, listenerName, isInterBrokerListener, logContext);
                break;
            case SASL_SSL:
            case SASL_PLAINTEXT:
                requireNonNullMode(mode, securityProtocol);
                String sslClientAuthOverride = null;
                Object jaasContexts;
                if (mode != Mode.SERVER) {
                    JaasContext jaasContext = contextType == Type.CLIENT ? JaasContext.loadClientContext(configs) : JaasContext.loadServerContext(listenerName, clientSaslMechanism, configs);
                    jaasContexts = Collections.singletonMap(clientSaslMechanism, jaasContext);
                } else {
                    List<String> enabledMechanisms = (List)configs.get("sasl.enabled.mechanisms");
                    jaasContexts = new HashMap(enabledMechanisms.size());
                    Iterator var18 = enabledMechanisms.iterator();

                    String listenerClientAuth;
                    while(var18.hasNext()) {
                        listenerClientAuth = (String)var18.next();
                        ((Map)jaasContexts).put(listenerClientAuth, JaasContext.loadServerContext(listenerName, listenerClientAuth, configs));
                    }

                    if (listenerName != null && securityProtocol == SecurityProtocol.SASL_SSL) {
                        String configuredClientAuth = (String)configs.get("ssl.client.auth");
                        listenerClientAuth = (String)config.originalsWithPrefix(listenerName.configPrefix(), true).get("ssl.client.auth");
                        if (listenerClientAuth == null) {
                            sslClientAuthOverride = SslClientAuth.NONE.name().toLowerCase(Locale.ROOT);
                            if (configuredClientAuth != null && !configuredClientAuth.equalsIgnoreCase(SslClientAuth.NONE.name())) {
                                log.warn("Broker configuration '{}' is applied only to SSL listeners. Listener-prefixed configuration can be used to enable SSL client authentication for SASL_SSL listeners. In future releases, broker-wide option without listener prefix may be applied to SASL_SSL listeners as well. All configuration options intended for specific listeners should be listener-prefixed.", "ssl.client.auth");
                            }
                        }
                    }
                }

                channelBuilder = new SaslChannelBuilder(mode, (Map)jaasContexts, securityProtocol, listenerName, isInterBrokerListener, clientSaslMechanism, saslHandshakeRequestEnable, credentialCache, tokenCache, sslClientAuthOverride, time, logContext, apiVersionSupplier);
                break;
            case PLAINTEXT:
                channelBuilder = new PlaintextChannelBuilder(listenerName);
                break;
            default:
                throw new IllegalArgumentException("Unexpected securityProtocol " + securityProtocol);
        }

        ((ChannelBuilder)channelBuilder).configure(configs);
        return (ChannelBuilder)channelBuilder;
    }
1686133882006.png

security.protocol值为SSL时,channelBuilderSslChannelBuilder

security.protocol值为SASL、SASL_PLAINTEXT时,channelBuilderSaslChannelBuilder

security.protocol值为PLAINTEXT时,channelBuilderPlaintextChannelBuilder

若都不是,则抛出IllegalArgumentException错误

这里基于config信息生成了JaasContext实例,然后SaslChannelBuilder对象

在实例化JaasContext的时候,会new JaasConfig(globalContextName, dynamicJaasConfig.value());sasl.jaas.config中的字符串解析成AppConfigurationEntry对象,其中包含loginModuleName以及其他参数

1686202401949.png
1686199852392.png

跟进到SaslChannelBuilder#configure()

1686134608855.png

继续跟进到LoginManager#acquireLoginManger

public static LoginManager acquireLoginManager(JaasContext jaasContext, String saslMechanism, Class<? extends Login> defaultLoginClass, Map<String, ?> configs) throws LoginException {
        Class<? extends Login> loginClass = configuredClassOrDefault(configs, jaasContext, saslMechanism, "sasl.login.class", defaultLoginClass);
        Class<? extends AuthenticateCallbackHandler> defaultLoginCallbackHandlerClass = "OAUTHBEARER".equals(saslMechanism) ? OAuthBearerUnsecuredLoginCallbackHandler.class : AbstractLogin.DefaultLoginCallbackHandler.class;
        Class<? extends AuthenticateCallbackHandler> loginCallbackClass = configuredClassOrDefault(configs, jaasContext, saslMechanism, "sasl.login.callback.handler.class", defaultLoginCallbackHandlerClass);
        Class var7 = LoginManager.class;
        synchronized(LoginManager.class) {
            Password jaasConfigValue = jaasContext.dynamicJaasConfig();
            LoginManager loginManager;
            LoginMetadata loginMetadata;
            if (jaasConfigValue != null) {
                loginMetadata = new LoginMetadata(jaasConfigValue, loginClass, loginCallbackClass);
                loginManager = (LoginManager)DYNAMIC_INSTANCES.get(loginMetadata);
                if (loginManager == null) {
                    loginManager = new LoginManager(jaasContext, saslMechanism, configs, loginMetadata);
                    DYNAMIC_INSTANCES.put(loginMetadata, loginManager);
                }
            } else {
                loginMetadata = new LoginMetadata(jaasContext.name(), loginClass, loginCallbackClass);
                loginManager = (LoginManager)STATIC_INSTANCES.get(loginMetadata);
                if (loginManager == null) {
                    loginManager = new LoginManager(jaasContext, saslMechanism, configs, loginMetadata);
                    STATIC_INSTANCES.put(loginMetadata, loginManager);
                }
            }

            SecurityUtils.addConfiguredSecurityProviders(configs);
            return loginManager.acquire();
        }
    }
1686135717346.png

跟进LoginManager的构造函数

1686135829245.png

调用KerberosLogin#login(),调用父类的login

1686136198763.png

AbstractLogin#login(),下面会new一个LoginContext类,调用login

public LoginContext login() throws LoginException {
        this.loginContext = new LoginContext(this.contextName, (Subject)null, this.loginCallbackHandler, this.configuration);
        this.loginContext.login();
        log.info("Successfully logged in.");
        return this.loginContext;
    }
1686136840138.png

经过一系列调用,最后走到LoginContext#incoke()反射获取到com.sun.security.auth.module.JndiLoginModule类,然后后面实例化了。

1686203738226.png

然后进入JndiLoginModule#login(),其中userProvider和groupProvider都不能为null。

1686204110066.png

当useFirstPass=true或tryFirstPass=true时,进入到attemptAuthentication();

1686204234701.png
1686204253349.png

这里调用InitialContext#lookup,触发JNDI注入

1686204302396.png

总结:当攻击者可以控制kafka-clients连接时的属性:

  • security.protocol设置为SASL_PLAINTEXT SASL_SSL,并且指定sasl.mechanism
  • sasl.jaas.config设置为com.sun.security.auth.module.JndiLoginModule,且userProvider以及groupProvider都不为null,并且设置了useFirstPass=true或tryFirstPass=true

此时kafka client发起连接,就会发起JNDI连接,导致JDNI注入

1686138837562.png