利用函数计算流式 gz 打包 ECS 上的单个 超大文件

背景

在某些业务场景下,生成超大的日志文件或者其他文件, 这些文件需要及时移出并 gz 压缩保存到 OSS,但是压缩文件可能会大于 3G 超出函数计算执行环境的最大内存限制, 本文提供流式解决这个问题的方案

  • 函数计算配置VPC, 内网打通ecs
  • OSS 和 函数计算在相同 region, 内网传输

示例代码

依赖使用第三方库 paramiko, 但是默认的库在传输大文件上有传输速率限制, 需要做如下改造, 同时构造 paramiko.SFTPClient 的时候需要设置好 window_size 和 max_packet_size 这两个参数

利用函数计算流式 gz 打包 ECS 上的单个 超大文件

import paramiko
import gzip
import oss2
import logging
import os
import time

logging.getLogger("oss2.api").setLevel(logging.ERROR)
logging.getLogger("oss2.auth").setLevel(logging.ERROR)

# config
BUCKET_NAME = "oss-demo"
ECS_INNER_IP = "192.168.22.3"
SSH_PORT = 22
USR_NAME = 'xiaoming'
USR_PWD = '123456'

def handler(event, context):
    start = time.time()
    region = context.region
    creds = context.credentials
    auth = oss2.StsAuth(creds.accessKeyId, creds.accessKeySecret, creds.securityToken)
    bucket = oss2.Bucket(auth, 'oss-' + region + '-internal.aliyuncs.com', BUCKET_NAME)

    scp=paramiko.Transport((ECS_INNER_IP, SSH_PORT))
    scp.connect(username=USR_NAME, password=USR_PWD)
    window_size = 2 ** 28
    max_packet_size = 2 ** 26
    sftp=paramiko.SFTPClient.from_transport(scp,window_size=window_size, max_packet_size=max_packet_size)
    
    pos = 0
    CHUNK_SIZE = 1024*1024
    APPEND_SIZE = 128
     
    LOG_FILE = "test-1.log"
    DST_OBJ = 'dst/' + LOG_FILE + '.gz'
    
    data_out = []
    with sftp.open("/root/" + LOG_FILE, "r") as f:
        while 1:
            data=f.read(CHUNK_SIZE)      
            if not data:
                if len(data_out) > 0 and len(data_out) < APPEND_SIZE:
                  result = bucket.append_object(DST_OBJ, pos, b"".join(data_out))
                break    
            data_out.append(gzip.compress(data))
            if len(data_out) == APPEND_SIZE:
                upload_data = b"".join(data_out)
                result = bucket.append_object(DST_OBJ, pos, b"".join(data_out))
                pos += len(upload_data)
                data_out = []
    
    print("total time = ", time.time() - start)
    return "OK"

这个方案, 虽然解决了函数计算内存限制, 但是对于某些超大文件, 比如15G 以上的文件, 10分钟的时间限制又是一个limit

ImproveMent

OSS 支持分片上传功能,那基于分片上传,配合 FC 的弹性伸缩功能, 可以有如下方案:

利用函数计算流式 gz 打包 ECS 上的单个 超大文件

示例代码:

master:

# -*- coding: utf-8 -*-
import logging
import fc2
import oss2
import paramiko
import math
import json
import time
from threading import Thread

logging.getLogger("oss2.api").setLevel(logging.ERROR)
logging.getLogger("oss2.auth").setLevel(logging.ERROR)

SRC_LOG_FILE =  "/root/huge.log"
DST_FILE = "jiahe/push_log_50000_5G.gz"
BUCKET_NAME = "oss-demo"

ECS_INNER_IP = "192.168.2.12"
SSH_PORT = 22
USR_NAME = 'xiaoming'
USR_PWD = '123456'

SUB_LOG = 5*1024*1024*1024  # 交给每个work函数处理的日志大小
PART_SIZE = 16 * 1024 * 1024  # work 函数上传分片的大小, 和work函数大小必须相同

SERVICE_NMAE = "fc_demo"    # work函数所在的service, 最好在同一个service
SUB_FUNCTION_NAME = "worker"  # work函数的名字

parts = []
def sub_gz(fcClient, subevent):
    content=fcClient.invoke_function(SERVICE_NMAE, SUB_FUNCTION_NAME, json.dumps(subevent)).data
    global parts
    #print(content)
    parts.extend(json.loads(content))

