StegaStamp代码分析

说明

课题的思路可能还是得从源码入手,我认为当下的思路应该是:复现->改进->创新。

这里分析论文:StegaStamp: Invisible Hyperlinks in Physical Photographs的源码,我也会在分析过程中寻找可以改进的点。

数据集预处理

代码使用的是MIRFLICKR数据集(重采样到400x400分辨率),水印信息是一个二进制串。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#TRAIN_PATH是数据集路径,先读取数据集。
files_list = glob.glob(join(TRAIN_PATH, "**/*"))

images, secrets = get_img_batch(files_list=files_list,
secret_size=args.secret_size,
batch_size=args.batch_size,
size=(height, width))
#get_img_batch(...)
def get_img_batch(files_list, #数据集的图像表
secret_size, #水印长度
batch_size=4, #训练数据集可以分为一个或多个Batch,一次batch获取的图像个数
size=(400, 400)): #重采样大小
batch_cover = []
batch_secret = []

for i in range(batch_size):
img_cover_path = random.choice(files_list) #随机取一个图像
try:
img_cover = Image.open(img_cover_path).convert("RGB") #获取RGB的图像矩阵
img_cover = ImageOps.fit(img_cover, size) #ImageOps.fit()方法返回一个指定大小的裁剪过的图像(重采样为400x400)
img_cover = np.array(img_cover, dtype=np.float32) / 255. #归一化
except:
img_cover = np.zeros((size[0], size[1], 3), dtype=np.float32)
batch_cover.append(img_cover) #将图像矩阵加入到batch_cover[]中

secret = np.random.binomial(1, .5, secret_size) #生成随机的二进制串,01出现概率是1:1
batch_secret.append(secret) #将二进制串加入到batch_secret[]

batch_cover, batch_secret = np.array(batch_cover), np.array(batch_secret)
return batch_cover, batch_secret #返回batch_cover[]和batch_secret

占位符的设置

占位变量是一种TensorFlow用来解决读取大量训练数据问题的机制,它允许你现在不用给它赋值,随着训练的开始,再把训练数据传送给训练网络学习

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
	secret_pl = tf.placeholder(shape=[None, args.secret_size], dtype=tf.float32, name="input_prep")
#传入的秘密信息(一串随机生成的二进制串)
image_pl = tf.placeholder(shape=[None, height, width, 3], dtype=tf.float32, name="input_hide")
#载体图片(400*400)
M_pl = tf.placeholder(shape=[None, 2, 8], dtype=tf.float32, name="input_transform")
#用于投影变换的矩阵,投影变换矩阵的生成也是随机的
global_step_tensor = tf.Variable(0, trainable=False, name='global_step')
#全局步数,初始化为0,记录训练达到第几轮,什么阶段执行什么样的操作
loss_scales_pl = tf.placeholder(shape=[4], dtype=tf.float32, name="input_loss_scales")
#损失函数权重,见代码:loss_op = loss_scales[0]*image_loss_op + loss_scales[1]*lpips_loss_op + loss_scales[2]*secret
l2_edge_gain_pl = tf.placeholder(shape=[1], dtype=tf.float32, name="input_edge_gain")
#用于falloff_im *= l2_edge_gain_pl
#falloff_im是一个权重矩阵,其元素大小依靠元素位置计算,越靠近图像外侧越接近1,越靠近中心约接近0
#乘l2_edge_gain_pl将0-1的权重差异扩大
yuv_scales_pl = tf.placeholder(shape=[3], dtype=tf.float32, name="input_yuv_scales")
#image_loss_op = tf.tensordot(yuv_loss_op, yuv_scales, axes=1),yuv各个通道在视觉损失中的占比不同
log_decode_mod_pl = tf.placeholder(shape=[], dtype=tf.float32, name="input_log_decode_mod")
#打开之前训练一半的模型

建立模型

1
2
3
4
5
6
 encoder = models.StegaStampEncoder(height=height, width=width)
#编码器模型
decoder = models.StegaStampDecoder(secret_size=args.secret_size, height=height, width=width)
#解码器模型
discriminator = models.Discriminator()
#一个评论网络,可以预测信息是否被编码到图像中,并作为编解码器模型的感知损失,是全部损失函数的一部分
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
 loss_op, secret_loss_op, D_loss_op, summary_op, image_summary_op, _ = models.build_model(
encoder=encoder,
decoder=decoder,
discriminator=discriminator,
secret_input=secret_pl,
image_input=image_pl,
l2_edge_gain=l2_edge_gain_pl,
borders=args.borders,
secret_size=args.secret_size,
M=M_pl,
loss_scales=loss_scales_pl,
yuv_scales=yuv_scales_pl,
args=args,
global_step=global_step_tensor)

