-
Notifications
You must be signed in to change notification settings - Fork 524
SNOW-300951 removing cloud SDKs #725
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SNOW-300951 removing cloud SDKs #725
Conversation
62ceeaf to
1d6089a
Compare
0231a62 to
38c1c55
Compare
e31ad0d to
3768635
Compare
09482c9 to
c077cf7
Compare
|
|
||
| import json | ||
| import os | ||
| import xml.etree.cElementTree as ET |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As the usage of ET here is to construct a simple and small XML text, maybe just using string template is enough and faster.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While you are right in this file, we do use ET for parsing responses from blob storage services:
| err = ET.fromstring(message) |
Given that we need to keep it I'd like to keep our usage consistent across the driver (where we deal with XMLs we should keep using
ET).
| def transfer(self, metas: List["SnowflakeFileMeta"]) -> None: | ||
| max_concurrency = self._parallel | ||
| network_tpe = ThreadPoolExecutor(max_concurrency) | ||
| preprocess_tpe = ThreadPoolExecutor(min(len(metas), os.cpu_count())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cpu_count() could be too many, would suggest to use half of cpu count.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We ran a benchmark and found that os.cpu_count() was better for files with size around 200MB (which is what we suggest customers to use), but this number seems to be too high for very large files.
| cv_chunk_process = ( | ||
| threading.Condition() | ||
| ) # to get more chunks into the chunk_tpe | ||
| files = [self._create_file_transfer_client(m) for m in metas] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so here create one client for each meta, and each client fill authenticated header and args at least once, we might be able to save some duplicated works here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be handled as an enhancement after we release this feature. Created SNOW-398380 to track
| else: | ||
| logger.debug(f"Finished preparing file {done_client.meta.name}") | ||
| with cv_chunk_process: | ||
| while transfer_metadata.chunks_in_queue > 2 * max_concurrency: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2 times of max_concurrency? that means 2 times of max_concurrency chunks are being transferred, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, max_concurrency number of them are transferred, but 2 * max_concurrency is waiting in the queue ready to go
| fd = ( | ||
| self.meta.real_src_stream | ||
| or self.meta.src_stream | ||
| or open(self.data_file, "rb") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for the same file, seems we are opening the file once for each chunk - this could be optimized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not without buffering data in memory, which lead before to really high memory usage.
I think nowadays most people would be using SSDs, so I don't see the point in optimizing for HDDs.
| qtask, | ||
| self._results, | ||
| triggers, | ||
| with cv_chunk_process: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
try to lock cv_chunk_process here when a chunk is done, but cv_chunk_process in preprocess_done_cb is locked for a big loop, and that preprocess_done_cb is triggered in multi thread manner I am afraid some early done transfer chunks might be blocked here (might not be too long still) until the loop in preprocess_done_cb releases this Condition lock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me link the line where this cv is used: https://github.com/snowflakedb/snowflake-connector-python/blob/9ad4b67e21fa4a7919c56d05426c705b443158ca/src/snowflake/connector/file_transfer_agent.py#L447
I think it's safe to say that the code on this spot is non-blocking, and doesn't do much checks a variable and sleeps on the CV. And there is at most cpu_count of threads in this cv section. I think it is safe to leave it as is.
I could add extra logging that would indicate that this is an issue. Is that okay with you?
| # Arguments for work fn | ||
| _chunk_id, | ||
| ) | ||
| transfer_metadata.chunks_in_queue += 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
once submit, chunks_in_queue adds one. A large file with many chunks(e.g. 1000 chunks) will cross the 2 * max_concurrency limit (e.g. 32) a lot, in this case, every transfer_done_cb will trigger cv_chunk_process.notify() and cause the while loop (at line 444) of some other threads being awaked and then wait again, maybe a semaphore could help reducing the cost here.
while transfer_metadata.chunks_in_queue > 2 * max_concurrency:
cv_chunk_process.wait()"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, a semaphore is better fitted here!
The use-case is not something that we recommend though, do you think that we could treat this as an enhancement later instead of delaying this feature?
e245543 to
63e8000
Compare
71201c5 to
a8e8c42
Compare
18d87a2 to
da49777
Compare
Co-authored-by: Sophie Tan <sophie.tan@snowflake.com> Co-authored-by: Kushan Zaveri <kushan.zaveri@snowflake.com>
SNOW-300951
Introduction
Back when we started on this project it was mainly to decrease the number of our dependencies and the overall size of snowflake-connector-python, as we can disregard all other capabilities other than blob storage.
Since the start we ran into a few other issues with the cloud SDKs, like some more exotic proxies not being supported by the cloud SDKs and not supporting asyncio.
Design
Initial design
Different blob storage services work mostly the same, but there are subtle differences. For example consider that on S3 we need to keep track of the block numbers and the returned etags for all the pieces for multipart upload, while on Azure we specify the certain block IDs when putting the pieces and then we send the list of those ids to an end-point, because of this we quickly arrived to a class design like this:
While performing PUT and GET operation most of the time is spent on the network calls, so we knew from the start that we needed to perform the network call in parallel.
Previously our method to parallelize execution was to use
ThreadPoolExecutors (I’ll be referring to them astpes from now on), which already handles some concurrency hurdles for us, but during this project their limitations became obvious.Notice that the idea was that the main thread would be doing most of the work and then it would let the
network_tpetake care of network calls.Once the implementation was semi complete we ran benchmark and found that performance suffered greatly at least on machines with limited computing power when compared to their network speed (such as cheaper EC2 machines).
As it turned out when only the network calls were parallelized compression became the most expensive operation. So after some back-and-forth we arrived to the final design.
Final design
The new idea in this iteration is that we have separate thread pools for CPU and network bound tasks.
When designing the flow of files we wanted to make sure that if a chunk needed to be retried the queue to the
network_tpewasn’t too long so we introduced a conditional variable where each file chunk will wait to submit it’s chunk to the network tpe until the queue’s length is over 2 * max_concurrency. Once all of the chunks of a file have been processed by thenetwork_tpeduring a download gets submitted to thepostprocess_tpeto be decrypted. In case of an upload, the last chunk to finish in the network_tpe makes a second call to finalize the file on blob storage. Note that the only difference betweenpreprocess_tpeandpostprocess_tpeis that we did not want to pollute the queue to the network_tpe with tasks that we can basically guaranteed to work.Issues with ThreadPoolExecutors
Callbacks might be executed synchronously: This is a gotcha of the tpes, if a thread submits a job and if that job is done by the time the thread is attaching a callback onto the future, then the callback will be executed by the thread that is attaching the callback to the future. For this reason we introduced the fn function_and_callback_wrapper that makes sure that the callback is called concurrently from the tpe.
Error handling in callbacks is non-existent in tpes: We need to be careful how we handle errors inside of tpes. While calling
.result()on a tpe raises the same exception that was raised in the callable that was executed there’s no such logic for Exceptions raised inside of callback function. For this reason we introduced manually track errors and handle them inside of theSnowflakeFileTransferAgent.result.Closes SNOW-141545: Request - Reduce package dependencies chaos #284