#!/usr/bin/env python # # Test case for NBD's blockdev-add interface # # Copyright (C) 2016 Red Hat, Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # import os import random import socket import stat import time import iotests from iotests import cachemode, imgfmt, qemu_img, qemu_nbd, qemu_nbd_early_pipe NBD_PORT_START = 32768 NBD_PORT_END = NBD_PORT_START + 1024 NBD_IPV6_PORT_START = NBD_PORT_END NBD_IPV6_PORT_END = NBD_IPV6_PORT_START + 1024 test_img = os.path.join(iotests.test_dir, 'test.img') unix_socket = os.path.join(iotests.sock_dir, 'nbd.socket') def flatten_sock_addr(crumpled_address): result = { 'type': crumpled_address['type'] } result.update(crumpled_address['data']) return result class NBDBlockdevAddBase(iotests.QMPTestCase): def blockdev_add_options(self, address, export, node_name): options = { 'node-name': node_name, 'driver': 'raw', 'file': { 'driver': 'nbd', 'read-only': True, 'server': address } } if export is not None: options['file']['export'] = export return options def client_test(self, filename, address, export=None, node_name='nbd-blockdev', delete=True): bao = self.blockdev_add_options(address, export, node_name) result = self.vm.qmp('blockdev-add', **bao) self.assert_qmp(result, 'return', {}) found = False result = self.vm.qmp('query-named-block-nodes') for node in result['return']: if node['node-name'] == node_name: found = True if isinstance(filename, str): self.assert_qmp(node, 'image/filename', filename) else: self.assert_json_filename_equal(node['image']['filename'], filename) break self.assertTrue(found) if delete: result = self.vm.qmp('blockdev-del', node_name=node_name) self.assert_qmp(result, 'return', {}) class QemuNBD(NBDBlockdevAddBase): def setUp(self): qemu_img('create', '-f', iotests.imgfmt, test_img, '64k') self.vm = iotests.VM() self.vm.launch() def tearDown(self): self.vm.shutdown() os.remove(test_img) try: os.remove(unix_socket) except OSError: pass def _try_server_up(self, *args): status, msg = qemu_nbd_early_pipe('-f', imgfmt, test_img, *args) if status == 0: return True if 'Address already in use' in msg: return False self.fail(msg) def _server_up(self, *args): self.assertTrue(self._try_server_up(*args)) def test_inet(self): while True: nbd_port = random.randrange(NBD_PORT_START, NBD_PORT_END) if self._try_server_up('-b', 'localhost', '-p', str(nbd_port)): break address = { 'type': 'inet', 'data': { 'host': 'localhost', 'port': str(nbd_port) } } self.client_test('nbd://localhost:%i' % nbd_port, flatten_sock_addr(address)) def test_unix(self): self._server_up('-k', unix_socket) address = { 'type': 'unix', 'data': { 'path': unix_socket } } self.client_test('nbd+unix://?socket=' + unix_socket, flatten_sock_addr(address)) class BuiltinNBD(NBDBlockdevAddBase): def setUp(self): qemu_img('create', '-f', iotests.imgfmt, test_img, '64k') self.vm = iotests.VM() self.vm.launch() self.server = iotests.VM('.server') self.server.add_drive_raw('if=none,id=nbd-export,' + 'file=%s,' % test_img + 'format=%s,' % imgfmt + 'cache=%s' % cachemode) self.server.launch() def tearDown(self): self.vm.shutdown() self.server.shutdown() os.remove(test_img) try: os.remove(unix_socket) except OSError: pass # Returns False on EADDRINUSE; fails an assertion on other errors. # Returns True on success. def _try_server_up(self, address, export_name=None, export_name2=None): result = self.server.qmp('nbd-server-start', addr=address) if 'error' in result and \ 'Address already in use' in result['error']['desc']: return False self.assert_qmp(result, 'return', {}) if export_name is None: result = self.server.qmp('nbd-server-add', device='nbd-export') else: result = self.server.qmp('nbd-server-add', device='nbd-export', name=export_name) self.assert_qmp(result, 'return', {}) if export_name2 is not None: result = self.server.qmp('nbd-server-add', device='nbd-export', name=export_name2) self.assert_qmp(result, 'return', {}) return True def _server_up(self, address, export_name=None, export_name2=None): self.assertTrue(self._try_server_up(address, export_name, export_name2)) def _server_down(self): result = self.server.qmp('nbd-server-stop') self.assert_qmp(result, 'return', {}) def do_test_inet(self, export_name=None): while True: nbd_port = random.randrange(NBD_PORT_START, NBD_PORT_END) address = { 'type': 'inet', 'data': { 'host': 'localhost', 'port': str(nbd_port) } } if self._try_server_up(address, export_name): break export_name = export_name or 'nbd-export' self.client_test('nbd://localhost:%i/%s' % (nbd_port, export_name), flatten_sock_addr(address), export_name) self._server_down() def test_inet_default_export_name(self): self.do_test_inet() def test_inet_same_export_name(self): self.do_test_inet('nbd-export') def test_inet_different_export_name(self): self.do_test_inet('shadow') def test_inet_two_exports(self): while True: nbd_port = random.randrange(NBD_PORT_START, NBD_PORT_END) address = { 'type': 'inet', 'data': { 'host': 'localhost', 'port': str(nbd_port) } } if self._try_server_up(address, 'exp1', 'exp2'): break self.client_test('nbd://localhost:%i/%s' % (nbd_port, 'exp1'), flatten_sock_addr(address), 'exp1', 'node1', False) self.client_test('nbd://localhost:%i/%s' % (nbd_port, 'exp2'), flatten_sock_addr(address), 'exp2', 'node2', False) result = self.vm.qmp('blockdev-del', node_name='node1') self.assert_qmp(result, 'return', {}) result = self.vm.qmp('blockdev-del', node_name='node2') self.assert_qmp(result, 'return', {}) self._server_down() def test_inet6(self): try: socket.getaddrinfo("::0", "0", socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP, socket.AI_ADDRCONFIG | socket.AI_CANONNAME) except socket.gaierror: # IPv6 not available, skip return while True: nbd_port = random.randrange(NBD_IPV6_PORT_START, NBD_IPV6_PORT_END) address = { 'type': 'inet', 'data': { 'host': '::1', 'port': str(nbd_port), 'ipv4': False, 'ipv6': True } } if self._try_server_up(address): break filename = { 'driver': 'raw', 'file': { 'driver': 'nbd', 'export': 'nbd-export', 'server': flatten_sock_addr(address) } } self.client_test(filename, flatten_sock_addr(address), 'nbd-export') self._server_down() def test_unix(self): address = { 'type': 'unix', 'data': { 'path': unix_socket } } self._server_up(address) self.client_test('nbd+unix:///nbd-export?socket=' + unix_socket, flatten_sock_addr(address), 'nbd-export') self._server_down() def test_fd(self): self._server_up({ 'type': 'unix', 'data': { 'path': unix_socket } }) sockfd = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sockfd.connect(unix_socket) result = self.vm.send_fd_scm(fd=sockfd.fileno()) self.assertEqual(result, 0, 'Failed to send socket FD') result = self.vm.qmp('getfd', fdname='nbd-fifo') self.assert_qmp(result, 'return', {}) address = { 'type': 'fd', 'data': { 'str': 'nbd-fifo' } } filename = { 'driver': 'raw', 'file': { 'driver': 'nbd', 'export': 'nbd-export', 'server': flatten_sock_addr(address) } } self.client_test(filename, flatten_sock_addr(address), 'nbd-export') self._server_down() if __name__ == '__main__': iotests.main(supported_fmts=['raw'], supported_protocols=['nbd'])