KEMBAR78
SNOW-300951 removing cloud SDKs by sfc-gh-mkeller · Pull Request #725 · snowflakedb/snowflake-connector-python · GitHub
Skip to content

Conversation

@sfc-gh-mkeller
Copy link
Collaborator

@sfc-gh-mkeller sfc-gh-mkeller commented May 22, 2021

SNOW-300951

Introduction

Back when we started on this project it was mainly to decrease the number of our dependencies and the overall size of snowflake-connector-python, as we can disregard all other capabilities other than blob storage.

Since the start we ran into a few other issues with the cloud SDKs, like some more exotic proxies not being supported by the cloud SDKs and not supporting asyncio.

Design

Initial design

Different blob storage services work mostly the same, but there are subtle differences. For example consider that on S3 we need to keep track of the block numbers and the returned etags for all the pieces for multipart upload, while on Azure we specify the certain block IDs when putting the pieces and then we send the list of those ids to an end-point, because of this we quickly arrived to a class design like this:

simplified sdkless class UML

While performing PUT and GET operation most of the time is spent on the network calls, so we knew from the start that we needed to perform the network call in parallel.
Previously our method to parallelize execution was to use ThreadPoolExecutors (I’ll be referring to them as tpes from now on), which already handles some concurrency hurdles for us, but during this project their limitations became obvious.

simple sdkless transfer flow

Notice that the idea was that the main thread would be doing most of the work and then it would let the network_tpe take care of network calls.

Once the implementation was semi complete we ran benchmark and found that performance suffered greatly at least on machines with limited computing power when compared to their network speed (such as cheaper EC2 machines).

As it turned out when only the network calls were parallelized compression became the most expensive operation. So after some back-and-forth we arrived to the final design.

Final design

sdkless transfer flow(1)

The new idea in this iteration is that we have separate thread pools for CPU and network bound tasks.

When designing the flow of files we wanted to make sure that if a chunk needed to be retried the queue to the network_tpe wasn’t too long so we introduced a conditional variable where each file chunk will wait to submit it’s chunk to the network tpe until the queue’s length is over 2 * max_concurrency. Once all of the chunks of a file have been processed by the network_tpe during a download gets submitted to the postprocess_tpe to be decrypted. In case of an upload, the last chunk to finish in the network_tpe makes a second call to finalize the file on blob storage. Note that the only difference between preprocess_tpe and postprocess_tpe is that we did not want to pollute the queue to the network_tpe with tasks that we can basically guaranteed to work.

Issues with ThreadPoolExecutors

  1. Callbacks might be executed synchronously: This is a gotcha of the tpes, if a thread submits a job and if that job is done by the time the thread is attaching a callback onto the future, then the callback will be executed by the thread that is attaching the callback to the future. For this reason we introduced the fn function_and_callback_wrapper that makes sure that the callback is called concurrently from the tpe.

  2. Error handling in callbacks is non-existent in tpes: We need to be careful how we handle errors inside of tpes. While calling .result() on a tpe raises the same exception that was raised in the callable that was executed there’s no such logic for Exceptions raised inside of callback function. For this reason we introduced manually track errors and handle them inside of the SnowflakeFileTransferAgent.result.
    Closes SNOW-141545: Request - Reduce package dependencies chaos #284

@sfc-gh-mkeller sfc-gh-mkeller force-pushed the mkeller/SNOW-300951-remove_cloud_sdks branch 5 times, most recently from 62ceeaf to 1d6089a Compare June 10, 2021 17:29
@sfc-gh-mkeller sfc-gh-mkeller force-pushed the mkeller/SNOW-300951-remove_cloud_sdks branch 9 times, most recently from 0231a62 to 38c1c55 Compare June 16, 2021 06:58
@sfc-gh-mkeller sfc-gh-mkeller force-pushed the mkeller/SNOW-300951-remove_cloud_sdks branch 3 times, most recently from e31ad0d to 3768635 Compare June 18, 2021 16:40
@sfc-gh-mkeller sfc-gh-mkeller force-pushed the mkeller/SNOW-300951-remove_cloud_sdks branch 2 times, most recently from 09482c9 to c077cf7 Compare July 1, 2021 03:11

import json
import os
import xml.etree.cElementTree as ET
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the usage of ET here is to construct a simple and small XML text, maybe just using string template is enough and faster.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While you are right in this file, we do use ET for parsing responses from blob storage services:


Given that we need to keep it I'd like to keep our usage consistent across the driver (where we deal with XMLs we should keep using ET).

