Hike News
Hike News

深度學習-tensorflow基礎-讀取數據-API應用-csv檔讀取

構造文件名隊列(string_input_producer)

不管任何形式的文件(csv,圖片,二進制)都會使用此API來構造文件名的隊列

  • tf.train.string_input_producer(string_tensor,shuffle=True)
    • string_tensor:含有文件名的1階張量
      • 列表裡面含有各個文件的路徑+文件名
      • 可直接傳入列表,會將其轉換成一階張量
    • num_epochs
      • 當讀入的量比較少時,且批處理隊列所需要的數據量比較大時,會重複的使用已經讀取過的文件
      • 預設是過無限次的數據,直到滿足批處理隊列的大小為止,或者可限定讀取次數
    • shuffle:bool值,是否亂序讀取,預設為True
    • 返回一個具有路徑+檔名的字符串隊列
  • 將輸出字符串(例如:文件名)輸入到管道隊列

構造文件閱讀器

用於讀取隊列內容不同文件格式所使用的閱讀器亦不同

文本文件

  • 通常是以逗號作為分隔值的csv或txt格式
  • 預設是按行讀取,且一次只讀取一行
  • tf.TextLineReader()
    • 返回一個讀取器的實例

讀取TfRecords文件

  • tf.TFRecordReader()
    • 返回一個讀取器實例

method

以上三種讀取器都有一個共同的方法:

  • read(file_queue):從隊列中讀取指定數量內容
    • 返回一個tensor元組
      • key為文件名;value為一個樣本內容(行,字節)

文件內容解碼器

由於從文件中讀取的是字符串或bits,需要函數去解析這些內容到張量

CSV解碼

  • tf.decode_csv(records,record_defaults=None,field_delim=None,name=None)
    • records:tensor型字符串,每個字符串是csv中讀取器讀取出來的記錄行
      • 也就是閱讀器read方法返回的value值
    • field_delim:分割符號,預設的分割符號為”,”
    • record_defaults:決定所得張量類型(指定每樣本的每一column的類型),並設置一個缺失值的默認值
      • 可按照不同數據類型進行解碼
        • [[1],["None"],[0.0]]:有三個colmn分別為int類型(缺失值預設為1),string型(缺失值預設為”None”),float型(缺失值預設為0.0)
      • 必須為一個二維的列表
    • 返回每個樣本每一列之值
      • 一列有多少值就用多少變量去接收
  • 將csv轉換為張量,與tf.TextLineReader搭配使用

開啟子線程操作

開啟線程進行讀取操作(於會話當中運行),為固定寫法

  • tf.train.strat_queue_runners(sess=None,coord=None)
    • 收集所有圖中的隊列線程,並啟動線程
    • coord:其開啟線程後仍需要有回收的機制,因此須定義線程協調器

範例(讀取csv文件)

欲讀取A.csv,B.csv,C.csv中的數據其數據內容如下
A.csv:

1
2
3
Alpha1,A1
Alpha2,A2
Alpha3,A3

B.csv:

1
2
3
Beta1,B1
Beta2,B2
Beta3,B3

C.csv:

1
2
3
Gama1,C1
Gama2,C2
Gama3,C3

代碼

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import tensorflow as tf
import os
# 定一個csv讀取函數其接收存放數據的路徑+文件列表
def csvread(FileList):
# 1. 構造文件隊列
file_queue = tf.train.string_input_producer(FileList)

# 2. 構造csv閱讀器讀取隊列文件中的數據(按行讀取)
Reader = tf.TextLineReader()
key, val = Reader.read(file_queue) # 調用閱讀器的read方法,返回key與value

# 3. 對每一行內容解碼
records = [["None"],["None"]] # 根據column數決定二維陣列中有幾個列表,並填入默認值
Feature,Label = tf.decode_csv(val,record_defaults=records) #共兩列數據因此用兩個變量接收

return Feature,Label # 將每一列得到的值返回


if __name__ == '__main__':

