package com.aliyun.openservices.ons.client.rocketmq.impl;

import com.aliyun.openservices.ons.api.OffsetStore;
import com.aliyun.openservices.ons.api.TopicPartition;
import com.aliyun.openservices.ons.shaded.com.google.common.base.Optional;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.utility.ExecutorServices;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.utility.ThreadFactoryImpl;
import com.aliyun.openservices.ons.shaded.org.slf4j.Logger;
import com.aliyun.openservices.ons.shaded.org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:com/aliyun/openservices/ons/client/rocketmq/impl/AbstractOffsetStore.class */
public abstract class AbstractOffsetStore implements OffsetStore {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) AbstractOffsetStore.class);
    private final long persistPeriodSeconds;
    private final ConcurrentMap<TopicPartition, Long> offsetTable = new ConcurrentHashMap();
    private final ScheduledExecutorService offsetPersistScheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("OffsetPersistScheduler"));

    public AbstractOffsetStore(long j) {
        this.persistPeriodSeconds = j;
    }

    @Override // com.aliyun.openservices.ons.api.OffsetStore
    public void start() {
        Map<TopicPartition, Long> loadOffset = loadOffset();
        if (null != loadOffset) {
            this.offsetTable.putAll(loadOffset);
        }
        this.offsetPersistScheduler.scheduleWithFixedDelay(new Runnable() { // from class: com.aliyun.openservices.ons.client.rocketmq.impl.AbstractOffsetStore.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    AbstractOffsetStore.this.persistOffset(AbstractOffsetStore.this.offsetTable);
                } catch (Throwable th) {
                    AbstractOffsetStore.log.error("Exception occurs while trying to persist offset", th);
                }
            }
        }, this.persistPeriodSeconds, this.persistPeriodSeconds, TimeUnit.SECONDS);
    }

    @Override // com.aliyun.openservices.ons.api.OffsetStore
    public void shutdown() {
        try {
            if (!ExecutorServices.awaitTerminated(this.offsetPersistScheduler)) {
                log.error("[Bug] Timeout to shutdown the offset persist scheduler.");
            }
        } catch (Throwable th) {
            log.error("Failed to shutdown the offset persist scheduler.", th);
        }
    }

    public abstract Map<TopicPartition, Long> loadOffset();

    public abstract void persistOffset(Map<TopicPartition, Long> map);

    @Override // com.aliyun.openservices.ons.api.OffsetStore
    public void updateOffset(TopicPartition topicPartition, long j) {
        this.offsetTable.put(topicPartition, Long.valueOf(j));
    }

    @Override // com.aliyun.openservices.ons.api.OffsetStore
    public Optional<Long> readOffset(TopicPartition topicPartition) {
        Long l = this.offsetTable.get(topicPartition);
        return null == l ? Optional.absent() : Optional.of(l);
    }
}
