pyzstd-mt-test.py

Type: text/x-python3, Size: 2133 bytes, SHA256: bc6dc876083a8cae4fed392fddf28da027e4cfbcc9d37c153863fcb98e3d3fe9.
UTC timestamps: upload: 2024-06-15 20:20:47, download: 2024-06-29 12:28:48, max lifetime: 2029-06-14 20:20:47.

"""

Try to estimate if zstd's internal multithreading is useful

within the context of compressing borgbackup chunks.

The chunks of data yielded by borg's chunker are usually ~2MiB

in size (if the file is larger than that). Smaller files often

only yield 1 chunk with the whole file content in it and these

chunks can be arbitrarily small, down to 1B.

"""

import os

import random

import time

from pyzstd import CParameter, compress, zstd_support_multithread

def generate_random_data():

# return chunks with a size around 2MiB.

# return fully random data assuming that this leads to

# maximum compression time (and thus giving multi-threading

# an advantage).

size = random.randint(512 * 1024, 4096 * 1024) # 0.5 .. 2 MiB

return os.urandom(size)

def generate_large_data():

max_length = 10 * 2**30

generated_length = 0

while generated_length < max_length:

data = generate_random_data()

yield data

generated_length += len(data)

def compress_large_data(level=1, workers=0):

options = {CParameter.compressionLevel: level,

CParameter.nbWorkers: workers,

CParameter.jobSize: 512 * 1024, # min==512kiB

}

dlen = clen = 0

for data in generate_large_data():

data_compressed = compress(data, options)

dlen += len(data)

clen += len(data_compressed)

return dlen, clen

def measure_execution_time(level, workers=0):

# workers == 0 means not to create worker threads

# workers == 1 means threading, but only 1 worker

# workers == 2+ means real multithreading, N workers

start_time = time.time()

dlen, clen = compress_large_data(level=level, workers=workers)

end_time = time.time()

execution_time = end_time - start_time

print(f"level: {level}, workers: {workers}, len: {dlen/1e9:.3f}GB, len compressed: {clen/1e9:.3f}GB, time: {execution_time:.3f}s")

assert zstd_support_multithread

for level in [1, 3, 6, 10, 15, 20]:

for workers in [0, 1, 2, 4]:

measure_execution_time(level, workers)