1. 源碼解析 | Flink Timer(定時器)機制與實現(xiàn)

        共 1309字,需瀏覽 3分鐘

         ·

        2020-09-19 18:20

        點擊上方藍色字體,選擇“設為星標

        回復”資源“獲取更多資源

        大數(shù)據技術與架構
        點擊右側關注,大數(shù)據開發(fā)領域最強公眾號!

        暴走大數(shù)據
        點擊右側關注,暴走大數(shù)據!


        Timer簡介

        Timer(定時器)是Flink Streaming API提供的用于感知并利用處理時間/事件時間變化的機制。官網上上給出的描述如下:

        Timers are what make Flink streaming applications reactive and adaptable to processing and event time changes.

        對于普通用戶來說,最常見的顯式利用Timer的地方就是KeyedProcessFunction。我們在其processElement()方法中注冊Timer,然后覆寫其onTimer()方法作為Timer觸發(fā)時的回調邏輯。根據時間特征的不同:

        • 處理時間—調用Context.timerService().registerProcessingTimeTimer()注冊;onTimer()在系統(tǒng)時間戳達到Timer設定的時間戳時觸發(fā)。

        • 事件時間—調用Context.timerService().registerEventTimeTimer()注冊;onTimer()在Flink內部水印達到或超過Timer設定的時間戳時觸發(fā)。

        舉個栗子,按天實時統(tǒng)計指標并存儲在狀態(tài)中,每天0點清除狀態(tài)重新統(tǒng)計,就可以在processElement()方法里注冊Timer。

        ctx.timerService().registerProcessingTimeTimer(
        tomorrowZeroTimestampMs(System.currentTimeMillis(), 8) + 1
        );

        public static long tomorrowZeroTimestampMs(long now, int timeZone) {
        return now - (now + timeZone * 3600000) % 86400000 + 86400000;
        }

        再在onTimer()方法里執(zhí)行state.clear()。so easy。

        除了KeyedProcessFunction之外,Timer在窗口機制中也有重要的地位。提起窗口自然就能想到Trigger,即觸發(fā)器。來看下Flink自帶的EventTimeTrigger的部分代碼,它是事件時間特征下的默認觸發(fā)器。


            @Override
        public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
        return TriggerResult.FIRE;
        } else {
        ctx.registerEventTimeTimer(window.maxTimestamp());
        return TriggerResult.CONTINUE;
        }
        }

        @Override
        public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
        return time == window.maxTimestamp() ?
        TriggerResult.FIRE :
        TriggerResult.CONTINUE;
        }

        可見,當水印還沒有到達窗口右邊沿時,就注冊以窗口右邊沿為時間戳的Timer。Timer到期后觸發(fā)onEventTime()方法,進而觸發(fā)該窗口相關聯(lián)的Trigger。

        文章開頭引用的blog從用戶的角度給出了Flink Timer的4大特點,如下圖所示。

        經由上面的介紹,我們有了兩個入手點(KeyedProcessFunction、Trigger)來分析Timer的細節(jié)。接下來從前者入手,let's get our hands dirty。

        TimerService、InternalTimerService

        負責實際執(zhí)行KeyedProcessFunction的算子是KeyedProcessOperator,其中以內部類的形式實現(xiàn)了KeyedProcessFunction需要的上下文類Context,如下所示。

            private class ContextImpl extends KeyedProcessFunction<K, IN, OUT>.Context {
        private final TimerService timerService;
        private StreamRecord<IN> element;

        ContextImpl(KeyedProcessFunction<K, IN, OUT> function, TimerService timerService) {
        function.super();
        this.timerService = checkNotNull(timerService);
        }

        @Override
        public Long timestamp() {
        checkState(element != null);
        if (element.hasTimestamp()) {
        return element.getTimestamp();
        } else {
        return null;
        }
        }

        @Override
        public TimerService timerService() {
        return timerService;
        }

        // 以下略...
        }

        可見timerService()方法返回的是外部傳入的TimerService實例,那么我們就回到KeyedProcessOperator看一下它的實現(xiàn),順便放個類圖。

        public class KeyedProcessOperator<K, IN, OUT>
        extends AbstractUdfStreamOperator<OUT, KeyedProcessFunction<K, IN, OUT>>
        implements OneInputStreamOperator<IN, OUT>, Triggerable<K, VoidNamespace> {
        private static final long serialVersionUID = 1L;
        private transient TimestampedCollector<OUT> collector;
        private transient ContextImpl context;
        private transient OnTimerContextImpl onTimerContext;

        public KeyedProcessOperator(KeyedProcessFunction<K, IN, OUT> function) {
        super(function);
        chainingStrategy = ChainingStrategy.ALWAYS;
        }

        @Override
        public void open() throws Exception {
        super.open();
        collector = new TimestampedCollector<>(output);
        InternalTimerService<VoidNamespace> internalTimerService =
        getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
        TimerService timerService = new SimpleTimerService(internalTimerService);
        context = new ContextImpl(userFunction, timerService);
        onTimerContext = new OnTimerContextImpl(userFunction, timerService);
        }

        @Override
        public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
        collector.setAbsoluteTimestamp(timer.getTimestamp());
        invokeUserFunction(TimeDomain.EVENT_TIME, timer);
        }

        @Override
        public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
        collector.eraseTimestamp();
        invokeUserFunction(TimeDomain.PROCESSING_TIME, timer);
        }

        @Override
        public void processElement(StreamRecord<IN> element) throws Exception {
        collector.setTimestamp(element);
        context.element = element;
        userFunction.processElement(element.getValue(), context, collector);
        context.element = null;
        }

        private void invokeUserFunction(
        TimeDomain timeDomain,
        InternalTimer<K, VoidNamespace> timer) throws Exception {
        onTimerContext.timeDomain = timeDomain;
        onTimerContext.timer = timer;
        userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
        onTimerContext.timeDomain = null;
        onTimerContext.timer = null;
        }

        // 以下略...
        }

        通過閱讀上述代碼,可以總結出:

        • TimerService接口的實現(xiàn)類為SimpleTimerService,它實際上又是InternalTimerService的非常簡單的代理(真的很簡單,代碼略去)。

        • InternalTimerService的實例由getInternalTimerService()方法取得,該方法定義在所有算子的基類AbstractStreamOperator中。它比較重要,后面再提。

        • KeyedProcessOperator.processElement()方法調用用戶自定義函數(shù)的processElement()方法,順便將上下文實例ContextImpl傳了進去,所以用戶可以由它獲得TimerService來注冊Timer。

        • Timer在代碼中叫做InternalTimer(是個接口)。

        • 當Timer觸發(fā)時,實際上是根據時間特征調用onProcessingTime()/onEventTime()方法(這兩個方法來自Triggerable接口),進而觸發(fā)用戶函數(shù)的onTimer()回調邏輯。后面還會見到它們。

        接下來就看看InternalTimerService是如何取得的。


            /**
        * Returns a {@link InternalTimerService} that can be used to query current processing time
        * and event time and to set timers. An operator can have several timer services, where
        * each has its own namespace serializer. Timer services are differentiated by the string
        * key that is given when requesting them, if you call this method with the same key
        * multiple times you will get the same timer service instance in subsequent requests.
        *
        *

        Timers are always scoped to a key, the currently active key of a keyed stream operation.
        * When a timer fires, this key will also be set as the currently active key.
        *
        *

        Each timer has attached metadata, the namespace. Different timer services
        * can have a different namespace type. If you don't need namespace differentiation you
        * can use {@link VoidNamespaceSerializer} as the namespace serializer.
        *
        * @param name The name of the requested timer service. If no service exists under the given
        * name a new one will be created and returned.
        * @param namespaceSerializer {@code TypeSerializer} for the timer namespace.
        * @param triggerable The {@link Triggerable} that should be invoked when timers fire
        * @param The type of the timer namespace.
        */
        public <K, N> InternalTimerService<N> getInternalTimerService(
        String name,
        TypeSerializer<N> namespaceSerializer,
        Triggerable<K, N> triggerable) {
        checkTimerServiceInitialization();

        KeyedStateBackend<K> keyedStateBackend = getKeyedStateBackend();
        TypeSerializer<K> keySerializer = keyedStateBackend.getKeySerializer();
        InternalTimeServiceManager<K> keyedTimeServiceHandler = (InternalTimeServiceManager<K>) timeServiceManager;
        TimerSerializer<K, N> timerSerializer = new TimerSerializer<>(keySerializer, namespaceSerializer);
        return keyedTimeServiceHandler.getInternalTimerService(name, timerSerializer, triggerable);
        }

        該方法的注釋描述非常清楚,所以一起粘貼過來。簡單來講:

        • 每個算子可以有一個或多個InternalTimerService。

        • InternalTimerService的四要素是:名稱、命名空間類型N(及其序列化器)、鍵類型K(及其序列化器),還有上文所述Triggerable接口的實現(xiàn)。

        • InternalTimerService經由InternalTimeServiceManager.getInternalTimerService()方法取得。

        例如,上文KeyedProcessOperator初始化的InternalTimerService,名稱為"user-timers",命名空間類型為空(VoidNamespace),Triggerable實現(xiàn)類則是其本身。如果是WindowOperator的話,其InternalTimerService的名稱就是"window-timers",命名空間類型則是Window。

        InternalTimerService在代碼中仍然是一個接口,其代碼如下。方法的簽名除了多了命名空間之外(命名空間對用戶透明),其他都與TimerService提供的相同。

        public interface InternalTimerService<N> {
        long currentProcessingTime();
        long currentWatermark();

        void registerProcessingTimeTimer(N namespace, long time);
        void deleteProcessingTimeTimer(N namespace, long time);

        void registerEventTimeTimer(N namespace, long time);
        void deleteEventTimeTimer(N namespace, long time);

        // ...
        }

        下面更進一步,看看InternalTimeServiceManager是如何實現(xiàn)的。

        InternalTimeServiceManager、TimerHeapInternalTimer

        顧名思義,InternalTimeServiceManager用于管理各個InternalTimeService。部分代碼如下:

        public class InternalTimeServiceManager<K> {
        @VisibleForTesting
        static final String TIMER_STATE_PREFIX = "_timer_state";
        @VisibleForTesting
        static final String PROCESSING_TIMER_PREFIX = TIMER_STATE_PREFIX + "/processing_";
        @VisibleForTesting
        static final String EVENT_TIMER_PREFIX = TIMER_STATE_PREFIX + "/event_";

        private final KeyGroupRange localKeyGroupRange;
        private final KeyContext keyContext;
        private final PriorityQueueSetFactory priorityQueueSetFactory;
        private final ProcessingTimeService processingTimeService;
        private final Map<String, InternalTimerServiceImpl<K, ?>> timerServices;
        private final boolean useLegacySynchronousSnapshots;

        @SuppressWarnings("unchecked")
        public <N> InternalTimerService<N> getInternalTimerService(
        String name,
        TimerSerializer<K, N> timerSerializer,
        Triggerable<K, N> triggerable) {
        InternalTimerServiceImpl<K, N> timerService = registerOrGetTimerService(name, timerSerializer);
        timerService.startTimerService(
        timerSerializer.getKeySerializer(),
        timerSerializer.getNamespaceSerializer(),
        triggerable);
        return timerService;
        }

        @SuppressWarnings("unchecked")
        <N> InternalTimerServiceImpl<K, N> registerOrGetTimerService(String name, TimerSerializer<K, N> timerSerializer) {
        InternalTimerServiceImpl<K, N> timerService = (InternalTimerServiceImpl<K, N>) timerServices.get(name);
        if (timerService == null) {
        timerService = new InternalTimerServiceImpl<>(
        localKeyGroupRange,
        keyContext,
        processingTimeService,
        createTimerPriorityQueue(PROCESSING_TIMER_PREFIX + name, timerSerializer),
        createTimerPriorityQueue(EVENT_TIMER_PREFIX + name, timerSerializer));
        timerServices.put(name, timerService);
        }
        return timerService;
        }

        private <N> KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> createTimerPriorityQueue(
        String name,
        TimerSerializer<K, N> timerSerializer) {
        return priorityQueueSetFactory.create(
        name,
        timerSerializer);
        }

        // 以下略...
        }

        從上面的代碼可以得知:

        • Flink中InternalTimerService的最終實現(xiàn)實際上是InternalTimerServiceImpl類,而InternalTimer的最終實現(xiàn)是TimerHeapInternalTimer類。

        • InternalTimeServiceManager會用HashMap維護一個特定鍵類型K下所有InternalTimerService的名稱與實例映射。如果名稱已經存在,就會直接返回,不會重新創(chuàng)建。

        • 初始化InternalTimerServiceImpl時,會同時創(chuàng)建兩個包含TimerHeapInternalTimer的優(yōu)先隊列(該優(yōu)先隊列是Flink自己實現(xiàn)的),分別用于維護事件時間和處理時間的Timer。

        說了這么多,最需要注意的是,Timer是維護在JVM堆內存中的,如果頻繁注冊大量Timer,或者同時觸發(fā)大量Timer,也是一筆不小的開銷。

        TimerHeapInternalTimer的實現(xiàn)比較簡單,主要就是4個字段和1個方法。為了少打點字,把注釋也弄過來。

            /**
        * The key for which the timer is scoped.
        */

        @Nonnull
        private final K key;
        /**
        * The namespace for which the timer is scoped.
        */

        @Nonnull
        private final N namespace;
        /**
        * The expiration timestamp.
        */

        private final long timestamp;
        /**
        * This field holds the current physical index of this timer when it is managed by a timer heap so that we can
        * support fast deletes.
        */

        private transient int timerHeapIndex;

        @Override
        public int comparePriorityTo(@Nonnull InternalTimer<?, ?> other) {
        return Long.compare(timestamp, other.getTimestamp());
        }
        }

        可見,Timer的scope有兩個,一是數(shù)據的key,二是命名空間。但是用戶不會感知到命名空間的存在,所以我們可以簡單地認為Timer是以key級別注冊的(Timer四大特點之1)。正確估計key的量可以幫助我們控制Timer的量。

        timerHeapIndex是這個Timer在優(yōu)先隊列里存儲的下標。優(yōu)先隊列通常用二叉堆實現(xiàn),而二叉堆可以直接用數(shù)組存儲,所以讓Timer持有其對應的下標可以較快地從隊列里刪除它。

        comparePriorityTo()方法則用于確定Timer的優(yōu)先級,顯然Timer的優(yōu)先隊列是一個按Timer時間戳為關鍵字排序的最小堆。下面粗略看看該最小堆的實現(xiàn)。

        HeapPriorityQueueSet

        上面代碼中PriorityQueueSetFactory.create()方法創(chuàng)建的優(yōu)先隊列實際上的類型是HeapPriorityQueueSet。它的基本思路與Java自帶的PriorityQueue相同,但是在其基礎上加入了按key去重的邏輯(Timer四大特點之2)。不妨列出它的部分代碼。

            private final HashMap<T, T>[] deduplicationMapsByKeyGroup;
        private final KeyGroupRange keyGroupRange;

        @Override
        @Nullable
        public T poll() {
        final T toRemove = super.poll();
        return toRemove != null ? getDedupMapForElement(toRemove).remove(toRemove) : null;
        }

        @Override
        public boolean add(@Nonnull T element) {
        return getDedupMapForElement(element).putIfAbsent(element, element) == null && super.add(element);
        }

        @Override
        public boolean remove(@Nonnull T toRemove) {
        T storedElement = getDedupMapForElement(toRemove).remove(toRemove);
        return storedElement != null && super.remove(storedElement);
        }

        private HashMap<T, T> getDedupMapForKeyGroup(
        @Nonnegative int keyGroupId) {
        return deduplicationMapsByKeyGroup[globalKeyGroupToLocalIndex(keyGroupId)];
        }

        private HashMap<T, T> getDedupMapForElement(T element) {
        int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(
        keyExtractor.extractKeyFromElement(element),
        totalNumberOfKeyGroups);
        return getDedupMapForKeyGroup(keyGroup);
        }

        private int globalKeyGroupToLocalIndex(int keyGroup) {
        checkArgument(keyGroupRange.contains(keyGroup), "%s does not contain key group %s", keyGroupRange, keyGroup);
        return keyGroup - keyGroupRange.getStartKeyGroup();
        }

        要搞懂它,必須解釋一下KeyGroup和KeyGroupRange。KeyGroup是Flink內部KeyedState的原子單位,亦即一些key的組合。一個Flink App的KeyGroup數(shù)量與最大并行度相同,將key分配到KeyGroup的操作則是經典的取hashCode+取模。而KeyGroupRange則是一些連續(xù)KeyGroup的范圍,每個Flink sub-task都只包含一個KeyGroupRange。也就是說,KeyGroupRange可以看做當前sub-task在本地維護的所有key。

        解釋完畢。容易得知,上述代碼中的那個HashMap[]數(shù)組就是在KeyGroup級別對key進行去重的容器,數(shù)組中每個元素對應一個KeyGroup。以插入一個Timer的流程為例:

        • 從Timer中取出key,計算該key屬于哪一個KeyGroup;

        • 計算出該KeyGroup在整個KeyGroupRange中的偏移量,按該偏移量定位到HashMap[]數(shù)組的下標;

        • 根據putIfAbsent()方法的語義,只有當對應HashMap不存在該Timer的key時,才將Timer插入最小堆中。

        接下來回到主流程,InternalTimerServiceImpl。


        InternalTimerServiceImpl

        在這里,我們終于可以看到注冊和移除Timer方法的最底層實現(xiàn)了。注意ProcessingTimeService是Flink內部產生處理時間的時間戳的服務。

            private final ProcessingTimeService processingTimeService;
        private final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> processingTimeTimersQueue;
        private final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue;
        private ScheduledFuture<?> nextTimer;

        @Override
        public void registerProcessingTimeTimer(N namespace, long time) {
        InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
        if (processingTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) {
        long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;
        if (time < nextTriggerTime) {
        if (nextTimer != null) {
        nextTimer.cancel(false);
        }
        nextTimer = processingTimeService.registerTimer(time, this);
        }
        }
        }

        @Override
        public void registerEventTimeTimer(N namespace, long time) {
        eventTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
        }

        @Override
        public void deleteProcessingTimeTimer(N namespace, long time) {
        processingTimeTimersQueue.remove(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
        }

        @Override
        public void deleteEventTimeTimer(N namespace, long time) {
        eventTimeTimersQueue.remove(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
        }

        由此可見,注冊Timer實際上就是為它們賦予對應的時間戳、key和命名空間,并將它們加入對應的優(yōu)先隊列。特別地,當注冊基于處理時間的Timer時,會先檢查要注冊的Timer時間戳與當前在最小堆堆頂?shù)腡imer的時間戳的大小關系。如果前者比后者要早,就會用前者替代掉后者,因為處理時間是永遠線性增長的。

        Timer注冊好了之后是如何觸發(fā)的呢?先來看處理時間的情況。

        InternalTimerServiceImpl類繼承了ProcessingTimeCallback接口,表示它可以觸發(fā)處理時間的回調。該接口只要求實現(xiàn)一個方法,如下。

            @Override
        private Triggerable<K, N> triggerTarget;

        public void onProcessingTime(long time) throws Exception {
        nextTimer = null;
        InternalTimer<K, N> timer;

        while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
        processingTimeTimersQueue.poll();
        keyContext.setCurrentKey(timer.getKey());
        triggerTarget.onProcessingTime(timer);
        }

        if (timer != null && nextTimer == null) {
        nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this);
        }
        }

        可見,當onProcessingTime()方法被觸發(fā)回調時,就會按順序從隊列中獲取到比時間戳time小的所有Timer,并挨個執(zhí)行Triggerable.onProcessingTime()方法,也就是在上文KeyedProcessOperator的同名方法,用戶自定義的onTimer()邏輯也就被執(zhí)行了。

        最后來到ProcessingTimeService的實現(xiàn)類SystemProcessingTimeService,它是用調度線程池實現(xiàn)回調的。相關的代碼如下。

            private final ScheduledThreadPoolExecutor timerService;

        @Override
        public ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target) {
        long delay = Math.max(timestamp - getCurrentProcessingTime(), 0) + 1;

        try {
        return timerService.schedule(
        new TriggerTask(status, task, checkpointLock, target, timestamp), delay, TimeUnit.MILLISECONDS);
        } catch (RejectedExecutionException e) {
        final int status = this.status.get();
        if (status == STATUS_QUIESCED) {
        return new NeverCompleteFuture(delay);
        } else if (status == STATUS_SHUTDOWN) {
        throw new IllegalStateException("Timer service is shut down");
        } else {
        throw e;
        }
        }
        }

        // 注意:這個是TriggerTask線程的run()方法
        @Override
        public void run() {
        synchronized (lock) {
        try {
        if (serviceStatus.get() == STATUS_ALIVE) {
        target.onProcessingTime(timestamp);
        }
        } catch (Throwable t) {
        TimerException asyncException = new TimerException(t);
        exceptionHandler.handleAsyncException("Caught exception while processing timer.", asyncException);
        }
        }
        }

        可見,onProcessingTime()在TriggerTask線程中被回調,而TriggerTask線程按照Timer的時間戳來調度。到這里,處理時間Timer的情況就講述完畢了。

        再來看事件時間的情況。事件時間與內部時間戳無關,而與水印有關。以下是InternalTimerServiceImpl.advanceWatermark()方法的代碼。

            public void advanceWatermark(long time) throws Exception {
        currentWatermark = time;
        InternalTimer<K, N> timer;

        while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
        eventTimeTimersQueue.poll();
        keyContext.setCurrentKey(timer.getKey());
        triggerTarget.onEventTime(timer);
        }
        }

        該邏輯與處理時間相似,只不過從回調onProcessingTime()變成了回調onEventTime()而已。然后追蹤它的調用鏈,回到InternalTimeServiceManager的同名方法。

            public void advanceWatermark(Watermark watermark) throws Exception {
        for (InternalTimerServiceImpl<?, ?> service : timerServices.values()) {
        service.advanceWatermark(watermark.getTimestamp());
        }
        }

        繼續(xù)向上追溯,到達終點:算子基類AbstractStreamOperator中處理水印的方法processWatermark()。當水印到來時,就會按著上述調用鏈流轉到InternalTimerServiceImpl中,并觸發(fā)所有早于水印時間戳的Timer了。

            public void processWatermark(Watermark mark) throws Exception {
        if (timeServiceManager != null) {
        timeServiceManager.advanceWatermark(mark);
        }
        output.emitWatermark(mark);
        }

        至此,我們算是基本打通了Flink Timer機制的實現(xiàn)細節(jié)。

        版權聲明:

        本文為大數(shù)據技術與架構整理,原作者獨家授權。未經原作者允許轉載追究侵權責任。
        編輯|冷眼丶
        微信公眾號|import_bigdata


        歡迎點贊+收藏+轉發(fā)朋友圈素質三連


        文章不錯?點個【在看】吧!??

        瀏覽 124
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
          
          

            1. 日本骚少妇 | AV综合 | 女同一区二区三区 | 日韩人妻在线观看 | 无码成人网 |