/* device.cpp
*
* Copyright (C) 2013 Austin Hendrix
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*/
/*
* Implementation of CEC Device class for Python
*
* Author: Austin Hendrix
*/
// request the std format macros
#define __STDC_FORMAT_MACROS
#include "device.h"
#include
using namespace CEC;
static ICECAdapter * adapter;
static PyObject * Device_getAddr(Device * self, void * closure) {
return Py_BuildValue("b", self->addr);
}
static PyObject * Device_getPhysicalAddress(Device * self,
void * closure) {
Py_INCREF(self->physicalAddress);
return self->physicalAddress;
}
static PyObject * Device_getVendor(Device * self, void * closure) {
Py_INCREF(self->vendorId);
return self->vendorId;
}
static PyObject * Device_getOsdString(Device * self, void * closure) {
Py_INCREF(self->osdName);
return self->osdName;
}
static PyObject * Device_getCECVersion(Device * self,
void * closure) {
Py_INCREF(self->cecVersion);
return self->cecVersion;
}
static PyObject * Device_getLanguage(Device * self, void * closure) {
Py_INCREF(self->lang);
return self->lang;
}
#pragma GCC diagnostic ignored "-Wwrite-strings"
static PyGetSetDef Device_getset[] = {
{"address", (getter)Device_getAddr, (setter)NULL,
"Logical Address"},
{"physical_address", (getter)Device_getPhysicalAddress, (setter)NULL,
"Physical Addresss"},
{"vendor", (getter)Device_getVendor, (setter)NULL,
"Vendor ID"},
{"osd_string", (getter)Device_getOsdString, (setter)NULL,
"OSD String"},
{"cec_version", (getter)Device_getCECVersion, (setter)NULL,
"CEC Version"},
{"language", (getter)Device_getLanguage, (setter)NULL,
"Language"},
{NULL}
};
static PyObject * Device_is_on(Device * self) {
cec_power_status power;
Py_BEGIN_ALLOW_THREADS
power = adapter->GetDevicePowerStatus(self->addr);
Py_END_ALLOW_THREADS
PyObject * ret;
switch(power) {
case CEC_POWER_STATUS_ON:
case CEC_POWER_STATUS_IN_TRANSITION_ON_TO_STANDBY:
ret = Py_True;
break;
case CEC_POWER_STATUS_STANDBY:
case CEC_POWER_STATUS_IN_TRANSITION_STANDBY_TO_ON:
ret = Py_False;
break;
case CEC_POWER_STATUS_UNKNOWN:
default:
PyErr_SetString(PyExc_IOError, "Power status not found");
return NULL;
}
Py_INCREF(ret);
return ret;
}
static PyObject * Device_power_on(Device * self) {
bool success;
Py_BEGIN_ALLOW_THREADS
success = adapter->PowerOnDevices(self->addr);
Py_END_ALLOW_THREADS
if( success ) {
Py_RETURN_TRUE;
} else {
Py_RETURN_FALSE;
}
}
static PyObject * Device_standby(Device * self) {
bool success;
Py_BEGIN_ALLOW_THREADS
success = adapter->StandbyDevices(self->addr);
Py_END_ALLOW_THREADS
if( success ) {
Py_RETURN_TRUE;
} else {
Py_RETURN_FALSE;
}
}
static PyObject * Device_is_active(Device * self) {
bool success;
Py_BEGIN_ALLOW_THREADS
success = adapter->IsActiveSource(self->addr);
Py_END_ALLOW_THREADS
if( success ) {
Py_RETURN_TRUE;
} else {
Py_RETURN_FALSE;
}
}
static PyObject * Device_av_input(Device * self, PyObject * args) {
unsigned char input;
if( PyArg_ParseTuple(args, "b:set_av_input", &input) ) {
cec_command data;
bool success;
Py_BEGIN_ALLOW_THREADS
data.initiator = adapter->GetLogicalAddresses().primary;
data.destination = self->addr;
data.opcode = CEC_OPCODE_USER_CONTROL_PRESSED;
data.opcode_set = 1;
data.PushBack(0x69);
data.PushBack(input);
success = adapter->Transmit(data);
Py_END_ALLOW_THREADS
if( success ) {
Py_RETURN_TRUE;
} else {
Py_RETURN_FALSE;
}
} else {
return NULL;
}
}
static PyObject * Device_audio_input(Device * self, PyObject * args) {
unsigned char input;
if( PyArg_ParseTuple(args, "b:set_audio_input", &input) ) {
cec_command data;
bool success;
Py_BEGIN_ALLOW_THREADS
data.initiator = adapter->GetLogicalAddresses().primary;
data.destination = self->addr;
data.opcode = CEC_OPCODE_USER_CONTROL_PRESSED;
data.opcode_set = 1;
data.PushBack(0x6a);
data.PushBack(input);
success = adapter->Transmit(data);
Py_END_ALLOW_THREADS
if( success ) {
Py_RETURN_TRUE;
} else {
Py_RETURN_FALSE;
}
} else {
return NULL;
}
}
static PyObject * Device_transmit(Device * self, PyObject * args) {
unsigned char opcode;
const char * params = NULL;
Py_ssize_t param_count = 0;
if( PyArg_ParseTuple(args, "b|s#:transmit", &opcode,
¶ms, ¶m_count) ) {
if( param_count > CEC_MAX_DATA_PACKET_SIZE ) {
char errstr[1024];
snprintf(errstr, 1024, "Too many parameters, maximum is %d",
CEC_MAX_DATA_PACKET_SIZE);
PyErr_SetString(PyExc_ValueError, errstr);
return NULL;
}
cec_command data;
bool success;
Py_BEGIN_ALLOW_THREADS
data.initiator = adapter->GetLogicalAddresses().primary;
data.destination = self->addr;
data.opcode = (cec_opcode)opcode;
data.opcode_set = 1;
if( params ) {
for( Py_ssize_t i=0; iTransmit(data);
Py_END_ALLOW_THREADS
if( success ) {
Py_RETURN_TRUE;
} else {
Py_RETURN_FALSE;
}
} else {
return NULL;
}
}
static PyObject * Device_new(PyTypeObject * type, PyObject * args,
PyObject * kwds) {
Device * self;
unsigned char addr;
if( !PyArg_ParseTuple(args, "b:Device new", &addr) ) {
return NULL;
}
if( addr < 0 ) {
PyErr_SetString(PyExc_ValueError, "Logical address should be >= 0");
return NULL;
}
if( addr > 15 ) {
PyErr_SetString(PyExc_ValueError, "Logical address should be < 16");
return NULL;
}
self = (Device*)type->tp_alloc(type, 0);
if( self != NULL ) {
self->addr = (cec_logical_address)addr;
uint64_t vendor;
Py_BEGIN_ALLOW_THREADS
vendor = adapter->GetDeviceVendorId(self->addr);
Py_END_ALLOW_THREADS
char vendor_str[7];
snprintf(vendor_str, 7, "%06" PRIX64, vendor);
if( ! (self->vendorId = Py_BuildValue("s", vendor_str)) ) return NULL;
char strAddr[8];
Py_BEGIN_ALLOW_THREADS
uint16_t physicalAddress = adapter->GetDevicePhysicalAddress(self->addr);
snprintf(strAddr, 8, "%x.%x.%x.%x",
(physicalAddress >> 12) & 0xF,
(physicalAddress >> 8) & 0xF,
(physicalAddress >> 4) & 0xF,
physicalAddress & 0xF);
Py_END_ALLOW_THREADS
self->physicalAddress = Py_BuildValue("s", strAddr);
const char * ver_str;
Py_BEGIN_ALLOW_THREADS
cec_version ver = adapter->GetDeviceCecVersion(self->addr);
switch(ver) {
case CEC_VERSION_1_2:
ver_str = "1.2";
break;
case CEC_VERSION_1_2A:
ver_str = "1.2a";
break;
case CEC_VERSION_1_3:
ver_str = "1.3";
break;
case CEC_VERSION_1_3A:
ver_str = "1.3a";
break;
case CEC_VERSION_1_4:
ver_str = "1.4";
break;
case CEC_VERSION_UNKNOWN:
default:
ver_str = "Unknown";
break;
}
Py_END_ALLOW_THREADS
if( !(self->cecVersion = Py_BuildValue("s", ver_str)) ) return NULL;
#if CEC_LIB_VERSION_MAJOR >= 4
std::string name;
Py_BEGIN_ALLOW_THREADS
name = adapter->GetDeviceOSDName(self->addr);
Py_END_ALLOW_THREADS
if( !(self->osdName = Py_BuildValue("s#", name.c_str(), name.length())) ) return NULL;
#else
cec_osd_name name;
Py_BEGIN_ALLOW_THREADS
name = adapter->GetDeviceOSDName(self->addr);
Py_END_ALLOW_THREADS
if( !(self->osdName = Py_BuildValue("s", name.name)) ) return NULL;
#endif
#if CEC_LIB_VERSION_MAJOR >= 4
std::string lang;
Py_BEGIN_ALLOW_THREADS
lang = adapter->GetDeviceMenuLanguage(self->addr);
Py_END_ALLOW_THREADS
if( !(self->lang = Py_BuildValue("s#", lang.c_str(), lang.length())) ) return NULL;
#else
cec_menu_language lang;
Py_BEGIN_ALLOW_THREADS
adapter->GetDeviceMenuLanguage(self->addr, &lang);
Py_END_ALLOW_THREADS
if( !(self->lang = Py_BuildValue("s", lang.language)) ) return NULL;
#endif
}
return (PyObject *)self;
}
static void Device_dealloc(Device * self) {
Py_DECREF(self->vendorId);
Py_DECREF(self->physicalAddress);
Py_DECREF(self->cecVersion);
Py_DECREF(self->osdName);
Py_DECREF(self->lang);
Py_TYPE(self)->tp_free((PyObject*)self);
}
static PyObject * Device_str(Device * self) {
char addr[16];
snprintf(addr, 16, "CEC Device %d", self->addr);
return Py_BuildValue("s", addr);
}
static PyObject * Device_repr(Device * self) {
char addr[16];
snprintf(addr, 16, "Device(%d)", self->addr);
return Py_BuildValue("s", addr);
}
static PyMethodDef Device_methods[] = {
{"is_on", (PyCFunction)Device_is_on, METH_NOARGS,
"Get device power status"},
{"power_on", (PyCFunction)Device_power_on, METH_NOARGS,
"Power on this device"},
{"standby", (PyCFunction)Device_standby, METH_NOARGS,
"Put this device into standby"},
{"is_active", (PyCFunction)Device_is_active, METH_VARARGS,
"Check if this device is the active source on the bus"},
{"set_av_input", (PyCFunction)Device_av_input, METH_VARARGS,
"Select AV Input"},
{"set_audio_input", (PyCFunction)Device_audio_input, METH_VARARGS,
"Select Audio Input"},
{"transmit", (PyCFunction)Device_transmit, METH_VARARGS,
"Transmit a raw CEC command to this device"},
{NULL}
};
static PyTypeObject DeviceType = {
PyVarObject_HEAD_INIT(NULL, 0)
"cec.Device", /*tp_name*/
sizeof(Device), /*tp_basicsize*/
0, /*tp_itemsize*/
(destructor)Device_dealloc, /*tp_dealloc*/
0, /*tp_print*/
0, /*tp_getattr*/
0, /*tp_setattr*/
0, /*tp_compare*/
(reprfunc)Device_repr, /*tp_repr*/
0, /*tp_as_number*/
0, /*tp_as_sequence*/
0, /*tp_as_mapping*/
0, /*tp_hash */
0, /*tp_call*/
(reprfunc)Device_str, /*tp_str*/
0, /*tp_getattro*/
0, /*tp_setattro*/
0, /*tp_as_buffer*/
Py_TPFLAGS_DEFAULT, /*tp_flags*/
"CEC Device objects", /* tp_doc */
};
PyTypeObject * DeviceTypeInit(ICECAdapter * a) {
adapter = a;
DeviceType.tp_new = Device_new;
DeviceType.tp_methods = Device_methods;
DeviceType.tp_getset = Device_getset;
return & DeviceType;
}