Compare commits
2 Commits
beforeRunt
...
21.395
| Author | SHA1 | Date | |
|---|---|---|---|
| d7b7da6fc5 | |||
| 15cfbe315c |
1
image-inpainting/.gitignore
vendored
1
image-inpainting/.gitignore
vendored
@@ -2,4 +2,3 @@ data/*
|
|||||||
*.zip
|
*.zip
|
||||||
*.jpg
|
*.jpg
|
||||||
*.pt
|
*.pt
|
||||||
__pycache__/
|
|
||||||
@@ -1,16 +0,0 @@
|
|||||||
{
|
|
||||||
"learningrate": 0.0003,
|
|
||||||
"weight_decay": 1e-05,
|
|
||||||
"n_updates": 150000,
|
|
||||||
"plot_at": 400,
|
|
||||||
"early_stopping_patience": 40,
|
|
||||||
"print_stats_at": 200,
|
|
||||||
"print_train_stats_at": 50,
|
|
||||||
"validate_at": 200,
|
|
||||||
"accumulation_steps": 1,
|
|
||||||
"commands": {
|
|
||||||
"save_checkpoint": false,
|
|
||||||
"run_test_validation": false,
|
|
||||||
"generate_predictions": false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
BIN
image-inpainting/src/__pycache__/architecture.cpython-313.pyc
Normal file
BIN
image-inpainting/src/__pycache__/architecture.cpython-313.pyc
Normal file
Binary file not shown.
BIN
image-inpainting/src/__pycache__/architecture.cpython-314.pyc
Normal file
BIN
image-inpainting/src/__pycache__/architecture.cpython-314.pyc
Normal file
Binary file not shown.
BIN
image-inpainting/src/__pycache__/datasets.cpython-313.pyc
Normal file
BIN
image-inpainting/src/__pycache__/datasets.cpython-313.pyc
Normal file
Binary file not shown.
BIN
image-inpainting/src/__pycache__/datasets.cpython-314.pyc
Normal file
BIN
image-inpainting/src/__pycache__/datasets.cpython-314.pyc
Normal file
Binary file not shown.
BIN
image-inpainting/src/__pycache__/train.cpython-313.pyc
Normal file
BIN
image-inpainting/src/__pycache__/train.cpython-313.pyc
Normal file
Binary file not shown.
BIN
image-inpainting/src/__pycache__/train.cpython-314.pyc
Normal file
BIN
image-inpainting/src/__pycache__/train.cpython-314.pyc
Normal file
Binary file not shown.
BIN
image-inpainting/src/__pycache__/utils.cpython-313.pyc
Normal file
BIN
image-inpainting/src/__pycache__/utils.cpython-313.pyc
Normal file
Binary file not shown.
BIN
image-inpainting/src/__pycache__/utils.cpython-314.pyc
Normal file
BIN
image-inpainting/src/__pycache__/utils.cpython-314.pyc
Normal file
Binary file not shown.
@@ -20,46 +20,28 @@ def init_weights(m):
|
|||||||
nn.init.constant_(m.bias, 0)
|
nn.init.constant_(m.bias, 0)
|
||||||
|
|
||||||
|
|
||||||
class GatedSkipConnection(nn.Module):
|
class ChannelAttention(nn.Module):
|
||||||
"""Gated skip connection for better feature fusion"""
|
"""Channel attention module (squeeze-and-excitation style)"""
|
||||||
def __init__(self, up_channels, skip_channels):
|
def __init__(self, channels, reduction=16):
|
||||||
super().__init__()
|
|
||||||
self.gate = nn.Sequential(
|
|
||||||
nn.Conv2d(up_channels + skip_channels, up_channels, 1),
|
|
||||||
nn.Sigmoid()
|
|
||||||
)
|
|
||||||
# Project skip to match up_channels if they differ
|
|
||||||
if skip_channels != up_channels:
|
|
||||||
self.skip_proj = nn.Conv2d(skip_channels, up_channels, 1)
|
|
||||||
else:
|
|
||||||
self.skip_proj = nn.Identity()
|
|
||||||
|
|
||||||
def forward(self, x, skip):
|
|
||||||
skip_proj = self.skip_proj(skip)
|
|
||||||
combined = torch.cat([x, skip], dim=1)
|
|
||||||
gate = self.gate(combined)
|
|
||||||
return x * gate + skip_proj * (1 - gate)
|
|
||||||
|
|
||||||
|
|
||||||
class EfficientChannelAttention(nn.Module):
|
|
||||||
"""Efficient channel attention without dimensionality reduction"""
|
|
||||||
def __init__(self, channels):
|
|
||||||
super().__init__()
|
super().__init__()
|
||||||
self.avg_pool = nn.AdaptiveAvgPool2d(1)
|
self.avg_pool = nn.AdaptiveAvgPool2d(1)
|
||||||
self.conv = nn.Conv1d(1, 1, kernel_size=3, padding=1, bias=False)
|
self.max_pool = nn.AdaptiveMaxPool2d(1)
|
||||||
|
reduced = max(channels // reduction, 8)
|
||||||
|
self.fc = nn.Sequential(
|
||||||
|
nn.Conv2d(channels, reduced, 1, bias=False),
|
||||||
|
nn.ReLU(inplace=True),
|
||||||
|
nn.Conv2d(reduced, channels, 1, bias=False)
|
||||||
|
)
|
||||||
self.sigmoid = nn.Sigmoid()
|
self.sigmoid = nn.Sigmoid()
|
||||||
|
|
||||||
def forward(self, x):
|
def forward(self, x):
|
||||||
# Global pooling
|
avg_out = self.fc(self.avg_pool(x))
|
||||||
y = self.avg_pool(x)
|
max_out = self.fc(self.max_pool(x))
|
||||||
# 1D convolution on channel dimension
|
return x * self.sigmoid(avg_out + max_out)
|
||||||
y = self.conv(y.squeeze(-1).transpose(-1, -2)).transpose(-1, -2).unsqueeze(-1)
|
|
||||||
y = self.sigmoid(y)
|
|
||||||
return x * y.expand_as(x)
|
|
||||||
|
|
||||||
|
|
||||||
class SpatialAttention(nn.Module):
|
class SpatialAttention(nn.Module):
|
||||||
"""Efficient spatial attention module"""
|
"""Spatial attention module"""
|
||||||
def __init__(self, kernel_size=7):
|
def __init__(self, kernel_size=7):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
self.conv = nn.Conv2d(2, 1, kernel_size, padding=kernel_size // 2, bias=False)
|
self.conv = nn.Conv2d(2, 1, kernel_size, padding=kernel_size // 2, bias=False)
|
||||||
@@ -73,12 +55,12 @@ class SpatialAttention(nn.Module):
|
|||||||
return x * attn
|
return x * attn
|
||||||
|
|
||||||
|
|
||||||
class EfficientAttention(nn.Module):
|
class CBAM(nn.Module):
|
||||||
"""Lightweight attention module combining channel and spatial"""
|
"""Convolutional Block Attention Module"""
|
||||||
def __init__(self, channels):
|
def __init__(self, channels, reduction=16):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
self.channel_attn = EfficientChannelAttention(channels)
|
self.channel_attn = ChannelAttention(channels, reduction)
|
||||||
self.spatial_attn = SpatialAttention(kernel_size=5)
|
self.spatial_attn = SpatialAttention()
|
||||||
|
|
||||||
def forward(self, x):
|
def forward(self, x):
|
||||||
x = self.channel_attn(x)
|
x = self.channel_attn(x)
|
||||||
@@ -86,222 +68,198 @@ class EfficientAttention(nn.Module):
|
|||||||
return x
|
return x
|
||||||
|
|
||||||
|
|
||||||
|
class MultiScaleFeatureExtraction(nn.Module):
|
||||||
|
"""Multi-scale feature extraction using dilated convolutions"""
|
||||||
|
def __init__(self, channels):
|
||||||
|
super().__init__()
|
||||||
|
self.branch1 = nn.Sequential(
|
||||||
|
nn.Conv2d(channels, channels // 4, 1),
|
||||||
|
nn.BatchNorm2d(channels // 4),
|
||||||
|
nn.LeakyReLU(0.1, inplace=True)
|
||||||
|
)
|
||||||
|
self.branch2 = nn.Sequential(
|
||||||
|
nn.Conv2d(channels, channels // 4, 3, padding=2, dilation=2),
|
||||||
|
nn.BatchNorm2d(channels // 4),
|
||||||
|
nn.LeakyReLU(0.1, inplace=True)
|
||||||
|
)
|
||||||
|
self.branch3 = nn.Sequential(
|
||||||
|
nn.Conv2d(channels, channels // 4, 3, padding=4, dilation=4),
|
||||||
|
nn.BatchNorm2d(channels // 4),
|
||||||
|
nn.LeakyReLU(0.1, inplace=True)
|
||||||
|
)
|
||||||
|
self.branch4 = nn.Sequential(
|
||||||
|
nn.Conv2d(channels, channels // 4, 3, padding=8, dilation=8),
|
||||||
|
nn.BatchNorm2d(channels // 4),
|
||||||
|
nn.LeakyReLU(0.1, inplace=True)
|
||||||
|
)
|
||||||
|
self.fusion = nn.Sequential(
|
||||||
|
nn.Conv2d(channels, channels, 1),
|
||||||
|
nn.BatchNorm2d(channels),
|
||||||
|
nn.LeakyReLU(0.1, inplace=True)
|
||||||
|
)
|
||||||
|
|
||||||
|
def forward(self, x):
|
||||||
|
b1 = self.branch1(x)
|
||||||
|
b2 = self.branch2(x)
|
||||||
|
b3 = self.branch3(x)
|
||||||
|
b4 = self.branch4(x)
|
||||||
|
out = torch.cat([b1, b2, b3, b4], dim=1)
|
||||||
|
out = self.fusion(out)
|
||||||
|
return out + x # Residual connection
|
||||||
|
|
||||||
|
|
||||||
class ConvBlock(nn.Module):
|
class ConvBlock(nn.Module):
|
||||||
"""Convolutional block with Conv2d -> BatchNorm -> LeakyReLU"""
|
"""Convolutional block with Conv2d -> BatchNorm -> LeakyReLU"""
|
||||||
def __init__(self, in_channels, out_channels, kernel_size=3, padding=1, dilation=1, dropout=0.0, separable=False):
|
def __init__(self, in_channels, out_channels, kernel_size=3, padding=1, dropout=0.0):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
if separable and in_channels > 1:
|
self.conv = nn.Conv2d(in_channels, out_channels, kernel_size, padding=padding)
|
||||||
# Depthwise separable convolution for efficiency
|
|
||||||
self.conv = nn.Sequential(
|
|
||||||
nn.Conv2d(in_channels, in_channels, kernel_size, padding=padding, dilation=dilation, groups=in_channels),
|
|
||||||
nn.Conv2d(in_channels, out_channels, 1)
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
self.conv = nn.Conv2d(in_channels, out_channels, kernel_size, padding=padding, dilation=dilation)
|
|
||||||
self.bn = nn.BatchNorm2d(out_channels)
|
self.bn = nn.BatchNorm2d(out_channels)
|
||||||
self.relu = nn.LeakyReLU(0.2, inplace=True)
|
self.relu = nn.LeakyReLU(0.1, inplace=True)
|
||||||
self.dropout = nn.Dropout2d(dropout) if dropout > 0 else nn.Identity()
|
self.dropout = nn.Dropout2d(dropout) if dropout > 0 else nn.Identity()
|
||||||
|
|
||||||
def forward(self, x):
|
def forward(self, x):
|
||||||
return self.dropout(self.relu(self.bn(self.conv(x))))
|
return self.dropout(self.relu(self.bn(self.conv(x))))
|
||||||
|
|
||||||
|
|
||||||
class DenseBlock(nn.Module):
|
|
||||||
"""Lightweight dense block for better gradient flow"""
|
|
||||||
def __init__(self, channels, growth_rate=8, num_layers=2, dropout=0.0):
|
|
||||||
super().__init__()
|
|
||||||
self.layers = nn.ModuleList()
|
|
||||||
for i in range(num_layers):
|
|
||||||
self.layers.append(ConvBlock(channels + i * growth_rate, growth_rate, dropout=dropout))
|
|
||||||
self.fusion = nn.Conv2d(channels + num_layers * growth_rate, channels, 1)
|
|
||||||
self.bn = nn.BatchNorm2d(channels)
|
|
||||||
self.relu = nn.LeakyReLU(0.2, inplace=True)
|
|
||||||
|
|
||||||
def forward(self, x):
|
|
||||||
features = [x]
|
|
||||||
for layer in self.layers:
|
|
||||||
out = layer(torch.cat(features, dim=1))
|
|
||||||
features.append(out)
|
|
||||||
out = self.fusion(torch.cat(features, dim=1))
|
|
||||||
out = self.relu(self.bn(out))
|
|
||||||
return out + x # Residual connection
|
|
||||||
|
|
||||||
class ResidualConvBlock(nn.Module):
|
class ResidualConvBlock(nn.Module):
|
||||||
"""Improved residual convolutional block with pre-activation"""
|
"""Residual convolutional block for better gradient flow"""
|
||||||
def __init__(self, channels, dropout=0.0):
|
def __init__(self, channels, dropout=0.0):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
self.bn1 = nn.BatchNorm2d(channels)
|
|
||||||
self.relu1 = nn.LeakyReLU(0.2, inplace=True)
|
|
||||||
self.conv1 = nn.Conv2d(channels, channels, 3, padding=1)
|
self.conv1 = nn.Conv2d(channels, channels, 3, padding=1)
|
||||||
self.bn2 = nn.BatchNorm2d(channels)
|
self.bn1 = nn.BatchNorm2d(channels)
|
||||||
self.relu2 = nn.LeakyReLU(0.2, inplace=True)
|
|
||||||
self.conv2 = nn.Conv2d(channels, channels, 3, padding=1)
|
self.conv2 = nn.Conv2d(channels, channels, 3, padding=1)
|
||||||
|
self.bn2 = nn.BatchNorm2d(channels)
|
||||||
|
self.relu = nn.LeakyReLU(0.1, inplace=True)
|
||||||
self.dropout = nn.Dropout2d(dropout) if dropout > 0 else nn.Identity()
|
self.dropout = nn.Dropout2d(dropout) if dropout > 0 else nn.Identity()
|
||||||
|
|
||||||
def forward(self, x):
|
def forward(self, x):
|
||||||
residual = x
|
residual = x
|
||||||
out = self.relu1(self.bn1(x))
|
out = self.relu(self.bn1(self.conv1(x)))
|
||||||
out = self.conv1(out)
|
|
||||||
out = self.relu2(self.bn2(out))
|
|
||||||
out = self.dropout(out)
|
out = self.dropout(out)
|
||||||
out = self.conv2(out)
|
out = self.bn2(self.conv2(out))
|
||||||
return out + residual
|
out = out + residual
|
||||||
|
return self.relu(out)
|
||||||
|
|
||||||
|
|
||||||
class DownBlock(nn.Module):
|
class DownBlock(nn.Module):
|
||||||
"""Enhanced downsampling block with dense and residual connections"""
|
"""Downsampling block with conv blocks, residual connection, attention, and max pooling"""
|
||||||
def __init__(self, in_channels, out_channels, dropout=0.1, use_attention=True, use_dense=False):
|
def __init__(self, in_channels, out_channels, dropout=0.1):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
self.conv1 = ConvBlock(in_channels, out_channels, dropout=dropout, separable=True)
|
self.conv1 = ConvBlock(in_channels, out_channels, dropout=dropout)
|
||||||
self.conv2 = ConvBlock(out_channels, out_channels, dropout=dropout)
|
self.conv2 = ConvBlock(out_channels, out_channels, dropout=dropout)
|
||||||
if use_dense:
|
self.residual = ResidualConvBlock(out_channels, dropout=dropout)
|
||||||
self.dense = DenseBlock(out_channels, growth_rate=8, num_layers=2, dropout=dropout)
|
self.attention = CBAM(out_channels)
|
||||||
else:
|
|
||||||
self.dense = ResidualConvBlock(out_channels, dropout=dropout)
|
|
||||||
self.attention = EfficientAttention(out_channels) if use_attention else nn.Identity()
|
|
||||||
self.pool = nn.MaxPool2d(2)
|
self.pool = nn.MaxPool2d(2)
|
||||||
|
|
||||||
def forward(self, x):
|
def forward(self, x):
|
||||||
x = self.conv1(x)
|
x = self.conv1(x)
|
||||||
x = self.conv2(x)
|
x = self.conv2(x)
|
||||||
x = self.dense(x)
|
x = self.residual(x)
|
||||||
skip = self.attention(x)
|
skip = self.attention(x)
|
||||||
return self.pool(skip), skip
|
return self.pool(skip), skip
|
||||||
|
|
||||||
class UpBlock(nn.Module):
|
class UpBlock(nn.Module):
|
||||||
"""Enhanced upsampling block with gated skip connections"""
|
"""Upsampling block with transposed conv, residual connection, attention, and conv blocks"""
|
||||||
def __init__(self, in_channels, out_channels, dropout=0.1, use_attention=True, use_dense=False):
|
def __init__(self, in_channels, out_channels, dropout=0.1):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
self.up = nn.ConvTranspose2d(in_channels, out_channels, kernel_size=2, stride=2)
|
self.up = nn.ConvTranspose2d(in_channels, out_channels, kernel_size=2, stride=2)
|
||||||
# Skip connection has in_channels, upsampled has out_channels
|
# After concat: out_channels (from upconv) + in_channels (from skip)
|
||||||
self.gated_skip = GatedSkipConnection(out_channels, in_channels)
|
self.conv1 = ConvBlock(out_channels + in_channels, out_channels, dropout=dropout)
|
||||||
# After gated skip: out_channels
|
|
||||||
self.conv1 = ConvBlock(out_channels, out_channels, dropout=dropout, separable=True)
|
|
||||||
self.conv2 = ConvBlock(out_channels, out_channels, dropout=dropout)
|
self.conv2 = ConvBlock(out_channels, out_channels, dropout=dropout)
|
||||||
if use_dense:
|
self.residual = ResidualConvBlock(out_channels, dropout=dropout)
|
||||||
self.dense = DenseBlock(out_channels, growth_rate=8, num_layers=2, dropout=dropout)
|
self.attention = CBAM(out_channels)
|
||||||
else:
|
|
||||||
self.dense = ResidualConvBlock(out_channels, dropout=dropout)
|
|
||||||
self.attention = EfficientAttention(out_channels) if use_attention else nn.Identity()
|
|
||||||
|
|
||||||
def forward(self, x, skip):
|
def forward(self, x, skip):
|
||||||
x = self.up(x)
|
x = self.up(x)
|
||||||
# Handle dimension mismatch
|
# Handle dimension mismatch by interpolating x to match skip's size
|
||||||
if x.shape[2:] != skip.shape[2:]:
|
if x.shape[2:] != skip.shape[2:]:
|
||||||
x = F.interpolate(x, size=skip.shape[2:], mode='bilinear', align_corners=False)
|
x = F.interpolate(x, size=skip.shape[2:], mode='bilinear', align_corners=False)
|
||||||
x = self.gated_skip(x, skip)
|
x = torch.cat([x, skip], dim=1)
|
||||||
x = self.conv1(x)
|
x = self.conv1(x)
|
||||||
x = self.conv2(x)
|
x = self.conv2(x)
|
||||||
x = self.dense(x)
|
x = self.residual(x)
|
||||||
x = self.attention(x)
|
x = self.attention(x)
|
||||||
return x
|
return x
|
||||||
|
|
||||||
class MyModel(nn.Module):
|
class MyModel(nn.Module):
|
||||||
"""Enhanced U-Net architecture with dense connections and efficient attention"""
|
"""Improved U-Net style architecture for image inpainting with attention and residual connections"""
|
||||||
def __init__(self, n_in_channels: int, base_channels: int = 64, dropout: float = 0.1):
|
def __init__(self, n_in_channels: int, base_channels: int = 64, dropout: float = 0.1):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
|
|
||||||
# Separate mask processing for better feature extraction
|
# Initial convolution with larger receptive field
|
||||||
self.mask_conv = nn.Sequential(
|
self.init_conv = nn.Sequential(
|
||||||
nn.Conv2d(1, base_channels // 4, 3, padding=1),
|
ConvBlock(n_in_channels, base_channels, kernel_size=7, padding=3),
|
||||||
nn.LeakyReLU(0.2, inplace=True),
|
|
||||||
nn.Conv2d(base_channels // 4, base_channels // 4, 3, padding=1),
|
|
||||||
nn.LeakyReLU(0.2, inplace=True)
|
|
||||||
)
|
|
||||||
|
|
||||||
# Image processing path
|
|
||||||
self.image_conv = nn.Sequential(
|
|
||||||
ConvBlock(3, base_channels, kernel_size=5, padding=2),
|
|
||||||
ConvBlock(base_channels, base_channels)
|
|
||||||
)
|
|
||||||
|
|
||||||
# Fusion of mask and image features
|
|
||||||
self.fusion = nn.Sequential(
|
|
||||||
nn.Conv2d(base_channels + base_channels // 4, base_channels, 1),
|
|
||||||
nn.BatchNorm2d(base_channels),
|
|
||||||
nn.LeakyReLU(0.2, inplace=True)
|
|
||||||
)
|
|
||||||
|
|
||||||
# Encoder with progressive feature extraction
|
|
||||||
self.down1 = DownBlock(base_channels, base_channels * 2, dropout=dropout*0.5, use_attention=False, use_dense=False)
|
|
||||||
self.down2 = DownBlock(base_channels * 2, base_channels * 4, dropout=dropout*0.7, use_attention=True, use_dense=True)
|
|
||||||
self.down3 = DownBlock(base_channels * 4, base_channels * 8, dropout=dropout, use_attention=True, use_dense=True)
|
|
||||||
|
|
||||||
# Enhanced bottleneck with multi-scale features and dense connections
|
|
||||||
self.bottleneck = nn.Sequential(
|
|
||||||
ConvBlock(base_channels * 8, base_channels * 8, dropout=dropout),
|
|
||||||
DenseBlock(base_channels * 8, growth_rate=10, num_layers=3, dropout=dropout),
|
|
||||||
ConvBlock(base_channels * 8, base_channels * 8, dilation=2, padding=2, dropout=dropout),
|
|
||||||
ResidualConvBlock(base_channels * 8, dropout=dropout),
|
|
||||||
EfficientAttention(base_channels * 8)
|
|
||||||
)
|
|
||||||
|
|
||||||
# Decoder with progressive reconstruction
|
|
||||||
self.up1 = UpBlock(base_channels * 8, base_channels * 4, dropout=dropout, use_attention=True, use_dense=True)
|
|
||||||
self.up2 = UpBlock(base_channels * 4, base_channels * 2, dropout=dropout*0.7, use_attention=True, use_dense=True)
|
|
||||||
self.up3 = UpBlock(base_channels * 2, base_channels, dropout=dropout*0.5, use_attention=False, use_dense=False)
|
|
||||||
|
|
||||||
# Multi-scale feature fusion with dense connections
|
|
||||||
self.multiscale_fusion = nn.Sequential(
|
|
||||||
ConvBlock(base_channels * 2, base_channels),
|
|
||||||
DenseBlock(base_channels, growth_rate=8, num_layers=2, dropout=dropout//2),
|
|
||||||
ConvBlock(base_channels, base_channels)
|
|
||||||
)
|
|
||||||
|
|
||||||
# Output with residual connection to input
|
|
||||||
self.pre_output = nn.Sequential(
|
|
||||||
ConvBlock(base_channels, base_channels),
|
ConvBlock(base_channels, base_channels),
|
||||||
ConvBlock(base_channels, base_channels // 2)
|
ResidualConvBlock(base_channels)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Encoder (downsampling path)
|
||||||
|
self.down1 = DownBlock(base_channels, base_channels * 2, dropout=dropout)
|
||||||
|
self.down2 = DownBlock(base_channels * 2, base_channels * 4, dropout=dropout)
|
||||||
|
self.down3 = DownBlock(base_channels * 4, base_channels * 8, dropout=dropout)
|
||||||
|
self.down4 = DownBlock(base_channels * 8, base_channels * 16, dropout=dropout)
|
||||||
|
|
||||||
|
# Bottleneck with multiple residual blocks and multi-scale features
|
||||||
|
self.bottleneck = nn.Sequential(
|
||||||
|
ConvBlock(base_channels * 16, base_channels * 16, dropout=dropout),
|
||||||
|
ResidualConvBlock(base_channels * 16, dropout=dropout),
|
||||||
|
MultiScaleFeatureExtraction(base_channels * 16),
|
||||||
|
ResidualConvBlock(base_channels * 16, dropout=dropout),
|
||||||
|
ResidualConvBlock(base_channels * 16, dropout=dropout),
|
||||||
|
CBAM(base_channels * 16)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Decoder (upsampling path)
|
||||||
|
self.up1 = UpBlock(base_channels * 16, base_channels * 8, dropout=dropout)
|
||||||
|
self.up2 = UpBlock(base_channels * 8, base_channels * 4, dropout=dropout)
|
||||||
|
self.up3 = UpBlock(base_channels * 4, base_channels * 2, dropout=dropout)
|
||||||
|
self.up4 = UpBlock(base_channels * 2, base_channels, dropout=dropout)
|
||||||
|
|
||||||
|
# Final refinement layers
|
||||||
|
self.final_conv = nn.Sequential(
|
||||||
|
ConvBlock(base_channels * 2, base_channels),
|
||||||
|
ResidualConvBlock(base_channels),
|
||||||
|
ConvBlock(base_channels, base_channels)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Output layer with smooth transition
|
||||||
self.output = nn.Sequential(
|
self.output = nn.Sequential(
|
||||||
nn.Conv2d(base_channels // 2 + 3, base_channels // 2, 3, padding=1),
|
nn.Conv2d(base_channels, base_channels // 2, kernel_size=3, padding=1),
|
||||||
nn.LeakyReLU(0.2, inplace=True),
|
nn.LeakyReLU(0.1, inplace=True),
|
||||||
nn.Conv2d(base_channels // 2, 3, 1),
|
nn.Conv2d(base_channels // 2, 3, kernel_size=1),
|
||||||
nn.Sigmoid()
|
nn.Sigmoid() # Ensure output is in [0, 1] range
|
||||||
)
|
)
|
||||||
|
|
||||||
# Apply weight initialization
|
# Apply weight initialization
|
||||||
self.apply(init_weights)
|
self.apply(init_weights)
|
||||||
|
|
||||||
def forward(self, x):
|
def forward(self, x):
|
||||||
# Split input into image and mask
|
# Initial convolution
|
||||||
image = x[:, :3, :, :]
|
x0 = self.init_conv(x)
|
||||||
mask = x[:, 3:4, :, :]
|
|
||||||
|
|
||||||
# Process mask and image separately
|
|
||||||
mask_features = self.mask_conv(mask)
|
|
||||||
image_features = self.image_conv(image)
|
|
||||||
|
|
||||||
# Fuse features
|
|
||||||
x0 = self.fusion(torch.cat([image_features, mask_features], dim=1))
|
|
||||||
|
|
||||||
# Encoder
|
# Encoder
|
||||||
x1, skip1 = self.down1(x0)
|
x1, skip1 = self.down1(x0)
|
||||||
x2, skip2 = self.down2(x1)
|
x2, skip2 = self.down2(x1)
|
||||||
x3, skip3 = self.down3(x2)
|
x3, skip3 = self.down3(x2)
|
||||||
|
x4, skip4 = self.down4(x3)
|
||||||
|
|
||||||
# Bottleneck
|
# Bottleneck
|
||||||
x = self.bottleneck(x3)
|
x = self.bottleneck(x4)
|
||||||
|
|
||||||
# Decoder with skip connections
|
# Decoder with skip connections
|
||||||
x = self.up1(x, skip3)
|
x = self.up1(x, skip4)
|
||||||
x = self.up2(x, skip2)
|
x = self.up2(x, skip3)
|
||||||
x = self.up3(x, skip1)
|
x = self.up3(x, skip2)
|
||||||
|
x = self.up4(x, skip1)
|
||||||
|
|
||||||
# Handle dimension mismatch for final fusion
|
# Handle dimension mismatch for final concatenation
|
||||||
if x.shape[2:] != x0.shape[2:]:
|
if x.shape[2:] != x0.shape[2:]:
|
||||||
x = F.interpolate(x, size=x0.shape[2:], mode='bilinear', align_corners=False)
|
x = F.interpolate(x, size=x0.shape[2:], mode='bilinear', align_corners=False)
|
||||||
|
|
||||||
# Multi-scale fusion with initial features
|
# Concatenate with initial features for better detail preservation
|
||||||
x = torch.cat([x, x0], dim=1)
|
x = torch.cat([x, x0], dim=1)
|
||||||
x = self.multiscale_fusion(x)
|
x = self.final_conv(x)
|
||||||
|
|
||||||
# Pre-output processing
|
# Output
|
||||||
x = self.pre_output(x)
|
|
||||||
|
|
||||||
# Concatenate with original masked image for residual learning
|
|
||||||
x = torch.cat([x, image], dim=1)
|
|
||||||
x = self.output(x)
|
x = self.output(x)
|
||||||
|
|
||||||
return x
|
return x
|
||||||
@@ -10,11 +10,50 @@ import numpy as np
|
|||||||
import random
|
import random
|
||||||
import glob
|
import glob
|
||||||
import os
|
import os
|
||||||
from PIL import Image, ImageEnhance
|
from PIL import Image, ImageEnhance, ImageFilter
|
||||||
|
|
||||||
IMAGE_DIMENSION = 100
|
IMAGE_DIMENSION = 100
|
||||||
|
|
||||||
|
|
||||||
|
class DataAugmentation:
|
||||||
|
"""Data augmentation pipeline for improved generalization"""
|
||||||
|
|
||||||
|
def __init__(self, p=0.5):
|
||||||
|
self.p = p
|
||||||
|
|
||||||
|
def __call__(self, image: Image.Image) -> Image.Image:
|
||||||
|
# Random horizontal flip
|
||||||
|
if random.random() < self.p:
|
||||||
|
image = image.transpose(Image.FLIP_LEFT_RIGHT)
|
||||||
|
|
||||||
|
# Random vertical flip
|
||||||
|
if random.random() < self.p * 0.5:
|
||||||
|
image = image.transpose(Image.FLIP_TOP_BOTTOM)
|
||||||
|
|
||||||
|
# Random rotation (90 degree increments)
|
||||||
|
if random.random() < self.p * 0.3:
|
||||||
|
angle = random.choice([90, 180, 270])
|
||||||
|
image = image.rotate(angle)
|
||||||
|
|
||||||
|
# Color jittering
|
||||||
|
if random.random() < self.p * 0.4:
|
||||||
|
# Brightness
|
||||||
|
enhancer = ImageEnhance.Brightness(image)
|
||||||
|
image = enhancer.enhance(random.uniform(0.85, 1.15))
|
||||||
|
|
||||||
|
if random.random() < self.p * 0.4:
|
||||||
|
# Contrast
|
||||||
|
enhancer = ImageEnhance.Contrast(image)
|
||||||
|
image = enhancer.enhance(random.uniform(0.85, 1.15))
|
||||||
|
|
||||||
|
if random.random() < self.p * 0.3:
|
||||||
|
# Saturation
|
||||||
|
enhancer = ImageEnhance.Color(image)
|
||||||
|
image = enhancer.enhance(random.uniform(0.85, 1.15))
|
||||||
|
|
||||||
|
return image
|
||||||
|
|
||||||
|
|
||||||
def create_arrays_from_image(image_array: np.ndarray, offset: tuple, spacing: tuple) -> tuple[np.ndarray, np.ndarray]:
|
def create_arrays_from_image(image_array: np.ndarray, offset: tuple, spacing: tuple) -> tuple[np.ndarray, np.ndarray]:
|
||||||
image_array = np.transpose(image_array, (2, 0, 1))
|
image_array = np.transpose(image_array, (2, 0, 1))
|
||||||
known_array = np.zeros_like(image_array)
|
known_array = np.zeros_like(image_array)
|
||||||
@@ -32,77 +71,48 @@ def resize(img: Image):
|
|||||||
transforms.CenterCrop((IMAGE_DIMENSION, IMAGE_DIMENSION))
|
transforms.CenterCrop((IMAGE_DIMENSION, IMAGE_DIMENSION))
|
||||||
])
|
])
|
||||||
return resize_transforms(img)
|
return resize_transforms(img)
|
||||||
|
|
||||||
def preprocess(input_array: np.ndarray):
|
def preprocess(input_array: np.ndarray):
|
||||||
input_array = np.asarray(input_array, dtype=np.float32) / 255.0
|
input_array = np.asarray(input_array, dtype=np.float32) / 255.0
|
||||||
return input_array
|
return input_array
|
||||||
|
|
||||||
def augment_image(img: Image, strength: float = 0.7) -> Image:
|
|
||||||
"""Apply comprehensive data augmentation for better generalization"""
|
|
||||||
# Random horizontal flip
|
|
||||||
if random.random() > 0.5:
|
|
||||||
img = img.transpose(Image.FLIP_LEFT_RIGHT)
|
|
||||||
|
|
||||||
# Random vertical flip
|
|
||||||
if random.random() > 0.5:
|
|
||||||
img = img.transpose(Image.FLIP_TOP_BOTTOM)
|
|
||||||
|
|
||||||
# Random rotation (90, 180, 270 degrees)
|
|
||||||
if random.random() > 0.5:
|
|
||||||
angle = random.choice([90, 180, 270])
|
|
||||||
img = img.rotate(angle)
|
|
||||||
|
|
||||||
# Color augmentation - more aggressive for long training
|
|
||||||
rand = random.random()
|
|
||||||
if rand > 0.75:
|
|
||||||
# Brightness
|
|
||||||
factor = 1.0 + random.uniform(-0.2, 0.2) * strength
|
|
||||||
img = ImageEnhance.Brightness(img).enhance(factor)
|
|
||||||
elif rand > 0.5:
|
|
||||||
# Contrast
|
|
||||||
factor = 1.0 + random.uniform(-0.2, 0.2) * strength
|
|
||||||
img = ImageEnhance.Contrast(img).enhance(factor)
|
|
||||||
elif rand > 0.25:
|
|
||||||
# Saturation
|
|
||||||
factor = 1.0 + random.uniform(-0.15, 0.15) * strength
|
|
||||||
img = ImageEnhance.Color(img).enhance(factor)
|
|
||||||
|
|
||||||
return img
|
|
||||||
|
|
||||||
class ImageDataset(torch.utils.data.Dataset):
|
class ImageDataset(torch.utils.data.Dataset):
|
||||||
"""
|
"""
|
||||||
Dataset class for loading images from a folder with augmentation support
|
Dataset class for loading images from a folder with augmentation
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, datafolder: str, augment: bool = True, augment_strength: float = 0.7):
|
def __init__(self, datafolder: str, augment: bool = True):
|
||||||
self.imagefiles = sorted(glob.glob(os.path.join(datafolder,"**","*.jpg"),recursive=True))
|
self.imagefiles = sorted(glob.glob(os.path.join(datafolder, "**", "*.jpg"), recursive=True))
|
||||||
self.augment = augment
|
self.augment = augment
|
||||||
self.augment_strength = augment_strength
|
self.augmentation = DataAugmentation(p=0.5) if augment else None
|
||||||
|
|
||||||
def __len__(self):
|
def __len__(self):
|
||||||
return len(self.imagefiles)
|
return len(self.imagefiles)
|
||||||
|
|
||||||
def __getitem__(self, idx:int):
|
def __getitem__(self, idx: int):
|
||||||
index = int(idx)
|
index = int(idx)
|
||||||
|
|
||||||
image = Image.open(self.imagefiles[index])
|
image = Image.open(self.imagefiles[index]).convert('RGB')
|
||||||
|
|
||||||
|
# Apply augmentation before resize
|
||||||
|
if self.augment and self.augmentation is not None:
|
||||||
|
image = self.augmentation(image)
|
||||||
|
|
||||||
image = resize(image)
|
image = resize(image)
|
||||||
|
|
||||||
# Apply augmentation
|
|
||||||
if self.augment:
|
|
||||||
image = augment_image(image, self.augment_strength)
|
|
||||||
|
|
||||||
image = np.asarray(image)
|
image = np.asarray(image)
|
||||||
image = preprocess(image)
|
image = preprocess(image)
|
||||||
spacing_x = random.randint(2,6)
|
|
||||||
spacing_y = random.randint(2,6)
|
# More varied spacing for better generalization
|
||||||
offset_x = random.randint(0,8)
|
spacing_x = random.randint(2, 8)
|
||||||
offset_y = random.randint(0,8)
|
spacing_y = random.randint(2, 8)
|
||||||
|
offset_x = random.randint(0, min(spacing_x - 1, 8))
|
||||||
|
offset_y = random.randint(0, min(spacing_y - 1, 8))
|
||||||
spacing = (spacing_x, spacing_y)
|
spacing = (spacing_x, spacing_y)
|
||||||
offset = (offset_x, offset_y)
|
offset = (offset_x, offset_y)
|
||||||
|
|
||||||
input_array, known_array = create_arrays_from_image(image.copy(), offset, spacing)
|
input_array, known_array = create_arrays_from_image(image.copy(), offset, spacing)
|
||||||
target_image = torch.from_numpy(np.transpose(image, (2,0,1)))
|
target_image = torch.from_numpy(np.transpose(image, (2, 0, 1)))
|
||||||
input_array = torch.from_numpy(input_array)
|
input_array = torch.from_numpy(input_array)
|
||||||
known_array = torch.from_numpy(known_array)
|
known_array = torch.from_numpy(known_array)
|
||||||
input_array = torch.cat((input_array, known_array), dim=0)
|
input_array = torch.cat((input_array, known_array), dim=0)
|
||||||
|
|
||||||
return input_array, target_image
|
return input_array, target_image
|
||||||
@@ -24,22 +24,22 @@ if __name__ == '__main__':
|
|||||||
config_dict['results_path'] = os.path.join(project_root, "results")
|
config_dict['results_path'] = os.path.join(project_root, "results")
|
||||||
config_dict['data_path'] = os.path.join(project_root, "data", "dataset")
|
config_dict['data_path'] = os.path.join(project_root, "data", "dataset")
|
||||||
config_dict['device'] = None
|
config_dict['device'] = None
|
||||||
config_dict['learningrate'] = 3e-4 # More stable learning rate
|
config_dict['learningrate'] = 2e-4 # Slightly lower for stable training
|
||||||
config_dict['weight_decay'] = 1e-4 # Proper regularization
|
config_dict['weight_decay'] = 5e-5 # Reduced weight decay
|
||||||
config_dict['n_updates'] = 40000 # Extended training
|
config_dict['n_updates'] = 8000 # More updates for better convergence
|
||||||
config_dict['batchsize'] = 96 # Maximize batch size for better gradients
|
config_dict['batchsize'] = 8 # Smaller batch for better gradient estimates
|
||||||
config_dict['early_stopping_patience'] = 20 # More patience for convergence
|
config_dict['early_stopping_patience'] = 15 # More patience for complex model
|
||||||
config_dict['use_wandb'] = False
|
config_dict['use_wandb'] = False
|
||||||
|
|
||||||
config_dict['print_train_stats_at'] = 50
|
config_dict['print_train_stats_at'] = 10
|
||||||
config_dict['print_stats_at'] = 200
|
config_dict['print_stats_at'] = 100
|
||||||
config_dict['plot_at'] = 500
|
config_dict['plot_at'] = 300
|
||||||
config_dict['validate_at'] = 500 # Regular validation
|
config_dict['validate_at'] = 300 # Validate more frequently
|
||||||
|
|
||||||
network_config = {
|
network_config = {
|
||||||
'n_in_channels': 4,
|
'n_in_channels': 4,
|
||||||
'base_channels': 64,
|
'base_channels': 56, # Increased capacity for better feature learning
|
||||||
'dropout': 0.1 # Proper dropout for regularization
|
'dropout': 0.08 # Slightly less dropout with augmentation
|
||||||
}
|
}
|
||||||
|
|
||||||
config_dict['network_config'] = network_config
|
config_dict['network_config'] = network_config
|
||||||
|
|||||||
@@ -16,30 +16,91 @@ import os
|
|||||||
|
|
||||||
from torch.utils.data import DataLoader
|
from torch.utils.data import DataLoader
|
||||||
from torch.utils.data import Subset
|
from torch.utils.data import Subset
|
||||||
|
from torch.optim.lr_scheduler import CosineAnnealingWarmRestarts
|
||||||
|
|
||||||
import wandb
|
import wandb
|
||||||
|
|
||||||
|
|
||||||
class EnhancedRMSELoss(nn.Module):
|
def gaussian_kernel(window_size=11, sigma=1.5):
|
||||||
"""Enhanced RMSE loss with edge weighting for sharper predictions"""
|
"""Create a Gaussian kernel for SSIM computation"""
|
||||||
def __init__(self):
|
x = torch.arange(window_size).float() - window_size // 2
|
||||||
|
gauss = torch.exp(-x.pow(2) / (2 * sigma ** 2))
|
||||||
|
kernel = gauss / gauss.sum()
|
||||||
|
kernel_2d = kernel.unsqueeze(1) * kernel.unsqueeze(0)
|
||||||
|
return kernel_2d.unsqueeze(0).unsqueeze(0)
|
||||||
|
|
||||||
|
|
||||||
|
class SSIMLoss(nn.Module):
|
||||||
|
"""Structural Similarity Index Loss for perceptual quality"""
|
||||||
|
def __init__(self, window_size=11, sigma=1.5):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
|
self.window_size = window_size
|
||||||
|
kernel = gaussian_kernel(window_size, sigma)
|
||||||
|
self.register_buffer('kernel', kernel)
|
||||||
|
self.C1 = 0.01 ** 2
|
||||||
|
self.C2 = 0.03 ** 2
|
||||||
|
|
||||||
def forward(self, pred, target):
|
def forward(self, pred, target):
|
||||||
# Compute per-pixel squared error
|
# Apply to each channel
|
||||||
se = (pred - target) ** 2
|
channels = pred.shape[1]
|
||||||
|
kernel = self.kernel.repeat(channels, 1, 1, 1)
|
||||||
|
|
||||||
# Weight edges more heavily for sharper results
|
mu_pred = F.conv2d(pred, kernel, padding=self.window_size // 2, groups=channels)
|
||||||
edge_weight = 1.0 + 0.3 * torch.abs(target[:, :, 1:, :] - target[:, :, :-1, :]).mean(dim=1, keepdim=True)
|
mu_target = F.conv2d(target, kernel, padding=self.window_size // 2, groups=channels)
|
||||||
edge_weight = F.pad(edge_weight, (0, 0, 0, 1), value=1.0)
|
|
||||||
|
|
||||||
# Apply weighting
|
mu_pred_sq = mu_pred.pow(2)
|
||||||
weighted_se = se * edge_weight
|
mu_target_sq = mu_target.pow(2)
|
||||||
|
mu_pred_target = mu_pred * mu_target
|
||||||
|
|
||||||
# Compute RMSE
|
sigma_pred_sq = F.conv2d(pred * pred, kernel, padding=self.window_size // 2, groups=channels) - mu_pred_sq
|
||||||
mse = weighted_se.mean()
|
sigma_target_sq = F.conv2d(target * target, kernel, padding=self.window_size // 2, groups=channels) - mu_target_sq
|
||||||
rmse = torch.sqrt(mse + 1e-8)
|
sigma_pred_target = F.conv2d(pred * target, kernel, padding=self.window_size // 2, groups=channels) - mu_pred_target
|
||||||
return rmse
|
|
||||||
|
ssim = ((2 * mu_pred_target + self.C1) * (2 * sigma_pred_target + self.C2)) / \
|
||||||
|
((mu_pred_sq + mu_target_sq + self.C1) * (sigma_pred_sq + sigma_target_sq + self.C2))
|
||||||
|
|
||||||
|
return 1 - ssim.mean()
|
||||||
|
|
||||||
|
|
||||||
|
class CombinedLoss(nn.Module):
|
||||||
|
"""Combined loss: MSE + L1 + SSIM + Edge for comprehensive image reconstruction"""
|
||||||
|
def __init__(self, mse_weight=1.0, l1_weight=0.5, edge_weight=0.15, ssim_weight=0.3):
|
||||||
|
super().__init__()
|
||||||
|
self.mse_weight = mse_weight
|
||||||
|
self.l1_weight = l1_weight
|
||||||
|
self.edge_weight = edge_weight
|
||||||
|
self.ssim_weight = ssim_weight
|
||||||
|
self.mse = nn.MSELoss()
|
||||||
|
self.l1 = nn.L1Loss()
|
||||||
|
self.ssim = SSIMLoss(window_size=7)
|
||||||
|
|
||||||
|
# Sobel filters for edge detection
|
||||||
|
sobel_x = torch.tensor([[-1, 0, 1], [-2, 0, 2], [-1, 0, 1]], dtype=torch.float32).view(1, 1, 3, 3)
|
||||||
|
sobel_y = torch.tensor([[-1, -2, -1], [0, 0, 0], [1, 2, 1]], dtype=torch.float32).view(1, 1, 3, 3)
|
||||||
|
self.register_buffer('sobel_x', sobel_x.repeat(3, 1, 1, 1))
|
||||||
|
self.register_buffer('sobel_y', sobel_y.repeat(3, 1, 1, 1))
|
||||||
|
|
||||||
|
def edge_loss(self, pred, target):
|
||||||
|
"""Compute edge-aware loss using Sobel filters"""
|
||||||
|
pred_edge_x = F.conv2d(pred, self.sobel_x, padding=1, groups=3)
|
||||||
|
pred_edge_y = F.conv2d(pred, self.sobel_y, padding=1, groups=3)
|
||||||
|
target_edge_x = F.conv2d(target, self.sobel_x, padding=1, groups=3)
|
||||||
|
target_edge_y = F.conv2d(target, self.sobel_y, padding=1, groups=3)
|
||||||
|
|
||||||
|
edge_loss = self.l1(pred_edge_x, target_edge_x) + self.l1(pred_edge_y, target_edge_y)
|
||||||
|
return edge_loss
|
||||||
|
|
||||||
|
def forward(self, pred, target):
|
||||||
|
mse_loss = self.mse(pred, target)
|
||||||
|
l1_loss = self.l1(pred, target)
|
||||||
|
edge_loss = self.edge_loss(pred, target)
|
||||||
|
ssim_loss = self.ssim(pred, target)
|
||||||
|
|
||||||
|
total_loss = (self.mse_weight * mse_loss +
|
||||||
|
self.l1_weight * l1_loss +
|
||||||
|
self.edge_weight * edge_loss +
|
||||||
|
self.ssim_weight * ssim_loss)
|
||||||
|
return total_loss
|
||||||
|
|
||||||
|
|
||||||
def train(seed, testset_ratio, validset_ratio, data_path, results_path, early_stopping_patience, device, learningrate,
|
def train(seed, testset_ratio, validset_ratio, data_path, results_path, early_stopping_patience, device, learningrate,
|
||||||
@@ -55,10 +116,6 @@ def train(seed, testset_ratio, validset_ratio, data_path, results_path, early_st
|
|||||||
if isinstance(device, str):
|
if isinstance(device, str):
|
||||||
device = torch.device(device)
|
device = torch.device(device)
|
||||||
|
|
||||||
# Enable mixed precision training for memory efficiency
|
|
||||||
use_amp = torch.cuda.is_available()
|
|
||||||
scaler = torch.amp.GradScaler('cuda') if use_amp else None
|
|
||||||
|
|
||||||
if use_wandb:
|
if use_wandb:
|
||||||
wandb.login()
|
wandb.login()
|
||||||
wandb.init(project="image_inpainting", config={
|
wandb.init(project="image_inpainting", config={
|
||||||
@@ -102,17 +159,15 @@ def train(seed, testset_ratio, validset_ratio, data_path, results_path, early_st
|
|||||||
network.to(device)
|
network.to(device)
|
||||||
network.train()
|
network.train()
|
||||||
|
|
||||||
# defining the loss - Enhanced RMSE for sharper predictions
|
# defining the loss - combined loss for better reconstruction
|
||||||
rmse_loss = EnhancedRMSELoss().to(device)
|
combined_loss = CombinedLoss(mse_weight=1.0, l1_weight=0.5, edge_weight=0.15, ssim_weight=0.3).to(device)
|
||||||
mse_loss = torch.nn.MSELoss() # Keep for evaluation
|
mse_loss = torch.nn.MSELoss() # Keep for evaluation
|
||||||
|
|
||||||
# defining the optimizer with AdamW for better weight decay handling
|
# defining the optimizer with AdamW for better weight decay handling
|
||||||
optimizer = torch.optim.AdamW(network.parameters(), lr=learningrate, weight_decay=weight_decay, betas=(0.9, 0.999), eps=1e-8)
|
optimizer = torch.optim.AdamW(network.parameters(), lr=learningrate, weight_decay=weight_decay)
|
||||||
|
|
||||||
# Cosine annealing with warm restarts for gradual learning rate decay
|
# Learning rate scheduler for better convergence
|
||||||
scheduler = torch.optim.lr_scheduler.CosineAnnealingWarmRestarts(
|
scheduler = CosineAnnealingWarmRestarts(optimizer, T_0=50, T_mult=2, eta_min=1e-6)
|
||||||
optimizer, T_0=n_updates//4, T_mult=1, eta_min=learningrate/100
|
|
||||||
)
|
|
||||||
|
|
||||||
if use_wandb:
|
if use_wandb:
|
||||||
wandb.watch(network, mse_loss, log="all", log_freq=10)
|
wandb.watch(network, mse_loss, log="all", log_freq=10)
|
||||||
@@ -137,31 +192,17 @@ def train(seed, testset_ratio, validset_ratio, data_path, results_path, early_st
|
|||||||
|
|
||||||
optimizer.zero_grad()
|
optimizer.zero_grad()
|
||||||
|
|
||||||
# Mixed precision training for memory efficiency
|
output = network(input)
|
||||||
if use_amp:
|
|
||||||
with torch.amp.autocast('cuda'):
|
|
||||||
output = network(input)
|
|
||||||
loss = rmse_loss(output, target)
|
|
||||||
|
|
||||||
scaler.scale(loss).backward()
|
loss = combined_loss(output, target)
|
||||||
|
|
||||||
# Gradient clipping for training stability
|
loss.backward()
|
||||||
scaler.unscale_(optimizer)
|
|
||||||
torch.nn.utils.clip_grad_norm_(network.parameters(), max_norm=1.0)
|
|
||||||
|
|
||||||
scaler.step(optimizer)
|
# Gradient clipping for training stability
|
||||||
scaler.update()
|
torch.nn.utils.clip_grad_norm_(network.parameters(), max_norm=1.0)
|
||||||
else:
|
|
||||||
output = network(input)
|
|
||||||
loss = rmse_loss(output, target)
|
|
||||||
loss.backward()
|
|
||||||
|
|
||||||
# Gradient clipping for training stability
|
optimizer.step()
|
||||||
torch.nn.utils.clip_grad_norm_(network.parameters(), max_norm=1.0)
|
scheduler.step(i + len(loss_list) / len(dataloader_train))
|
||||||
|
|
||||||
optimizer.step()
|
|
||||||
|
|
||||||
scheduler.step()
|
|
||||||
|
|
||||||
loss_list.append(loss.item())
|
loss_list.append(loss.item())
|
||||||
|
|
||||||
@@ -172,11 +213,7 @@ def train(seed, testset_ratio, validset_ratio, data_path, results_path, early_st
|
|||||||
# plotting
|
# plotting
|
||||||
if (i + 1) % plot_at == 0:
|
if (i + 1) % plot_at == 0:
|
||||||
print(f"Plotting images, current update {i + 1}")
|
print(f"Plotting images, current update {i + 1}")
|
||||||
# Convert to float32 for matplotlib compatibility
|
plot(input.cpu().numpy(), target.detach().cpu().numpy(), output.detach().cpu().numpy(), plotpath, i)
|
||||||
plot(input.float().cpu().numpy(),
|
|
||||||
target.detach().float().cpu().numpy(),
|
|
||||||
output.detach().float().cpu().numpy(),
|
|
||||||
plotpath, i)
|
|
||||||
|
|
||||||
# evaluating model every validate_at sample
|
# evaluating model every validate_at sample
|
||||||
if (i + 1) % validate_at == 0:
|
if (i + 1) % validate_at == 0:
|
||||||
|
|||||||
@@ -81,9 +81,42 @@ def read_compressed_file(file_path: str):
|
|||||||
return input_arrays, known_arrays
|
return input_arrays, known_arrays
|
||||||
|
|
||||||
|
|
||||||
def create_predictions(model_config, state_dict_path, testset_path, device, save_path, plot_path, plot_at=20, rmse_value=None):
|
def apply_tta(model, input_tensor, device):
|
||||||
"""
|
"""
|
||||||
Here, one might needs to adjust the code based on the used preprocessing
|
Apply Test-Time Augmentation for better predictions.
|
||||||
|
Averages predictions from original and augmented versions.
|
||||||
|
"""
|
||||||
|
outputs = []
|
||||||
|
|
||||||
|
# Original
|
||||||
|
out = model(input_tensor)
|
||||||
|
outputs.append(out)
|
||||||
|
|
||||||
|
# Horizontal flip
|
||||||
|
flipped_h = torch.flip(input_tensor, dims=[3])
|
||||||
|
out_h = model(flipped_h)
|
||||||
|
out_h = torch.flip(out_h, dims=[3])
|
||||||
|
outputs.append(out_h)
|
||||||
|
|
||||||
|
# Vertical flip
|
||||||
|
flipped_v = torch.flip(input_tensor, dims=[2])
|
||||||
|
out_v = model(flipped_v)
|
||||||
|
out_v = torch.flip(out_v, dims=[2])
|
||||||
|
outputs.append(out_v)
|
||||||
|
|
||||||
|
# Both flips
|
||||||
|
flipped_hv = torch.flip(input_tensor, dims=[2, 3])
|
||||||
|
out_hv = model(flipped_hv)
|
||||||
|
out_hv = torch.flip(out_hv, dims=[2, 3])
|
||||||
|
outputs.append(out_hv)
|
||||||
|
|
||||||
|
# Average all predictions
|
||||||
|
return torch.stack(outputs, dim=0).mean(dim=0)
|
||||||
|
|
||||||
|
|
||||||
|
def create_predictions(model_config, state_dict_path, testset_path, device, save_path, plot_path, plot_at=20, rmse_value=None, use_tta=True):
|
||||||
|
"""
|
||||||
|
Create predictions with optional Test-Time Augmentation for improved results.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if device is None:
|
if device is None:
|
||||||
@@ -94,7 +127,7 @@ def create_predictions(model_config, state_dict_path, testset_path, device, save
|
|||||||
device = torch.device(device)
|
device = torch.device(device)
|
||||||
|
|
||||||
model = MyModel(**model_config)
|
model = MyModel(**model_config)
|
||||||
model.load_state_dict(torch.load(state_dict_path))
|
model.load_state_dict(torch.load(state_dict_path, weights_only=True))
|
||||||
model.to(device)
|
model.to(device)
|
||||||
model.eval()
|
model.eval()
|
||||||
|
|
||||||
@@ -111,9 +144,14 @@ def create_predictions(model_config, state_dict_path, testset_path, device, save
|
|||||||
with torch.no_grad():
|
with torch.no_grad():
|
||||||
for i in range(len(input_arrays)):
|
for i in range(len(input_arrays)):
|
||||||
print(f"Processing image {i + 1}/{len(input_arrays)}")
|
print(f"Processing image {i + 1}/{len(input_arrays)}")
|
||||||
input_array = torch.from_numpy(input_arrays[i]).to(
|
input_array = torch.from_numpy(input_arrays[i]).to(device)
|
||||||
device)
|
input_tensor = input_array.unsqueeze(0) if input_array.dim() == 3 else input_array
|
||||||
output = model(input_array.unsqueeze(0) if hasattr(input_array, 'dim') and input_array.dim() == 3 else input_array)
|
|
||||||
|
if use_tta:
|
||||||
|
output = apply_tta(model, input_tensor, device)
|
||||||
|
else:
|
||||||
|
output = model(input_tensor)
|
||||||
|
|
||||||
output = output.cpu().numpy()
|
output = output.cpu().numpy()
|
||||||
predictions.append(output)
|
predictions.append(output)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user