def handler(event, context):
    #start = time.time()
    global parts
    parts = []
    region = context.region
    creds = context.credentials
    auth = oss2.StsAuth(creds.accessKeyId, creds.accessKeySecret, creds.securityToken)
    bucket = oss2.Bucket(auth, 'oss-' + region + '-internal.aliyuncs.com', BUCKET_NAME)

    scp=paramiko.Transport((ECS_INNER_IP, SSH_PORT))
    scp.connect(username=USR_NAME, password=USR_PWD)
    sftp=paramiko.SFTPClient.from_transport(scp)
    info = sftp.stat(SRC_LOG_FILE)
    total_size = info.st_size
    print(total_size)
    sftp.close()

    endpoint = "http://{0}.{1}-internal.fc.aliyuncs.com".format(context.account_id, context.region)
    fcClient = fc2.Client(endpoint=endpoint,accessKeyID=creds.accessKeyId,accessKeySecret=creds.accessKeySecret, securityToken=creds.securityToken,Timeout=900)

    threadNum = int(math.ceil(float(total_size)/SUB_LOG))
    key = DST_FILE
    upload_id = bucket.init_multipart_upload(key).upload_id

    part_step = int(math.ceil(float(SUB_LOG)/PART_SIZE))
    ts = []
    left_size = total_size
    for i in range(threadNum):
        part_start = part_step * i + 1
        size = SUB_LOG if SUB_LOG < left_size else left_size
        subEvt = {
          "src": SRC_LOG_FILE,
          "dst": DST_FILE,
          "offset": i * SUB_LOG,
          "size": size,
          "part_number":  part_start, 
          "upload_id" : upload_id,
        }
        print(i, subEvt)
        t = Thread(target=sub_gz, args=(fcClient, subEvt,))
        left_size = left_size - SUB_LOG
        t.start()
        ts.append(t)

    for t in ts:
        t.join()

    parts.sort(key=lambda k: (k.get('part_number', 0)))
    #print(parts)
    part_objs = []
    for part in parts:
        part_objs.append(oss2.models.PartInfo(part["part_number"], part["etag"], size = part["size"], part_crc = part["part_crc"]))
    bucket.complete_multipart_upload(key, upload_id, part_objs)
    #print(time.time() - start)
    return "ok"

worker:

import paramiko
import gzip
import oss2
import logging
import os
import time
import json

logging.getLogger("oss2.api").setLevel(logging.ERROR)
logging.getLogger("oss2.auth").setLevel(logging.ERROR)

# config
BUCKET_NAME = "oss-demo"
ECS_INNER_IP = "192.168.2.12"
SSH_PORT = 22
USR_NAME = 'xiaoming'
USR_PWD = '123456'

CHUNK_SIZE = 1024*1024
PART_SIZE = 16 * CHUNK_SIZE

def handler(event, context):
    region = context.region
    creds = context.credentials
    auth = oss2.StsAuth(creds.accessKeyId, creds.accessKeySecret, creds.securityToken)
    bucket = oss2.Bucket(auth, 'oss-' + region + '-internal.aliyuncs.com', BUCKET_NAME)

    scp=paramiko.Transport((ECS_INNER_IP, SSH_PORT))
    scp.connect(username=USR_NAME, password=USR_PWD)
    window_size = 2 ** 30
    max_packet_size = 2 ** 28
    sftp=paramiko.SFTPClient.from_transport(scp,window_size=window_size, max_packet_size=max_packet_size)

    evt = json.loads(event)
    src_log_file = evt['src']
    offset = evt['offset']
    size = evt['size']
    dst_gz_file = evt['dst']
    part_number = int(evt['part_number'])
    upload_id = evt['upload_id']

    key =  dst_gz_file
    data_out = []
    partsInfo = []

    with sftp.open(src_log_file, "r") as f:
        f.seek(offset)
        while 1:
            data=f.read(CHUNK_SIZE)
            cur = f.tell()
            if not data or (cur > offset + size):
                if data_out:
                    upload_data = b"".join(data_out)
                    size_to_upload = len(upload_data)
                    # 这里有可能出现分片不足100K的情况, 比如你的文件是 15G+1k, 这个时候出现1K漏单的情况或者即使大于100K但是压缩之后小于100K的情况
                    # 对于日志文件, 可以考虑填充点无效的字符在后面
                    if size_to_upload < 100 * 1024:
                        # fill_data=b"\n\n\n\n\n\n\n\n\n\nAliyunFCFill" + os.urandom(102400)
                        fill_data=b"\t\t\t\t\t\t\t\t\t\t\t\n"*1024*1024*5
                        upload_data += gzip.compress(fill_data) # fill_data 压缩后的结果为>100K
                        size_to_upload = len(upload_data)
                    result = bucket.upload_part(key, upload_id, part_number, upload_data)
                    partsInfo.append({
                        "part_number":part_number,
                        "etag":result.etag,
                        "size":size_to_upload,
                        "part_crc":result.crc})
                break
            data_out.append(gzip.compress(data))
            # 16M gz压缩的结果生成一个分片, oss要求一个分片最小为100K(102400), 通常16M压缩后的文件应该大于100K
            if (cur - offset) % PART_SIZE == 0: 
                upload_data = b"".join(data_out)
                size_to_upload = len(upload_data)
                result = bucket.upload_part(key, upload_id, part_number, upload_data)
                partsInfo.append({
                      "part_number":part_number,
                      "etag":result.etag,
                      "size":size_to_upload,
                      "part_crc":result.crc})
                part_number += 1
                data_out = []
    
    return json.dumps(partsInfo)
上一篇:基于spring boot框架的云上微服务整体监控方案


下一篇:云服务器 ECS 镜像迁移:应用迁云之镜像迁移-(4)迁移流程和实践方法