package com.sohu.jafka.console;

import com.github.zkclient.ZkClient;
import com.github.zkclient.exception.ZkInterruptedException;
import com.sohu.jafka.api.OffsetRequest;
import com.sohu.jafka.consumer.Consumer;
import com.sohu.jafka.consumer.ConsumerConfig;
import com.sohu.jafka.consumer.ConsumerConnector;
import com.sohu.jafka.consumer.MessageStream;
import com.sohu.jafka.message.Message;
import com.sohu.jafka.producer.serializer.MessageEncoders;
import com.sohu.jafka.utils.Closer;
import com.sohu.jafka.utils.ImmutableMap;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import joptsimple.ArgumentAcceptingOptionSpec;
import joptsimple.OptionException;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import joptsimple.OptionSpecBuilder;

/* loaded from: classes2.dex */
public class ConsoleConsumer {
    static void checkRequiredArgs(OptionParser optionParser, OptionSet optionSet, OptionSpec<?>... optionSpecArr) throws IOException {
        for (OptionSpec<?> optionSpec : optionSpecArr) {
            if (!optionSet.has(optionSpec)) {
                System.err.println("Missing required argument " + optionSpec);
                optionParser.printHelpOn(System.err);
                System.exit(1);
            }
        }
    }

    public static void main(String[] strArr) throws Exception {
        OptionParser optionParser = new OptionParser();
        OptionSpec ofType = optionParser.accepts("topic", "REQUIRED: The topic id to consumer on.").withRequiredArg().describedAs("topic").ofType(String.class);
        final OptionSpec ofType2 = optionParser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port.  Multiple URLS can be given to allow fail-over.").withRequiredArg().describedAs("urls").ofType(String.class);
        final ArgumentAcceptingOptionSpec ofType3 = optionParser.accepts("group", "The group id to consume on.").withRequiredArg().describedAs("gid").defaultsTo("console-consumer-" + new Random().nextInt(100000), new String[0]).ofType(String.class);
        ArgumentAcceptingOptionSpec defaultsTo = optionParser.accepts("fetch-size", "The amount of data to fetch in a single request.").withRequiredArg().describedAs("size").ofType(Integer.class).defaultsTo(1048576, new Integer[0]);
        ArgumentAcceptingOptionSpec defaultsTo2 = optionParser.accepts("socket-buffer-size", "The size of the tcp RECV size.").withRequiredArg().describedAs("size").ofType(Integer.class).defaultsTo(2097152, new Integer[0]);
        ArgumentAcceptingOptionSpec defaultsTo3 = optionParser.accepts("consumer-timeout-ms", "consumer throws timeout exception after waiting this much of time without incoming messages").withRequiredArg().describedAs("prop").ofType(Integer.class).defaultsTo(-1, new Integer[0]);
        ArgumentAcceptingOptionSpec defaultsTo4 = optionParser.accepts("formatter", "The name of a class to use for formatting jafka messages for display.").withRequiredArg().describedAs("class").ofType(String.class).defaultsTo(NewlineMessageFormatter.class.getName(), new String[0]);
        OptionSpecBuilder accepts = optionParser.accepts("from-beginning", "If the consumer does not already have an established offset to consume from, start with the earliest message present in the log rather than the latest message.");
        ArgumentAcceptingOptionSpec defaultsTo5 = optionParser.accepts("autocommit.interval.ms", "The time interval at which to save the current offset in ms").withRequiredArg().describedAs("ms").ofType(Integer.class).defaultsTo(10000, new Integer[0]);
        OptionSpecBuilder accepts2 = optionParser.accepts("skip-message-on-error", "If there is an error when processing a message, skip it instead of halt.");
        final OptionSet tryParse = tryParse(optionParser, strArr);
        checkRequiredArgs(optionParser, tryParse, ofType, ofType2);
        Properties properties = new Properties();
        properties.put("groupid", tryParse.valueOf(ofType3));
        properties.put("socket.buffersize", ((Integer) tryParse.valueOf(defaultsTo2)).toString());
        properties.put("socket.buffersize", ((Integer) tryParse.valueOf(defaultsTo2)).toString());
        properties.put("fetch.size", ((Integer) tryParse.valueOf(defaultsTo)).toString());
        properties.put("auto.commit", "true");
        properties.put("autocommit.interval.ms", ((Integer) tryParse.valueOf(defaultsTo5)).toString());
        properties.put("autooffset.reset", tryParse.has(accepts) ? OffsetRequest.SMALLES_TIME_STRING : OffsetRequest.LARGEST_TIME_STRING);
        properties.put("zk.connect", tryParse.valueOf(ofType2));
        properties.put("consumer.timeout.ms", ((Integer) tryParse.valueOf(defaultsTo3)).toString());
        ConsumerConfig consumerConfig = new ConsumerConfig(properties);
        boolean has = tryParse.has(accepts2);
        String str = (String) tryParse.valueOf(ofType);
        Class<?> cls = Class.forName((String) tryParse.valueOf(defaultsTo4));
        final ConsumerConnector create = Consumer.create(consumerConfig);
        if (tryParse.has(accepts)) {
            tryCleanupZookeeper((String) tryParse.valueOf(ofType2), (String) tryParse.valueOf(ofType3));
        }
        Runtime.getRuntime().addShutdownHook(new Thread() { // from class: com.sohu.jafka.console.ConsoleConsumer.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                Closer.closeQuietly(ConsumerConnector.this);
                if (tryParse.has(ofType3)) {
                    return;
                }
                ConsoleConsumer.tryCleanupZookeeper((String) tryParse.valueOf(ofType2), (String) tryParse.valueOf(ofType3));
            }
        });
        MessageStream messageStream = (MessageStream) ((List) create.createMessageStreams(ImmutableMap.of(str, 1), new MessageEncoders()).get(str)).get(0);
        MessageFormatter messageFormatter = (MessageFormatter) cls.newInstance();
        try {
            Iterator it = messageStream.iterator();
            while (it.hasNext()) {
                try {
                    messageFormatter.writeTo((Message) it.next(), System.out);
                } catch (RuntimeException e) {
                    if (!has) {
                        throw e;
                    }
                    System.err.println(e.getMessage());
                    if (System.out.checkError()) {
                        System.err.println("Unable to write to standard out, closing consumer.");
                        messageFormatter.close();
                        create.close();
                        System.exit(1);
                    }
                }
            }
        } finally {
            System.out.flush();
            messageFormatter.close();
            create.close();
        }
    }

    static void tryCleanupZookeeper(String str, String str2) {
        try {
            ZkClient zkClient = new ZkClient(str, 30000, 30000);
            zkClient.deleteRecursive("/consumers/" + str2);
            zkClient.close();
        } catch (ZkInterruptedException e) {
            e.printStackTrace();
        }
    }

    static OptionSet tryParse(OptionParser optionParser, String[] strArr) {
        try {
            return optionParser.parse(strArr);
        } catch (OptionException e) {
            e.printStackTrace();
            return null;
        }
    }
}