# 找到欲訓練的文件,將所有文件的 路徑+文件名 放入列表
FileName = os.listdir("./Training_Data")
filelist = [os.path.join("./Training_Data/",name) for name in FileName] # 拼接路徑

Feature,Label = csvread(filelist)

with tf.Session() as sess:
# 定義一個線程協調器
coord = tf.train.Coordinator()

# 開啟讀取文件的線程
threads = tf.train.start_queue_runners(sess,coord=coord)

# 讀取內容(訓練操作)
print(sess.run([Feature,Label]))

# 回收子線程
coord.request_stop()
coord.join(threads)

Result

1
[b'Alpha1', b'A1']
  • 讀取一行的數據,如想要讀取多個數據須進行批處理操作

管道讀端批處理

由於預設只會讀取一個樣本,如要進行批處理(進行多個數據的讀取),使用以下兩個API,便於之後指定每批次多個樣本的訓練

  • tf.train.batch(tensors,batch_size,num_threads=1,capacity=32,name=None)
    • 其可讀取指定大小(個數)的張量
    • tensors:可以是包含張量的列表(一行有多少數據),欲批處理的內容都要放到列表當中
    • batch_size:從隊列中讀取的批處理大小
      • 每一次讀取多少條數據
      • 批處理大小,跟隊列,數據的數量多少沒有影響,只決定當前此批次取多少數據
      • 取的數據有無重複,對訓練並無影響
    • num_threads:進入隊列的線程數
    • capacity:整數,隊列中元素的最大數量
    • 返回批處理完成多個value值的tensor
  • tf.train.shuffle_batch(tensors,batch_size,num_threads=1,capacity,min_after_dequeue)
    • 亂序讀取指定大小(個數)的張量
    • min_after_dequeue:留下隊列裡的張量個數,能夠保持隨機打亂的狀態

改善代碼

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import tensorflow as tf
import os

def csvread(FileList):

file_queue = tf.train.string_input_producer(FileList)

Reader = tf.TextLineReader()
key, val = Reader.read(file_queue) # 調用閱讀器的read方法,返回key與value

records = [["None"],["None"]] # 根據column數決定二維陣列中有幾個列表,並填入默認值
Feature,Label = tf.decode_csv(val,record_defaults=records) #共兩列數據因此用兩個變量接收

# 4. 欲讀取多個數據,批處理
Feature_batch, Label_batch = tf.train.batch([Feature,Label],batch_size=20,num_threads=1,capacity=9)

return Feature_batch,Label_batch # 將每一列得到的值返回


if __name__ == '__main__':

FileName = os.listdir("./Training_Data")
filelist = [os.path.join("./Training_Data/",name) for name in FileName] # 拼接路徑

Feature_batch,Label_batch = csvread(filelist)

with tf.Session() as sess:

coord = tf.train.Coordinator()

threads = tf.train.start_queue_runners(sess,coord=coord)

# 讀取內容(訓練操作),要取幾批次的數據進行訓練自行使用for循環訓練
print(sess.run([Feature_batch,Label_batch]))

coord.request_stop()
coord.join(threads)

Result

1
2
3
4
5
6
7
[array([b'Beta1', b'Beta2', b'Beta3', b'Gama1', b'Gama2', b'Gama3',
b'Alpha1', b'Alpha2', b'Alpha3', b'Gama1', b'Gama2', b'Gama3',
b'Alpha1', b'Alpha2', b'Alpha3', b'Beta1', b'Beta2', b'Beta3',
b'Gama1', b'Gama2'], dtype=object),
array([b'B1', b'B2', b'B3', b'C1', b'C2', b'C3', b'A1', b'A2', b'A3',
b'C1', b'C2', b'C3', b'A1', b'A2', b'A3', b'B1', b'B2', b'B3',
b'C1', b'C2'], dtype=object)]
  • 即使文本中只有9條數據,但此批次取20個樣本,會有重複的樣本被取出,但對訓練並無影響
  • 但一般batch_sizecapacity都設為相同之值