Skip to content

[Issue #736] pipeline execution of hash partitioned join (cont.) #771

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 63 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
d50c6d5
Minor fix
jasha64 Oct 17, 2024
39a70a3
log in PixelsReaderStreamImpl
jasha64 Oct 24, 2024
2f76bd6
Add isSmallTable in partition input, to fix a hardcode in base partit…
jasha64 Oct 24, 2024
bd559fa
Fix log4j2.properties in pixels-turbo/pixels-worker-common
jasha64 Oct 24, 2024
5d9be0d
Also fix hardcode in BasePartitionStreamWorker
jasha64 Oct 24, 2024
643e6c2
Fix bug in BasePartitionedJoinStreamWorker
jasha64 Oct 27, 2024
c9039e8
Use -2 as numPartitions when getting schema readers
jasha64 Oct 27, 2024
4371236
Also fix hardcode in BasePartitionedJoinStreamWorker
jasha64 Oct 29, 2024
a76601a
Add postPartitionId and postPartitionIsSmallTable in JoinInfo, to ext…
jasha64 Nov 2, 2024
dea7cf2
Add postPartitionId and postPartitionIsSmallTable in JoinInfo, to ext…
jasha64 Nov 2, 2024
bddce98
Comments
jasha64 Nov 2, 2024
0d9054e
Comments
jasha64 Nov 2, 2024
09bc92c
Interconnection between workers and stream workers, by modifying stor…
jasha64 Nov 2, 2024
5dfb296
VhiveInvoker should not call blocking GRPC in its constructor
jasha64 Nov 4, 2024
30baa67
Fix smallPartitionWorkerNum and largePartitionWorkerNum in Partitione…
jasha64 Nov 4, 2024
66c9c47
Bug fix, support multiple partition workers
jasha64 Nov 4, 2024
76cad0a
Minor fix
jasha64 Nov 4, 2024
901ab03
Bug fix in pixels stream reader
jasha64 Nov 5, 2024
e518599
Bug fix pixels stream reader
jasha64 Nov 5, 2024
616e307
Comments and logs
jasha64 Nov 5, 2024
2952fc3
Bug fix in stream reader and writer: process empty partition results
jasha64 Nov 6, 2024
d9cef74
Bug fix in PartitionedJoinOperator
jasha64 Nov 6, 2024
65d45d3
Revert "Interconnection between workers and stream workers, by modify…
jasha64 Nov 6, 2024
27488c5
Merge branch 'master' into dev3
jasha64 Nov 7, 2024
260dc70
Add partitionId in PartitionInput, to fix a hardcode in BasePartition…
jasha64 Nov 7, 2024
b630c48
Adapt PixelsPlanner to support streaming mode
jasha64 Nov 7, 2024
b048d6c
Optimization in partitioned join stream worker
jasha64 Nov 12, 2024
b398d10
Fix hardcode of streaming port numbers
jasha64 Nov 12, 2024
4d0fc87
Format
jasha64 Nov 12, 2024
958dd51
Modify Pixels stream writer to retry connection at 100ms interval
jasha64 Nov 13, 2024
001e3ca
Minor fix
jasha64 Oct 17, 2024
2bc7ce2
log in PixelsReaderStreamImpl
jasha64 Oct 24, 2024
95c9820
Add isSmallTable in partition input, to fix a hardcode in base partit…
jasha64 Oct 24, 2024
ad1a97c
Fix log4j2.properties in pixels-turbo/pixels-worker-common
jasha64 Oct 24, 2024
a3a4da7
Also fix hardcode in BasePartitionStreamWorker
jasha64 Oct 24, 2024
28142da
Fix bug in BasePartitionedJoinStreamWorker
jasha64 Oct 27, 2024
4622503
Use -2 as numPartitions when getting schema readers
jasha64 Oct 27, 2024
5f3346a
Also fix hardcode in BasePartitionedJoinStreamWorker
jasha64 Oct 29, 2024
189a751
Add postPartitionId and postPartitionIsSmallTable in JoinInfo, to ext…
jasha64 Nov 2, 2024
222145c
Add postPartitionId and postPartitionIsSmallTable in JoinInfo, to ext…
jasha64 Nov 2, 2024
2bf5f52
Comments
jasha64 Nov 2, 2024
83bce2c
Comments
jasha64 Nov 2, 2024
c516ecb
Interconnection between workers and stream workers, by modifying stor…
jasha64 Nov 2, 2024
e8d7a55
VhiveInvoker should not call blocking GRPC in its constructor
jasha64 Nov 4, 2024
5842401
Fix smallPartitionWorkerNum and largePartitionWorkerNum in Partitione…
jasha64 Nov 4, 2024
2d85258
Bug fix, support multiple partition workers
jasha64 Nov 4, 2024
e1b6cca
Minor fix
jasha64 Nov 4, 2024
1f0a159
Bug fix in pixels stream reader
jasha64 Nov 5, 2024
0deb55e
Bug fix pixels stream reader
jasha64 Nov 5, 2024
1a1aadb
Comments and logs
jasha64 Nov 5, 2024
7ad765d
Bug fix in stream reader and writer: process empty partition results
jasha64 Nov 6, 2024
b009faa
Bug fix in PartitionedJoinOperator
jasha64 Nov 6, 2024
d0aa7d0
Revert "Interconnection between workers and stream workers, by modify…
jasha64 Nov 6, 2024
3b03cdc
Add partitionId in PartitionInput, to fix a hardcode in BasePartition…
jasha64 Nov 7, 2024
8e8a2bc
Adapt PixelsPlanner to support streaming mode
jasha64 Nov 7, 2024
35dd99e
Optimization in partitioned join stream worker
jasha64 Nov 12, 2024
4eb8fd9
Fix hardcode of streaming port numbers
jasha64 Nov 12, 2024
bb0ae95
Format
jasha64 Nov 12, 2024
56c0a75
Modify Pixels stream writer to retry connection at 100ms interval
jasha64 Nov 13, 2024
1fafdf5
implement stream storage
huasiy Nov 7, 2024
1b88ede
fix no data bug in stream storage
huasiy Nov 7, 2024
36b71a8
implement physical reader and writer
huasiy Nov 12, 2024
bcf9c27
Merge branch 'dev3' of github.com:jasha64/pixels into dev3
jasha64 Dec 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package io.pixelsdb.pixels.common.physical;

import static java.util.Objects.requireNonNull;

public class StreamPath
{
private String host;
private int port;
public boolean valid = false;

public StreamPath(String path)
{
requireNonNull(path);
if (path.contains(":///"))
{
path = path.substring(path.indexOf(":///") + 4);
}
int colon = path.indexOf(':');
if (colon > 0)
{
host = path.substring(0, colon);
port = Integer.parseInt(path.substring(colon + 1));
this.valid = true;
}
}

public String getHostName()
{
return host;
}

public int getPort()
{
return port;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public final class Constants
public static final int S3_BUFFER_SIZE = 8 * 1024 * 1024;
public static final int REDIS_BUFFER_SIZE = 8 * 1024 * 1024;
public static final int GCS_BUFFER_SIZE = 8 * 1024 * 1024;
public static final int STREAM_BUFFER_SIZE = 8 * 1024 * 1024;

public static final int MIN_REPEAT = 3;
public static final int MAX_SCOPE = 512;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,20 @@
public final class HttpServer
{
final HttpServerInitializer initializer;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private Channel channel;

public HttpServer(HttpServerHandler handler) throws CertificateException, SSLException
{
this.initializer = new HttpServerInitializer(HttpServerUtil.buildSslContext(), handler);
handler.setServerCloser(this::close);
}

public void serve(int PORT) throws InterruptedException
{
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();
try
{
ServerBootstrap b = new ServerBootstrap();
Expand All @@ -59,13 +63,28 @@ public void serve(int PORT) throws InterruptedException
.handler(new LoggingHandler(LogLevel.DEBUG))
.childHandler(this.initializer);

Channel ch = b.bind(PORT).sync().channel();

ch.closeFuture().sync();
channel = b.bind(PORT).sync().channel();
channel.closeFuture().sync();
} finally
{
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

public void close()
{
if (channel != null)
{
channel.close();
}
if (bossGroup != null)
{
bossGroup.shutdownGracefully();
}
if (workerGroup != null)
{
workerGroup.shutdownGracefully();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
@ChannelHandler.Sharable
public class HttpServerHandler extends SimpleChannelInboundHandler<HttpObject>
{
protected Runnable serverCloser;

@Override
public void channelReadComplete(ChannelHandlerContext ctx)
Expand Down Expand Up @@ -100,4 +101,8 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
ChannelFuture f = ctx.writeAndFlush(response);
f.addListener(ChannelFutureListener.CLOSE);
}

public void setServerCloser(Runnable serverCloser) {
this.serverCloser = serverCloser;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public class PixelsReaderStreamImpl implements PixelsReader
// In partitioned mode, we use byteBufBlockingMap to map hash value to corresponding ByteBuf
private final BlockingMap<Integer, ByteBuf> byteBufBlockingMap;
private final boolean partitioned;
private final int httpPort;
private final AtomicReference<Integer> numPartitionsReceived = new AtomicReference<>(0);
private final List<PixelsRecordReaderStreamImpl> recordReaders;

Expand All @@ -98,7 +99,7 @@ public class PixelsReaderStreamImpl implements PixelsReader

public PixelsReaderStreamImpl(String endpoint) throws Exception
{
this(endpoint, false, -1);
this(endpoint, false, -2);
}

public PixelsReaderStreamImpl(int port) throws Exception
Expand All @@ -113,7 +114,7 @@ public PixelsReaderStreamImpl(String endpoint, boolean partitioned, int numParti
this.streamHeader = null;
URI uri = new URI(endpoint);
String IP = uri.getHost();
int httpPort = uri.getPort();
this.httpPort = uri.getPort();
logger.debug("In Pixels stream reader constructor, IP: " + IP + ", port: " + httpPort +
", partitioned: " + partitioned + ", numPartitions: " + numPartitions);
if (!Objects.equals(IP, "127.0.0.1") && !Objects.equals(IP, "localhost"))
Expand Down Expand Up @@ -151,13 +152,6 @@ public void channelRead0(ChannelHandlerContext ctx, HttpObject msg)
", partition ID header: " + req.headers().get("X-Partition-Id") +
", HTTP request object body total length: " + req.content().readableBytes());

// schema packet: only 1 packet expected, so close the connection immediately
// partitioned mode: close the connection if all partitions received
// else (non-partitioned mode, data packet): close connection if empty packet received
boolean needCloseParentChannel = partitionId == PixelsWriterStreamImpl.PARTITION_ID_SCHEMA_WRITER ||
(partitioned && numPartitionsReceived.get() == numPartitions) ||
(Objects.equals(req.headers().get(CONNECTION), CLOSE.toString()) &&
req.content().readableBytes() == 0);
ByteBuf byteBuf = req.content();
try
{
Expand All @@ -178,27 +172,29 @@ public void channelRead0(ChannelHandlerContext ctx, HttpObject msg)
catch (IOException e)
{
logger.error("Invalid stream header values: ", e);
sendResponseAndClose(ctx, req, BAD_REQUEST, needCloseParentChannel);
sendResponseAndClose(ctx, req, BAD_REQUEST, false);
return;
}
}
else if (partitioned)
{
// In partitioned mode, every packet brings a streamHeader to prevent errors from possibly
// out-of-order packet arrivals, so we need to parse it, but do not need the return value
// (except for the first incoming packet processed above).
// out-of-order packet arrivals, so we need to parse it, but do not need the return value
// (except for the first incoming packet processed above).
// XXX: Now we have each worker pass the schema in a separate packet, so this is no longer
// necessary. We can remove this block of code in PixelsWriterStreamImpl.
parseStreamHeader(byteBuf);
}
}
catch (InvalidProtocolBufferException | IndexOutOfBoundsException e)
{
logger.error("Malformed or corrupted stream header", e);
sendResponseAndClose(ctx, req, BAD_REQUEST, needCloseParentChannel);
sendResponseAndClose(ctx, req, BAD_REQUEST, false);
return;
}

// We only need to put the byteBuf into the blocking queue to pass it to the recordReader, if the
// client is a data writer rather than a schema writer. In the latter case,
// packet is a data packet rather than a schema packet. Because in the latter case,
// the schema packet has been processed when parsing the stream header above.
if (partitionId != PixelsWriterStreamImpl.PARTITION_ID_SCHEMA_WRITER)
{
Expand All @@ -209,7 +205,7 @@ else if (partitioned)
if (partitionId < 0 || partitionId >= numPartitions)
{
logger.warn("Client sent invalid partitionId value: " + partitionId);
sendResponseAndClose(ctx, req, BAD_REQUEST, needCloseParentChannel);
sendResponseAndClose(ctx, req, BAD_REQUEST, false);
return;
}
byteBufBlockingMap.put(partitionId, byteBuf);
Expand All @@ -222,6 +218,13 @@ else if (partitioned)
}
}

// schema reader: only 1 packet expected, so close the connection immediately
// partitioned mode: close the connection if all partitions received
// else (non-partitioned mode, data packet): close connection if empty packet received
boolean needCloseParentChannel = numPartitions == PixelsWriterStreamImpl.PARTITION_ID_SCHEMA_WRITER ||
(partitioned && numPartitionsReceived.get() == numPartitions) ||
(numPartitions == -1 && Objects.equals(req.headers().get(CONNECTION), CLOSE.toString()) &&
req.content().readableBytes() == 0);
sendResponseAndClose(ctx, req, HttpResponseStatus.OK, needCloseParentChannel);
}

Expand Down Expand Up @@ -437,15 +440,7 @@ public int getRowGroupNum()
@Override
public boolean isPartitioned()
{
try
{
streamHeaderLatch.await();
}
catch (InterruptedException e)
{
logger.error("Interrupted while waiting for stream header", e);
}
return this.streamHeader.hasPartitioned() && this.streamHeader.getPartitioned();
return partitioned;
}

/**
Expand Down Expand Up @@ -523,6 +518,7 @@ public PixelsProto.Footer getFooter()
public void close()
throws IOException
{
logger.debug("Closing PixelsReaderStreamImpl");
new Thread(() -> {
// Conditions for closing:
// 1. streamHeaderLatch.await() to ensure that the stream header has been received
Expand All @@ -539,11 +535,11 @@ public void close()

try
{
if (!this.httpServerFuture.isDone()) this.httpServerFuture.get(5, TimeUnit.SECONDS);
if (!this.httpServerFuture.isDone()) this.httpServerFuture.get(300, TimeUnit.SECONDS);
}
catch (TimeoutException e)
{
logger.warn("In close(), HTTP server did not shut down in 5 seconds, doing forceful shutdown");
logger.warn("In close(), HTTP server on port " + httpPort + " did not shut down in 300 seconds, doing forceful shutdown");
this.httpServerFuture.cancel(true);
}
catch (InterruptedException | ExecutionException e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,7 @@ public void addRowBatch(VectorizedRowBatch rowBatch, int hashValue) throws IOExc
currHashValue = hashValue;
hashValueIsSet = true;
curRowGroupDataLength = 0;
if (rowBatch == null) return;
curRowGroupNumOfRows += rowBatch.size;
writeColumnVectors(rowBatch.cols, rowBatch.size);
}
Expand Down Expand Up @@ -550,8 +551,9 @@ public void close()
{
try
{
if (curRowGroupNumOfRows != 0)
if (partitioned || curRowGroupNumOfRows != 0)
{
// In partitioned mode, even an empty row group has to be sent to the server.
writeRowGroup();
}
// If the outgoing stream is empty (addRowBatch() and thus writeRowGroup() never called), we artificially
Expand Down Expand Up @@ -609,7 +611,9 @@ else if (isFirstRowGroup)

private void writeRowGroup() throws IOException
{
if (isFirstRowGroup || partitioned)
// XXX: Now that we have each worker pass the schema in a separate packet in partitioned mode, it is no longer
// necessary to add a stream header to every packet. We can modify this block of code.
if (isFirstRowGroup || partitioned) // if (isFirstRowGroup)
{
writeStreamHeader();
isFirstRowGroup = false;
Expand Down Expand Up @@ -769,7 +773,8 @@ private void writeRowGroup() throws IOException
uri = URI.create(fileNameToUri(fileName));
}
String reqUri = partitioned ? uris.get(currHashValue).toString() : uri.toString();
logger.debug("Sending row group with length: " + byteBuf.writerIndex() + " to endpoint: " + reqUri);
logger.debug("Sending row group to endpoint: " + reqUri + ", length: " + byteBuf.writerIndex()
+ ", partitionId: " + partitionId);
Request req = httpClient.preparePost(reqUri)
.setBody(byteBuf.nioBuffer())
.addHeader("X-Partition-Id", String.valueOf(partitionId))
Expand All @@ -786,8 +791,8 @@ private void writeRowGroup() throws IOException
try
{
outstandingHTTPRequestSemaphore.acquire();
int maxAttempts = 30000;
long backoffMillis = 10;
int maxAttempts = 3000;
long backoffMillis = 100;
int attempt = 0;
boolean success = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import java.util.*;
import java.util.concurrent.BlockingQueue;

import static com.google.common.base.Preconditions.checkArgument;

/**
* PixelsRecordReaderStreamImpl is the variant of {@link PixelsRecordReaderImpl} for streaming mode.
* <p>
Expand Down Expand Up @@ -153,6 +155,12 @@ public PixelsRecordReaderStreamImpl(boolean partitioned,
*/
public void lateInitialization(PixelsStreamProto.StreamHeader streamHeader) throws IOException
{
if (this.streamHeader != null)
{
checkArgument(this.streamHeader == streamHeader,
"streamHeader used for lateInitialization() is not the same as the one in the RecordReader.");
return;
}
this.streamHeader = streamHeader;
checkBeforeRead();
}
Expand Down Expand Up @@ -426,6 +434,7 @@ public int prepareBatch(int batchSize)
*/
private VectorizedRowBatch createEmptyEOFRowBatch(int size)
{
logger.debug("In createEmptyEOFRowBatch(), size = " + size);
TypeDescription resultSchema = TypeDescription.createSchema(new ArrayList<>());
VectorizedRowBatch resultRowBatch = resultSchema.createRowBatch(0);
resultRowBatch.projectionSize = 0;
Expand Down Expand Up @@ -494,6 +503,14 @@ public VectorizedRowBatch readBatch(int batchSize, boolean reuse)
}

int rgRowCount = (int) curRowGroupStreamFooter.getNumberOfRows();
if (rgRowCount == 0)
{
// Empty row group, mark the current row group as unreadable.
curRowGroupByteBuf.readerIndex(curRowGroupByteBuf.readerIndex() + curRowGroupByteBuf.readableBytes());
curRGIdx++;
return resultSchema.createRowBatch(0, resultColumnsEncoded);
}

int curBatchSize;
ColumnVector[] columnVectors = resultRowBatch.cols;

Expand Down Expand Up @@ -702,6 +719,7 @@ private void acquireNewRowGroup(boolean reuse) throws IOException
else
// incoming byteBuf unreadable, must be end of stream
{
logger.debug("In acquireNewRowGroup(), end of file");
// checkValid = false; // Issue #105: to reject continuous read.
if (reuse && resultRowBatch != null)
// XXX: Before we implement necessary checks, the close() below might be called before our readBatch()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void put(K key, V value)

public V get(K key) throws InterruptedException
{
V ret = getQueue(key).poll(60, TimeUnit.SECONDS);
V ret = getQueue(key).poll(300, TimeUnit.SECONDS);
if (ret == null)
{
throw new RuntimeException("BlockingMap.get() timed out");
Expand Down
Loading