summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPeter Maydell <peter.maydell@linaro.org>2018-09-25 16:47:35 +0100
committerPeter Maydell <peter.maydell@linaro.org>2018-09-25 16:47:35 +0100
commitc5e4e49258e9b89cb34c085a419dd9f862935c48 (patch)
treeaa7547e348af2bf5070321c329e2dbe3daf6290a
parent0a736f7ab83df26dbe5421bb3d1f7892def05dee (diff)
parent9c76ff9c16be890e70fce30754b096ff9950d1ee (diff)
downloadqemu-c5e4e49258e9b89cb34c085a419dd9f862935c48.zip
Merge remote-tracking branch 'remotes/xanclic/tags/pull-block-2018-09-25' into staging
Block layer patches: - Drain fixes - node-name parameters for block-commit - Refactor block jobs to use transactional callbacks for exiting # gpg: Signature made Tue 25 Sep 2018 16:12:44 BST # gpg: using RSA key F407DB0061D5CF40 # gpg: Good signature from "Max Reitz <mreitz@redhat.com>" # Primary key fingerprint: 91BE B60A 30DB 3E88 57D1 1829 F407 DB00 61D5 CF40 * remotes/xanclic/tags/pull-block-2018-09-25: (42 commits) test-bdrv-drain: Test draining job source child and parent block: Use a single global AioWait test-bdrv-drain: Fix outdated comments test-bdrv-drain: AIO_WAIT_WHILE() in job .commit/.abort job: Avoid deadlocks in job_completed_txn_abort() test-bdrv-drain: Test nested poll in bdrv_drain_poll_top_level() block: Remove aio_poll() in bdrv_drain_poll variants blockjob: Lie better in child_job_drained_poll() block-backend: Decrease in_flight only after callback block-backend: Fix potential double blk_delete() block-backend: Add .drained_poll callback block: Add missing locking in bdrv_co_drain_bh_cb() test-bdrv-drain: Test AIO_WAIT_WHILE() in completion callback job: Use AIO_WAIT_WHILE() in job_finish_sync() test-blockjob: Acquire AioContext around job_cancel_sync() test-bdrv-drain: Drain with block jobs in an I/O thread aio-wait: Increase num_waiters even in home thread blockjob: Wake up BDS when job becomes idle job: Fix missing locking due to mismerge job: Fix nested aio_poll() hanging in job_txn_apply ... Signed-off-by: Peter Maydell <peter.maydell@linaro.org>
-rw-r--r--block.c6
-rw-r--r--block/block-backend.c31
-rw-r--r--block/commit.c97
-rw-r--r--block/io.c30
-rw-r--r--block/linux-aio.c2
-rw-r--r--block/mirror.c49
-rw-r--r--block/stream.c28
-rw-r--r--blockdev.c84
-rw-r--r--blockjob.c9
-rw-r--r--hmp.c5
-rw-r--r--include/block/aio-wait.h28
-rw-r--r--include/block/block.h6
-rw-r--r--include/block/block_int.h18
-rw-r--r--include/block/blockjob.h3
-rw-r--r--include/qemu/coroutine.h5
-rw-r--r--include/qemu/job.h23
-rw-r--r--job.c144
-rw-r--r--qapi/block-core.json104
-rwxr-xr-xtests/qemu-iotests/04052
-rw-r--r--tests/qemu-iotests/040.out4
-rwxr-xr-xtests/qemu-iotests/0513
-rw-r--r--tests/qemu-iotests/051.out3
-rw-r--r--tests/qemu-iotests/051.pc.out3
-rw-r--r--tests/test-bdrv-drain.c294
-rw-r--r--tests/test-blockjob-txn.c4
-rw-r--r--tests/test-blockjob.c120
-rw-r--r--util/aio-wait.c11
-rw-r--r--util/async.c2
-rw-r--r--util/qemu-coroutine.c5
29 files changed, 856 insertions, 317 deletions
diff --git a/block.c b/block.c
index 0dbb1fcc7b..c298ca6a19 100644
--- a/block.c
+++ b/block.c
@@ -2792,6 +2792,7 @@ static BlockDriverState *bdrv_open_inherit(const char *filename,
bdrv_parent_cb_change_media(bs, true);
qobject_unref(options);
+ options = NULL;
/* For snapshot=on, create a temporary qcow2 overlay. bs points to the
* temporary snapshot afterwards. */
@@ -4885,11 +4886,6 @@ AioContext *bdrv_get_aio_context(BlockDriverState *bs)
return bs ? bs->aio_context : qemu_get_aio_context();
}
-AioWait *bdrv_get_aio_wait(BlockDriverState *bs)
-{
- return bs ? &bs->wait : NULL;
-}
-
void bdrv_coroutine_enter(BlockDriverState *bs, Coroutine *co)
{
aio_co_enter(bdrv_get_aio_context(bs), co);
diff --git a/block/block-backend.c b/block/block-backend.c
index 14a1b7ac6a..7b1ec5071b 100644
--- a/block/block-backend.c
+++ b/block/block-backend.c
@@ -88,7 +88,6 @@ struct BlockBackend {
* Accessed with atomic ops.
*/
unsigned int in_flight;
- AioWait wait;
};
typedef struct BlockBackendAIOCB {
@@ -121,6 +120,7 @@ static void blk_root_inherit_options(int *child_flags, QDict *child_options,
abort();
}
static void blk_root_drained_begin(BdrvChild *child);
+static bool blk_root_drained_poll(BdrvChild *child);
static void blk_root_drained_end(BdrvChild *child);
static void blk_root_change_media(BdrvChild *child, bool load);
@@ -294,6 +294,7 @@ static const BdrvChildRole child_root = {
.get_parent_desc = blk_root_get_parent_desc,
.drained_begin = blk_root_drained_begin,
+ .drained_poll = blk_root_drained_poll,
.drained_end = blk_root_drained_end,
.activate = blk_root_activate,
@@ -433,6 +434,7 @@ int blk_get_refcnt(BlockBackend *blk)
*/
void blk_ref(BlockBackend *blk)
{
+ assert(blk->refcnt > 0);
blk->refcnt++;
}
@@ -445,7 +447,13 @@ void blk_unref(BlockBackend *blk)
{
if (blk) {
assert(blk->refcnt > 0);
- if (!--blk->refcnt) {
+ if (blk->refcnt > 1) {
+ blk->refcnt--;
+ } else {
+ blk_drain(blk);
+ /* blk_drain() cannot resurrect blk, nobody held a reference */
+ assert(blk->refcnt == 1);
+ blk->refcnt = 0;
blk_delete(blk);
}
}
@@ -1289,7 +1297,7 @@ static void blk_inc_in_flight(BlockBackend *blk)
static void blk_dec_in_flight(BlockBackend *blk)
{
atomic_dec(&blk->in_flight);
- aio_wait_kick(&blk->wait);
+ aio_wait_kick();
}
static void error_callback_bh(void *opaque)
@@ -1330,8 +1338,8 @@ static const AIOCBInfo blk_aio_em_aiocb_info = {
static void blk_aio_complete(BlkAioEmAIOCB *acb)
{
if (acb->has_returned) {
- blk_dec_in_flight(acb->rwco.blk);
acb->common.cb(acb->common.opaque, acb->rwco.ret);
+ blk_dec_in_flight(acb->rwco.blk);
qemu_aio_unref(acb);
}
}
@@ -1590,9 +1598,8 @@ void blk_drain(BlockBackend *blk)
}
/* We may have -ENOMEDIUM completions in flight */
- AIO_WAIT_WHILE(&blk->wait,
- blk_get_aio_context(blk),
- atomic_mb_read(&blk->in_flight) > 0);
+ AIO_WAIT_WHILE(blk_get_aio_context(blk),
+ atomic_mb_read(&blk->in_flight) > 0);
if (bs) {
bdrv_drained_end(bs);
@@ -1611,8 +1618,7 @@ void blk_drain_all(void)
aio_context_acquire(ctx);
/* We may have -ENOMEDIUM completions in flight */
- AIO_WAIT_WHILE(&blk->wait, ctx,
- atomic_mb_read(&blk->in_flight) > 0);
+ AIO_WAIT_WHILE(ctx, atomic_mb_read(&blk->in_flight) > 0);
aio_context_release(ctx);
}
@@ -2189,6 +2195,13 @@ static void blk_root_drained_begin(BdrvChild *child)
}
}
+static bool blk_root_drained_poll(BdrvChild *child)
+{
+ BlockBackend *blk = child->opaque;
+ assert(blk->quiesce_counter);
+ return !!blk->in_flight;
+}
+
static void blk_root_drained_end(BdrvChild *child)
{
BlockBackend *blk = child->opaque;
diff --git a/block/commit.c b/block/commit.c
index da69165de3..a2da5740b0 100644
--- a/block/commit.c
+++ b/block/commit.c
@@ -36,6 +36,7 @@ typedef struct CommitBlockJob {
BlockDriverState *commit_top_bs;
BlockBackend *top;
BlockBackend *base;
+ BlockDriverState *base_bs;
BlockdevOnError on_error;
int base_flags;
char *backing_file_str;
@@ -68,61 +69,67 @@ static int coroutine_fn commit_populate(BlockBackend *bs, BlockBackend *base,
return 0;
}
-static void commit_exit(Job *job)
+static int commit_prepare(Job *job)
{
CommitBlockJob *s = container_of(job, CommitBlockJob, common.job);
- BlockJob *bjob = &s->common;
- BlockDriverState *top = blk_bs(s->top);
- BlockDriverState *base = blk_bs(s->base);
- BlockDriverState *commit_top_bs = s->commit_top_bs;
- bool remove_commit_top_bs = false;
-
- /* Make sure commit_top_bs and top stay around until bdrv_replace_node() */
- bdrv_ref(top);
- bdrv_ref(commit_top_bs);
/* Remove base node parent that still uses BLK_PERM_WRITE/RESIZE before
* the normal backing chain can be restored. */
blk_unref(s->base);
+ s->base = NULL;
+
+ /* FIXME: bdrv_drop_intermediate treats total failures and partial failures
+ * identically. Further work is needed to disambiguate these cases. */
+ return bdrv_drop_intermediate(s->commit_top_bs, s->base_bs,
+ s->backing_file_str);
+}
- if (!job_is_cancelled(job) && job->ret == 0) {
- /* success */
- job->ret = bdrv_drop_intermediate(s->commit_top_bs, base,
- s->backing_file_str);
- } else {
- /* XXX Can (or should) we somehow keep 'consistent read' blocked even
- * after the failed/cancelled commit job is gone? If we already wrote
- * something to base, the intermediate images aren't valid any more. */
- remove_commit_top_bs = true;
+static void commit_abort(Job *job)
+{
+ CommitBlockJob *s = container_of(job, CommitBlockJob, common.job);
+ BlockDriverState *top_bs = blk_bs(s->top);
+
+ /* Make sure commit_top_bs and top stay around until bdrv_replace_node() */
+ bdrv_ref(top_bs);
+ bdrv_ref(s->commit_top_bs);
+
+ if (s->base) {
+ blk_unref(s->base);
}
+ /* free the blockers on the intermediate nodes so that bdrv_replace_nodes
+ * can succeed */
+ block_job_remove_all_bdrv(&s->common);
+
+ /* If bdrv_drop_intermediate() failed (or was not invoked), remove the
+ * commit filter driver from the backing chain now. Do this as the final
+ * step so that the 'consistent read' permission can be granted.
+ *
+ * XXX Can (or should) we somehow keep 'consistent read' blocked even
+ * after the failed/cancelled commit job is gone? If we already wrote
+ * something to base, the intermediate images aren't valid any more. */
+ bdrv_child_try_set_perm(s->commit_top_bs->backing, 0, BLK_PERM_ALL,
+ &error_abort);
+ bdrv_replace_node(s->commit_top_bs, backing_bs(s->commit_top_bs),
+ &error_abort);
+
+ bdrv_unref(s->commit_top_bs);
+ bdrv_unref(top_bs);
+}
+
+static void commit_clean(Job *job)
+{
+ CommitBlockJob *s = container_of(job, CommitBlockJob, common.job);
+
/* restore base open flags here if appropriate (e.g., change the base back
* to r/o). These reopens do not need to be atomic, since we won't abort
* even on failure here */
- if (s->base_flags != bdrv_get_flags(base)) {
- bdrv_reopen(base, s->base_flags, NULL);
+ if (s->base_flags != bdrv_get_flags(s->base_bs)) {
+ bdrv_reopen(s->base_bs, s->base_flags, NULL);
}
+
g_free(s->backing_file_str);
blk_unref(s->top);
-
- /* If there is more than one reference to the job (e.g. if called from
- * job_finish_sync()), job_completed() won't free it and therefore the
- * blockers on the intermediate nodes remain. This would cause
- * bdrv_set_backing_hd() to fail. */
- block_job_remove_all_bdrv(bjob);
-
- /* If bdrv_drop_intermediate() didn't already do that, remove the commit
- * filter driver from the backing chain. Do this as the final step so that
- * the 'consistent read' permission can be granted. */
- if (remove_commit_top_bs) {
- bdrv_child_try_set_perm(commit_top_bs->backing, 0, BLK_PERM_ALL,
- &error_abort);
- bdrv_replace_node(commit_top_bs, backing_bs(commit_top_bs),
- &error_abort);
- }
-
- bdrv_unref(commit_top_bs);
- bdrv_unref(top);
}
static int coroutine_fn commit_run(Job *job, Error **errp)
@@ -211,7 +218,9 @@ static const BlockJobDriver commit_job_driver = {
.user_resume = block_job_user_resume,
.drain = block_job_drain,
.run = commit_run,
- .exit = commit_exit,
+ .prepare = commit_prepare,
+ .abort = commit_abort,
+ .clean = commit_clean
},
};
@@ -249,7 +258,8 @@ static BlockDriver bdrv_commit_top = {
};
void commit_start(const char *job_id, BlockDriverState *bs,
- BlockDriverState *base, BlockDriverState *top, int64_t speed,
+ BlockDriverState *base, BlockDriverState *top,
+ int creation_flags, int64_t speed,
BlockdevOnError on_error, const char *backing_file_str,
const char *filter_node_name, Error **errp)
{
@@ -267,7 +277,7 @@ void commit_start(const char *job_id, BlockDriverState *bs,
}
s = block_job_create(job_id, &commit_job_driver, NULL, bs, 0, BLK_PERM_ALL,
- speed, JOB_DEFAULT, NULL, NULL, errp);
+ speed, creation_flags, NULL, NULL, errp);
if (!s) {
return;
}
@@ -344,6 +354,7 @@ void commit_start(const char *job_id, BlockDriverState *bs,
if (ret < 0) {
goto fail;
}
+ s->base_bs = base;
/* Required permissions are already taken with block_job_add_bdrv() */
s->top = blk_new(0, BLK_PERM_ALL);
diff --git a/block/io.c b/block/io.c
index 7100344c7b..bd9d688f8b 100644
--- a/block/io.c
+++ b/block/io.c
@@ -38,8 +38,6 @@
/* Maximum bounce buffer for copy-on-read and write zeroes, in bytes */
#define MAX_BOUNCE_BUFFER (32768 << BDRV_SECTOR_BITS)
-static AioWait drain_all_aio_wait;
-
static void bdrv_parent_cb_resize(BlockDriverState *bs);
static int coroutine_fn bdrv_co_do_pwrite_zeroes(BlockDriverState *bs,
int64_t offset, int bytes, BdrvRequestFlags flags);
@@ -268,10 +266,6 @@ bool bdrv_drain_poll(BlockDriverState *bs, bool recursive,
static bool bdrv_drain_poll_top_level(BlockDriverState *bs, bool recursive,
BdrvChild *ignore_parent)
{
- /* Execute pending BHs first and check everything else only after the BHs
- * have executed. */
- while (aio_poll(bs->aio_context, false));
-
return bdrv_drain_poll(bs, recursive, ignore_parent, false);
}
@@ -288,6 +282,18 @@ static void bdrv_co_drain_bh_cb(void *opaque)
BlockDriverState *bs = data->bs;
if (bs) {
+ AioContext *ctx = bdrv_get_aio_context(bs);
+ AioContext *co_ctx = qemu_coroutine_get_aio_context(co);
+
+ /*
+ * When the coroutine yielded, the lock for its home context was
+ * released, so we need to re-acquire it here. If it explicitly
+ * acquired a different context, the lock is still held and we don't
+ * want to lock it a second time (or AIO_WAIT_WHILE() would hang).
+ */
+ if (ctx == co_ctx) {
+ aio_context_acquire(ctx);
+ }
bdrv_dec_in_flight(bs);
if (data->begin) {
bdrv_do_drained_begin(bs, data->recursive, data->parent,
@@ -296,6 +302,9 @@ static void bdrv_co_drain_bh_cb(void *opaque)
bdrv_do_drained_end(bs, data->recursive, data->parent,
data->ignore_bds_parents);
}
+ if (ctx == co_ctx) {
+ aio_context_release(ctx);
+ }
} else {
assert(data->begin);
bdrv_drain_all_begin();
@@ -496,10 +505,6 @@ static bool bdrv_drain_all_poll(void)
BlockDriverState *bs = NULL;
bool result = false;
- /* Execute pending BHs first (may modify the graph) and check everything
- * else only after the BHs have executed. */
- while (aio_poll(qemu_get_aio_context(), false));
-
/* bdrv_drain_poll() can't make changes to the graph and we are holding the
* main AioContext lock, so iterating bdrv_next_all_states() is safe. */
while ((bs = bdrv_next_all_states(bs))) {
@@ -550,7 +555,7 @@ void bdrv_drain_all_begin(void)
}
/* Now poll the in-flight requests */
- AIO_WAIT_WHILE(&drain_all_aio_wait, NULL, bdrv_drain_all_poll());
+ AIO_WAIT_WHILE(NULL, bdrv_drain_all_poll());
while ((bs = bdrv_next_all_states(bs))) {
bdrv_drain_assert_idle(bs);
@@ -706,8 +711,7 @@ void bdrv_inc_in_flight(BlockDriverState *bs)
void bdrv_wakeup(BlockDriverState *bs)
{
- aio_wait_kick(bdrv_get_aio_wait(bs));
- aio_wait_kick(&drain_all_aio_wait);
+ aio_wait_kick();
}
void bdrv_dec_in_flight(BlockDriverState *bs)
diff --git a/block/linux-aio.c b/block/linux-aio.c
index 19eb922fdd..217ce60138 100644
--- a/block/linux-aio.c
+++ b/block/linux-aio.c
@@ -234,9 +234,9 @@ static void qemu_laio_process_completions(LinuxAioState *s)
static void qemu_laio_process_completions_and_submit(LinuxAioState *s)
{
+ aio_context_acquire(s->aio_context);
qemu_laio_process_completions(s);
- aio_context_acquire(s->aio_context);
if (!s->io_q.plugged && !QSIMPLEQ_EMPTY(&s->io_q.pending)) {
ioq_submit(s);
}
diff --git a/block/mirror.c b/block/mirror.c
index b8941db6c1..56d9ef7474 100644
--- a/block/mirror.c
+++ b/block/mirror.c
@@ -79,6 +79,7 @@ typedef struct MirrorBlockJob {
int max_iov;
bool initial_zeroing_ongoing;
int in_active_write_counter;
+ bool prepared;
} MirrorBlockJob;
typedef struct MirrorBDSOpaque {
@@ -607,7 +608,12 @@ static void mirror_wait_for_all_io(MirrorBlockJob *s)
}
}
-static void mirror_exit(Job *job)
+/**
+ * mirror_exit_common: handle both abort() and prepare() cases.
+ * for .prepare, returns 0 on success and -errno on failure.
+ * for .abort cases, denoted by abort = true, MUST return 0.
+ */
+static int mirror_exit_common(Job *job)
{
MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job);
BlockJob *bjob = &s->common;
@@ -617,7 +623,13 @@ static void mirror_exit(Job *job)
BlockDriverState *target_bs = blk_bs(s->target);
BlockDriverState *mirror_top_bs = s->mirror_top_bs;
Error *local_err = NULL;
- int ret = job->ret;
+ bool abort = job->ret < 0;
+ int ret = 0;
+
+ if (s->prepared) {
+ return 0;
+ }
+ s->prepared = true;
bdrv_release_dirty_bitmap(src, s->dirty_bitmap);
@@ -642,7 +654,7 @@ static void mirror_exit(Job *job)
* required before it could become a backing file of target_bs. */
bdrv_child_try_set_perm(mirror_top_bs->backing, 0, BLK_PERM_ALL,
&error_abort);
- if (s->backing_mode == MIRROR_SOURCE_BACKING_CHAIN) {
+ if (!abort && s->backing_mode == MIRROR_SOURCE_BACKING_CHAIN) {
BlockDriverState *backing = s->is_none_mode ? src : s->base;
if (backing_bs(target_bs) != backing) {
bdrv_set_backing_hd(target_bs, backing, &local_err);
@@ -658,11 +670,8 @@ static void mirror_exit(Job *job)
aio_context_acquire(replace_aio_context);
}
- if (s->should_complete && ret == 0) {
- BlockDriverState *to_replace = src;
- if (s->to_replace) {
- to_replace = s->to_replace;
- }
+ if (s->should_complete && !abort) {
+ BlockDriverState *to_replace = s->to_replace ?: src;
if (bdrv_get_flags(target_bs) != bdrv_get_flags(to_replace)) {
bdrv_reopen(target_bs, bdrv_get_flags(to_replace), NULL);
@@ -711,7 +720,18 @@ static void mirror_exit(Job *job)
bdrv_unref(mirror_top_bs);
bdrv_unref(src);
- job->ret = ret;
+ return ret;
+}
+
+static int mirror_prepare(Job *job)
+{
+ return mirror_exit_common(job);
+}
+
+static void mirror_abort(Job *job)
+{
+ int ret = mirror_exit_common(job);
+ assert(ret == 0);
}
static void mirror_throttle(MirrorBlockJob *s)
@@ -1132,7 +1152,8 @@ static const BlockJobDriver mirror_job_driver = {
.user_resume = block_job_user_resume,
.drain = block_job_drain,
.run = mirror_run,
- .exit = mirror_exit,
+ .prepare = mirror_prepare,
+ .abort = mirror_abort,
.pause = mirror_pause,
.complete = mirror_complete,
},
@@ -1149,7 +1170,8 @@ static const BlockJobDriver commit_active_job_driver = {
.user_resume = block_job_user_resume,
.drain = block_job_drain,
.run = mirror_run,
- .exit = mirror_exit,
+ .prepare = mirror_prepare,
+ .abort = mirror_abort,
.pause = mirror_pause,
.complete = mirror_complete,
},
@@ -1639,7 +1661,8 @@ fail:
void mirror_start(const char *job_id, BlockDriverState *bs,
BlockDriverState *target, const char *replaces,
- int64_t speed, uint32_t granularity, int64_t buf_size,
+ int creation_flags, int64_t speed,
+ uint32_t granularity, int64_t buf_size,
MirrorSyncMode mode, BlockMirrorBackingMode backing_mode,
BlockdevOnError on_source_error,
BlockdevOnError on_target_error,
@@ -1655,7 +1678,7 @@ void mirror_start(const char *job_id, BlockDriverState *bs,
}
is_none_mode = mode == MIRROR_SYNC_MODE_NONE;
base = mode == MIRROR_SYNC_MODE_TOP ? backing_bs(bs) : NULL;
- mirror_start_job(job_id, bs, JOB_DEFAULT, target, replaces,
+ mirror_start_job(job_id, bs, creation_flags, target, replaces,
speed, granularity, buf_size, backing_mode,
on_source_error, on_target_error, unmap, NULL, NULL,
&mirror_job_driver, is_none_mode, base, false,
diff --git a/block/stream.c b/block/stream.c
index 67e1e72e23..81a7ec8ece 100644
--- a/block/stream.c
+++ b/block/stream.c
@@ -54,16 +54,16 @@ static int coroutine_fn stream_populate(BlockBackend *blk,
return blk_co_preadv(blk, offset, qiov.size, &qiov, BDRV_REQ_COPY_ON_READ);
}
-static void stream_exit(Job *job)
+static int stream_prepare(Job *job)
{
StreamBlockJob *s = container_of(job, StreamBlockJob, common.job);
BlockJob *bjob = &s->common;
BlockDriverState *bs = blk_bs(bjob->blk);
BlockDriverState *base = s->base;
Error *local_err = NULL;
- int ret = job->ret;
+ int ret = 0;
- if (!job_is_cancelled(job) && bs->backing && ret == 0) {
+ if (bs->backing) {
const char *base_id = NULL, *base_fmt = NULL;
if (base) {
base_id = s->backing_file_str;
@@ -75,12 +75,19 @@ static void stream_exit(Job *job)
bdrv_set_backing_hd(bs, base, &local_err);
if (local_err) {
error_report_err(local_err);
- ret = -EPERM;
- goto out;
+ return -EPERM;
}
}
-out:
+ return ret;
+}
+
+static void stream_clean(Job *job)
+{
+ StreamBlockJob *s = container_of(job, StreamBlockJob, common.job);
+ BlockJob *bjob = &s->common;
+ BlockDriverState *bs = blk_bs(bjob->blk);
+
/* Reopen the image back in read-only mode if necessary */
if (s->bs_flags != bdrv_get_flags(bs)) {
/* Give up write permissions before making it read-only */
@@ -89,7 +96,6 @@ out:
}
g_free(s->backing_file_str);
- job->ret = ret;
}
static int coroutine_fn stream_run(Job *job, Error **errp)
@@ -206,7 +212,8 @@ static const BlockJobDriver stream_job_driver = {
.job_type = JOB_TYPE_STREAM,
.free = block_job_free,
.run = stream_run,
- .exit = stream_exit,
+ .prepare = stream_prepare,
+ .clean = stream_clean,
.user_resume = block_job_user_resume,
.drain = block_job_drain,
},
@@ -214,7 +221,8 @@ static const BlockJobDriver stream_job_driver = {
void stream_start(const char *job_id, BlockDriverState *bs,
BlockDriverState *base, const char *backing_file_str,
- int64_t speed, BlockdevOnError on_error, Error **errp)
+ int creation_flags, int64_t speed,
+ BlockdevOnError on_error, Error **errp)
{
StreamBlockJob *s;
BlockDriverState *iter;
@@ -236,7 +244,7 @@ void stream_start(const char *job_id, BlockDriverState *bs,
BLK_PERM_GRAPH_MOD,
BLK_PERM_CONSISTENT_READ | BLK_PERM_WRITE_UNCHANGED |
BLK_PERM_WRITE,
- speed, JOB_DEFAULT, NULL, NULL, errp);
+ speed, creation_flags, NULL, NULL, errp);
if (!s) {
goto fail;
}
diff --git a/blockdev.c b/blockdev.c
index 72f5347df5..a8755bd908 100644
--- a/blockdev.c
+++ b/blockdev.c
@@ -2182,7 +2182,13 @@ static const BlkActionOps actions[] = {
.instance_size = sizeof(BlockDirtyBitmapState),
.prepare = block_dirty_bitmap_disable_prepare,
.abort = block_dirty_bitmap_disable_abort,
- }
+ },
+ /* Where are transactions for MIRROR, COMMIT and STREAM?
+ * Although these blockjobs use transaction callbacks like the backup job,
+ * these jobs do not necessarily adhere to transaction semantics.
+ * These jobs may not fully undo all of their actions on abort, nor do they
+ * necessarily work in transactions with more than one job in them.
+ */
};
/**
@@ -3116,6 +3122,8 @@ void qmp_block_stream(bool has_job_id, const char *job_id, const char *device,
bool has_backing_file, const char *backing_file,
bool has_speed, int64_t speed,
bool has_on_error, BlockdevOnError on_error,
+ bool has_auto_finalize, bool auto_finalize,
+ bool has_auto_dismiss, bool auto_dismiss,
Error **errp)
{
BlockDriverState *bs, *iter;
@@ -3123,6 +3131,7 @@ void qmp_block_stream(bool has_job_id, const char *job_id, const char *device,
AioContext *aio_context;
Error *local_err = NULL;
const char *base_name = NULL;
+ int job_flags = JOB_DEFAULT;
if (!has_on_error) {
on_error = BLOCKDEV_ON_ERROR_REPORT;
@@ -3184,8 +3193,15 @@ void qmp_block_stream(bool has_job_id, const char *job_id, const char *device,
/* backing_file string overrides base bs filename */
base_name = has_backing_file ? backing_file : base_name;
+ if (has_auto_finalize && !auto_finalize) {
+ job_flags |= JOB_MANUAL_FINALIZE;
+ }
+ if (has_auto_dismiss && !auto_dismiss) {
+ job_flags |= JOB_MANUAL_DISMISS;
+ }
+
stream_start(has_job_id ? job_id : NULL, bs, base_bs, base_name,
- has_speed ? speed : 0, on_error, &local_err);
+ job_flags, has_speed ? speed : 0, on_error, &local_err);
if (local_err) {
error_propagate(errp, local_err);
goto out;
@@ -3198,11 +3214,15 @@ out:
}
void qmp_block_commit(bool has_job_id, const char *job_id, const char *device,
+ bool has_base_node, const char *base_node,
bool has_base, const char *base,
+ bool has_top_node, const char *top_node,
bool has_top, const char *top,
bool has_backing_file, const char *backing_file,
bool has_speed, int64_t speed,
bool has_filter_node_name, const char *filter_node_name,
+ bool has_auto_finalize, bool auto_finalize,
+ bool has_auto_dismiss, bool auto_dismiss,
Error **errp)
{
BlockDriverState *bs;
@@ -3214,6 +3234,7 @@ void qmp_block_commit(bool has_job_id, const char *job_id, const char *device,
* BlockdevOnError change for blkmirror makes it in
*/
BlockdevOnError on_error = BLOCKDEV_ON_ERROR_REPORT;
+ int job_flags = JOB_DEFAULT;
if (!has_speed) {
speed = 0;
@@ -3221,6 +3242,12 @@ void qmp_block_commit(bool has_job_id, const char *job_id, const char *device,
if (!has_filter_node_name) {
filter_node_name = NULL;
}
+ if (has_auto_finalize && !auto_finalize) {
+ job_flags |= JOB_MANUAL_FINALIZE;
+ }
+ if (has_auto_dismiss && !auto_dismiss) {
+ job_flags |= JOB_MANUAL_DISMISS;
+ }
/* Important Note:
* libvirt relies on the DeviceNotFound error class in order to probe for
@@ -3250,7 +3277,20 @@ void qmp_block_commit(bool has_job_id, const char *job_id, const char *device,
/* default top_bs is the active layer */
top_bs = bs;
- if (has_top && top) {
+ if (has_top_node && has_top) {
+ error_setg(errp, "'top-node' and 'top' are mutually exclusive");
+ goto out;
+ } else if (has_top_node) {
+ top_bs = bdrv_lookup_bs(NULL, top_node, errp);
+ if (top_bs == NULL) {
+ goto out;
+ }
+ if (!bdrv_chain_contains(bs, top_bs)) {
+ error_setg(errp, "'%s' is not in this backing file chain",
+ top_node);
+ goto out;
+ }
+ } else if (has_top && top) {
if (strcmp(bs->filename, top) != 0) {
top_bs = bdrv_find_backing_image(bs, top);
}
@@ -3263,7 +3303,20 @@ void qmp_block_commit(bool has_job_id, const char *job_id, const char *device,
assert(bdrv_get_aio_context(top_bs) == aio_context);
- if (has_base && base) {
+ if (has_base_node && has_base) {
+ error_setg(errp, "'base-node' and 'base' are mutually exclusive");
+ goto out;
+ } else if (has_base_node) {
+ base_bs = bdrv_lookup_bs(NULL, base_node, errp);
+ if (base_bs == NULL) {
+ goto out;
+ }
+ if (!bdrv_chain_contains(top_bs, base_bs)) {
+ error_setg(errp, "'%s' is not in this backing file chain",
+ base_node);
+ goto out;
+ }
+ } else if (has_base && base) {
base_bs = bdrv_find_backing_image(top_bs, base);
} else {
base_bs = bdrv_find_base(top_bs);
@@ -3295,15 +3348,15 @@ void qmp_block_commit(bool has_job_id, const char *job_id, const char *device,
goto out;
}
commit_active_start(has_job_id ? job_id : NULL, bs, base_bs,
- JOB_DEFAULT, speed, on_error,
+ job_flags, speed, on_error,
filter_node_name, NULL, NULL, false, &local_err);
} else {
BlockDriverState *overlay_bs = bdrv_find_overlay(bs, top_bs);
if (bdrv_op_is_blocked(overlay_bs, BLOCK_OP_TYPE_COMMIT_TARGET, errp)) {
goto out;
}
- commit_start(has_job_id ? job_id : NULL, bs, base_bs, top_bs, speed,
- on_error, has_backing_file ? backing_file : NULL,
+ commit_start(has_job_id ? job_id : NULL, bs, base_bs, top_bs, job_flags,
+ speed, on_error, has_backing_file ? backing_file : NULL,
filter_node_name, &local_err);
}
if (local_err != NULL) {
@@ -3587,8 +3640,11 @@ static void blockdev_mirror_common(const char *job_id, BlockDriverState *bs,
bool has_filter_node_name,
const char *filter_node_name,
bool has_copy_mode, MirrorCopyMode copy_mode,
+ bool has_auto_finalize, bool auto_finalize,
+ bool has_auto_dismiss, bool auto_dismiss,
Error **errp)
{
+ int job_flags = JOB_DEFAULT;
if (!has_speed) {
speed = 0;
@@ -3614,6 +3670,12 @@ static void blockdev_mirror_common(const char *job_id, BlockDriverState *bs,
if (!has_copy_mode) {
copy_mode = MIRROR_COPY_MODE_BACKGROUND;
}
+ if (has_auto_finalize && !auto_finalize) {
+ job_flags |= JOB_MANUAL_FINALIZE;
+ }
+ if (has_auto_dismiss && !auto_dismiss) {
+ job_flags |= JOB_MANUAL_DISMISS;
+ }
if (granularity != 0 && (granularity < 512 || granularity > 1048576 * 64)) {
error_setg(errp, QERR_INVALID_PARAMETER_VALUE, "granularity",
@@ -3641,7 +3703,7 @@ static void blockdev_mirror_common(const char *job_id, BlockDriverState *bs,
* and will allow to check whether the node still exist at mirror completion
*/
mirror_start(job_id, bs, target,
- has_replaces ? replaces : NULL,
+ has_replaces ? replaces : NULL, job_flags,
speed, granularity, buf_size, sync, backing_mode,
on_source_error, on_target_error, unmap, filter_node_name,
copy_mode, errp);
@@ -3791,6 +3853,8 @@ void qmp_drive_mirror(DriveMirror *arg, Error **errp)
arg->has_unmap, arg->unmap,
false, NULL,
arg->has_copy_mode, arg->copy_mode,
+ arg->has_auto_finalize, arg->auto_finalize,
+ arg->has_auto_dismiss, arg->auto_dismiss,
&local_err);
bdrv_unref(target_bs);
error_propagate(errp, local_err);
@@ -3812,6 +3876,8 @@ void qmp_blockdev_mirror(bool has_job_id, const char *job_id,
bool has_filter_node_name,
const char *filter_node_name,
bool has_copy_mode, MirrorCopyMode copy_mode,
+ bool has_auto_finalize, bool auto_finalize,
+ bool has_auto_dismiss, bool auto_dismiss,
Error **errp)
{
BlockDriverState *bs;
@@ -3845,6 +3911,8 @@ void qmp_blockdev_mirror(bool has_job_id, const char *job_id,
true, true,
has_filter_node_name, filter_node_name,
has_copy_mode, copy_mode,
+ has_auto_finalize, auto_finalize,
+ has_auto_dismiss, auto_dismiss,
&local_err);
error_propagate(errp, local_err);
diff --git a/blockjob.c b/blockjob.c
index bf7ef48f98..58de8cb024 100644
--- a/blockjob.c
+++ b/blockjob.c
@@ -164,7 +164,7 @@ static bool child_job_drained_poll(BdrvChild *c)
/* An inactive or completed job doesn't have any pending requests. Jobs
* with !job->busy are either already paused or have a pause point after
* being reentered, so no job driver code will run before they pause. */
- if (!job->busy || job_is_completed(job) || job->deferred_to_main_loop) {
+ if (!job->busy || job_is_completed(job)) {
return false;
}
@@ -221,6 +221,11 @@ int block_job_add_bdrv(BlockJob *job, const char *name, BlockDriverState *bs,
return 0;
}
+static void block_job_on_idle(Notifier *n, void *opaque)
+{
+ aio_wait_kick();
+}
+
bool block_job_is_internal(BlockJob *job)
{
return (job->job.id == NULL);
@@ -416,6 +421,7 @@ void *block_job_create(const char *job_id, const BlockJobDriver *driver,
job->finalize_completed_notifier.notify = block_job_event_completed;
job->pending_notifier.notify = block_job_event_pending;
job->ready_notifier.notify = block_job_event_ready;
+ job->idle_notifier.notify = block_job_on_idle;
notifier_list_add(&job->job.on_finalize_cancelled,
&job->finalize_cancelled_notifier);
@@ -423,6 +429,7 @@ void *block_job_create(const char *job_id, const BlockJobDriver *driver,
&job->finalize_completed_notifier);
notifier_list_add(&job->job.on_pending, &job->pending_notifier);
notifier_list_add(&job->job.on_ready, &job->ready_notifier);
+ notifier_list_add(&job->job.on_idle, &job->idle_notifier);
error_setg(&job->blocker, "block device is in use by block job: %s",
job_type_str(&job->job));
diff --git a/hmp.c b/hmp.c
index 80a95fc269..3a9f797677 100644
--- a/hmp.c
+++ b/hmp.c
@@ -1907,8 +1907,9 @@ void hmp_block_stream(Monitor *mon, const QDict *qdict)
int64_t speed = qdict_get_try_int(qdict, "speed", 0);
qmp_block_stream(true, device, device, base != NULL, base, false, NULL,
- false, NULL, qdict_haskey(qdict, "speed"), speed,
- true, BLOCKDEV_ON_ERROR_REPORT, &error);
+ false, NULL, qdict_haskey(qdict, "speed"), speed, true,
+ BLOCKDEV_ON_ERROR_REPORT, false, false, false, false,
+ &error);
hmp_handle_error(mon, &error);
}
diff --git a/include/block/aio-wait.h b/include/block/aio-wait.h
index c85a62f798..afd0ff7eb8 100644
--- a/include/block/aio-wait.h
+++ b/include/block/aio-wait.h
@@ -30,14 +30,15 @@
/**
* AioWait:
*
- * An object that facilitates synchronous waiting on a condition. The main
- * loop can wait on an operation running in an IOThread as follows:
+ * An object that facilitates synchronous waiting on a condition. A single
+ * global AioWait object (global_aio_wait) is used internally.
+ *
+ * The main loop can wait on an operation running in an IOThread as follows:
*
- * AioWait *wait = ...;
* AioContext *ctx = ...;
* MyWork work = { .done = false };
* schedule_my_work_in_iothread(ctx, &work);
- * AIO_WAIT_WHILE(wait, ctx, !work.done);
+ * AIO_WAIT_WHILE(ctx, !work.done);
*
* The IOThread must call aio_wait_kick() to notify the main loop when
* work.done changes:
@@ -46,7 +47,7 @@
* {
* ...
* work.done = true;
- * aio_wait_kick(wait);
+ * aio_wait_kick();
* }
*/
typedef struct {
@@ -54,9 +55,10 @@ typedef struct {
unsigned num_waiters;
} AioWait;
+extern AioWait global_aio_wait;
+
/**
* AIO_WAIT_WHILE:
- * @wait: the aio wait object
* @ctx: the aio context, or NULL if multiple aio contexts (for which the
* caller does not hold a lock) are involved in the polling condition.
* @cond: wait while this conditional expression is true
@@ -72,10 +74,12 @@ typedef struct {
* wait on conditions between two IOThreads since that could lead to deadlock,
* go via the main loop instead.
*/
-#define AIO_WAIT_WHILE(wait, ctx, cond) ({ \
+#define AIO_WAIT_WHILE(ctx, cond) ({ \
bool waited_ = false; \
- AioWait *wait_ = (wait); \
+ AioWait *wait_ = &global_aio_wait; \
AioContext *ctx_ = (ctx); \
+ /* Increment wait_->num_waiters before evaluating cond. */ \
+ atomic_inc(&wait_->num_waiters); \
if (ctx_ && in_aio_context_home_thread(ctx_)) { \
while ((cond)) { \
aio_poll(ctx_, true); \
@@ -84,8 +88,6 @@ typedef struct {
} else { \
assert(qemu_get_current_aio_context() == \
qemu_get_aio_context()); \
- /* Increment wait_->num_waiters before evaluating cond. */ \
- atomic_inc(&wait_->num_waiters); \
while ((cond)) { \
if (ctx_) { \
aio_context_release(ctx_); \
@@ -96,20 +98,18 @@ typedef struct {
} \
waited_ = true; \
} \
- atomic_dec(&wait_->num_waiters); \
} \
+ atomic_dec(&wait_->num_waiters); \
waited_; })
/**
* aio_wait_kick:
- * @wait: the aio wait object that should re-evaluate its condition
- *
* Wake up the main thread if it is waiting on AIO_WAIT_WHILE(). During
* synchronous operations performed in an IOThread, the main thread lets the
* IOThread's event loop run, waiting for the operation to complete. A
* aio_wait_kick() call will wake up the main thread.
*/
-void aio_wait_kick(AioWait *wait);
+void aio_wait_kick(void);
/**
* aio_wait_bh_oneshot:
diff --git a/include/block/block.h b/include/block/block.h
index 4e0871aaf9..4edc1e8afa 100644
--- a/include/block/block.h
+++ b/include/block/block.h
@@ -410,13 +410,9 @@ void bdrv_drain_all_begin(void);
void bdrv_drain_all_end(void);
void bdrv_drain_all(void);
-/* Returns NULL when bs == NULL */
-AioWait *bdrv_get_aio_wait(BlockDriverState *bs);
-
#define BDRV_POLL_WHILE(bs, cond) ({ \
BlockDriverState *bs_ = (bs); \
- AIO_WAIT_WHILE(bdrv_get_aio_wait(bs_), \
- bdrv_get_aio_context(bs_), \
+ AIO_WAIT_WHILE(bdrv_get_aio_context(bs_), \
cond); })
int bdrv_pdiscard(BdrvChild *child, int64_t offset, int bytes);
diff --git a/include/block/block_int.h b/include/block/block_int.h
index 903b9c1034..92ecbd866e 100644
--- a/include/block/block_int.h
+++ b/include/block/block_int.h
@@ -794,9 +794,6 @@ struct BlockDriverState {
unsigned int in_flight;
unsigned int serialising_in_flight;
- /* Kicked to signal main loop when a request completes. */
- AioWait wait;
-
/* counter for nested bdrv_io_plug.
* Accessed with atomic ops.
*/
@@ -958,6 +955,8 @@ int is_windows_drive(const char *filename);
* flatten the whole backing file chain onto @bs.
* @backing_file_str: The file name that will be written to @bs as the
* the new backing file if the job completes. Ignored if @base is %NULL.
+ * @creation_flags: Flags that control the behavior of the Job lifetime.
+ * See @BlockJobCreateFlags
* @speed: The maximum speed, in bytes per second, or 0 for unlimited.
* @on_error: The action to take upon error.
* @errp: Error object.
@@ -971,7 +970,8 @@ int is_windows_drive(const char *filename);
*/
void stream_start(const char *job_id, BlockDriverState *bs,
BlockDriverState *base, const char *backing_file_str,
- int64_t speed, BlockdevOnError on_error, Error **errp);
+ int creation_flags, int64_t speed,
+ BlockdevOnError on_error, Error **errp);
/**
* commit_start:
@@ -980,6 +980,8 @@ void stream_start(const char *job_id, BlockDriverState *bs,
* @bs: Active block device.
* @top: Top block device to be committed.
* @base: Block device that will be written into, and become the new top.
+ * @creation_flags: Flags that control the behavior of the Job lifetime.
+ * See @BlockJobCreateFlags
* @speed: The maximum speed, in bytes per second, or 0 for unlimited.
* @on_error: The action to take upon error.
* @backing_file_str: String to use as the backing file in @top's overlay
@@ -990,7 +992,8 @@ void stream_start(const char *job_id, BlockDriverState *bs,
*
*/
void commit_start(const char *job_id, BlockDriverState *bs,
- BlockDriverState *base, BlockDriverState *top, int64_t speed,
+ BlockDriverState *base, BlockDriverState *top,
+ int creation_flags, int64_t speed,
BlockdevOnError on_error, const char *backing_file_str,
const char *filter_node_name, Error **errp);
/**
@@ -1026,6 +1029,8 @@ void commit_active_start(const char *job_id, BlockDriverState *bs,
* @target: Block device to write to.
* @replaces: Block graph node name to replace once the mirror is done. Can
* only be used when full mirroring is selected.
+ * @creation_flags: Flags that control the behavior of the Job lifetime.
+ * See @BlockJobCreateFlags
* @speed: The maximum speed, in bytes per second, or 0 for unlimited.
* @granularity: The chosen granularity for the dirty bitmap.
* @buf_size: The amount of data that can be in flight at one time.
@@ -1047,7 +1052,8 @@ void commit_active_start(const char *job_id, BlockDriverState *bs,
*/
void mirror_start(const char *job_id, BlockDriverState *bs,
BlockDriverState *target, const char *replaces,
- int64_t speed, uint32_t granularity, int64_t buf_size,
+ int creation_flags, int64_t speed,
+ uint32_t granularity, int64_t buf_size,
MirrorSyncMode mode, BlockMirrorBackingMode backing_mode,
BlockdevOnError on_source_error,
BlockdevOnError on_target_error,
diff --git a/include/block/blockjob.h b/include/block/blockjob.h
index 32c00b7dc0..ede0bd8dcb 100644
--- a/include/block/blockjob.h
+++ b/include/block/blockjob.h
@@ -70,6 +70,9 @@ typedef struct BlockJob {
/** Called when the job transitions to READY */
Notifier ready_notifier;
+ /** Called when the job coroutine yields or terminates */
+ Notifier idle_notifier;
+
/** BlockDriverStates that are involved in this block job */
GSList *nodes;
} BlockJob;
diff --git a/include/qemu/coroutine.h b/include/qemu/coroutine.h
index 6f8a487041..9801e7f5a4 100644
--- a/include/qemu/coroutine.h
+++ b/include/qemu/coroutine.h
@@ -90,6 +90,11 @@ void qemu_aio_coroutine_enter(AioContext *ctx, Coroutine *co);
void coroutine_fn qemu_coroutine_yield(void);
/**
+ * Get the AioContext of the given coroutine
+ */
+AioContext *coroutine_fn qemu_coroutine_get_aio_context(Coroutine *co);
+
+/**
* Get the currently executing coroutine
*/
Coroutine *coroutine_fn qemu_coroutine_self(void);
diff --git a/include/qemu/job.h b/include/qemu/job.h
index e0cff702b7..9e7cd1e4a0 100644
--- a/include/qemu/job.h
+++ b/include/qemu/job.h
@@ -76,6 +76,9 @@ typedef struct Job {
* Set to false by the job while the coroutine has yielded and may be
* re-entered by job_enter(). There may still be I/O or event loop activity
* pending. Accessed under block_job_mutex (in blockjob.c).
+ *
+ * When the job is deferred to the main loop, busy is true as long as the
+ * bottom half is still pending.
*/
bool busy;
@@ -156,6 +159,9 @@ typedef struct Job {
/** Notifiers called when the job transitions to READY */
NotifierList on_ready;
+ /** Notifiers called when the job coroutine yields or terminates */
+ NotifierList on_idle;
+
/** Element of the list of jobs */
QLIST_ENTRY(Job) job_list;
@@ -222,17 +228,6 @@ struct JobDriver {
void (*drain)(Job *job);
/**
- * If the callback is not NULL, exit will be invoked from the main thread
- * when the job's coroutine has finished, but before transactional
- * convergence; before @prepare or @abort.
- *
- * FIXME TODO: This callback is only temporary to transition remaining jobs
- * to prepare/commit/abort/clean callbacks and will be removed before 3.1.
- * is released.
- */
- void (*exit)(Job *job);
-
- /**
* If the callback is not NULL, prepare will be invoked when all the jobs
* belonging to the same transaction complete; or upon this job's completion
* if it is not in a transaction.
@@ -532,6 +527,8 @@ void job_user_cancel(Job *job, bool force, Error **errp);
*
* Returns the return value from the job if the job actually completed
* during the call, or -ECANCELED if it was canceled.
+ *
+ * Callers must hold the AioContext lock of job->aio_context.
*/
int job_cancel_sync(Job *job);
@@ -549,6 +546,8 @@ void job_cancel_sync_all(void);
* function).
*
* Returns the return value from the job.
+ *
+ * Callers must hold the AioContext lock of job->aio_context.
*/
int job_complete_sync(Job *job, Error **errp);
@@ -574,6 +573,8 @@ void job_dismiss(Job **job, Error **errp);
*
* Returns 0 if the job is successfully completed, -ECANCELED if the job was
* cancelled before completing, and -errno in other error cases.
+ *
+ * Callers must hold the AioContext lock of job->aio_context.
*/
int job_finish_sync(Job *job, void (*finish)(Job *, Error **errp), Error **errp);
diff --git a/job.c b/job.c
index 2327d790cc..c65e01bbfa 100644
--- a/job.c
+++ b/job.c
@@ -29,6 +29,7 @@
#include "qemu/job.h"
#include "qemu/id.h"
#include "qemu/main-loop.h"
+#include "block/aio-wait.h"
#include "trace-root.h"
#include "qapi/qapi-events-job.h"
@@ -136,21 +137,13 @@ static void job_txn_del_job(Job *job)
}
}
-static int job_txn_apply(JobTxn *txn, int fn(Job *), bool lock)
+static int job_txn_apply(JobTxn *txn, int fn(Job *))
{
- AioContext *ctx;
Job *job, *next;
int rc = 0;
QLIST_FOREACH_SAFE(job, &txn->jobs, txn_list, next) {
- if (lock) {
- ctx = job->aio_context;
- aio_context_acquire(ctx);
- }
rc = fn(job);
- if (lock) {
- aio_context_release(ctx);
- }
if (rc) {
break;
}
@@ -410,6 +403,11 @@ static void job_event_ready(Job *job)
notifier_list_notify(&job->on_ready, job);
}
+static void job_event_idle(Job *job)
+{
+ notifier_list_notify(&job->on_idle, job);
+}
+
void job_enter_cond(Job *job, bool(*fn)(Job *job))
{
if (!job_started(job)) {
@@ -455,6 +453,7 @@ static void coroutine_fn job_do_yield(Job *job, uint64_t ns)
timer_mod(&job->sleep_timer, ns);
}
job->busy = false;
+ job_event_idle(job);
job_unlock();
qemu_coroutine_yield();
@@ -535,49 +534,6 @@ void job_drain(Job *job)
}
}
-static void job_completed(Job *job);
-
-static void job_exit(void *opaque)
-{
- Job *job = (Job *)opaque;
- AioContext *aio_context = job->aio_context;
-
- if (job->driver->exit) {
- aio_context_acquire(aio_context);
- job->driver->exit(job);
- aio_context_release(aio_context);
- }
- job_completed(job);
-}
-
-/**
- * All jobs must allow a pause point before entering their job proper. This
- * ensures that jobs can be paused prior to being started, then resumed later.
- */
-static void coroutine_fn job_co_entry(void *opaque)
-{
- Job *job = opaque;
-
- assert(job && job->driver && job->driver->run);
- job_pause_point(job);
- job->ret = job->driver->run(job, &job->err);
- job->deferred_to_main_loop = true;
- aio_bh_schedule_oneshot(qemu_get_aio_context(), job_exit, job);
-}
-
-
-void job_start(Job *job)
-{
- assert(job && !job_started(job) && job->paused &&
- job->driver && job->driver->run);
- job->co = qemu_coroutine_create(job_co_entry, job);
- job->pause_count--;
- job->busy = true;
- job->paused = false;
- job_state_transition(job, JOB_STATUS_RUNNING);
- aio_co_enter(job->aio_context, job->co);
-}
-
/* Assumes the block_job_mutex is held */
static bool job_timer_not_pending(Job *job)
{
@@ -762,6 +718,7 @@ static void job_cancel_async(Job *job, bool force)
static void job_completed_txn_abort(Job *job)
{
+ AioContext *outer_ctx = job->aio_context;
AioContext *ctx;
JobTxn *txn = job->txn;
Job *other_job;
@@ -775,23 +732,26 @@ static void job_completed_txn_abort(Job *job)
txn->aborting = true;
job_txn_ref(txn);
- /* We are the first failed job. Cancel other jobs. */
- QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
- ctx = other_job->aio_context;
- aio_context_acquire(ctx);
- }
+ /* We can only hold the single job's AioContext lock while calling
+ * job_finalize_single() because the finalization callbacks can involve
+ * calls of AIO_WAIT_WHILE(), which could deadlock otherwise. */
+ aio_context_release(outer_ctx);
/* Other jobs are effectively cancelled by us, set the status for
* them; this job, however, may or may not be cancelled, depending
* on the caller, so leave it. */
QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
if (other_job != job) {
+ ctx = other_job->aio_context;
+ aio_context_acquire(ctx);
job_cancel_async(other_job, false);
+ aio_context_release(ctx);
}
}
while (!QLIST_EMPTY(&txn->jobs)) {
other_job = QLIST_FIRST(&txn->jobs);
ctx = other_job->aio_context;
+ aio_context_acquire(ctx);
if (!job_is_completed(other_job)) {
assert(job_is_cancelled(other_job));
job_finish_sync(other_job, NULL, NULL);
@@ -800,6 +760,8 @@ static void job_completed_txn_abort(Job *job)
aio_context_release(ctx);
}
+ aio_context_acquire(outer_ctx);
+
job_txn_unref(txn);
}
@@ -823,11 +785,11 @@ static void job_do_finalize(Job *job)
assert(job && job->txn);
/* prepare the transaction to complete */
- rc = job_txn_apply(job->txn, job_prepare, true);
+ rc = job_txn_apply(job->txn, job_prepare);
if (rc) {
job_completed_txn_abort(job);
} else {
- job_txn_apply(job->txn, job_finalize_single, true);
+ job_txn_apply(job->txn, job_finalize_single);
}
}
@@ -873,10 +835,10 @@ static void job_completed_txn_success(Job *job)
assert(other_job->ret == 0);
}
- job_txn_apply(txn, job_transition_to_pending, false);
+ job_txn_apply(txn, job_transition_to_pending);
/* If no jobs need manual finalization, automatically do so */
- if (job_txn_apply(txn, job_needs_finalize, false) == 0) {
+ if (job_txn_apply(txn, job_needs_finalize) == 0) {
job_do_finalize(job);
}
}
@@ -894,6 +856,54 @@ static void job_completed(Job *job)
}
}
+/** Useful only as a type shim for aio_bh_schedule_oneshot. */
+static void job_exit(void *opaque)
+{
+ Job *job = (Job *)opaque;
+ AioContext *ctx = job->aio_context;
+
+ aio_context_acquire(ctx);
+
+ /* This is a lie, we're not quiescent, but still doing the completion
+ * callbacks. However, completion callbacks tend to involve operations that
+ * drain block nodes, and if .drained_poll still returned true, we would
+ * deadlock. */
+ job->busy = false;
+ job_event_idle(job);
+
+ job_completed(job);
+
+ aio_context_release(ctx);
+}
+
+/**
+ * All jobs must allow a pause point before entering their job proper. This
+ * ensures that jobs can be paused prior to being started, then resumed later.
+ */
+static void coroutine_fn job_co_entry(void *opaque)
+{
+ Job *job = opaque;
+
+ assert(job && job->driver && job->driver->run);
+ job_pause_point(job);
+ job->ret = job->driver->run(job, &job->err);
+ job->deferred_to_main_loop = true;
+ job->busy = true;
+ aio_bh_schedule_oneshot(qemu_get_aio_context(), job_exit, job);
+}
+
+void job_start(Job *job)
+{
+ assert(job && !job_started(job) && job->paused &&
+ job->driver && job->driver->run);
+ job->co = qemu_coroutine_create(job_co_entry, job);
+ job->pause_count--;
+ job->busy = true;
+ job->paused = false;
+ job_state_transition(job, JOB_STATUS_RUNNING);
+ aio_co_enter(job->aio_context, job->co);
+}
+
void job_cancel(Job *job, bool force)
{
if (job->status == JOB_STATUS_CONCLUDED) {
@@ -980,14 +990,10 @@ int job_finish_sync(Job *job, void (*finish)(Job *, Error **errp), Error **errp)
job_unref(job);
return -EBUSY;
}
- /* job_drain calls job_enter, and it should be enough to induce progress
- * until the job completes or moves to the main thread. */
- while (!job->deferred_to_main_loop && !job_is_completed(job)) {
- job_drain(job);
- }
- while (!job_is_completed(job)) {
- aio_poll(qemu_get_aio_context(), true);
- }
+
+ AIO_WAIT_WHILE(job->aio_context,
+ (job_drain(job), !job_is_completed(job)));
+
ret = (job_is_cancelled(job) && job->ret == 0) ? -ECANCELED : job->ret;
job_unref(job);
return ret;
diff --git a/qapi/block-core.json b/qapi/block-core.json
index 4c7a37afdc..ac3b48ee54 100644
--- a/qapi/block-core.json
+++ b/qapi/block-core.json
@@ -1272,13 +1272,14 @@
# a different block device than @device).
#
# @auto-finalize: When false, this job will wait in a PENDING state after it has
-# finished its work, waiting for @block-job-finalize.
-# When true, this job will automatically perform its abort or
-# commit actions.
+# finished its work, waiting for @block-job-finalize before
+# making any block graph changes.
+# When true, this job will automatically
+# perform its abort or commit actions.
# Defaults to true. (Since 2.12)
#
# @auto-dismiss: When false, this job will wait in a CONCLUDED state after it
-# has completed ceased all work, and wait for @block-job-dismiss.
+# has completely ceased all work, and awaits @block-job-dismiss.
# When true, this job will automatically disappear from the query
# list without user intervention.
# Defaults to true. (Since 2.12)
@@ -1327,13 +1328,14 @@
# a different block device than @device).
#
# @auto-finalize: When false, this job will wait in a PENDING state after it has
-# finished its work, waiting for @block-job-finalize.
-# When true, this job will automatically perform its abort or
-# commit actions.
+# finished its work, waiting for @block-job-finalize before
+# making any block graph changes.
+# When true, this job will automatically
+# perform its abort or commit actions.
# Defaults to true. (Since 2.12)
#
# @auto-dismiss: When false, this job will wait in a CONCLUDED state after it
-# has completed ceased all work, and wait for @block-job-dismiss.
+# has completely ceased all work, and awaits @block-job-dismiss.
# When true, this job will automatically disappear from the query
# list without user intervention.
# Defaults to true. (Since 2.12)
@@ -1455,12 +1457,23 @@
#
# @device: the device name or node-name of a root node
#
-# @base: The file name of the backing image to write data into.
-# If not specified, this is the deepest backing image.
+# @base-node: The node name of the backing image to write data into.
+# If not specified, this is the deepest backing image.
+# (since: 3.1)
#
-# @top: The file name of the backing image within the image chain,
-# which contains the topmost data to be committed down. If
-# not specified, this is the active layer.
+# @base: Same as @base-node, except that it is a file name rather than a node
+# name. This must be the exact filename string that was used to open the
+# node; other strings, even if addressing the same file, are not
+# accepted (deprecated, use @base-node instead)
+#
+# @top-node: The node name of the backing image within the image chain
+# which contains the topmost data to be committed down. If
+# not specified, this is the active layer. (since: 3.1)
+#
+# @top: Same as @top-node, except that it is a file name rather than a node
+# name. This must be the exact filename string that was used to open the
+# node; other strings, even if addressing the same file, are not
+# accepted (deprecated, use @base-node instead)
#
# @backing-file: The backing file string to write into the overlay
# image of 'top'. If 'top' is the active layer,
@@ -1498,6 +1511,19 @@
# above @top. If this option is not given, a node name is
# autogenerated. (Since: 2.9)
#
+# @auto-finalize: When false, this job will wait in a PENDING state after it has
+# finished its work, waiting for @block-job-finalize before
+# making any block graph changes.
+# When true, this job will automatically
+# perform its abort or commit actions.
+# Defaults to true. (Since 3.1)
+#
+# @auto-dismiss: When false, this job will wait in a CONCLUDED state after it
+# has completely ceased all work, and awaits @block-job-dismiss.
+# When true, this job will automatically disappear from the query
+# list without user intervention.
+# Defaults to true. (Since 3.1)
+#
# Returns: Nothing on success
# If @device does not exist, DeviceNotFound
# Any other error returns a GenericError.
@@ -1513,9 +1539,11 @@
#
##
{ 'command': 'block-commit',
- 'data': { '*job-id': 'str', 'device': 'str', '*base': 'str', '*top': 'str',
+ 'data': { '*job-id': 'str', 'device': 'str', '*base-node': 'str',
+ '*base': 'str', '*top-node': 'str', '*top': 'str',
'*backing-file': 'str', '*speed': 'int',
- '*filter-node-name': 'str' } }
+ '*filter-node-name': 'str',
+ '*auto-finalize': 'bool', '*auto-dismiss': 'bool' } }
##
# @drive-backup:
@@ -1715,6 +1743,18 @@
# @copy-mode: when to copy data to the destination; defaults to 'background'
# (Since: 3.0)
#
+# @auto-finalize: When false, this job will wait in a PENDING state after it has
+# finished its work, waiting for @block-job-finalize before
+# making any block graph changes.
+# When true, this job will automatically
+# perform its abort or commit actions.
+# Defaults to true. (Since 3.1)
+#
+# @auto-dismiss: When false, this job will wait in a CONCLUDED state after it
+# has completely ceased all work, and awaits @block-job-dismiss.
+# When true, this job will automatically disappear from the query
+# list without user intervention.
+# Defaults to true. (Since 3.1)
# Since: 1.3
##
{ 'struct': 'DriveMirror',
@@ -1724,7 +1764,8 @@
'*speed': 'int', '*granularity': 'uint32',
'*buf-size': 'int', '*on-source-error': 'BlockdevOnError',
'*on-target-error': 'BlockdevOnError',
- '*unmap': 'bool', '*copy-mode': 'MirrorCopyMode' } }
+ '*unmap': 'bool', '*copy-mode': 'MirrorCopyMode',
+ '*auto-finalize': 'bool', '*auto-dismiss': 'bool' } }
##
# @BlockDirtyBitmap:
@@ -1990,6 +2031,18 @@
# @copy-mode: when to copy data to the destination; defaults to 'background'
# (Since: 3.0)
#
+# @auto-finalize: When false, this job will wait in a PENDING state after it has
+# finished its work, waiting for @block-job-finalize before
+# making any block graph changes.
+# When true, this job will automatically
+# perform its abort or commit actions.
+# Defaults to true. (Since 3.1)
+#
+# @auto-dismiss: When false, this job will wait in a CONCLUDED state after it
+# has completely ceased all work, and awaits @block-job-dismiss.
+# When true, this job will automatically disappear from the query
+# list without user intervention.
+# Defaults to true. (Since 3.1)
# Returns: nothing on success.
#
# Since: 2.6
@@ -2011,7 +2064,8 @@
'*buf-size': 'int', '*on-source-error': 'BlockdevOnError',
'*on-target-error': 'BlockdevOnError',
'*filter-node-name': 'str',
- '*copy-mode': 'MirrorCopyMode' } }
+ '*copy-mode': 'MirrorCopyMode',
+ '*auto-finalize': 'bool', '*auto-dismiss': 'bool' } }
##
# @block_set_io_throttle:
@@ -2277,6 +2331,19 @@
# 'stop' and 'enospc' can only be used if the block device
# supports io-status (see BlockInfo). Since 1.3.
#
+# @auto-finalize: When false, this job will wait in a PENDING state after it has
+# finished its work, waiting for @block-job-finalize before
+# making any block graph changes.
+# When true, this job will automatically
+# perform its abort or commit actions.
+# Defaults to true. (Since 3.1)
+#
+# @auto-dismiss: When false, this job will wait in a CONCLUDED state after it
+# has completely ceased all work, and awaits @block-job-dismiss.
+# When true, this job will automatically disappear from the query
+# list without user intervention.
+# Defaults to true. (Since 3.1)
+#
# Returns: Nothing on success. If @device does not exist, DeviceNotFound.
#
# Since: 1.1
@@ -2292,7 +2359,8 @@
{ 'command': 'block-stream',
'data': { '*job-id': 'str', 'device': 'str', '*base': 'str',
'*base-node': 'str', '*backing-file': 'str', '*speed': 'int',
- '*on-error': 'BlockdevOnError' } }
+ '*on-error': 'BlockdevOnError',
+ '*auto-finalize': 'bool', '*auto-dismiss': 'bool' } }
##
# @block-job-set-speed:
diff --git a/tests/qemu-iotests/040 b/tests/qemu-iotests/040
index 1beb5e6dab..1cb1ceeb33 100755
--- a/tests/qemu-iotests/040
+++ b/tests/qemu-iotests/040
@@ -57,9 +57,12 @@ class ImageCommitTestCase(iotests.QMPTestCase):
self.assert_no_active_block_jobs()
self.vm.shutdown()
- def run_commit_test(self, top, base, need_ready=False):
+ def run_commit_test(self, top, base, need_ready=False, node_names=False):
self.assert_no_active_block_jobs()
- result = self.vm.qmp('block-commit', device='drive0', top=top, base=base)
+ if node_names:
+ result = self.vm.qmp('block-commit', device='drive0', top_node=top, base_node=base)
+ else:
+ result = self.vm.qmp('block-commit', device='drive0', top=top, base=base)
self.assert_qmp(result, 'return', {})
self.wait_for_complete(need_ready)
@@ -101,6 +104,11 @@ class TestSingleDrive(ImageCommitTestCase):
self.assertEqual(-1, qemu_io('-f', 'raw', '-c', 'read -P 0xab 0 524288', backing_img).find("verification failed"))
self.assertEqual(-1, qemu_io('-f', 'raw', '-c', 'read -P 0xef 524288 524288', backing_img).find("verification failed"))
+ def test_commit_node(self):
+ self.run_commit_test("mid", "base", node_names=True)
+ self.assertEqual(-1, qemu_io('-f', 'raw', '-c', 'read -P 0xab 0 524288', backing_img).find("verification failed"))
+ self.assertEqual(-1, qemu_io('-f', 'raw', '-c', 'read -P 0xef 524288 524288', backing_img).find("verification failed"))
+
def test_device_not_found(self):
result = self.vm.qmp('block-commit', device='nonexistent', top='%s' % mid_img)
self.assert_qmp(result, 'error/class', 'DeviceNotFound')
@@ -123,6 +131,30 @@ class TestSingleDrive(ImageCommitTestCase):
self.assert_qmp(result, 'error/class', 'GenericError')
self.assert_qmp(result, 'error/desc', 'Base \'badfile\' not found')
+ def test_top_node_invalid(self):
+ self.assert_no_active_block_jobs()
+ result = self.vm.qmp('block-commit', device='drive0', top_node='badfile', base_node='base')
+ self.assert_qmp(result, 'error/class', 'GenericError')
+ self.assert_qmp(result, 'error/desc', "Cannot find device= nor node_name=badfile")
+
+ def test_base_node_invalid(self):
+ self.assert_no_active_block_jobs()
+ result = self.vm.qmp('block-commit', device='drive0', top_node='mid', base_node='badfile')
+ self.assert_qmp(result, 'error/class', 'GenericError')
+ self.assert_qmp(result, 'error/desc', "Cannot find device= nor node_name=badfile")
+
+ def test_top_path_and_node(self):
+ self.assert_no_active_block_jobs()
+ result = self.vm.qmp('block-commit', device='drive0', top_node='mid', base_node='base', top='%s' % mid_img)
+ self.assert_qmp(result, 'error/class', 'GenericError')
+ self.assert_qmp(result, 'error/desc', "'top-node' and 'top' are mutually exclusive")
+
+ def test_base_path_and_node(self):
+ self.assert_no_active_block_jobs()
+ result = self.vm.qmp('block-commit', device='drive0', top_node='mid', base_node='base', base='%s' % backing_img)
+ self.assert_qmp(result, 'error/class', 'GenericError')
+ self.assert_qmp(result, 'error/desc', "'base-node' and 'base' are mutually exclusive")
+
def test_top_is_active(self):
self.run_commit_test(test_img, backing_img, need_ready=True)
self.assertEqual(-1, qemu_io('-f', 'raw', '-c', 'read -P 0xab 0 524288', backing_img).find("verification failed"))
@@ -139,6 +171,22 @@ class TestSingleDrive(ImageCommitTestCase):
self.assert_qmp(result, 'error/class', 'GenericError')
self.assert_qmp(result, 'error/desc', 'Base \'%s\' not found' % mid_img)
+ def test_top_and_base_node_reversed(self):
+ self.assert_no_active_block_jobs()
+ result = self.vm.qmp('block-commit', device='drive0', top_node='base', base_node='top')
+ self.assert_qmp(result, 'error/class', 'GenericError')
+ self.assert_qmp(result, 'error/desc', "'top' is not in this backing file chain")
+
+ def test_top_node_in_wrong_chain(self):
+ self.assert_no_active_block_jobs()
+
+ result = self.vm.qmp('blockdev-add', driver='null-co', node_name='null')
+ self.assert_qmp(result, 'return', {})
+
+ result = self.vm.qmp('block-commit', device='drive0', top_node='null', base_node='base')
+ self.assert_qmp(result, 'error/class', 'GenericError')
+ self.assert_qmp(result, 'error/desc', "'null' is not in this backing file chain")
+
# When the job is running on a BB that is automatically deleted on hot
# unplug, the job is cancelled when the device disappears
def test_hot_unplug(self):
diff --git a/tests/qemu-iotests/040.out b/tests/qemu-iotests/040.out
index e20a75ce4f..802ffaa0c0 100644
--- a/tests/qemu-iotests/040.out
+++ b/tests/qemu-iotests/040.out
@@ -1,5 +1,5 @@
-.............................
+...........................................
----------------------------------------------------------------------
-Ran 29 tests
+Ran 43 tests
OK
diff --git a/tests/qemu-iotests/051 b/tests/qemu-iotests/051
index ee9c820d0f..25d3b2d478 100755
--- a/tests/qemu-iotests/051
+++ b/tests/qemu-iotests/051
@@ -354,6 +354,9 @@ printf %b "qemu-io $device_id \"write -P 0x33 0 4k\"\ncommit $device_id\n" |
$QEMU_IO -c "read -P 0x33 0 4k" "$TEST_IMG" | _filter_qemu_io
+# Using snapshot=on with a non-existent TMPDIR
+TMPDIR=/nonexistent run_qemu -drive driver=null-co,snapshot=on
+
# success, all done
echo "*** done"
rm -f $seq.full
diff --git a/tests/qemu-iotests/051.out b/tests/qemu-iotests/051.out
index b7273505c7..793af2ab96 100644
--- a/tests/qemu-iotests/051.out
+++ b/tests/qemu-iotests/051.out
@@ -455,4 +455,7 @@ wrote 4096/4096 bytes at offset 0
read 4096/4096 bytes at offset 0
4 KiB, X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)
+Testing: -drive driver=null-co,snapshot=on
+QEMU_PROG: -drive driver=null-co,snapshot=on: Could not get temporary filename: No such file or directory
+
*** done
diff --git a/tests/qemu-iotests/051.pc.out b/tests/qemu-iotests/051.pc.out
index e9257fe318..ca64edae6a 100644
--- a/tests/qemu-iotests/051.pc.out
+++ b/tests/qemu-iotests/051.pc.out
@@ -527,4 +527,7 @@ wrote 4096/4096 bytes at offset 0
read 4096/4096 bytes at offset 0
4 KiB, X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)
+Testing: -drive driver=null-co,snapshot=on
+QEMU_PROG: -drive driver=null-co,snapshot=on: Could not get temporary filename: No such file or directory
+
*** done
diff --git a/tests/test-bdrv-drain.c b/tests/test-bdrv-drain.c
index 89ac15e88a..c9f29c8b10 100644
--- a/tests/test-bdrv-drain.c
+++ b/tests/test-bdrv-drain.c
@@ -174,6 +174,28 @@ static void do_drain_end(enum drain_type drain_type, BlockDriverState *bs)
}
}
+static void do_drain_begin_unlocked(enum drain_type drain_type, BlockDriverState *bs)
+{
+ if (drain_type != BDRV_DRAIN_ALL) {
+ aio_context_acquire(bdrv_get_aio_context(bs));
+ }
+ do_drain_begin(drain_type, bs);
+ if (drain_type != BDRV_DRAIN_ALL) {
+ aio_context_release(bdrv_get_aio_context(bs));
+ }
+}
+
+static void do_drain_end_unlocked(enum drain_type drain_type, BlockDriverState *bs)
+{
+ if (drain_type != BDRV_DRAIN_ALL) {
+ aio_context_acquire(bdrv_get_aio_context(bs));
+ }
+ do_drain_end(drain_type, bs);
+ if (drain_type != BDRV_DRAIN_ALL) {
+ aio_context_release(bdrv_get_aio_context(bs));
+ }
+}
+
static void test_drv_cb_common(enum drain_type drain_type, bool recursive)
{
BlockBackend *blk;
@@ -614,6 +636,17 @@ static void test_iothread_aio_cb(void *opaque, int ret)
qemu_event_set(&done_event);
}
+static void test_iothread_main_thread_bh(void *opaque)
+{
+ struct test_iothread_data *data = opaque;
+
+ /* Test that the AioContext is not yet locked in a random BH that is
+ * executed during drain, otherwise this would deadlock. */
+ aio_context_acquire(bdrv_get_aio_context(data->bs));
+ bdrv_flush(data->bs);
+ aio_context_release(bdrv_get_aio_context(data->bs));
+}
+
/*
* Starts an AIO request on a BDS that runs in the AioContext of iothread 1.
* The request involves a BH on iothread 2 before it can complete.
@@ -683,6 +716,8 @@ static void test_iothread_common(enum drain_type drain_type, int drain_thread)
aio_context_acquire(ctx_a);
}
+ aio_bh_schedule_oneshot(ctx_a, test_iothread_main_thread_bh, &data);
+
/* The request is running on the IOThread a. Draining its block device
* will make sure that it has completed as far as the BDS is concerned,
* but the drain in this thread can continue immediately after
@@ -749,23 +784,56 @@ static void test_iothread_drain_subtree(void)
typedef struct TestBlockJob {
BlockJob common;
+ int run_ret;
+ int prepare_ret;
+ bool running;
bool should_complete;
} TestBlockJob;
+static int test_job_prepare(Job *job)
+{
+ TestBlockJob *s = container_of(job, TestBlockJob, common.job);
+
+ /* Provoke an AIO_WAIT_WHILE() call to verify there is no deadlock */
+ blk_flush(s->common.blk);
+ return s->prepare_ret;
+}
+
+static void test_job_commit(Job *job)
+{
+ TestBlockJob *s = container_of(job, TestBlockJob, common.job);
+
+ /* Provoke an AIO_WAIT_WHILE() call to verify there is no deadlock */
+ blk_flush(s->common.blk);
+}
+
+static void test_job_abort(Job *job)
+{
+ TestBlockJob *s = container_of(job, TestBlockJob, common.job);
+
+ /* Provoke an AIO_WAIT_WHILE() call to verify there is no deadlock */
+ blk_flush(s->common.blk);
+}
+
static int coroutine_fn test_job_run(Job *job, Error **errp)
{
TestBlockJob *s = container_of(job, TestBlockJob, common.job);
+ /* We are running the actual job code past the pause point in
+ * job_co_entry(). */
+ s->running = true;
+
job_transition_to_ready(&s->common.job);
while (!s->should_complete) {
- /* Avoid block_job_sleep_ns() because it marks the job as !busy. We
- * want to emulate some actual activity (probably some I/O) here so
- * that drain has to wait for this acitivity to stop. */
- qemu_co_sleep_ns(QEMU_CLOCK_REALTIME, 100000);
+ /* Avoid job_sleep_ns() because it marks the job as !busy. We want to
+ * emulate some actual activity (probably some I/O) here so that drain
+ * has to wait for this activity to stop. */
+ qemu_co_sleep_ns(QEMU_CLOCK_REALTIME, 1000000);
+
job_pause_point(&s->common.job);
}
- return 0;
+ return s->run_ret;
}
static void test_job_complete(Job *job, Error **errp)
@@ -782,36 +850,115 @@ BlockJobDriver test_job_driver = {
.drain = block_job_drain,
.run = test_job_run,
.complete = test_job_complete,
+ .prepare = test_job_prepare,
+ .commit = test_job_commit,
+ .abort = test_job_abort,
},
};
-static void test_blockjob_common(enum drain_type drain_type)
+enum test_job_result {
+ TEST_JOB_SUCCESS,
+ TEST_JOB_FAIL_RUN,
+ TEST_JOB_FAIL_PREPARE,
+};
+
+enum test_job_drain_node {
+ TEST_JOB_DRAIN_SRC,
+ TEST_JOB_DRAIN_SRC_CHILD,
+ TEST_JOB_DRAIN_SRC_PARENT,
+};
+
+static void test_blockjob_common_drain_node(enum drain_type drain_type,
+ bool use_iothread,
+ enum test_job_result result,
+ enum test_job_drain_node drain_node)
{
BlockBackend *blk_src, *blk_target;
- BlockDriverState *src, *target;
+ BlockDriverState *src, *src_backing, *src_overlay, *target, *drain_bs;
BlockJob *job;
+ TestBlockJob *tjob;
+ IOThread *iothread = NULL;
+ AioContext *ctx;
int ret;
src = bdrv_new_open_driver(&bdrv_test, "source", BDRV_O_RDWR,
&error_abort);
+ src_backing = bdrv_new_open_driver(&bdrv_test, "source-backing",
+ BDRV_O_RDWR, &error_abort);
+ src_overlay = bdrv_new_open_driver(&bdrv_test, "source-overlay",
+ BDRV_O_RDWR, &error_abort);
+
+ bdrv_set_backing_hd(src_overlay, src, &error_abort);
+ bdrv_unref(src);
+ bdrv_set_backing_hd(src, src_backing, &error_abort);
+ bdrv_unref(src_backing);
+
blk_src = blk_new(BLK_PERM_ALL, BLK_PERM_ALL);
- blk_insert_bs(blk_src, src, &error_abort);
+ blk_insert_bs(blk_src, src_overlay, &error_abort);
+
+ switch (drain_node) {
+ case TEST_JOB_DRAIN_SRC:
+ drain_bs = src;
+ break;
+ case TEST_JOB_DRAIN_SRC_CHILD:
+ drain_bs = src_backing;
+ break;
+ case TEST_JOB_DRAIN_SRC_PARENT:
+ drain_bs = src_overlay;
+ break;
+ default:
+ g_assert_not_reached();
+ }
+
+ if (use_iothread) {
+ iothread = iothread_new();
+ ctx = iothread_get_aio_context(iothread);
+ blk_set_aio_context(blk_src, ctx);
+ } else {
+ ctx = qemu_get_aio_context();
+ }
target = bdrv_new_open_driver(&bdrv_test, "target", BDRV_O_RDWR,
&error_abort);
blk_target = blk_new(BLK_PERM_ALL, BLK_PERM_ALL);
blk_insert_bs(blk_target, target, &error_abort);
- job = block_job_create("job0", &test_job_driver, NULL, src, 0, BLK_PERM_ALL,
- 0, 0, NULL, NULL, &error_abort);
+ aio_context_acquire(ctx);
+ tjob = block_job_create("job0", &test_job_driver, NULL, src,
+ 0, BLK_PERM_ALL,
+ 0, 0, NULL, NULL, &error_abort);
+ job = &tjob->common;
block_job_add_bdrv(job, "target", target, 0, BLK_PERM_ALL, &error_abort);
+
+ switch (result) {
+ case TEST_JOB_SUCCESS:
+ break;
+ case TEST_JOB_FAIL_RUN:
+ tjob->run_ret = -EIO;
+ break;
+ case TEST_JOB_FAIL_PREPARE:
+ tjob->prepare_ret = -EIO;
+ break;
+ }
+
job_start(&job->job);
+ aio_context_release(ctx);
+
+ if (use_iothread) {
+ /* job_co_entry() is run in the I/O thread, wait for the actual job
+ * code to start (we don't want to catch the job in the pause point in
+ * job_co_entry(). */
+ while (!tjob->running) {
+ aio_poll(qemu_get_aio_context(), false);
+ }
+ }
g_assert_cmpint(job->job.pause_count, ==, 0);
g_assert_false(job->job.paused);
- g_assert_true(job->job.busy); /* We're in job_sleep_ns() */
+ g_assert_true(tjob->running);
+ g_assert_true(job->job.busy); /* We're in qemu_co_sleep_ns() */
- do_drain_begin(drain_type, src);
+ do_drain_begin_unlocked(drain_type, drain_bs);
if (drain_type == BDRV_DRAIN_ALL) {
/* bdrv_drain_all() drains both src and target */
@@ -822,7 +969,14 @@ static void test_blockjob_common(enum drain_type drain_type)
g_assert_true(job->job.paused);
g_assert_false(job->job.busy); /* The job is paused */
- do_drain_end(drain_type, src);
+ do_drain_end_unlocked(drain_type, drain_bs);
+
+ if (use_iothread) {
+ /* paused is reset in the I/O thread, wait for it */
+ while (job->job.paused) {
+ aio_poll(qemu_get_aio_context(), false);
+ }
+ }
g_assert_cmpint(job->job.pause_count, ==, 0);
g_assert_false(job->job.paused);
@@ -841,32 +995,113 @@ static void test_blockjob_common(enum drain_type drain_type)
do_drain_end(drain_type, target);
+ if (use_iothread) {
+ /* paused is reset in the I/O thread, wait for it */
+ while (job->job.paused) {
+ aio_poll(qemu_get_aio_context(), false);
+ }
+ }
+
g_assert_cmpint(job->job.pause_count, ==, 0);
g_assert_false(job->job.paused);
- g_assert_true(job->job.busy); /* We're in job_sleep_ns() */
+ g_assert_true(job->job.busy); /* We're in qemu_co_sleep_ns() */
+ aio_context_acquire(ctx);
ret = job_complete_sync(&job->job, &error_abort);
- g_assert_cmpint(ret, ==, 0);
+ g_assert_cmpint(ret, ==, (result == TEST_JOB_SUCCESS ? 0 : -EIO));
+
+ if (use_iothread) {
+ blk_set_aio_context(blk_src, qemu_get_aio_context());
+ }
+ aio_context_release(ctx);
blk_unref(blk_src);
blk_unref(blk_target);
- bdrv_unref(src);
+ bdrv_unref(src_overlay);
bdrv_unref(target);
+
+ if (iothread) {
+ iothread_join(iothread);
+ }
+}
+
+static void test_blockjob_common(enum drain_type drain_type, bool use_iothread,
+ enum test_job_result result)
+{
+ test_blockjob_common_drain_node(drain_type, use_iothread, result,
+ TEST_JOB_DRAIN_SRC);
+ test_blockjob_common_drain_node(drain_type, use_iothread, result,
+ TEST_JOB_DRAIN_SRC_CHILD);
+ if (drain_type == BDRV_SUBTREE_DRAIN) {
+ test_blockjob_common_drain_node(drain_type, use_iothread, result,
+ TEST_JOB_DRAIN_SRC_PARENT);
+ }
}
static void test_blockjob_drain_all(void)
{
- test_blockjob_common(BDRV_DRAIN_ALL);
+ test_blockjob_common(BDRV_DRAIN_ALL, false, TEST_JOB_SUCCESS);
}
static void test_blockjob_drain(void)
{
- test_blockjob_common(BDRV_DRAIN);
+ test_blockjob_common(BDRV_DRAIN, false, TEST_JOB_SUCCESS);
}
static void test_blockjob_drain_subtree(void)
{
- test_blockjob_common(BDRV_SUBTREE_DRAIN);
+ test_blockjob_common(BDRV_SUBTREE_DRAIN, false, TEST_JOB_SUCCESS);
+}
+
+static void test_blockjob_error_drain_all(void)
+{
+ test_blockjob_common(BDRV_DRAIN_ALL, false, TEST_JOB_FAIL_RUN);
+ test_blockjob_common(BDRV_DRAIN_ALL, false, TEST_JOB_FAIL_PREPARE);
+}
+
+static void test_blockjob_error_drain(void)
+{
+ test_blockjob_common(BDRV_DRAIN, false, TEST_JOB_FAIL_RUN);
+ test_blockjob_common(BDRV_DRAIN, false, TEST_JOB_FAIL_PREPARE);
+}
+
+static void test_blockjob_error_drain_subtree(void)
+{
+ test_blockjob_common(BDRV_SUBTREE_DRAIN, false, TEST_JOB_FAIL_RUN);
+ test_blockjob_common(BDRV_SUBTREE_DRAIN, false, TEST_JOB_FAIL_PREPARE);
+}
+
+static void test_blockjob_iothread_drain_all(void)
+{
+ test_blockjob_common(BDRV_DRAIN_ALL, true, TEST_JOB_SUCCESS);
+}
+
+static void test_blockjob_iothread_drain(void)
+{
+ test_blockjob_common(BDRV_DRAIN, true, TEST_JOB_SUCCESS);
+}
+
+static void test_blockjob_iothread_drain_subtree(void)
+{
+ test_blockjob_common(BDRV_SUBTREE_DRAIN, true, TEST_JOB_SUCCESS);
+}
+
+static void test_blockjob_iothread_error_drain_all(void)
+{
+ test_blockjob_common(BDRV_DRAIN_ALL, true, TEST_JOB_FAIL_RUN);
+ test_blockjob_common(BDRV_DRAIN_ALL, true, TEST_JOB_FAIL_PREPARE);
+}
+
+static void test_blockjob_iothread_error_drain(void)
+{
+ test_blockjob_common(BDRV_DRAIN, true, TEST_JOB_FAIL_RUN);
+ test_blockjob_common(BDRV_DRAIN, true, TEST_JOB_FAIL_PREPARE);
+}
+
+static void test_blockjob_iothread_error_drain_subtree(void)
+{
+ test_blockjob_common(BDRV_SUBTREE_DRAIN, true, TEST_JOB_FAIL_RUN);
+ test_blockjob_common(BDRV_SUBTREE_DRAIN, true, TEST_JOB_FAIL_PREPARE);
}
@@ -1338,6 +1573,27 @@ int main(int argc, char **argv)
g_test_add_func("/bdrv-drain/blockjob/drain_subtree",
test_blockjob_drain_subtree);
+ g_test_add_func("/bdrv-drain/blockjob/error/drain_all",
+ test_blockjob_error_drain_all);
+ g_test_add_func("/bdrv-drain/blockjob/error/drain",
+ test_blockjob_error_drain);
+ g_test_add_func("/bdrv-drain/blockjob/error/drain_subtree",
+ test_blockjob_error_drain_subtree);
+
+ g_test_add_func("/bdrv-drain/blockjob/iothread/drain_all",
+ test_blockjob_iothread_drain_all);
+ g_test_add_func("/bdrv-drain/blockjob/iothread/drain",
+ test_blockjob_iothread_drain);
+ g_test_add_func("/bdrv-drain/blockjob/iothread/drain_subtree",
+ test_blockjob_iothread_drain_subtree);
+
+ g_test_add_func("/bdrv-drain/blockjob/iothread/error/drain_all",
+ test_blockjob_iothread_error_drain_all);
+ g_test_add_func("/bdrv-drain/blockjob/iothread/error/drain",
+ test_blockjob_iothread_error_drain);
+ g_test_add_func("/bdrv-drain/blockjob/iothread/error/drain_subtree",
+ test_blockjob_iothread_error_drain_subtree);
+
g_test_add_func("/bdrv-drain/deletion/drain", test_delete_by_drain);
g_test_add_func("/bdrv-drain/detach/drain_all", test_detach_by_drain_all);
g_test_add_func("/bdrv-drain/detach/drain", test_detach_by_drain);
diff --git a/tests/test-blockjob-txn.c b/tests/test-blockjob-txn.c
index ef29f35e44..86606f92b3 100644
--- a/tests/test-blockjob-txn.c
+++ b/tests/test-blockjob-txn.c
@@ -24,7 +24,7 @@ typedef struct {
int *result;
} TestBlockJob;
-static void test_block_job_exit(Job *job)
+static void test_block_job_clean(Job *job)
{
BlockJob *bjob = container_of(job, BlockJob, job);
BlockDriverState *bs = blk_bs(bjob->blk);
@@ -73,7 +73,7 @@ static const BlockJobDriver test_block_job_driver = {
.user_resume = block_job_user_resume,
.drain = block_job_drain,
.run = test_block_job_run,
- .exit = test_block_job_exit,
+ .clean = test_block_job_clean,
},
};
diff --git a/tests/test-blockjob.c b/tests/test-blockjob.c
index ad4a65bc78..652d1e8359 100644
--- a/tests/test-blockjob.c
+++ b/tests/test-blockjob.c
@@ -160,15 +160,8 @@ typedef struct CancelJob {
BlockBackend *blk;
bool should_converge;
bool should_complete;
- bool completed;
} CancelJob;
-static void cancel_job_exit(Job *job)
-{
- CancelJob *s = container_of(job, CancelJob, common.job);
- s->completed = true;
-}
-
static void cancel_job_complete(Job *job, Error **errp)
{
CancelJob *s = container_of(job, CancelJob, common.job);
@@ -201,23 +194,24 @@ static const BlockJobDriver test_cancel_driver = {
.user_resume = block_job_user_resume,
.drain = block_job_drain,
.run = cancel_job_run,
- .exit = cancel_job_exit,
.complete = cancel_job_complete,
},
};
-static CancelJob *create_common(BlockJob **pjob)
+static CancelJob *create_common(Job **pjob)
{
BlockBackend *blk;
- BlockJob *job;
+ Job *job;
+ BlockJob *bjob;
CancelJob *s;
blk = create_blk(NULL);
- job = mk_job(blk, "Steve", &test_cancel_driver, true,
- JOB_MANUAL_FINALIZE | JOB_MANUAL_DISMISS);
- job_ref(&job->job);
- assert(job->job.status == JOB_STATUS_CREATED);
- s = container_of(job, CancelJob, common);
+ bjob = mk_job(blk, "Steve", &test_cancel_driver, true,
+ JOB_MANUAL_FINALIZE | JOB_MANUAL_DISMISS);
+ job = &bjob->job;
+ job_ref(job);
+ assert(job->status == JOB_STATUS_CREATED);
+ s = container_of(bjob, CancelJob, common);
s->blk = blk;
*pjob = job;
@@ -229,6 +223,10 @@ static void cancel_common(CancelJob *s)
BlockJob *job = &s->common;
BlockBackend *blk = s->blk;
JobStatus sts = job->job.status;
+ AioContext *ctx;
+
+ ctx = job->job.aio_context;
+ aio_context_acquire(ctx);
job_cancel_sync(&job->job);
if (sts != JOB_STATUS_CREATED && sts != JOB_STATUS_CONCLUDED) {
@@ -238,11 +236,13 @@ static void cancel_common(CancelJob *s)
assert(job->job.status == JOB_STATUS_NULL);
job_unref(&job->job);
destroy_blk(blk);
+
+ aio_context_release(ctx);
}
static void test_cancel_created(void)
{
- BlockJob *job;
+ Job *job;
CancelJob *s;
s = create_common(&job);
@@ -251,119 +251,123 @@ static void test_cancel_created(void)
static void test_cancel_running(void)
{
- BlockJob *job;
+ Job *job;
CancelJob *s;
s = create_common(&job);
- job_start(&job->job);
- assert(job->job.status == JOB_STATUS_RUNNING);
+ job_start(job);
+ assert(job->status == JOB_STATUS_RUNNING);
cancel_common(s);
}
static void test_cancel_paused(void)
{
- BlockJob *job;
+ Job *job;
CancelJob *s;
s = create_common(&job);
- job_start(&job->job);
- assert(job->job.status == JOB_STATUS_RUNNING);
+ job_start(job);
+ assert(job->status == JOB_STATUS_RUNNING);
- job_user_pause(&job->job, &error_abort);
- job_enter(&job->job);
- assert(job->job.status == JOB_STATUS_PAUSED);
+ job_user_pause(job, &error_abort);
+ job_enter(job);
+ assert(job->status == JOB_STATUS_PAUSED);
cancel_common(s);
}
static void test_cancel_ready(void)
{
- BlockJob *job;
+ Job *job;
CancelJob *s;
s = create_common(&job);
- job_start(&job->job);
- assert(job->job.status == JOB_STATUS_RUNNING);
+ job_start(job);
+ assert(job->status == JOB_STATUS_RUNNING);
s->should_converge = true;
- job_enter(&job->job);
- assert(job->job.status == JOB_STATUS_READY);
+ job_enter(job);
+ assert(job->status == JOB_STATUS_READY);
cancel_common(s);
}
static void test_cancel_standby(void)
{
- BlockJob *job;
+ Job *job;
CancelJob *s;
s = create_common(&job);
- job_start(&job->job);
- assert(job->job.status == JOB_STATUS_RUNNING);
+ job_start(job);
+ assert(job->status == JOB_STATUS_RUNNING);
s->should_converge = true;
- job_enter(&job->job);
- assert(job->job.status == JOB_STATUS_READY);
+ job_enter(job);
+ assert(job->status == JOB_STATUS_READY);
- job_user_pause(&job->job, &error_abort);
- job_enter(&job->job);
- assert(job->job.status == JOB_STATUS_STANDBY);
+ job_user_pause(job, &error_abort);
+ job_enter(job);
+ assert(job->status == JOB_STATUS_STANDBY);
cancel_common(s);
}
static void test_cancel_pending(void)
{
- BlockJob *job;
+ Job *job;
CancelJob *s;
s = create_common(&job);
- job_start(&job->job);
- assert(job->job.status == JOB_STATUS_RUNNING);
+ job_start(job);
+ assert(job->status == JOB_STATUS_RUNNING);
s->should_converge = true;
- job_enter(&job->job);
- assert(job->job.status == JOB_STATUS_READY);
+ job_enter(job);
+ assert(job->status == JOB_STATUS_READY);
- job_complete(&job->job, &error_abort);
- job_enter(&job->job);
- while (!s->completed) {
+ job_complete(job, &error_abort);
+ job_enter(job);
+ while (!job->deferred_to_main_loop) {
aio_poll(qemu_get_aio_context(), true);
}
- assert(job->job.status == JOB_STATUS_PENDING);
+ assert(job->status == JOB_STATUS_READY);
+ aio_poll(qemu_get_aio_context(), true);
+ assert(job->status == JOB_STATUS_PENDING);
cancel_common(s);
}
static void test_cancel_concluded(void)
{
- BlockJob *job;
+ Job *job;
CancelJob *s;
s = create_common(&job);
- job_start(&job->job);
- assert(job->job.status == JOB_STATUS_RUNNING);
+ job_start(job);
+ assert(job->status == JOB_STATUS_RUNNING);
s->should_converge = true;
- job_enter(&job->job);
- assert(job->job.status == JOB_STATUS_READY);
+ job_enter(job);
+ assert(job->status == JOB_STATUS_READY);
- job_complete(&job->job, &error_abort);
- job_enter(&job->job);
- while (!s->completed) {
+ job_complete(job, &error_abort);
+ job_enter(job);
+ while (!job->deferred_to_main_loop) {
aio_poll(qemu_get_aio_context(), true);
}
- assert(job->job.status == JOB_STATUS_PENDING);
+ assert(job->status == JOB_STATUS_READY);
+ aio_poll(qemu_get_aio_context(), true);
+ assert(job->status == JOB_STATUS_PENDING);
- job_finalize(&job->job, &error_abort);
- assert(job->job.status == JOB_STATUS_CONCLUDED);
+ job_finalize(job, &error_abort);
+ assert(job->status == JOB_STATUS_CONCLUDED);
cancel_common(s);
}
diff --git a/util/aio-wait.c b/util/aio-wait.c
index b8a8f86dba..b4877493f8 100644
--- a/util/aio-wait.c
+++ b/util/aio-wait.c
@@ -26,21 +26,22 @@
#include "qemu/main-loop.h"
#include "block/aio-wait.h"
+AioWait global_aio_wait;
+
static void dummy_bh_cb(void *opaque)
{
/* The point is to make AIO_WAIT_WHILE()'s aio_poll() return */
}
-void aio_wait_kick(AioWait *wait)
+void aio_wait_kick(void)
{
/* The barrier (or an atomic op) is in the caller. */
- if (atomic_read(&wait->num_waiters)) {
+ if (atomic_read(&global_aio_wait.num_waiters)) {
aio_bh_schedule_oneshot(qemu_get_aio_context(), dummy_bh_cb, NULL);
}
}
typedef struct {
- AioWait wait;
bool done;
QEMUBHFunc *cb;
void *opaque;
@@ -54,7 +55,7 @@ static void aio_wait_bh(void *opaque)
data->cb(data->opaque);
data->done = true;
- aio_wait_kick(&data->wait);
+ aio_wait_kick();
}
void aio_wait_bh_oneshot(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
@@ -67,5 +68,5 @@ void aio_wait_bh_oneshot(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
assert(qemu_get_current_aio_context() == qemu_get_aio_context());
aio_bh_schedule_oneshot(ctx, aio_wait_bh, &data);
- AIO_WAIT_WHILE(&data.wait, ctx, !data.done);
+ AIO_WAIT_WHILE(ctx, !data.done);
}
diff --git a/util/async.c b/util/async.c
index 05979f8014..c10642a385 100644
--- a/util/async.c
+++ b/util/async.c
@@ -400,7 +400,7 @@ static void co_schedule_bh_cb(void *opaque)
/* Protected by write barrier in qemu_aio_coroutine_enter */
atomic_set(&co->scheduled, NULL);
- qemu_coroutine_enter(co);
+ qemu_aio_coroutine_enter(ctx, co);
aio_context_release(ctx);
}
}
diff --git a/util/qemu-coroutine.c b/util/qemu-coroutine.c
index 1ba4191b84..2295928d33 100644
--- a/util/qemu-coroutine.c
+++ b/util/qemu-coroutine.c
@@ -198,3 +198,8 @@ bool qemu_coroutine_entered(Coroutine *co)
{
return co->caller;
}
+
+AioContext *coroutine_fn qemu_coroutine_get_aio_context(Coroutine *co)
+{
+ return co->ctx;
+}