diff options
Diffstat (limited to 'python-client/usbmux.py')
| -rw-r--r-- | python-client/usbmux.py | 246 | 
1 files changed, 0 insertions, 246 deletions
| diff --git a/python-client/usbmux.py b/python-client/usbmux.py deleted file mode 100644 index 79ec26a..0000000 --- a/python-client/usbmux.py +++ /dev/null @@ -1,246 +0,0 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- -# -#	usbmux.py - usbmux client library for Python -# -# Copyright (C) 2009	Hector Martin "marcan" <hector@marcansoft.com> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 2 or version 3. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA - -import socket, struct, select, sys - -try: -	import plistlib -	haveplist = True -except: -	haveplist = False - -class MuxError(Exception): -	pass - -class MuxVersionError(MuxError): -	pass - -class SafeStreamSocket: -	def __init__(self, address, family): -		self.sock = socket.socket(family, socket.SOCK_STREAM) -		self.sock.connect(address) -	def send(self, msg): -		totalsent = 0 -		while totalsent < len(msg): -			sent = self.sock.send(msg[totalsent:]) -			if sent == 0: -				raise MuxError("socket connection broken") -			totalsent = totalsent + sent -	def recv(self, size): -		msg = '' -		while len(msg) < size: -			chunk = self.sock.recv(size-len(msg)) -			if chunk == '': -				raise MuxError("socket connection broken") -			msg = msg + chunk -		return msg - -class MuxDevice(object): -	def __init__(self, devid, usbprod, serial, location): -		self.devid = devid -		self.usbprod = usbprod -		self.serial = serial -		self.location = location -	def __str__(self): -		return "<MuxDevice: ID %d ProdID 0x%04x Serial '%s' Location 0x%x>"%(self.devid, self.usbprod, self.serial, self.location) - -class BinaryProtocol(object): -	TYPE_RESULT = 1 -	TYPE_CONNECT = 2 -	TYPE_LISTEN = 3 -	TYPE_DEVICE_ADD = 4 -	TYPE_DEVICE_REMOVE = 5 -	VERSION = 0 -	def __init__(self, socket): -		self.socket = socket -		self.connected = False - -	def _pack(self, req, payload): -		if req == self.TYPE_CONNECT: -			return struct.pack("IH", payload['DeviceID'], payload['PortNumber']) + "\x00\x00" -		elif req == self.TYPE_LISTEN: -			return "" -		else: -			raise ValueError("Invalid outgoing request type %d"%req) -	 -	def _unpack(self, resp, payload): -		if resp == self.TYPE_RESULT: -			return {'Number':struct.unpack("I", payload)[0]} -		elif resp == self.TYPE_DEVICE_ADD: -			devid, usbpid, serial, pad, location = struct.unpack("IH256sHI", payload) -			serial = serial.split("\0")[0] -			return {'DeviceID': devid, 'Properties': {'LocationID': location, 'SerialNumber': serial, 'ProductID': usbpid}} -		elif resp == self.TYPE_DEVICE_REMOVE: -			devid = struct.unpack("I", payload)[0] -			return {'DeviceID': devid} -		else: -			raise MuxError("Invalid incoming request type %d"%req) - -	def sendpacket(self, req, tag, payload={}): -		payload = self._pack(req, payload) -		if self.connected: -			raise MuxError("Mux is connected, cannot issue control packets") -		length = 16 + len(payload) -		data = struct.pack("IIII", length, self.VERSION, req, tag) + payload -		self.socket.send(data) -	def getpacket(self): -		if self.connected: -			raise MuxError("Mux is connected, cannot issue control packets") -		dlen = self.socket.recv(4) -		dlen = struct.unpack("I", dlen)[0] -		body = self.socket.recv(dlen - 4) -		version, resp, tag = struct.unpack("III",body[:0xc]) -		if version != self.VERSION: -			raise MuxVersionError("Version mismatch: expected %d, got %d"%(self.VERSION,version)) -		payload = self._unpack(resp, body[0xc:]) -		return (resp, tag, payload) - -class PlistProtocol(BinaryProtocol): -	TYPE_RESULT = "Result" -	TYPE_CONNECT = "Connect" -	TYPE_LISTEN = "Listen" -	TYPE_DEVICE_ADD = "Attached" -	TYPE_DEVICE_REMOVE = "Detached" #??? -	TYPE_PLIST = 8 -	VERSION = 1 -	def __init__(self, socket): -		if not haveplist: -			raise Exception("You need the plistlib module") -		BinaryProtocol.__init__(self, socket) -	 -	def _pack(self, req, payload): -		return payload -	 -	def _unpack(self, resp, payload): -		return payload -	 -	def sendpacket(self, req, tag, payload={}): -		payload['ClientVersionString'] = 'usbmux.py by marcan' -		if isinstance(req, int): -			req = [self.TYPE_CONNECT, self.TYPE_LISTEN][req-2] -		payload['MessageType'] = req -		payload['ProgName'] = 'tcprelay' -		BinaryProtocol.sendpacket(self, self.TYPE_PLIST, tag, plistlib.writePlistToString(payload)) -	def getpacket(self): -		resp, tag, payload = BinaryProtocol.getpacket(self) -		if resp != self.TYPE_PLIST: -			raise MuxError("Received non-plist type %d"%resp) -		payload = plistlib.readPlistFromString(payload) -		return payload['MessageType'], tag, payload - -class MuxConnection(object): -	def __init__(self, socketpath, protoclass): -		self.socketpath = socketpath -		if sys.platform in ['win32', 'cygwin']: -			family = socket.AF_INET -			address = ('127.0.0.1', 27015) -		else: -			family = socket.AF_UNIX -			address = self.socketpath -		self.socket = SafeStreamSocket(address, family) -		self.proto = protoclass(self.socket) -		self.pkttag = 1 -		self.devices = [] - -	def _getreply(self): -		while True: -			resp, tag, data = self.proto.getpacket() -			if resp == self.proto.TYPE_RESULT: -				return tag, data -			else: -				raise MuxError("Invalid packet type received: %d"%resp) -	def _processpacket(self): -		resp, tag, data = self.proto.getpacket() -		if resp == self.proto.TYPE_DEVICE_ADD: -			self.devices.append(MuxDevice(data['DeviceID'], data['Properties']['ProductID'], data['Properties']['SerialNumber'], data['Properties']['LocationID'])) -		elif resp == self.proto.TYPE_DEVICE_REMOVE: -			for dev in self.devices: -				if dev.devid == data['DeviceID']: -					self.devices.remove(dev) -		elif resp == self.proto.TYPE_RESULT: -			raise MuxError("Unexpected result: %d"%resp) -		else: -			raise MuxError("Invalid packet type received: %d"%resp) -	def _exchange(self, req, payload={}): -		mytag = self.pkttag -		self.pkttag += 1 -		self.proto.sendpacket(req, mytag, payload) -		recvtag, data = self._getreply() -		if recvtag != mytag: -			raise MuxError("Reply tag mismatch: expected %d, got %d"%(mytag, recvtag)) -		return data['Number'] - -	def listen(self): -		ret = self._exchange(self.proto.TYPE_LISTEN) -		if ret != 0: -			raise MuxError("Listen failed: error %d"%ret) -	def process(self, timeout=None): -		if self.proto.connected: -			raise MuxError("Socket is connected, cannot process listener events") -		rlo, wlo, xlo = select.select([self.socket.sock], [], [self.socket.sock], timeout) -		if xlo: -			self.socket.sock.close() -			raise MuxError("Exception in listener socket") -		if rlo: -			self._processpacket() -	def connect(self, device, port): -		ret = self._exchange(self.proto.TYPE_CONNECT, {'DeviceID':device.devid, 'PortNumber':((port<<8) & 0xFF00) | (port>>8)}) -		if ret != 0: -			raise MuxError("Connect failed: error %d"%ret) -		self.proto.connected = True -		return self.socket.sock -	def close(self): -		self.socket.sock.close() - -class USBMux(object): -	def __init__(self, socketpath=None): -		if socketpath is None: -			if sys.platform == 'darwin': -				socketpath = "/var/run/usbmuxd" -			else: -				socketpath = "/var/run/usbmuxd" -		self.socketpath = socketpath -		self.listener = MuxConnection(socketpath, BinaryProtocol) -		try: -			self.listener.listen() -			self.version = 0 -			self.protoclass = BinaryProtocol -		except MuxVersionError: -			self.listener = MuxConnection(socketpath, PlistProtocol) -			self.listener.listen() -			self.protoclass = PlistProtocol -			self.version = 1 -		self.devices = self.listener.devices -	def process(self, timeout=None): -		self.listener.process(timeout) -	def connect(self, device, port): -		connector = MuxConnection(self.socketpath, self.protoclass) -		return connector.connect(device, port) - -if __name__ == "__main__": -	mux = USBMux() -	print "Waiting for devices..." -	if not mux.devices: -		mux.process(0.1) -	while True: -		print "Devices:" -		for dev in mux.devices: -			print dev -		mux.process() | 
