From 74621170fe7431e5011f040d23fed7660a10e473 Mon Sep 17 00:00:00 2001 From: ByteHamster Date: Wed, 30 Oct 2019 11:10:44 +0100 Subject: Converted our own FeedSyncThread to a task+executor --- .../core/service/download/DownloadService.java | 56 +++-- .../service/download/handler/FeedParserTask.java | 2 +- .../service/download/handler/FeedSyncTask.java | 57 +++++ .../service/download/handler/FeedSyncThread.java | 234 --------------------- 4 files changed, 82 insertions(+), 267 deletions(-) create mode 100644 core/src/main/java/de/danoeh/antennapod/core/service/download/handler/FeedSyncTask.java delete mode 100644 core/src/main/java/de/danoeh/antennapod/core/service/download/handler/FeedSyncThread.java (limited to 'core/src/main') diff --git a/core/src/main/java/de/danoeh/antennapod/core/service/download/DownloadService.java b/core/src/main/java/de/danoeh/antennapod/core/service/download/DownloadService.java index 324e3e7b8..1d8f2bad0 100644 --- a/core/src/main/java/de/danoeh/antennapod/core/service/download/DownloadService.java +++ b/core/src/main/java/de/danoeh/antennapod/core/service/download/DownloadService.java @@ -8,7 +8,6 @@ import android.content.Context; import android.content.Intent; import android.content.IntentFilter; import android.os.Binder; -import android.os.Build; import android.os.Handler; import android.os.IBinder; import android.text.TextUtils; @@ -17,9 +16,7 @@ import android.webkit.URLUtil; import androidx.annotation.NonNull; import androidx.annotation.Nullable; import androidx.annotation.VisibleForTesting; -import androidx.core.app.NotificationCompat; import de.danoeh.antennapod.core.ClientConfig; -import de.danoeh.antennapod.core.R; import de.danoeh.antennapod.core.event.DownloadEvent; import de.danoeh.antennapod.core.event.FeedItemEvent; import de.danoeh.antennapod.core.feed.Feed; @@ -29,17 +26,13 @@ import de.danoeh.antennapod.core.preferences.GpodnetPreferences; import de.danoeh.antennapod.core.preferences.UserPreferences; import de.danoeh.antennapod.core.service.GpodnetSyncService; import de.danoeh.antennapod.core.service.download.handler.FailedDownloadHandler; -import de.danoeh.antennapod.core.service.download.handler.FeedSyncThread; +import de.danoeh.antennapod.core.service.download.handler.FeedSyncTask; import de.danoeh.antennapod.core.service.download.handler.MediaDownloadedHandler; import de.danoeh.antennapod.core.storage.DBReader; import de.danoeh.antennapod.core.storage.DBTasks; import de.danoeh.antennapod.core.storage.DBWriter; import de.danoeh.antennapod.core.storage.DownloadRequester; import de.danoeh.antennapod.core.util.DownloadError; -import de.danoeh.antennapod.core.util.gui.NotificationUtils; -import org.apache.commons.io.FileUtils; -import org.greenrobot.eventbus.EventBus; - import java.io.File; import java.io.IOException; import java.net.HttpURLConnection; @@ -55,6 +48,8 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.io.FileUtils; +import org.greenrobot.eventbus.EventBus; /** * Manages the download of feedfiles in the app. Downloads can be enqueued via the startService intent. @@ -94,7 +89,6 @@ public class DownloadService extends Service { private ExecutorService syncExecutor; private CompletionService downloadExecutor; - private FeedSyncThread feedSyncThread; private DownloadRequester requester; @@ -145,18 +139,36 @@ public class DownloadService extends Service { Log.d(TAG, "Received 'Download Complete' - message."); removeDownload(downloader); DownloadStatus status = downloader.getResult(); + DownloadRequest request = downloader.getDownloadRequest(); boolean successful = status.isSuccessful(); - final int type = status.getFeedfileType(); + if (successful) { if (type == Feed.FEEDFILETYPE_FEED) { Log.d(TAG, "Handling completed Feed Download"); - feedSyncThread.submitCompletedDownload(downloader.getDownloadRequest()); + syncExecutor.execute(() -> { + FeedSyncTask task = new FeedSyncTask(DownloadService.this, request); + boolean success = task.run(); + + if (success) { + // we create a 'successful' download log if the feed's last refresh failed + List log = DBReader.getFeedDownloadLog(request.getFeedfileId()); + if (log.size() > 0 && !log.get(0).isSuccessful()) { + saveDownloadStatus(task.getDownloadStatus()); + } + } else { + saveDownloadStatus(task.getDownloadStatus()); + } + numberOfDownloads.decrementAndGet(); + queryDownloadsAsync(); + + }); + } else if (type == FeedMedia.FEEDFILETYPE_FEEDMEDIA) { Log.d(TAG, "Handling completed FeedMedia Download"); syncExecutor.execute(() -> { MediaDownloadedHandler handler = new MediaDownloadedHandler(DownloadService.this, - status, downloader.getDownloadRequest()); + status, request); handler.run(); saveDownloadStatus(handler.getUpdatedStatus()); numberOfDownloads.decrementAndGet(); @@ -267,25 +279,6 @@ public class DownloadService extends Service { }, (r, executor) -> Log.w(TAG, "SchedEx rejected submission of new task") ); downloadCompletionThread.start(); - feedSyncThread = new FeedSyncThread(DownloadService.this, new FeedSyncThread.FeedSyncCallback() { - @Override - public void finishedSyncingFeeds(int numberOfCompletedFeeds) { - numberOfDownloads.addAndGet(-numberOfCompletedFeeds); - queryDownloadsAsync(); - } - - @Override - public void failedSyncingFeed() { - numberOfDownloads.decrementAndGet(); - } - - @Override - public void downloadStatusGenerated(DownloadStatus downloadStatus) { - saveDownloadStatus(downloadStatus); - } - }); - feedSyncThread.start(); - requester = DownloadRequester.getInstance(); notificationManager = new DownloadServiceNotification(this); Notification notification = notificationManager.updateNotifications( @@ -319,7 +312,6 @@ public class DownloadService extends Service { downloadCompletionThread.interrupt(); syncExecutor.shutdown(); schedExecutor.shutdown(); - feedSyncThread.shutdown(); cancelNotificationUpdater(); unregisterReceiver(cancelDownloadReceiver); diff --git a/core/src/main/java/de/danoeh/antennapod/core/service/download/handler/FeedParserTask.java b/core/src/main/java/de/danoeh/antennapod/core/service/download/handler/FeedParserTask.java index 96119ad47..10d5bfa15 100644 --- a/core/src/main/java/de/danoeh/antennapod/core/service/download/handler/FeedParserTask.java +++ b/core/src/main/java/de/danoeh/antennapod/core/service/download/handler/FeedParserTask.java @@ -31,7 +31,7 @@ public class FeedParserTask implements Callable { } @Override - public FeedHandlerResult call() throws Exception { + public FeedHandlerResult call() { Feed feed = new Feed(request.getSource(), request.getLastModified()); feed.setFile_url(request.getDestination()); feed.setId(request.getFeedfileId()); diff --git a/core/src/main/java/de/danoeh/antennapod/core/service/download/handler/FeedSyncTask.java b/core/src/main/java/de/danoeh/antennapod/core/service/download/handler/FeedSyncTask.java new file mode 100644 index 000000000..718faaa0a --- /dev/null +++ b/core/src/main/java/de/danoeh/antennapod/core/service/download/handler/FeedSyncTask.java @@ -0,0 +1,57 @@ +package de.danoeh.antennapod.core.service.download.handler; + +import android.content.Context; +import android.util.Log; +import de.danoeh.antennapod.core.ClientConfig; +import de.danoeh.antennapod.core.feed.Feed; +import de.danoeh.antennapod.core.service.download.DownloadRequest; +import de.danoeh.antennapod.core.service.download.DownloadStatus; +import de.danoeh.antennapod.core.storage.DBReader; +import de.danoeh.antennapod.core.storage.DBTasks; +import de.danoeh.antennapod.core.storage.DownloadRequestException; +import de.danoeh.antennapod.core.storage.DownloadRequester; +import de.danoeh.antennapod.core.syndication.handler.FeedHandlerResult; +import java.util.List; + +public class FeedSyncTask { + private static final String TAG = "FeedParserTask"; + private final DownloadRequest request; + private final Context context; + private DownloadStatus downloadStatus; + + public FeedSyncTask(Context context, DownloadRequest request) { + this.request = request; + this.context = context; + } + + public boolean run() { + FeedParserTask task = new FeedParserTask(request); + FeedHandlerResult result = task.call(); + downloadStatus = task.getDownloadStatus(); + + if (!task.isSuccessful()) { + return false; + } + + Feed[] savedFeeds = DBTasks.updateFeed(context, result.feed); + Feed savedFeed = savedFeeds[0]; + // If loadAllPages=true, check if another page is available and queue it for download + final boolean loadAllPages = request.getArguments().getBoolean(DownloadRequester.REQUEST_ARG_LOAD_ALL_PAGES); + final Feed feed = result.feed; + if (loadAllPages && feed.getNextPageLink() != null) { + try { + feed.setId(savedFeed.getId()); + DBTasks.loadNextPageOfFeed(context, savedFeed, true); + } catch (DownloadRequestException e) { + Log.e(TAG, "Error trying to load next page", e); + } + } + + ClientConfig.downloadServiceCallbacks.onFeedParsed(context, savedFeed); + return true; + } + + public DownloadStatus getDownloadStatus() { + return downloadStatus; + } +} diff --git a/core/src/main/java/de/danoeh/antennapod/core/service/download/handler/FeedSyncThread.java b/core/src/main/java/de/danoeh/antennapod/core/service/download/handler/FeedSyncThread.java deleted file mode 100644 index 21cab4ef6..000000000 --- a/core/src/main/java/de/danoeh/antennapod/core/service/download/handler/FeedSyncThread.java +++ /dev/null @@ -1,234 +0,0 @@ -package de.danoeh.antennapod.core.service.download.handler; - -import android.content.Context; -import android.util.Log; -import android.util.Pair; -import de.danoeh.antennapod.core.ClientConfig; -import de.danoeh.antennapod.core.feed.Feed; -import de.danoeh.antennapod.core.service.download.DownloadRequest; -import de.danoeh.antennapod.core.service.download.DownloadStatus; -import de.danoeh.antennapod.core.storage.DBReader; -import de.danoeh.antennapod.core.storage.DBTasks; -import de.danoeh.antennapod.core.storage.DownloadRequestException; -import de.danoeh.antennapod.core.storage.DownloadRequester; -import de.danoeh.antennapod.core.syndication.handler.FeedHandlerResult; - -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CompletionService; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingDeque; - -/** - * Takes a single Feed, parses the corresponding file and refreshes - * information in the manager. - */ -public class FeedSyncThread extends Thread { - private static final String TAG = "FeedSyncThread"; - private static final long WAIT_TIMEOUT = 3000; - - private final BlockingQueue completedRequests = new LinkedBlockingDeque<>(); - private final CompletionService> parserService = - new ExecutorCompletionService<>(Executors.newSingleThreadExecutor()); - private final ExecutorService dbService = Executors.newSingleThreadExecutor(); - private final Context context; - private Future dbUpdateFuture; - private final FeedSyncCallback feedSyncCallback; - private volatile boolean isActive = true; - private volatile boolean isCollectingRequests = false; - - public FeedSyncThread(Context context, FeedSyncCallback feedSyncCallback) { - super("FeedSyncThread"); - this.context = context; - this.feedSyncCallback = feedSyncCallback; - } - - /** - * Waits for completed requests. Once the first request has been taken, the method will wait WAIT_TIMEOUT ms longer to - * collect more completed requests. - * - * @return Collected feeds or null if the method has been interrupted during the first waiting period. - */ - private List> collectCompletedRequests() { - List> results = new LinkedList<>(); - DownloadRequester requester = DownloadRequester.getInstance(); - int tasks = 0; - - try { - DownloadRequest request = completedRequests.take(); - submitParseRequest(request); - tasks++; - } catch (InterruptedException e) { - Log.e(TAG, "FeedSyncThread was interrupted"); - return null; - } - - tasks += pollCompletedDownloads(); - - isCollectingRequests = true; - - if (requester.isDownloadingFeeds()) { - // wait for completion of more downloads - long startTime = System.currentTimeMillis(); - long currentTime = startTime; - while (requester.isDownloadingFeeds() && (currentTime - startTime) < WAIT_TIMEOUT) { - try { - Log.d(TAG, "Waiting for " + (startTime + WAIT_TIMEOUT - currentTime) + " ms"); - sleep(startTime + WAIT_TIMEOUT - currentTime); - } catch (InterruptedException e) { - Log.d(TAG, "interrupted while waiting for more downloads"); - tasks += pollCompletedDownloads(); - } finally { - currentTime = System.currentTimeMillis(); - } - } - - tasks += pollCompletedDownloads(); - - } - - isCollectingRequests = false; - - for (int i = 0; i < tasks; i++) { - try { - Pair result = parserService.take().get(); - if (result != null) { - results.add(result); - } - } catch (InterruptedException e) { - Log.e(TAG, "FeedSyncThread was interrupted"); - } catch (ExecutionException e) { - Log.e(TAG, "ExecutionException in FeedSyncThread: " + e.getMessage()); - e.printStackTrace(); - } - } - - return results; - } - - private void submitParseRequest(DownloadRequest request) { - parserService.submit(() -> { - FeedParserTask task = new FeedParserTask(request); - FeedHandlerResult result = task.call(); - - if (task.isSuccessful()) { - // we create a 'successful' download log if the feed's last refresh failed - List log = DBReader.getFeedDownloadLog(request.getFeedfileId()); - if (log.size() > 0 && !log.get(0).isSuccessful()) { - feedSyncCallback.downloadStatusGenerated(task.getDownloadStatus()); - } - return Pair.create(request, result); - } else { - feedSyncCallback.failedSyncingFeed(); - feedSyncCallback.downloadStatusGenerated(task.getDownloadStatus()); - return null; - } - }); - } - - private int pollCompletedDownloads() { - int tasks = 0; - while (!completedRequests.isEmpty()) { - DownloadRequest request = completedRequests.poll(); - submitParseRequest(request); - tasks++; - } - return tasks; - } - - @Override - public void run() { - while (isActive) { - final List> results = collectCompletedRequests(); - - if (results == null) { - continue; - } - - Log.d(TAG, "Bundling " + results.size() + " feeds"); - - // Save information of feed in DB - if (dbUpdateFuture != null) { - try { - dbUpdateFuture.get(); - } catch (InterruptedException e) { - Log.e(TAG, "FeedSyncThread was interrupted"); - } catch (ExecutionException e) { - Log.e(TAG, "ExecutionException in FeedSyncThread: " + e.getMessage()); - e.printStackTrace(); - } - } - - dbUpdateFuture = dbService.submit(() -> { - Feed[] savedFeeds = DBTasks.updateFeed(context, getFeeds(results)); - - for (int i = 0; i < savedFeeds.length; i++) { - Feed savedFeed = savedFeeds[i]; - - // If loadAllPages=true, check if another page is available and queue it for download - final boolean loadAllPages = results.get(i).first.getArguments() - .getBoolean(DownloadRequester.REQUEST_ARG_LOAD_ALL_PAGES); - final Feed feed = results.get(i).second.feed; - if (loadAllPages && feed.getNextPageLink() != null) { - try { - feed.setId(savedFeed.getId()); - DBTasks.loadNextPageOfFeed(context, savedFeed, true); - } catch (DownloadRequestException e) { - Log.e(TAG, "Error trying to load next page", e); - } - } - - ClientConfig.downloadServiceCallbacks.onFeedParsed(context, savedFeed); - } - feedSyncCallback.finishedSyncingFeeds(savedFeeds.length); - }); - - } - - if (dbUpdateFuture != null) { - try { - dbUpdateFuture.get(); - } catch (InterruptedException e) { - Log.e(TAG, "interrupted while updating the db"); - } catch (ExecutionException e) { - Log.e(TAG, "ExecutionException while updating the db: " + e.getMessage()); - } - } - - Log.d(TAG, "Shutting down"); - } - - private Feed[] getFeeds(List> results) { - Feed[] feeds = new Feed[results.size()]; - for (int i = 0; i < results.size(); i++) { - feeds[i] = results.get(i).second.feed; - } - return feeds; - } - - public void shutdown() { - isActive = false; - if (isCollectingRequests) { - interrupt(); - } - } - - public void submitCompletedDownload(DownloadRequest request) { - completedRequests.offer(request); - if (isCollectingRequests) { - interrupt(); - } - } - - public interface FeedSyncCallback { - void finishedSyncingFeeds(int numberOfCompletedFeeds); - void failedSyncingFeed(); - void downloadStatusGenerated(DownloadStatus downloadStatus); - } - -} \ No newline at end of file -- cgit v1.2.3