#建立模型在models.py中从184行开始
def build_model(
...
):
input_warped = tf.contrib.image.transform(image_input, M[:,1,:], interpolation='BILINEAR')
#根据M[:,1,:]矩阵进行投影变形
#train.py:178 M = utils.get_rand_transform_matrix(width, np.floor(width * rnd_tran), args.batch_size)
#获取一个随机的形变矩阵(具体算法可以看utils.py)
mask_warped = tf.contrib.image.transform(tf.ones_like(input_warped), M[:,1,:], interpolation='BILINEAR')
input_warped += (1-mask_warped) * image_input
#得到一个变形后的输入图像
residual_warped = encoder((secret_input, input_warped))
#将变形后的输入图像和secret_input一起输入到编码器模型中,进行编码,得到一个变形后的residual图像,里面蕴含着水印信息
encoded_warped = residual_warped + input_warped
#将含有水印信息的residual图像加回到变形后的输入图像,得到一个变形的含水印图像
residual = tf.contrib.image.transform(residual_warped, M[:,0,:], interpolation='BILINEAR')
#再通过形变矩阵的逆,将变形的residual变回去

#下面是在各种边框下,对编码图像的处理,并进行透视畸变的逆变换
if borders == 'no_edge': #无边框
encoded_image = image_input + residual
elif borders == 'black': #黑色边框
encoded_image = residual_warped + input_warped
encoded_image = tf.contrib.image.transform(encoded_image, M[:,0,:], interpolation='BILINEAR')
input_unwarped = tf.contrib.image.transform(input_warped, M[:,0,:], interpolation='BILINEAR')
elif borders.startswith('random'): #随机RGB的一种作为背景
mask = tf.contrib.image.transform(tf.ones_like(residual), M[:,0,:], interpolation='BILINEAR')
encoded_image = residual_warped + input_warped
encoded_image = tf.contrib.image.transform(encoded_image, M[:,0,:], interpolation='BILINEAR')
input_unwarped = tf.contrib.image.transform(input_warped, M[:,0,:], interpolation='BILINEAR')
ch = 3 if borders.endswith('rgb') else 1
encoded_image += (1-mask) * tf.ones_like(residual) * tf.random.uniform([ch])
elif borders == 'white': #白色边框
mask = tf.contrib.image.transform(tf.ones_like(residual), M[:,0,:], interpolation='BILINEAR')
encoded_image = residual_warped + input_warped
encoded_image = tf.contrib.image.transform(encoded_image, M[:,0,:], interpolation='BILINEAR')
input_unwarped = tf.contrib.image.transform(input_warped, M[:,0,:], interpolation='BILINEAR')
encoded_image += (1-mask) * tf.ones_like(residual)
elif borders == 'image': #图片边框-以原图作为畸变后的背景
mask = tf.contrib.image.transform(tf.ones_like(residual), M[:,0,:], interpolation='BILINEAR')
encoded_image = residual_warped + input_warped
encoded_image = tf.contrib.image.transform(encoded_image, M[:,0,:], interpolation='BILINEAR')
encoded_image += (1-mask) * tf.manip.roll(image_input, shift=1, axis=0)


transformed_image, transform_summaries = transform_net(encoded_image, args, global_step)
#让编码图像进入变换网络(透视变形,运动/散焦模糊,颜色处理(打印机和显示器的色域有限),噪声),得到一个噪声图像
decoded_secret = decoder(transformed_image)
#让噪声图像进入编码器,得到解码信息

...
#后面是计算各种loss函数,不细说,两行比较重要的代码
#1、计算信息恢复的准确率,函数具体在utils.py
bit_acc, str_acc = get_secret_acc(secret_input, decoded_secret)
#2、计算总loss函数
loss_op = loss_scales[0]*image_loss_op + loss_scales[1]*lpips_loss_op + loss_scales[2]*secret_loss_op
...

return loss_op, secret_loss_op, D_loss, summary_op, image_summary_op, bit_acc

