// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include "arrow/python/numpy_interop.h" #include "arrow/python/numpy_convert.h" #include #include #include #include #include "arrow/buffer.h" #include "arrow/sparse_tensor.h" #include "arrow/tensor.h" #include "arrow/type.h" #include "arrow/util/logging.h" #include "arrow/python/common.h" #include "arrow/python/pyarrow.h" #include "arrow/python/type_traits.h" namespace arrow { namespace py { NumPyBuffer::NumPyBuffer(PyObject* ao) : Buffer(nullptr, 0) { PyAcquireGIL lock; arr_ = ao; Py_INCREF(ao); if (PyArray_Check(ao)) { PyArrayObject* ndarray = reinterpret_cast(ao); auto ptr = reinterpret_cast(PyArray_DATA(ndarray)); data_ = const_cast(ptr); size_ = PyArray_NBYTES(ndarray); capacity_ = size_; is_mutable_ = !!(PyArray_FLAGS(ndarray) & NPY_ARRAY_WRITEABLE); } } NumPyBuffer::~NumPyBuffer() { PyAcquireGIL lock; Py_XDECREF(arr_); } #define TO_ARROW_TYPE_CASE(NPY_NAME, FACTORY) \ case NPY_##NPY_NAME: \ return FACTORY(); namespace { Result> GetTensorType(PyObject* dtype) { if (!PyObject_TypeCheck(dtype, &PyArrayDescr_Type)) { return Status::TypeError("Did not pass numpy.dtype object"); } PyArray_Descr* descr = reinterpret_cast(dtype); int type_num = fix_numpy_type_num(descr->type_num); switch (type_num) { TO_ARROW_TYPE_CASE(BOOL, uint8); TO_ARROW_TYPE_CASE(INT8, int8); TO_ARROW_TYPE_CASE(INT16, int16); TO_ARROW_TYPE_CASE(INT32, int32); TO_ARROW_TYPE_CASE(INT64, int64); TO_ARROW_TYPE_CASE(UINT8, uint8); TO_ARROW_TYPE_CASE(UINT16, uint16); TO_ARROW_TYPE_CASE(UINT32, uint32); TO_ARROW_TYPE_CASE(UINT64, uint64); TO_ARROW_TYPE_CASE(FLOAT16, float16); TO_ARROW_TYPE_CASE(FLOAT32, float32); TO_ARROW_TYPE_CASE(FLOAT64, float64); } return Status::NotImplemented("Unsupported numpy type ", descr->type_num); } Status GetNumPyType(const DataType& type, int* type_num) { #define NUMPY_TYPE_CASE(ARROW_NAME, NPY_NAME) \ case Type::ARROW_NAME: \ *type_num = NPY_##NPY_NAME; \ break; switch (type.id()) { NUMPY_TYPE_CASE(UINT8, UINT8); NUMPY_TYPE_CASE(INT8, INT8); NUMPY_TYPE_CASE(UINT16, UINT16); NUMPY_TYPE_CASE(INT16, INT16); NUMPY_TYPE_CASE(UINT32, UINT32); NUMPY_TYPE_CASE(INT32, INT32); NUMPY_TYPE_CASE(UINT64, UINT64); NUMPY_TYPE_CASE(INT64, INT64); NUMPY_TYPE_CASE(HALF_FLOAT, FLOAT16); NUMPY_TYPE_CASE(FLOAT, FLOAT32); NUMPY_TYPE_CASE(DOUBLE, FLOAT64); default: { return Status::NotImplemented("Unsupported tensor type: ", type.ToString()); } } #undef NUMPY_TYPE_CASE return Status::OK(); } } // namespace Result> NumPyScalarToArrowDataType(PyObject* scalar) { PyArray_Descr* descr = PyArray_DescrFromScalar(scalar); OwnedRef descr_ref(reinterpret_cast(descr)); return NumPyDtypeToArrow(descr); } Result> NumPyDtypeToArrow(PyObject* dtype) { if (!PyObject_TypeCheck(dtype, &PyArrayDescr_Type)) { return Status::TypeError("Did not pass numpy.dtype object"); } PyArray_Descr* descr = reinterpret_cast(dtype); return NumPyDtypeToArrow(descr); } Result> NumPyDtypeToArrow(PyArray_Descr* descr) { int type_num = fix_numpy_type_num(descr->type_num); switch (type_num) { TO_ARROW_TYPE_CASE(BOOL, boolean); TO_ARROW_TYPE_CASE(INT8, int8); TO_ARROW_TYPE_CASE(INT16, int16); TO_ARROW_TYPE_CASE(INT32, int32); TO_ARROW_TYPE_CASE(INT64, int64); TO_ARROW_TYPE_CASE(UINT8, uint8); TO_ARROW_TYPE_CASE(UINT16, uint16); TO_ARROW_TYPE_CASE(UINT32, uint32); TO_ARROW_TYPE_CASE(UINT64, uint64); TO_ARROW_TYPE_CASE(FLOAT16, float16); TO_ARROW_TYPE_CASE(FLOAT32, float32); TO_ARROW_TYPE_CASE(FLOAT64, float64); TO_ARROW_TYPE_CASE(STRING, binary); TO_ARROW_TYPE_CASE(UNICODE, utf8); case NPY_DATETIME: { auto date_dtype = reinterpret_cast(PyDataType_C_METADATA(descr)); switch (date_dtype->meta.base) { case NPY_FR_s: return timestamp(TimeUnit::SECOND); case NPY_FR_ms: return timestamp(TimeUnit::MILLI); case NPY_FR_us: return timestamp(TimeUnit::MICRO); case NPY_FR_ns: return timestamp(TimeUnit::NANO); case NPY_FR_D: return date32(); case NPY_FR_GENERIC: return Status::NotImplemented("Unbound or generic datetime64 time unit"); default: return Status::NotImplemented("Unsupported datetime64 time unit"); } } break; case NPY_TIMEDELTA: { auto timedelta_dtype = reinterpret_cast(PyDataType_C_METADATA(descr)); switch (timedelta_dtype->meta.base) { case NPY_FR_s: return duration(TimeUnit::SECOND); case NPY_FR_ms: return duration(TimeUnit::MILLI); case NPY_FR_us: return duration(TimeUnit::MICRO); case NPY_FR_ns: return duration(TimeUnit::NANO); case NPY_FR_GENERIC: return Status::NotImplemented("Unbound or generic timedelta64 time unit"); default: return Status::NotImplemented("Unsupported timedelta64 time unit"); } } break; } return Status::NotImplemented("Unsupported numpy type ", descr->type_num); } #undef TO_ARROW_TYPE_CASE Status NdarrayToTensor(MemoryPool* pool, PyObject* ao, const std::vector& dim_names, std::shared_ptr* out) { if (!PyArray_Check(ao)) { return Status::TypeError("Did not pass ndarray object"); } PyArrayObject* ndarray = reinterpret_cast(ao); // TODO(wesm): What do we want to do with non-contiguous memory and negative strides? int ndim = PyArray_NDIM(ndarray); std::shared_ptr data = std::make_shared(ao); std::vector shape(ndim); std::vector strides(ndim); npy_intp* array_strides = PyArray_STRIDES(ndarray); npy_intp* array_shape = PyArray_SHAPE(ndarray); for (int i = 0; i < ndim; ++i) { if (array_strides[i] < 0) { return Status::Invalid("Negative ndarray strides not supported"); } shape[i] = array_shape[i]; strides[i] = array_strides[i]; } ARROW_ASSIGN_OR_RAISE( auto type, GetTensorType(reinterpret_cast(PyArray_DESCR(ndarray)))); *out = std::make_shared(type, data, shape, strides, dim_names); return Status::OK(); } Status TensorToNdarray(const std::shared_ptr& tensor, PyObject* base, PyObject** out) { int type_num = 0; RETURN_NOT_OK(GetNumPyType(*tensor->type(), &type_num)); PyArray_Descr* dtype = PyArray_DescrNewFromType(type_num); RETURN_IF_PYERROR(); const int ndim = tensor->ndim(); std::vector npy_shape(ndim); std::vector npy_strides(ndim); for (int i = 0; i < ndim; ++i) { npy_shape[i] = tensor->shape()[i]; npy_strides[i] = tensor->strides()[i]; } const void* immutable_data = nullptr; if (tensor->data()) { immutable_data = tensor->data()->data(); } // Remove const =( void* mutable_data = const_cast(immutable_data); int array_flags = 0; if (tensor->is_row_major()) { array_flags |= NPY_ARRAY_C_CONTIGUOUS; } if (tensor->is_column_major()) { array_flags |= NPY_ARRAY_F_CONTIGUOUS; } if (tensor->is_mutable()) { array_flags |= NPY_ARRAY_WRITEABLE; } PyObject* result = PyArray_NewFromDescr(&PyArray_Type, dtype, ndim, npy_shape.data(), npy_strides.data(), mutable_data, array_flags, nullptr); RETURN_IF_PYERROR(); if (base == Py_None || base == nullptr) { base = py::wrap_tensor(tensor); } else { Py_XINCREF(base); } PyArray_SetBaseObject(reinterpret_cast(result), base); *out = result; return Status::OK(); } // Wrap the dense data of a sparse tensor in a ndarray static Status SparseTensorDataToNdarray(const SparseTensor& sparse_tensor, std::vector data_shape, PyObject* base, PyObject** out_data) { int type_num_data = 0; RETURN_NOT_OK(GetNumPyType(*sparse_tensor.type(), &type_num_data)); PyArray_Descr* dtype_data = PyArray_DescrNewFromType(type_num_data); RETURN_IF_PYERROR(); const void* immutable_data = sparse_tensor.data()->data(); // Remove const =( void* mutable_data = const_cast(immutable_data); int array_flags = NPY_ARRAY_C_CONTIGUOUS | NPY_ARRAY_F_CONTIGUOUS; if (sparse_tensor.is_mutable()) { array_flags |= NPY_ARRAY_WRITEABLE; } *out_data = PyArray_NewFromDescr(&PyArray_Type, dtype_data, static_cast(data_shape.size()), data_shape.data(), nullptr, mutable_data, array_flags, nullptr); RETURN_IF_PYERROR(); Py_XINCREF(base); PyArray_SetBaseObject(reinterpret_cast(*out_data), base); return Status::OK(); } Status SparseCOOTensorToNdarray(const std::shared_ptr& sparse_tensor, PyObject* base, PyObject** out_data, PyObject** out_coords) { const auto& sparse_index = arrow::internal::checked_cast( *sparse_tensor->sparse_index()); // Wrap tensor data OwnedRef result_data; RETURN_NOT_OK(SparseTensorDataToNdarray( *sparse_tensor, {static_cast(sparse_tensor->non_zero_length()), 1}, base, result_data.ref())); // Wrap indices PyObject* result_coords; RETURN_NOT_OK(TensorToNdarray(sparse_index.indices(), base, &result_coords)); *out_data = result_data.detach(); *out_coords = result_coords; return Status::OK(); } Status SparseCSXMatrixToNdarray(const std::shared_ptr& sparse_tensor, PyObject* base, PyObject** out_data, PyObject** out_indptr, PyObject** out_indices) { // Wrap indices OwnedRef result_indptr; OwnedRef result_indices; switch (sparse_tensor->format_id()) { case SparseTensorFormat::CSR: { const auto& sparse_index = arrow::internal::checked_cast( *sparse_tensor->sparse_index()); RETURN_NOT_OK(TensorToNdarray(sparse_index.indptr(), base, result_indptr.ref())); RETURN_NOT_OK(TensorToNdarray(sparse_index.indices(), base, result_indices.ref())); break; } case SparseTensorFormat::CSC: { const auto& sparse_index = arrow::internal::checked_cast( *sparse_tensor->sparse_index()); RETURN_NOT_OK(TensorToNdarray(sparse_index.indptr(), base, result_indptr.ref())); RETURN_NOT_OK(TensorToNdarray(sparse_index.indices(), base, result_indices.ref())); break; } default: return Status::NotImplemented("Invalid SparseTensor type."); } // Wrap tensor data OwnedRef result_data; RETURN_NOT_OK(SparseTensorDataToNdarray( *sparse_tensor, {static_cast(sparse_tensor->non_zero_length()), 1}, base, result_data.ref())); *out_data = result_data.detach(); *out_indptr = result_indptr.detach(); *out_indices = result_indices.detach(); return Status::OK(); } Status SparseCSRMatrixToNdarray(const std::shared_ptr& sparse_tensor, PyObject* base, PyObject** out_data, PyObject** out_indptr, PyObject** out_indices) { return SparseCSXMatrixToNdarray(sparse_tensor, base, out_data, out_indptr, out_indices); } Status SparseCSCMatrixToNdarray(const std::shared_ptr& sparse_tensor, PyObject* base, PyObject** out_data, PyObject** out_indptr, PyObject** out_indices) { return SparseCSXMatrixToNdarray(sparse_tensor, base, out_data, out_indptr, out_indices); } Status SparseCSFTensorToNdarray(const std::shared_ptr& sparse_tensor, PyObject* base, PyObject** out_data, PyObject** out_indptr, PyObject** out_indices) { const auto& sparse_index = arrow::internal::checked_cast( *sparse_tensor->sparse_index()); // Wrap tensor data OwnedRef result_data; RETURN_NOT_OK(SparseTensorDataToNdarray( *sparse_tensor, {static_cast(sparse_tensor->non_zero_length()), 1}, base, result_data.ref())); // Wrap indices int ndim = static_cast(sparse_index.indices().size()); OwnedRef indptr(PyList_New(ndim - 1)); OwnedRef indices(PyList_New(ndim)); RETURN_IF_PYERROR(); for (int i = 0; i < ndim - 1; ++i) { PyObject* item; RETURN_NOT_OK(TensorToNdarray(sparse_index.indptr()[i], base, &item)); if (PyList_SetItem(indptr.obj(), i, item) < 0) { Py_XDECREF(item); RETURN_IF_PYERROR(); } } for (int i = 0; i < ndim; ++i) { PyObject* item; RETURN_NOT_OK(TensorToNdarray(sparse_index.indices()[i], base, &item)); if (PyList_SetItem(indices.obj(), i, item) < 0) { Py_XDECREF(item); RETURN_IF_PYERROR(); } } *out_indptr = indptr.detach(); *out_indices = indices.detach(); *out_data = result_data.detach(); return Status::OK(); } Status NdarraysToSparseCOOTensor(MemoryPool* pool, PyObject* data_ao, PyObject* coords_ao, const std::vector& shape, const std::vector& dim_names, std::shared_ptr* out) { if (!PyArray_Check(data_ao) || !PyArray_Check(coords_ao)) { return Status::TypeError("Did not pass ndarray object"); } PyArrayObject* ndarray_data = reinterpret_cast(data_ao); std::shared_ptr data = std::make_shared(data_ao); ARROW_ASSIGN_OR_RAISE( auto type_data, GetTensorType(reinterpret_cast(PyArray_DESCR(ndarray_data)))); std::shared_ptr coords; RETURN_NOT_OK(NdarrayToTensor(pool, coords_ao, {}, &coords)); ARROW_CHECK_EQ(coords->type_id(), Type::INT64); // Should be ensured by caller ARROW_ASSIGN_OR_RAISE(std::shared_ptr sparse_index, SparseCOOIndex::Make(coords)); *out = std::make_shared>(sparse_index, type_data, data, shape, dim_names); return Status::OK(); } template Status NdarraysToSparseCSXMatrix(MemoryPool* pool, PyObject* data_ao, PyObject* indptr_ao, PyObject* indices_ao, const std::vector& shape, const std::vector& dim_names, std::shared_ptr>* out) { if (!PyArray_Check(data_ao) || !PyArray_Check(indptr_ao) || !PyArray_Check(indices_ao)) { return Status::TypeError("Did not pass ndarray object"); } PyArrayObject* ndarray_data = reinterpret_cast(data_ao); std::shared_ptr data = std::make_shared(data_ao); ARROW_ASSIGN_OR_RAISE( auto type_data, GetTensorType(reinterpret_cast(PyArray_DESCR(ndarray_data)))); std::shared_ptr indptr, indices; RETURN_NOT_OK(NdarrayToTensor(pool, indptr_ao, {}, &indptr)); RETURN_NOT_OK(NdarrayToTensor(pool, indices_ao, {}, &indices)); ARROW_CHECK_EQ(indptr->type_id(), Type::INT64); // Should be ensured by caller ARROW_CHECK_EQ(indices->type_id(), Type::INT64); // Should be ensured by caller auto sparse_index = std::make_shared( std::static_pointer_cast>(indptr), std::static_pointer_cast>(indices)); *out = std::make_shared>(sparse_index, type_data, data, shape, dim_names); return Status::OK(); } Status NdarraysToSparseCSFTensor(MemoryPool* pool, PyObject* data_ao, PyObject* indptr_ao, PyObject* indices_ao, const std::vector& shape, const std::vector& axis_order, const std::vector& dim_names, std::shared_ptr* out) { if (!PyArray_Check(data_ao)) { return Status::TypeError("Did not pass ndarray object for data"); } const int ndim = static_cast(shape.size()); PyArrayObject* ndarray_data = reinterpret_cast(data_ao); std::shared_ptr data = std::make_shared(data_ao); ARROW_ASSIGN_OR_RAISE( auto type_data, GetTensorType(reinterpret_cast(PyArray_DESCR(ndarray_data)))); std::vector> indptr(ndim - 1); std::vector> indices(ndim); for (int i = 0; i < ndim - 1; ++i) { #ifdef Py_GIL_DISABLED PyObject* item = PySequence_ITEM(indptr_ao, i); RETURN_IF_PYERROR(); OwnedRef item_ref(item); #else PyObject* item = PySequence_Fast_GET_ITEM(indptr_ao, i); #endif if (!PyArray_Check(item)) { return Status::TypeError("Did not pass ndarray object for indptr"); } RETURN_NOT_OK(NdarrayToTensor(pool, item, {}, &indptr[i])); ARROW_CHECK_EQ(indptr[i]->type_id(), Type::INT64); // Should be ensured by caller } for (int i = 0; i < ndim; ++i) { #ifdef Py_GIL_DISABLED PyObject* item = PySequence_ITEM(indices_ao, i); RETURN_IF_PYERROR(); OwnedRef item_ref(item); #else PyObject* item = PySequence_Fast_GET_ITEM(indices_ao, i); #endif if (!PyArray_Check(item)) { return Status::TypeError("Did not pass ndarray object for indices"); } RETURN_NOT_OK(NdarrayToTensor(pool, item, {}, &indices[i])); ARROW_CHECK_EQ(indices[i]->type_id(), Type::INT64); // Should be ensured by caller } auto sparse_index = std::make_shared(indptr, indices, axis_order); *out = std::make_shared>(sparse_index, type_data, data, shape, dim_names); return Status::OK(); } Status NdarraysToSparseCSRMatrix(MemoryPool* pool, PyObject* data_ao, PyObject* indptr_ao, PyObject* indices_ao, const std::vector& shape, const std::vector& dim_names, std::shared_ptr* out) { return NdarraysToSparseCSXMatrix(pool, data_ao, indptr_ao, indices_ao, shape, dim_names, out); } Status NdarraysToSparseCSCMatrix(MemoryPool* pool, PyObject* data_ao, PyObject* indptr_ao, PyObject* indices_ao, const std::vector& shape, const std::vector& dim_names, std::shared_ptr* out) { return NdarraysToSparseCSXMatrix(pool, data_ao, indptr_ao, indices_ao, shape, dim_names, out); } Status TensorToSparseCOOTensor(const std::shared_ptr& tensor, std::shared_ptr* out) { return SparseCOOTensor::Make(*tensor).Value(out); } Status TensorToSparseCSRMatrix(const std::shared_ptr& tensor, std::shared_ptr* out) { return SparseCSRMatrix::Make(*tensor).Value(out); } Status TensorToSparseCSCMatrix(const std::shared_ptr& tensor, std::shared_ptr* out) { return SparseCSCMatrix::Make(*tensor).Value(out); } Status TensorToSparseCSFTensor(const std::shared_ptr& tensor, std::shared_ptr* out) { return SparseCSFTensor::Make(*tensor).Value(out); } } // namespace py } // namespace arrow