package org.apache.seatunnel.connectors.seatunnel.redis.sink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.seatunnel.api.serialization.SerializationSchema;
import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import org.apache.seatunnel.connectors.seatunnel.redis.client.RedisClient;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
import org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException;
import org.apache.seatunnel.format.json.JsonSerializationSchema;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.class */
public class RedisSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> implements SupportMultiTableSinkWriter<Void> {
    private final SeaTunnelRowType seaTunnelRowType;
    private final RedisParameters redisParameters;
    private final SerializationSchema serializationSchema;
    private final RedisClient redisClient;
    private final int batchSize;
    private final List<String> keyBuffer;
    private final List<String> valueBuffer;

    public RedisSinkWriter(SeaTunnelRowType seaTunnelRowType, RedisParameters redisParameters) {
        this.seaTunnelRowType = seaTunnelRowType;
        this.redisParameters = redisParameters;
        this.serializationSchema = new JsonSerializationSchema(seaTunnelRowType);
        this.redisClient = redisParameters.buildRedisClient();
        this.batchSize = redisParameters.getBatchSize();
        this.keyBuffer = new ArrayList(this.batchSize);
        this.valueBuffer = new ArrayList(this.batchSize);
    }

    public void write(SeaTunnelRow seaTunnelRow) throws IOException {
        String str = new String(this.serializationSchema.serialize(seaTunnelRow));
        String keyField = this.redisParameters.getKeyField();
        List asList = Arrays.asList(this.seaTunnelRowType.getFieldNames());
        this.keyBuffer.add(asList.contains(keyField) ? seaTunnelRow.getField(asList.indexOf(keyField)).toString() : keyField);
        this.valueBuffer.add(str);
        if (this.keyBuffer.size() >= this.batchSize) {
            doBatchWrite();
            clearBuffer();
        }
    }

    private void clearBuffer() {
        this.keyBuffer.clear();
        this.valueBuffer.clear();
    }

    private void doBatchWrite() {
        RedisDataType redisDataType = this.redisParameters.getRedisDataType();
        if (RedisDataType.KEY.equals(redisDataType) || RedisDataType.STRING.equals(redisDataType)) {
            this.redisClient.batchWriteString(this.keyBuffer, this.valueBuffer, this.redisParameters.getExpire());
            return;
        }
        if (RedisDataType.LIST.equals(redisDataType)) {
            this.redisClient.batchWriteList(this.keyBuffer, this.valueBuffer, this.redisParameters.getExpire());
            return;
        }
        if (RedisDataType.SET.equals(redisDataType)) {
            this.redisClient.batchWriteSet(this.keyBuffer, this.valueBuffer, this.redisParameters.getExpire());
        } else if (RedisDataType.HASH.equals(redisDataType)) {
            this.redisClient.batchWriteHash(this.keyBuffer, this.valueBuffer, this.redisParameters.getExpire());
        } else {
            if (!RedisDataType.ZSET.equals(redisDataType)) {
                throw new RedisConnectorException((SeaTunnelErrorCode) CommonErrorCode.UNSUPPORTED_DATA_TYPE, "UnSupport redisDataType,only support string,list,hash,set,zset");
            }
            this.redisClient.batchWriteZset(this.keyBuffer, this.valueBuffer, this.redisParameters.getExpire());
        }
    }

    public void close() throws IOException {
        if (this.keyBuffer.isEmpty()) {
            return;
        }
        doBatchWrite();
        clearBuffer();
    }
}
