123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143 |
- import torch
- from torch.autograd import Variable
- from torch import nn, optim
- import torchvision.transforms as transforms
- import torchvision.datasets as dsets
- import numpy as np
- from skorch import NeuralNet
- from matplotlib import pyplot as plt
- from spacecutter.callbacks import AscensionCallback
- from spacecutter.losses import CumulativeLinkLoss
- from spacecutter.models import OrdinalLogisticModel
- class LogisticRegression(nn.Module):
- def __init__(self, input_dim, output_dim):
- super(LogisticRegression, self).__init__()
- self.linear = nn.Linear(input_dim, output_dim)
- def forward(self, x):
- outputs = self.linear(x)
- return outputs
- # pyTorch 逻辑回归 MNIST 数据
- def regression_on_mnist():
- batch_size = 100
- n_iters = 5000
- input_dim = 784
- output_dim = 10
- lr_rate = 0.001
- train_dataset = dsets.MNIST(root='./torch_test/data', train=True, transform=transforms.ToTensor(), download=False)
- test_dataset = dsets.MNIST(root='./torch_test/data', train=False, transform=transforms.ToTensor())
- train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
- test_loader = torch.utils.data.DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)
- model = LogisticRegression(input_dim, output_dim)
- criterion = nn.CrossEntropyLoss() # 计算 softmax 分布之上的交叉熵损失
- optimizer = optim.SGD(model.parameters(), lr=lr_rate)
- epochs = n_iters / (len(train_dataset) / batch_size)
- iter = 0
- for epoch in range(int(epochs)):
- for i, (images, labels) in enumerate(train_loader):
- images = Variable(images.view(-1, 28 * 28))
- labels = Variable(labels)
- optimizer.zero_grad()
- outputs = model(images)
- loss = criterion(outputs, labels)
- loss.backward()
- optimizer.step()
- iter+=1
- if iter%500==0:
- # 计算准确率
- correct = 0
- total = 0
- for images, labels in test_loader:
- images = Variable(images.view(-1, 28*28))
- outputs = model(images)
- _, predicted = torch.max(outputs.data, 1)
- total+= labels.size(0)
- # 如果用的是 GPU,则要把预测值和标签都取回 CPU,才能用 Python 来计算
- correct+= (predicted == labels).sum()
- accuracy = 100 * correct/total
- print("Iteration: {}. Loss: {}. Accuracy: {}.".format(iter, loss.item(), accuracy))
- # pyTorch + Skorch 有序逻辑回归
- def ordinal_regression():
- # 0. 数据准备
- X = np.array([
- [0.5, 0.1, -0.1],
- [1.0, 0.2, 0.6],
- [-2.0, 0.4, 0.8]
- ], dtype=np.float32)
- y = np.array([0, 1, 2]).reshape(-1, 1)
- num_features = X.shape[1]
- num_classes = len(np.unique(y))
- # 1. 有序逻辑回归
- predictor = nn.Sequential( # 预测器
- nn.Linear(num_features, num_features),
- nn.ReLU(),
- nn.Linear(num_features, 1)
- )
- # model = OrdinalLogisticModel(predictor, num_classes)
- # y_pred = model(torch.as_tensor(X))
- # print(y_pred)
- # tensor([[0.2325, 0.2191,
- # [0.2324, 0.2191, 0.5485],
- # [0.2607, 0.2287, 0.5106]], grad_fn=<CatBackward>)
- # 2. Skorch 训练模型
- skorch_model = NeuralNet(
- module=OrdinalLogisticModel,
- module__predictor=predictor,
- module__num_classes=num_classes,
- optimizer=torch.optim.Adam, # Adam收敛速度快,非SGD优化算法
- criterion=CumulativeLinkLoss, # 与OrdinalLogisticModel匹配的累计链接损失函数,常用交叉熵 torch.nn.CrossEntropyLoss 此处不适用
- train_split=None,
- max_epochs= 5000, # 训练次数(epoch=全部样本训练一次,iteration=取batchsize样本训练一次,SGD随机梯度下降优化算法才分iteration)
- callbacks=[
- ('ascension', AscensionCallback())
- ],
- )
- skorch_model.fit(X, y) # 训练
- # y_proba = skorch_model.predict_proba(X) # 预测
- # 3. Matplotlib 可视化
- train_loss = skorch_model.history[:, 'train_loss']
- plt.plot(train_loss, label='Train Loss')
- plt.xlabel('Epoch')
- plt.ylabel('Loss')
- plt.legend()
- plt.show()
- import pandas as pd
- from sklearn.model_selection import train_test_split
- from sklearn.linear_model import LogisticRegression
- def test():
- data = pd.read_stata('./torch_test/data/CGSS/CGSS2021_20240607.dta', convert_categoricals=False)
- data.to_csv('./torch_test/data/CGSS/CGSS2021_20240607.csv', encoding='UTF-8')
- feature_cols = ['id', 'provinces', 'type', 'A00','A1','A12_2','A12a']
- X = data[feature_cols] # Features
- y = data.label # Target variable
- X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=16)
- logreg = LogisticRegression(random_state=16)
- # fit the model with data
- logreg.fit(X_train, y_train)
- y_pred = logreg.predict(X_test)
- print(y_pred)
- # 基于 sklearn 的有序逻辑回归
|