编码器模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class StegaStampEncoder(Layer):
def __init__(self, height, width):
#初始化的时候传入图片长和宽
super(StegaStampEncoder, self).__init__()
self.secret_dense = Dense(7500, activation='relu', kernel_initializer='he_normal')
#将输入的100bit图像扩展成7500bit大小
self.conv1 = Conv2D(32, 3, activation='relu', padding='same', kernel_initializer='he_normal')
self.conv2 = Conv2D(32, 3, activation='relu', strides=2, padding='same', kernel_initializer='he_normal')
self.conv3 = Conv2D(64, 3, activation='relu', strides=2, padding='same', kernel_initializer='he_normal')
self.conv4 = Conv2D(128, 3, activation='relu', strides=2, padding='same', kernel_initializer='he_normal')
self.conv5 = Conv2D(256, 3, activation='relu', strides=2, padding='same', kernel_initializer='he_normal')
self.up6 = Conv2D(128, 2, activation='relu', padding='same', kernel_initializer='he_normal')
self.conv6 = Conv2D(128, 3, activation='relu', padding='same', kernel_initializer='he_normal')
self.up7 = Conv2D(64, 2, activation='relu', padding='same', kernel_initializer='he_normal')
self.conv7 = Conv2D(64, 3, activation='relu', padding='same', kernel_initializer='he_normal')
self.up8 = Conv2D(32, 2, activation='relu', padding='same', kernel_initializer='he_normal')
self.conv8 = Conv2D(32, 3, activation='relu', padding='same', kernel_initializer='he_normal')
self.up9 = Conv2D(32, 2, activation='relu', padding='same', kernel_initializer='he_normal')
self.conv9 = Conv2D(32, 3, activation='relu', padding='same', kernel_initializer='he_normal')
self.conv10 = Conv2D(32, 3, activation='relu', padding='same', kernel_initializer='he_normal')
self.residual = Conv2D(3, 1, activation=None, padding='same', kernel_initializer='he_normal')

def call(self, inputs):
secret, image = inputs
secret = secret - .5
image = image - .5
#对输入的水印信息和图像矩阵进行处理
secret = self.secret_dense(secret)
#见上方的 self.secret_dense ,将输入的100bit图像扩展成7500bit大小
secret = Reshape((50, 50, 3))(secret)
#将一维水印信息变为3维,7500 = 50 * 50 * 3
secret_enlarged = UpSampling2D(size=(8,8))(secret)
#(50,50,3)进行上采样变为(400,400,3)
inputs = concatenate([secret_enlarged, image], axis=-1)
#将secret和image拼接到一起,concatenate()函数没太明白拼接过程,axis = -1指在最后一个通道处拼接
#(B,H,W,C)在C通道拼接,这里secret(B,400,400,3)和image(B,400,400,3)变为(B,400,400,6)
conv1 = self.conv1(inputs)
conv2 = self.conv2(conv1)
conv3 = self.conv3(conv2)
conv4 = self.conv4(conv3)
conv5 = self.conv5(conv4)
up6 = self.up6(UpSampling2D(size=(2,2))(conv5))
merge6 = concatenate([conv4,up6], axis=3)
conv6 = self.conv6(merge6)
up7 = self.up7(UpSampling2D(size=(2,2))(conv6))
merge7 = concatenate([conv3,up7], axis=3)
conv7 = self.conv7(merge7)
up8 = self.up8(UpSampling2D(size=(2,2))(conv7))
merge8 = concatenate([conv2,up8], axis=3)
conv8 = self.conv8(merge8)
up9 = self.up9(UpSampling2D(size=(2,2))(conv8))
merge9 = concatenate([conv1,up9,inputs], axis=3)
conv9 = self.conv9(merge9)
conva = self.conv9(merge9)
conv10 = self.conv10(conv9)
residual = self.residual(conv9)
return residual

解码器模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class StegaStampDecoder(Layer):
def __init__(self, secret_size, height, width):
#初始化传入图片长和宽,还有水印信息长度
super(StegaStampDecoder, self).__init__()
self.height = height
self.width = width
#空间变换网络。
self.stn_params = Sequential([
Conv2D(32, (3, 3), strides=2, activation='relu', padding='same'),
Conv2D(64, (3, 3), strides=2, activation='relu', padding='same'),
Conv2D(128, (3, 3), strides=2, activation='relu', padding='same'),
Flatten(),
Dense(128, activation='relu')
])
#初始化权重和偏执
initial = np.array([[1., 0, 0], [0, 1., 0]])
initial = initial.astype('float32').flatten()

self.W_fc1 = tf.Variable(tf.zeros([128, 6]), name='W_fc1')
self.b_fc1 = tf.Variable(initial_value=initial, name='b_fc1')

self.decoder = Sequential([
Conv2D(32, (3, 3), strides=2, activation='relu', padding='same'),
Conv2D(32, (3, 3), activation='relu', padding='same'),
Conv2D(64, (3, 3), strides=2, activation='relu', padding='same'),
Conv2D(64, (3, 3), activation='relu', padding='same'),
Conv2D(64, (3, 3), strides=2, activation='relu', padding='same'),
Conv2D(128, (3, 3), strides=2, activation='relu', padding='same'),
Conv2D(128, (3, 3), strides=2, activation='relu', padding='same'),
Flatten(),
Dense(512, activation='relu'),
Dense(secret_size)
])

