Skip to content
Open
7 changes: 7 additions & 0 deletions dataframe.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ library
DataFrame.IO.CSV,
DataFrame.IO.JSON,
DataFrame.IO.Unstable.CSV,
DataFrame.IO.Unstable.Parquet.Utils,
DataFrame.IO.Unstable.Parquet.Thrift,
DataFrame.IO.Unstable.Parquet.PageParser,
DataFrame.IO.Unstable.Parquet,
DataFrame.IO.Utils.RandomAccess,
DataFrame.IO.Parquet,
DataFrame.IO.Parquet.Binary,
DataFrame.IO.Parquet.Dictionary,
Expand Down Expand Up @@ -148,6 +153,8 @@ library
http-conduit >= 2.3 && < 3,
streamly-core,
streamly-bytestring,
pinch >= 0.5.1.0 && <= 0.5.2.0 ,
streamly-core >= 0.3.0,

hs-source-dirs: src
c-sources: cbits/process_csv.c
Expand Down
4 changes: 4 additions & 0 deletions src/DataFrame.hs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ module DataFrame (
module CSV,
module UnstableCSV,
module Parquet,
module UnstableParquet,

-- * Type conversion
module Typing,
Expand Down Expand Up @@ -272,6 +273,9 @@ import DataFrame.IO.Unstable.CSV as UnstableCSV (
readCsvUnstable,
readTsvUnstable,
)
import DataFrame.IO.Unstable.Parquet as UnstableParquet (
readParquetUnstable,
)
import DataFrame.Internal.Column as Column (
Column,
fromList,
Expand Down
46 changes: 25 additions & 21 deletions src/DataFrame/IO/Parquet/Page.hs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,29 @@ isDictionaryPage page = case pageTypeHeader (pageHeader page) of
DictionaryPageHeader{..} -> True
_ -> False

decompressData :: CompressionCodec -> BS.ByteString -> IO BS.ByteString
decompressData codec compressed = case codec of
Copy link
Copy Markdown

@adithyaov adithyaov Mar 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The result of decompressData is used to produce a stream of Page (readPage). This decompression is strict in nature. I'm not sure if we can do a lazy, on-demand, decompression.

ZSTD -> do
result <- Zstd.decompress
drainZstd result compressed []
where
drainZstd (Zstd.Consume f) input acc = do
result <- f input
drainZstd result BS.empty acc
drainZstd (Zstd.Produce chunk next) _ acc = do
result <- next
drainZstd result BS.empty (chunk : acc)
drainZstd (Zstd.Done final) _ acc =
pure $ BS.concat (reverse (final : acc))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bytestring might have something similar to fromListRevN or fromChunksRev. If not, it should be easy to write our own.
We can avoid a list traversal and pre-allocate the resulting array avoiding any unnecessary copies.

drainZstd (Zstd.Error msg msg2) _ _ =
error ("ZSTD error: " ++ msg ++ " " ++ msg2)
SNAPPY -> case Snappy.decompress compressed of
Left e -> error (show e)
Right res -> pure res
UNCOMPRESSED -> pure compressed
GZIP -> pure (LB.toStrict (GZip.decompress (BS.fromStrict compressed)))
other -> error ("Unsupported compression type: " ++ show other)

readPage :: CompressionCodec -> BS.ByteString -> IO (Maybe Page, BS.ByteString)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related and out of context: This looks like an Unfold to me.

readPage c columnBytes =
if BS.null columnBytes
Expand All @@ -42,27 +65,8 @@ readPage c columnBytes =

let compressed = BS.take (fromIntegral $ compressedPageSize hdr) rem

fullData <- case c of
ZSTD -> do
result <- Zstd.decompress
drainZstd result compressed []
where
drainZstd (Zstd.Consume f) input acc = do
result <- f input
drainZstd result BS.empty acc
drainZstd (Zstd.Produce chunk next) _ acc = do
result <- next
drainZstd result BS.empty (chunk : acc)
drainZstd (Zstd.Done final) _ acc =
pure $ BS.concat (reverse (final : acc))
drainZstd (Zstd.Error msg msg2) _ _ =
error ("ZSTD error: " ++ msg ++ " " ++ msg2)
SNAPPY -> case Snappy.decompress compressed of
Left e -> error (show e)
Right res -> pure res
UNCOMPRESSED -> pure compressed
GZIP -> pure (LB.toStrict (GZip.decompress (BS.fromStrict compressed)))
other -> error ("Unsupported compression type: " ++ show other)
fullData <- decompressData c compressed

pure
( Just $ Page hdr fullData
, BS.drop (fromIntegral $ compressedPageSize hdr) rem
Expand Down
2 changes: 1 addition & 1 deletion src/DataFrame/IO/Parquet/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ data ParquetType
| PBYTE_ARRAY
| PFIXED_LEN_BYTE_ARRAY
| PARQUET_TYPE_UNKNOWN
deriving (Show, Eq)
deriving (Show, Eq, Enum)

parquetTypeFromInt :: Int32 -> ParquetType
parquetTypeFromInt 0 = PBOOLEAN
Expand Down
188 changes: 188 additions & 0 deletions src/DataFrame/IO/Unstable/Parquet.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
{-# LANGUAGE ExplicitForAll #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE OverloadedRecordDot #-}

module DataFrame.IO.Unstable.Parquet (readParquetUnstable) where

import Control.Monad.IO.Class (MonadIO (..))
import Data.Bits (Bits (shiftL), (.|.))
import qualified Data.ByteString as BS
import Data.Functor ((<&>))
import Data.List (foldl', transpose)
import qualified Data.Map as Map
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can use Data.Map.Strict by default, there is no need to be lazy here

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed

import Data.Maybe (fromJust, fromMaybe, isNothing)
import Data.Text (Text)
import qualified Data.Vector as Vector
import DataFrame.IO.Parquet.Dictionary (readDictVals)
import DataFrame.IO.Parquet.Page (decompressData)
import DataFrame.IO.Parquet.Types (DictVals)
import DataFrame.IO.Unstable.Parquet.PageParser (parsePage)
import DataFrame.IO.Unstable.Parquet.Thrift (
ColumnChunk (..),
ColumnMetaData (..),
CompressionCodec (..),
DictionaryPageHeader (..),
FileMetadata (..),
PageHeader (..),
RowGroup (..),
SchemaElement (..),
pinchCompressionToParquetCompression,
pinchThriftTypeToParquetType,
unField,
)
import DataFrame.IO.Unstable.Parquet.Utils (
ColumnDescription,
PageDescription (PageDescription),
foldColumns,
generateColumnDescriptions,
)
import DataFrame.IO.Utils.RandomAccess (
RandomAccess (..),
Range (Range),
ReaderIO (runReaderIO),
)
import DataFrame.Internal.Column (Column)
import DataFrame.Internal.DataFrame (DataFrame (..))
import Pinch (decodeWithLeftovers)
import qualified Pinch
import Streamly.Data.Stream (Stream)
import qualified Streamly.Data.Stream as Stream
import Streamly.Data.Unfold (Unfold)
import qualified Streamly.Internal.Data.Unfold as Unfold
import qualified System.IO as IO

readParquetUnstable :: FilePath -> IO DataFrame
readParquetUnstable filepath = IO.withFile filepath IO.ReadMode $ \handle -> do
runReaderIO parseParquet handle

parseParquet :: (RandomAccess r, MonadIO r) => r DataFrame
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

r -> m?

parseParquet = do
metadata <- parseFileMetadata
let vectorLength = fromIntegral . unField $ metadata.num_rows :: Int
columnStreams = parseColumns metadata
columnList <- mapM (foldColumns vectorLength) columnStreams
let columns = Vector.fromListN (length columnList) columnList
columnNames :: [Text]
columnNames =
map (unField . name)
. filter
( \se ->
(isNothing $ unField $ num_children se)

Check warning on line 71 in src/DataFrame/IO/Unstable/Parquet.hs

View workflow job for this annotation

GitHub Actions / hlint

Suggestion in parseParquet in module DataFrame.IO.Unstable.Parquet: Move brackets to avoid $ ▫︎ Found: "(isNothing $ unField $ num_children se)\n || unField se.num_children == Just 0" ▫︎ Perhaps: "isNothing (unField $ num_children se)\n || unField se.num_children == Just 0"
|| unField se.num_children == Just 0
)
$ unField metadata.schema
columnIndices = Map.fromList $ zip columnNames [0 ..]
dataframeDimensions = (vectorLength, length columnStreams)
return $ DataFrame columns columnIndices dataframeDimensions Map.empty

parseFileMetadata ::
(RandomAccess r) => r FileMetadata
parseFileMetadata = do
footerOffset <- readSuffix 8
let size = getMetadataSize footerOffset
rawMetadata <- readSuffix (size + 8) <&> BS.take size
case Pinch.decode Pinch.compactProtocol rawMetadata of
Left e -> error $ show e
Right metadata -> return metadata
Comment on lines +85 to +87
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. You can use maybe
  2. Use of error will make the control flow harder to reason with and manage later.

where
getMetadataSize footer =
let sizes :: [Int]
sizes = map (fromIntegral . BS.index footer) [0 .. 3]
in foldl' (.|.) 0 $ zipWith shiftL sizes [0, 8 .. 24]

parseColumns :: (RandomAccess r, MonadIO r) => FileMetadata -> [Stream r Column]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like this: [Stream r ColumnChunk]. That said, I'm not in a position to suggest a better alternative.
Could you help me understand how this fits in the bigger picture?
Each element in this list corresponds to a column?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: I think I see where this is used.
You can return a vector directly here.
Vector (Stream Column) is easier to reason with over [Stream Column]
FYI, Data.Vector == Streamly.Data.Array (Boxed & Unboxed)

parseColumns metadata =
let columnDescriptions = generateColumnDescriptions $ unField $ schema metadata
colChunks = columnChunks metadata
_numColumns = length colChunks
_numDescs = length columnDescriptions
in if _numColumns /= _numDescs
then
error $
"Column count mismatch: got "
<> show _numColumns
<> " columns but the schema implied "
<> show _numDescs
<> " columns"
else zipWith parse colChunks columnDescriptions
where
columnChunks :: (RandomAccess r) => FileMetadata -> [Stream r ColumnChunk]
columnChunks =
map Stream.fromList
. transpose
. map (unField . rg_columns)
. unField
. row_groups

parse ::
(RandomAccess r, MonadIO r) =>
Stream r ColumnChunk -> ColumnDescription -> Stream r Column
parse columnChunkStream description = Stream.unfoldEach (parseColumnChunk description) columnChunkStream

data ColumnChunkState
= ColumnChunkState
{ remainingBytes :: !BS.ByteString
, codec :: !CompressionCodec
, dictionary :: !(Maybe DictVals)
, parquetType :: !Int
}

parseColumnChunk ::
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

readColumns is a more idiomatic name in terms of streamly.

(RandomAccess r, MonadIO r) => ColumnDescription -> Unfold r ColumnChunk Column
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

m maybe a better type variable representing monads.

parseColumnChunk description = Unfold.Unfold step inject
where
inject :: (RandomAccess r) => ColumnChunk -> r ColumnChunkState
inject columnChunk = do
let columnMetadata = fromJust $ unField $ cc_meta_data columnChunk
dataOffset = unField $ cmd_data_page_offset columnMetadata
dictOffset = fromMaybe dataOffset (unField $ cmd_dictionary_page_offset columnMetadata)
startOffset = min dataOffset dictOffset
compressedSize = unField $ cmd_total_compressed_size columnMetadata
chunkCodec = unField $ cmd_codec columnMetadata
parquetType = fromEnum $ pinchThriftTypeToParquetType (unField $ cmd_type columnMetadata)
range = Range (fromIntegral startOffset) (fromIntegral compressedSize)

rawBytes <- readBytes range
return $ ColumnChunkState rawBytes chunkCodec Nothing parquetType

step ::
(RandomAccess r, MonadIO r) =>
ColumnChunkState -> r (Unfold.Step ColumnChunkState Column)
step (ColumnChunkState remaining chunkCodec dict parquetType) = do
if BS.null remaining
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could use guards here.

step (ColumnChunkState remaining _ _ _) | BS.null remaining = pure Unfold.Stop

then return Unfold.Stop
else case parsePageHeader remaining of
Left e -> error $ show e
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error is convenient to use while bootstrapping but eventually found undesirable later.
You can maybe add a TODO here to fix this later?

Right (remainder, header) -> do
let compressedPageSize = fromIntegral $ unField $ ph_compressed_page_size header
(pageData, rest) = BS.splitAt compressedPageSize remainder
uncompressedData <-
liftIO $
decompressData (pinchCompressionToParquetCompression chunkCodec) pageData

case unField $ ph_dictionary_page_header header of
Just dictHeader -> do
{-
The dictionary page must be placed at the first position of the column chunk
if it is partly or completely dictionary encoded. At most one dictionary page
can be placed in a column chunk.
This allows us to maintain the parsed DictVals for the chunk and pass it along
to subsequent data pages.
https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift#L698C1-L712C2
-}
let numValues = fromIntegral $ unField $ diph_num_values dictHeader
newDict = readDictVals (toEnum parquetType) uncompressedData (Just numValues)
step (ColumnChunkState rest chunkCodec (Just newDict) parquetType)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a good idea to make step recursive. Breaks fusion and also harder to reason with.
You can Skip here instead.

Nothing -> do
-- It's a data page. Yield it.
column <-
parsePage
description
(PageDescription uncompressedData header chunkCodec dict parquetType)
return $ Unfold.Yield column (ColumnChunkState rest chunkCodec dict parquetType)

parsePageHeader :: BS.ByteString -> Either String (BS.ByteString, PageHeader)
parsePageHeader bytes = case decodeWithLeftovers Pinch.compactProtocol bytes of
Left e -> Left e
Right header -> Right header
Comment on lines +186 to +188
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
parsePageHeader bytes = case decodeWithLeftovers Pinch.compactProtocol bytes of
Left e -> Left e
Right header -> Right header
parsePageHeader = decodeWithLeftovers Pinch.compactProtocol

78 changes: 78 additions & 0 deletions src/DataFrame/IO/Unstable/Parquet/PageParser.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE ScopedTypeVariables #-}

module DataFrame.IO.Unstable.Parquet.PageParser (parsePage) where

import Control.Monad.IO.Class (MonadIO (liftIO))
import DataFrame.IO.Parquet (applyLogicalType, decodePageData)
import DataFrame.IO.Parquet.Levels (readLevelsV1, readLevelsV2)
import DataFrame.IO.Parquet.Types (parquetTypeFromInt)
import DataFrame.IO.Unstable.Parquet.Thrift
import DataFrame.IO.Unstable.Parquet.Utils (
ColumnDescription (..),
PageDescription (..),
)
import DataFrame.IO.Utils.RandomAccess (RandomAccess)
import DataFrame.Internal.Column (Column)

parsePage ::
(RandomAccess r, MonadIO r) => ColumnDescription -> PageDescription -> r Column
parsePage description (PageDescription pageBytes header _ dictValsM pType') = do
let maxDef = fromIntegral $ maxDefinitionLevel description
maxRep = fromIntegral $ maxRepetitionLevel description
-- We do not have type lengths threaded effectively for Fixed Len yet, assume Nothing for now
-- unless handled correctly.
logicalType = pinchLogicalTypeToLogicalType <$> colLogicalType description
maybeTypeLen = Nothing
pType = parquetTypeFromInt . fromIntegral $ pType'

liftIO $ case unField (ph_data_page_header header) of
Just dph -> do
let n = fromIntegral $ unField (dph_num_values dph)
enc = parquetEncodingFromPinch (unField (dph_encoding dph))
(defLvls, repLvls, afterLvls) = readLevelsV1 n maxDef maxRep pageBytes
nPresent = length (filter (== maxDef) defLvls)
decodePageData
dictValsM
(maxDef, maxRep)
pType
maybeTypeLen
enc
defLvls
repLvls
nPresent
afterLvls
"v1"
Nothing -> case unField (ph_data_page_header_v2 header) of
Just dph2 -> do
let n = fromIntegral $ unField (dph2_num_values dph2)
enc = parquetEncodingFromPinch (unField (dph2_encoding dph2))
(defLvls, repLvls, afterLvls) =
readLevelsV2
n
maxDef
maxRep
(unField $ dph2_definition_levels_byte_length dph2)
(unField $ dph2_repetition_levels_byte_length dph2)
pageBytes
nPresent
| unField (dph2_num_nulls dph2) > 0 =
fromIntegral (unField (dph2_num_values dph2) - unField (dph2_num_nulls dph2))
| otherwise = length (filter (== maxDef) defLvls)
column <-
decodePageData
dictValsM
(maxDef, maxRep)
pType
maybeTypeLen
enc
defLvls
repLvls
nPresent
afterLvls
"v2"
case logicalType of
Nothing -> return column
Just lt -> return $ applyLogicalType lt column
Nothing -> error "Page header is neither v1 nor v2 data page"
Loading
Loading