import torch from torch.autograd import Variable from torch import nn, optim import torchvision.transforms as transforms import torchvision.datasets as dsets import numpy as np from skorch import NeuralNet from matplotlib import pyplot as plt from spacecutter.callbacks import AscensionCallback from spacecutter.losses import CumulativeLinkLoss from spacecutter.models import OrdinalLogisticModel class LogisticRegression(nn.Module): def __init__(self, input_dim, output_dim): super(LogisticRegression, self).__init__() self.linear = nn.Linear(input_dim, output_dim) def forward(self, x): outputs = self.linear(x) return outputs # pyTorch 逻辑回归 MNIST 数据 def regression_on_mnist(): batch_size = 100 n_iters = 5000 input_dim = 784 output_dim = 10 lr_rate = 0.001 train_dataset = dsets.MNIST(root='./torch_test/data', train=True, transform=transforms.ToTensor(), download=False) test_dataset = dsets.MNIST(root='./torch_test/data', train=False, transform=transforms.ToTensor()) train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True) test_loader = torch.utils.data.DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False) model = LogisticRegression(input_dim, output_dim) criterion = nn.CrossEntropyLoss() # 计算 softmax 分布之上的交叉熵损失 optimizer = optim.SGD(model.parameters(), lr=lr_rate) epochs = n_iters / (len(train_dataset) / batch_size) iter = 0 for epoch in range(int(epochs)): for i, (images, labels) in enumerate(train_loader): images = Variable(images.view(-1, 28 * 28)) labels = Variable(labels) optimizer.zero_grad() outputs = model(images) loss = criterion(outputs, labels) loss.backward() optimizer.step() iter+=1 if iter%500==0: # 计算准确率 correct = 0 total = 0 for images, labels in test_loader: images = Variable(images.view(-1, 28*28)) outputs = model(images) _, predicted = torch.max(outputs.data, 1) total+= labels.size(0) # 如果用的是 GPU,则要把预测值和标签都取回 CPU,才能用 Python 来计算 correct+= (predicted == labels).sum() accuracy = 100 * correct/total print("Iteration: {}. Loss: {}. Accuracy: {}.".format(iter, loss.item(), accuracy)) # pyTorch + Skorch 有序逻辑回归 def ordinal_regression(): # 0. 数据准备 X = np.array([ [0.5, 0.1, -0.1], [1.0, 0.2, 0.6], [-2.0, 0.4, 0.8] ], dtype=np.float32) y = np.array([0, 1, 2]).reshape(-1, 1) num_features = X.shape[1] num_classes = len(np.unique(y)) # 1. 有序逻辑回归 predictor = nn.Sequential( # 预测器 nn.Linear(num_features, num_features), nn.ReLU(), nn.Linear(num_features, 1) ) # model = OrdinalLogisticModel(predictor, num_classes) # y_pred = model(torch.as_tensor(X)) # print(y_pred) # tensor([[0.2325, 0.2191, # [0.2324, 0.2191, 0.5485], # [0.2607, 0.2287, 0.5106]], grad_fn=) # 2. Skorch 训练模型 skorch_model = NeuralNet( module=OrdinalLogisticModel, module__predictor=predictor, module__num_classes=num_classes, optimizer=torch.optim.Adam, # Adam收敛速度快,非SGD优化算法 criterion=CumulativeLinkLoss, # 与OrdinalLogisticModel匹配的累计链接损失函数,常用交叉熵 torch.nn.CrossEntropyLoss 此处不适用 train_split=None, max_epochs= 5000, # 训练次数(epoch=全部样本训练一次,iteration=取batchsize样本训练一次,SGD随机梯度下降优化算法才分iteration) callbacks=[ ('ascension', AscensionCallback()) ], ) skorch_model.fit(X, y) # 训练 # y_proba = skorch_model.predict_proba(X) # 预测 # 3. Matplotlib 可视化 train_loss = skorch_model.history[:, 'train_loss'] plt.plot(train_loss, label='Train Loss') plt.xlabel('Epoch') plt.ylabel('Loss') plt.legend() plt.show() import pandas as pd from sklearn.model_selection import train_test_split from sklearn.linear_model import LogisticRegression def test(): data = pd.read_stata('./torch_test/data/CGSS/CGSS2021_20240607.dta', convert_categoricals=False) data.to_csv('./torch_test/data/CGSS/CGSS2021_20240607.csv', encoding='UTF-8') feature_cols = ['id', 'provinces', 'type', 'A00','A1','A12_2','A12a'] X = data[feature_cols] # Features y = data.label # Target variable X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=16) logreg = LogisticRegression(random_state=16) # fit the model with data logreg.fit(X_train, y_train) y_pred = logreg.predict(X_test) print(y_pred) # 基于 sklearn 的有序逻辑回归