def call(self, image):
image = image - .5
#对输入的解码图像进行预处理,主要是空间变换
stn_params = self.stn_params(image)
x = tf.matmul(stn_params, self.W_fc1) + self.b_fc1
transformed_image = stn_transformer(image, x, [self.height, self.width, 3])
#纠正透视形变后,输入给解码器,返回恢复的水印信息
return self.decoder(transformed_image)

模拟真实环境变换网络

这一部分看的太困难了,大致就是对编码图像进行各种处理(透视变形,运动/散焦模糊,颜色处理(打印机和显示器的色域有限),噪声)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def transform_net(encoded_image, args, global_step):	#传入的参数args中有对该网络的设置,设置了噪声强度
sh = tf.shape(encoded_image)

ramp_fn = lambda ramp : tf.minimum(tf.to_float(global_step) / ramp, 1.)
#随机产生亮度变化、色调变化
rnd_bri = ramp_fn(args.rnd_bri_ramp) * args.rnd_bri
rnd_hue = ramp_fn(args.rnd_hue_ramp) * args.rnd_hue
rnd_brightness = utils.get_rnd_brightness_tf(rnd_bri, rnd_hue, args.batch_size)
#jpeg压缩
jpeg_quality = 100. - tf.random.uniform([]) * ramp_fn(args.jpeg_quality_ramp) * (100.-args.jpeg_quality)
jpeg_factor = tf.cond(tf.less(jpeg_quality, 50), lambda: 5000. / jpeg_quality, lambda: 200. - jpeg_quality * 2) / 100. + .0001
#产生随机噪声
rnd_noise = tf.random.uniform([]) * ramp_fn(args.rnd_noise_ramp) * args.rnd_noise
#对比度变化
contrast_low = 1. - (1. - args.contrast_low) * ramp_fn(args.contrast_ramp)
contrast_high = 1. + (args.contrast_high - 1.) * ramp_fn(args.contrast_ramp)
contrast_params = [contrast_low, contrast_high]
#饱和度变化
rnd_sat = tf.random.uniform([]) * ramp_fn(args.rnd_sat_ramp) * args.rnd_sat

#运动/散焦模糊
f = utils.random_blur_kernel(probs=[.25,.25], N_blur=7,
sigrange_gauss=[1.,3.], sigrange_line=[.25,1.], wmin_line=3)
encoded_image = tf.nn.conv2d(encoded_image, f, [1,1,1,1], padding='SAME')
#应用上述生成的噪声
noise = tf.random_normal(shape=tf.shape(encoded_image), mean=0.0, stddev=rnd_noise, dtype=tf.float32)
encoded_image = encoded_image + noise
encoded_image = tf.clip_by_value(encoded_image, 0, 1) #色彩通道变为0~1
#对比度变化程度
contrast_scale = tf.random_uniform(shape=[tf.shape(encoded_image)[0]], minval=contrast_params[0], maxval=contrast_params[1])
contrast_scale = tf.reshape(contrast_scale, shape=[tf.shape(encoded_image)[0],1,1,1])
#应用对比度变化
encoded_image = encoded_image * contrast_scale
#应用生成的随机亮度/色调变化
encoded_image = encoded_image + rnd_brightness
encoded_image = tf.clip_by_value(encoded_image, 0, 1)

#饱和度变化
encoded_image_lum = tf.expand_dims(tf.reduce_sum(encoded_image * tf.constant([.3,.6,.1]), axis=3), 3)
encoded_image = (1 - rnd_sat) * encoded_image + rnd_sat * encoded_image_lum

encoded_image = tf.reshape(encoded_image, [-1,400,400,3])
if not args.no_jpeg: #jpeg压缩变化
encoded_image = utils.jpeg_compress_decompress(encoded_image, rounding=utils.round_only_at_0, factor=jpeg_factor, downsample_c=True)

summaries = [tf.summary.scalar('transformer/rnd_bri', rnd_bri),
tf.summary.scalar('transformer/rnd_sat', rnd_sat),
tf.summary.scalar('transformer/rnd_hue', rnd_hue),
tf.summary.scalar('transformer/rnd_noise', rnd_noise),
tf.summary.scalar('transformer/contrast_low', contrast_low),
tf.summary.scalar('transformer/contrast_high', contrast_high),
tf.summary.scalar('transformer/jpeg_quality', jpeg_quality)]
#返回一个经过模拟现实网络后的编码图像
return encoded_image, summaries
  • Copyright: Copyright is owned by the author. For commercial reprints, please contact the author for authorization. For non-commercial reprints, please indicate the source.

扫一扫,分享到微信

微信分享二维码
  • Copyrights © 2020-2023 YYz
  • Visitors: | Views:

请我喝杯咖啡吧~

支付宝
微信