def transfer(self, metas: List["SnowflakeFileMeta"]) -> None:
max_concurrency = self._parallel
network_tpe = ThreadPoolExecutor(max_concurrency)
preprocess_tpe = ThreadPoolExecutor(min(len(metas), os.cpu_count()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cpu_count() could be too many, would suggest to use half of cpu count.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We ran a benchmark and found that os.cpu_count() was better for files with size around 200MB (which is what we suggest customers to use), but this number seems to be too high for very large files.

cv_chunk_process = (
threading.Condition()
) # to get more chunks into the chunk_tpe
files = [self._create_file_transfer_client(m) for m in metas]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so here create one client for each meta, and each client fill authenticated header and args at least once, we might be able to save some duplicated works here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be handled as an enhancement after we release this feature. Created SNOW-398380 to track

else:
logger.debug(f"Finished preparing file {done_client.meta.name}")
with cv_chunk_process:
while transfer_metadata.chunks_in_queue > 2 * max_concurrency:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 times of max_concurrency? that means 2 times of max_concurrency chunks are being transferred, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, max_concurrency number of them are transferred, but 2 * max_concurrency is waiting in the queue ready to go

fd = (
self.meta.real_src_stream
or self.meta.src_stream
or open(self.data_file, "rb")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the same file, seems we are opening the file once for each chunk - this could be optimized.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not without buffering data in memory, which lead before to really high memory usage.
I think nowadays most people would be using SSDs, so I don't see the point in optimizing for HDDs.

qtask,
self._results,
triggers,
with cv_chunk_process:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try to lock cv_chunk_process here when a chunk is done, but cv_chunk_process in preprocess_done_cb is locked for a big loop, and that preprocess_done_cb is triggered in multi thread manner I am afraid some early done transfer chunks might be blocked here (might not be too long still) until the loop in preprocess_done_cb releases this Condition lock.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me link the line where this cv is used: https://github.com/snowflakedb/snowflake-connector-python/blob/9ad4b67e21fa4a7919c56d05426c705b443158ca/src/snowflake/connector/file_transfer_agent.py#L447

I think it's safe to say that the code on this spot is non-blocking, and doesn't do much checks a variable and sleeps on the CV. And there is at most cpu_count of threads in this cv section. I think it is safe to leave it as is.

I could add extra logging that would indicate that this is an issue. Is that okay with you?

# Arguments for work fn
_chunk_id,
)
transfer_metadata.chunks_in_queue += 1
Copy link
Contributor

@sfc-gh-cshi sfc-gh-cshi Jul 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

once submit, chunks_in_queue adds one. A large file with many chunks(e.g. 1000 chunks) will cross the 2 * max_concurrency limit (e.g. 32) a lot, in this case, every transfer_done_cb will trigger cv_chunk_process.notify() and cause the while loop (at line 444) of some other threads being awaked and then wait again, maybe a semaphore could help reducing the cost here.

   while transfer_metadata.chunks_in_queue > 2 * max_concurrency:
                    cv_chunk_process.wait()"

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, a semaphore is better fitted here!
The use-case is not something that we recommend though, do you think that we could treat this as an enhancement later instead of delaying this feature?

@sfc-gh-mkeller sfc-gh-mkeller force-pushed the mkeller/SNOW-300951-remove_cloud_sdks branch 4 times, most recently from e245543 to 63e8000 Compare August 3, 2021 22:39
@sfc-gh-mkeller sfc-gh-mkeller force-pushed the mkeller/SNOW-300951-remove_cloud_sdks branch 2 times, most recently from 71201c5 to a8e8c42 Compare August 5, 2021 21:41
@sfc-gh-mkeller sfc-gh-mkeller force-pushed the mkeller/SNOW-300951-remove_cloud_sdks branch from 18d87a2 to da49777 Compare August 20, 2021 00:17
@sfc-gh-mkeller sfc-gh-mkeller merged commit 18ef9c0 into master Aug 20, 2021
@sfc-gh-mkeller sfc-gh-mkeller deleted the mkeller/SNOW-300951-remove_cloud_sdks branch August 20, 2021 03:45
sfc-gh-aling pushed a commit that referenced this pull request Oct 17, 2022
Co-authored-by: Sophie Tan <sophie.tan@snowflake.com>
Co-authored-by: Kushan Zaveri <kushan.zaveri@snowflake.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

SNOW-141545: Request - Reduce package dependencies chaos

6 participants