em->getRepository(Import::class)->find($message->importId); if (!$import) { $this->logger->error('Import not found', ['importId' => $message->importId]); return; } try { $csvContent = $this->defaultStorage->read($import->getFilePath()); $tmpFile = tempnam(sys_get_temp_dir(), 'import_'); file_put_contents($tmpFile, $csvContent); try { $ltbxdMovies = $this->ltbxdGateway->parseFileFromPath($tmpFile); } finally { unlink($tmpFile); } $totalFilms = count($ltbxdMovies); $totalBatches = (int) ceil($totalFilms / self::BATCH_SIZE); $import->setTotalFilms($totalFilms); $import->setTotalBatches($totalBatches); $import->setStatus(Import::STATUS_PROCESSING); $this->em->flush(); $batches = array_chunk($ltbxdMovies, self::BATCH_SIZE); foreach ($batches as $batch) { $films = array_map(fn ($movie) => [ 'name' => $movie->getName(), 'year' => $movie->getYear(), 'ltbxdUri' => $movie->getLtbxdUri(), 'date' => $movie->getDate()->format('Y-m-d'), ], $batch); $this->bus->dispatch(new ImportFilmsBatchMessage( importId: $import->getId(), films: $films, )); } } catch (\Throwable $e) { $this->logger->error('Import processing failed', [ 'importId' => $import->getId(), 'error' => $e->getMessage(), ]); $import->setStatus(Import::STATUS_FAILED); $this->em->flush(); } } }